1.同步调用场景
1.1 背景
MQTT 协议是基于 PUB/SUB 的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。
IoT 物联网平台基于 MQTT 协议制定了一套请求和响应的同步机制,无需改动 MQTT 协议即可实现同步通信。应用服务器通过 POP API 发起 Rrpc 调用,IoT 设备端只需要在 Timeout 内,按照固定的格式回复 Pub 消息,服务端即可同步获取 IoT 设备端的响应结果。
 
具体流程如下:
 
1.2 Topic 格式约定
请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}
$表示变量,每个设备不同
messageId 为 IoT 平台生成的消息 ID,设备端回复 responseTopic 里的 messageId 要与 requestTopic 一致
示例:
设备端需要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设备收到 Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到消息后,在 timeout 时间内回复 Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
 
2.同步调用 RRPC 示例
2.1 设备端代码
 
const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) {        if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){    	handleRrpc(topic, message)    }})
function handleRrpc(topic, message){	topic = topic.replace('/request/','/response/');	console.log("topic=" + topic)	//普通Rrpc,响应payload自定义    const payloadJson = {code:200,msg:"handle ok"};	client.publish(topic, JSON.stringify(payloadJson));}
   复制代码
  
2.2 服务端 POP 调用 Rrpc
 const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({    accessKeyId: options.accessKey,    secretAccessKey: options.accessKeySecret,    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',    apiVersion: '2017-04-20'});
const payload = {  "msg": "hello Rrpc"};
//2.构建requestconst params = {    ProductKey:"a1gMu82K4m2",    DeviceName:"h5@nuwr5r9hf6l@1532088166923",    RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),    Timeout:3000};
co(function*() {    //3.发起API调用    const response = yield client.request('Rrpc', params);
    console.log(JSON.stringify(response));});
   复制代码
  
rrpc 响应:
 {    "MessageId": "1037292594536681472",    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",    "Success": true,    "RrpcCode": "SUCCESS"}
// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
   复制代码
  
3.物模型-服务同步调用 InvokeThingService 示例
注意:物模型 服务调用 接口 InvokeThingService,不是 Rrpc
3.1 物模型-同步服务定义
 
3.2 设备端实现
 const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) {        if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){    	handleRrpc(topic, message)    }})/** 如果存在多个同步调用服务,需要通过payload里的method区分*/function handleRrpc(topic, message){
	topic = topic.replace('/request/','/response/');	console.log("topic=" + topic)	//物模型 同步服务调用,响应payload结构:    const payloadJson = {        id: Date.now(),        code:200,        data: {            currentMode: Math.floor((Math.random() * 20) + 10)        }    }
	client.publish(topic, JSON.stringify(payloadJson));}
   复制代码
 注意:设备端响应的 payload 要满足物模型定义的出参结构
3.3 服务端 POP 接口 InvokeThingService
 const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({    accessKeyId: options.accessKey,    secretAccessKey: options.accessKeySecret,    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',    apiVersion: '2018-01-20'});
const params = {    ProductKey: "a1gMu82K4m2",    DeviceName: "h5@nuwr5r9hf6l@1532088166923",    Args: JSON.stringify({ "mode": "1" }),    Identifier: "thing.service.setMode"};
co(function*() {    try {        //3.发起API调用        const response = yield client.request('InvokeThingService', params);
        console.log(JSON.stringify(response));    } catch (err) {        console.log(err);    }});
   复制代码
  
调用结果:
 {    "Data":{        "Result": "{\"currentMode\":12}",        "MessageId": "1536145625658"    },    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",    "Success": true}
   复制代码
 
物联网平台产品介绍详情:https://www.aliyun.com/product/iot/iot_instc_public_cn
阿里云物联网平台客户交流群
评论