写点什么

IoT 平台实现 RRPC 同步调用最佳实战——实践类

作者:阿里云AIoT
  • 2023-03-02
    浙江
  • 本文字数:3658 字

    阅读完需:约 12 分钟

基于 Pub/Sub 模式的同步调用实战


1.同步调用场景


1.1 背景

MQTT 协议是基于 PUB/SUB 的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。IoT 物联网平台基于 MQTT 协议制定了一套请求和响应的同步机制,无需改动 MQTT 协议即可实现同步通信。应用服务器通过 POP API 发起 Rrpc 调用,IoT 设备端只需要在 Timeout 内,按照固定的格式回复 Pub 消息,服务端即可同步获取 IoT 设备端的响应结果。


具体流程如下:



1.2 Topic 格式约定

请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}**<br />**响应:**/sys/${productKey}/${deviceName}/rrpc/**response**/**${messageId}

$表示变量,每个设备不同

messageId 为 IoT 平台生成的消息 ID,

设备端回复 responseTopic 里的 messageId 要与 requestTopic 一致


示例:

设备端需要订阅:

/sys/${productKey}/${deviceName}/rrpc/request/+

运行中设备收到 Topic:

/sys/PK100101/DN213452/rrpc/request/443859344534

收到消息后,在 timeout 时间内回复 Topic:

/sys/PK100101/DN213452/rrpc/response/443859344534


2.同步调用 RRPC 示例


2.1 设备端代码

const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) }})
function handleRrpc(topic, message){ topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //普通Rrpc,响应payload自定义 const payloadJson = {code:200,msg:"handle ok"}; client.publish(topic, JSON.stringify(payloadJson));}
复制代码


2.2 服务端 POP 调用 Rrpc

const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2018-01-20'});
const payload = { "msg": "hello Rrpc"};
//2.构建requestconst params = { ProductKey:"a1gMu82K4m2", DeviceName:"h5@nuwr5r9hf6l@1532088166923", RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"), Timeout:3000};
co(function*() { //3.发起API调用 const response = yield client.request('Rrpc', params);
console.log(JSON.stringify(response));});
复制代码

rrpc 响应:

{    "MessageId": "1037292594536681472",    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",    "Success": true,    "RrpcCode": "SUCCESS"}
// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
复制代码


3.物模型-服务同步调用 InvokeThingService 示例

注意:物模型 服务调用 接口 InvokeThingService,不是 Rrpc


设备订阅 subTopic

注意:服务同步调用 API 是 InvokeThingService

/sys/${productKey}/${deviceName}/rrpc/request/+

IoT 云端下行的 payload 格式

{

 "id": 3536123,

 "version": "1.0",

 "params": {

   "入参 key1": "入参 value1",

   "入参 key2": "入参 value2"

 },

 "method": "thing.service.{tsl.service.identifier}"

}


设备响应 replyTopic

/sys/${productKey}/${deviceName}/rrpc/response/request 的消息 Id

设备响应 payload 格式

{

 "id": 3536123,

 "code": 200,

 "data": {

   "出参 key1": "出参 value1",

   "出参 key2": "出参 value2"

 }

}


3.1 物模型-同步服务定义



3.2 设备端实现

const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) }})/** 如果存在多个同步调用服务,需要通过payload里的method区分*/function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //物模型 同步服务调用,响应payload结构: const payloadJson = { id: Date.now(), code:200, data: { currentMode: Math.floor((Math.random() * 20) + 10) } }
client.publish(topic, JSON.stringify(payloadJson));}
复制代码

注意:设备端响应的 payload 要满足物模型定义的出参结构

3.3 服务端 POP 接口 InvokeThingService

const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2018-01-20'});
const params = { ProductKey: "a1gMu82K4m2", DeviceName: "h5@nuwr5r9hf6l@1532088166923", Args: JSON.stringify({ "mode": "1" }), Identifier: "thing.service.setMode"};
co(function*() { try { //3.发起API调用 const response = yield client.request('InvokeThingService', params);
console.log(JSON.stringify(response)); } catch (err) { console.log(err); }});
复制代码

调用结果:

{    "Data":{        "Result": "{\"currentMode\":12}",        "MessageId": "1536145625658"    },    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",    "Success": true}端实现const mqtt = require('aliyun-iot-mqtt');//设备属性const options = require("./iot-device-config.json");//建立连接const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) }})/** 如果存在多个同步调用服务,需要通过payload里的method区分*/function handleRrpc(topic, message){
topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //物模型 同步服务调用,响应payload结构: const payloadJson = { id: Date.now(), code:200, data: { currentMode: Math.floor((Math.random() * 20) + 10) } }
client.publish(topic, JSON.stringify(payloadJson));}注意:设备端响应的payload要满足物模型定义的出参结构
3.3 服务端POP 接口InvokeThingServiceconst co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1.初始化clientconst client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2018-01-20'});
const params = { ProductKey: "a1gMu82K4m2", DeviceName: "h5@nuwr5r9hf6l@1532088166923", Args: JSON.stringify({ "mode": "1" }), Identifier: "thing.service.setMode"};
co(function*() { try { //3.发起API调用 const response = yield client.request('InvokeThingService', params);
console.log(JSON.stringify(response)); } catch (err) { console.log(err); }});
复制代码


调用结果:


{    "Data":{        "Result": "{\"currentMode\":12}",        "MessageId": "1536145625658"    },    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",    "Success": true}
复制代码


物联网平台产品介绍详情:https://www.aliyun.com/product/iot/iot_instc_public_cn

阿里云物联网平台客户交流群

用户头像

阿里云AIoT

关注

物联网内容搬运者 2022-04-22 加入

还未添加个人简介

评论

发布
暂无评论
IoT平台实现RRPC同步调用最佳实战——实践类_物联网_阿里云AIoT_InfoQ写作社区