写点什么

基于 Pub/Sub 模式的阿里云 IoT 同步调用详解——设备管理运维类

作者:阿里云AIoT
  • 2023-03-16
    浙江
  • 本文字数:2408 字

    阅读完需:约 8 分钟

1.同步调用场景

1.1 背景

MQTT 协议是基于 PUB/SUB 的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。

IoT 物联网平台基于 MQTT 协议制定了一套请求和响应的同步机制,无需改动 MQTT 协议即可实现同步通信。应用服务器通过 POP API 发起 Rrpc 调用,IoT 设备端只需要在 Timeout 内,按照固定的格式回复 Pub 消息,服务端即可同步获取 IoT 设备端的响应结果。

 

具体流程如下:

 

1.2 Topic 格式约定

请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}

响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}

$表示变量,每个设备不同

messageId 为 IoT 平台生成的消息 ID,设备端回复 responseTopic 里的 messageId 要与 requestTopic 一致

示例:

设备端需要订阅:

/sys/${productKey}/${deviceName}/rrpc/request/+

运行中设备收到 Topic:

/sys/PK100101/DN213452/rrpc/request/443859344534

收到消息后,在 timeout 时间内回复 Topic:

/sys/PK100101/DN213452/rrpc/response/443859344534

 

2.同步调用 RRPC 示例

2.1 设备端代码


const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) }})
function handleRrpc(topic, message){ topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //普通Rrpc,响应payload自定义 const payloadJson = {code:200,msg:"handle ok"}; client.publish(topic, JSON.stringify(payloadJson));}
复制代码

 

2.2 服务端 POP 调用 Rrpc

const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2017-04-20'});
const payload = { "msg": "hello Rrpc"};
//2.构建requestconst params = { ProductKey:"a1gMu82K4m2", DeviceName:"h5@nuwr5r9hf6l@1532088166923", RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"), Timeout:3000};
co(function*() { //3.发起API调用 const response = yield client.request('Rrpc', params);
console.log(JSON.stringify(response));});
复制代码

 

rrpc 响应:

{    "MessageId": "1037292594536681472",    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",    "Success": true,    "RrpcCode": "SUCCESS"}
// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
复制代码

 

3.物模型-服务同步调用 InvokeThingService 示例

注意:物模型 服务调用 接口 InvokeThingService,不是 Rrpc

3.1 物模型-同步服务定义

 

3.2 设备端实现

const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) }})/** 如果存在多个同步调用服务,需要通过payload里的method区分*/function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //物模型 同步服务调用,响应payload结构: const payloadJson = { id: Date.now(), code:200, data: { currentMode: Math.floor((Math.random() * 20) + 10) } }
client.publish(topic, JSON.stringify(payloadJson));}
复制代码

注意:设备端响应的 payload 要满足物模型定义的出参结构

3.3 服务端 POP 接口 InvokeThingService

const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2018-01-20'});
const params = { ProductKey: "a1gMu82K4m2", DeviceName: "h5@nuwr5r9hf6l@1532088166923", Args: JSON.stringify({ "mode": "1" }), Identifier: "thing.service.setMode"};
co(function*() { try { //3.发起API调用 const response = yield client.request('InvokeThingService', params);
console.log(JSON.stringify(response)); } catch (err) { console.log(err); }});
复制代码

 

调用结果:

{    "Data":{        "Result": "{\"currentMode\":12}",        "MessageId": "1536145625658"    },    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",    "Success": true}
复制代码


物联网平台产品介绍详情:https://www.aliyun.com/product/iot/iot_instc_public_cn


阿里云物联网平台客户交流群

用户头像

阿里云AIoT

关注

物联网内容搬运者 2022-04-22 加入

还未添加个人简介

评论

发布
暂无评论
基于Pub/Sub模式的阿里云IoT同步调用详解——设备管理运维类_物联网_阿里云AIoT_InfoQ写作社区