# 平台统一设备消息定义
平台使用自定义的协议包将设备上报的报文解析为平台统一的消息来进行统一管理。
# 消息定义
平台统一消息基本于物模型中的定义相同,主要由属性(property)
,功能(function)
,事件(event)
组成。
# 消息组成
消息主要由deviceId
,messageId
,headers
,timestamp
组成.
deviceId
为设备的唯一标识,messageId
为消息的唯一标识,headers
为消息头,通常用于对自定义消息处理的行为,如是否异步消息,
是否分片消息等.
headers定义在org.jetlinks.core.message.Headers
。常用的headers如下:
名称 | 类型 | 描述 |
async | Boolean | 是否异步 |
timeout | Long | 超时时间,单位为毫秒 |
frag_msg_id | String | 分片主消息ID,为平台下发消息时的消息ID( |
frag_num | Integer | 分片总数 |
frag_num | Integer | 分片总数 |
frag_part | Integer | 当前分片索引 |
frag_last | Integer | 是否为最后一个分片。当无法确定分片数量的时候,可以将分片设置到足够大,最后一个分片设置 |
keepOnline | Boolean | 与 |
keepOnlineTimeoutSeconds | Integer | 指定在线超时时间。在短链接时,如果超过此间隔没有收到消息则认为设备离线。 |
ignoreStorage | Boolean | 不存储此消息数据。如:读写属性回复默认也会记录到属性时序数据库中,设置为true后,将不记录。(1.9版本后支持) |
ignoreLog | Boolean | 不记录此消息的日志。如:设置为true,将不记录此消息的日志。 |
mergeLatest | Boolean | 是否合并最新属性数据。设置此消息头后,将会把最新的消息合并到消息体里(需要开启最新数据存储。 |
提示
messageId通常由平台自动生成,如果设备不支持消息id,可在自定义协议中通过Map的方式来做映射,将设备返回的消息与平台的messageId进行绑定。
分片消息指,设备将一个请求的结果分片返回。通常用于处理大消息。
# 属性相关消息
获取设备属性(ReadPropertyMessage)
对应设备回复的消息ReadPropertyMessageReply
.修改设备属性(WritePropertyMessage)
对应设备回复的消息WritePropertyMessageReply
.设备上报属性(ReportPropertyMessage)
由设备上报.
提示
设备回复的消息是通过messageId
进行绑定,messageId
应该注意要全局唯一。如果设备无法做到,可以在编解码时通过添加前缀等方式实现。
消息定义:
ReadPropertyMessage{
Map<String,Object> headers;
String deviceId;
String messageId;
long timestamp; //时间戳(毫秒)
List<String> properties;//可读取多个属性
}
ReadPropertyMessageReply{
Map<String,Object> headers;
String deviceId;
String messageId;
long timestamp; //时间戳(毫秒)
boolean success;
Map<String,Object> properties;//属性键值对
}
WritePropertyMessage{
Map<String,Object> headers;
String deviceId;
String messageId;
long timestamp; //时间戳(毫秒)
Map<String,Object> properties;
}
WritePropertyMessageReply{
Map<String,Object> headers;
String deviceId;
String messageId;
long timestamp; //时间戳(毫秒)
boolean success;
Map<String,Object> properties; //回复被修改的属性最新值
}
ReportPropertyMessage{
Map<String,Object> headers;
String deviceId;
String messageId; //可为空
long timestamp; //时间戳(毫秒)
Map<String,Object> properties;
}
# 功能相关消息
调用设备功能的消息(FunctionInvokeMessage
)由平台发往设备,对应的返回消息为FunctionInvokeMessageReply
。
消息定义:
FunctionInvokeMessage{
Map<String,Object> headers;
String functionId;//功能标识,在元数据中定义.
String deviceId;
String messageId;
long timestamp; //时间戳(毫秒)
List<FunctionParameter> inputs;//输入参数
}
FunctionParameter{
String name;
Object value;
}
FunctionInvokeMessageReply{
Map<String,Object> headers;
String deviceId;
String messageId;
long timestamp;
boolean success;
Object output; //输出值,需要与元数据定义中的类型一致
}
# 事件消息
事件消息EventMessage
由设备端发往平台。
消息定义:
EventMessage{
Map<String,Object> headers;
String event; //事件标识,在元数据中定义
Object data; //与元数据中定义的类型一致,如果是对象类型,请转为java.util.HashMap,禁止使用自定义类型.
long timestamp; //时间戳(毫秒)
}
# 其他消息
消息类 | 名称 | 描述 |
DeviceOnlineMessage | 设备上线消息 | 通常用于网关代理的子设备的上线操作 |
DeviceOfflineMessage | 设备离线消息 | 通常用于网关代理的子设备的下线操作 |
ChildDeviceMessage | 子设备消息 | 通常用于网关代理的子设备的消息 |
ChildDeviceMessageReply | 子设备消息回复 | 用于平台向网关代理的子设备发送消息后设备回复给平台的结果 |
UpdateTagMessage | 更新设备标签 | |
DerivedMetadataMessage | 更新设备独立物模型 | |
DeviceRegisterMessage | 设备注册消息 | 通过设置消息头 |
DeviceStateCheckMessage | 检查子设备状态 | 如果配置了状态自管理,在检查子设备状态时,会发送指令 |
UpgradeFirmwareMessage | 更新设备固件消息 | 配置了 |
UpgradeFirmwareProgressMessage | 上报固件更新进度消息 | 设备开始 |
消息定义:
DeviceOnlineMessage{
String deviceId;
long timestamp;
}
DeviceOfflineMessage{
String deviceId;
long timestamp;
}
ChildDeviceMessage{
String deviceId;
long timestamp;
String childDeviceId;
Message childDeviceMessage; //子设备消息
}
ChildDeviceMessageReply{
String deviceId;
String messageId;
String childDeviceId;
Message childDeviceMessage;
}
UpdateTagMessage{
String deviceId;
long timestamp;
Map<String, Object> tags;
}
DerivedMetadataMessage{
String deviceId;
long timestamp;
String metadata; //元数据
boolean all; //是否是全量数据
}
DeviceRegisterMessage{
String deviceId;
long timestamp;
}
UpgradeFirmwareMessage{
String deviceId;
long timestamp;
String url; //固件下载地址
String version; //固件版本
Map<String, Object> parameters; //其他参数
String sign; //签名
String signMethod; //签名方式,md5,sha256
String firmwareId; //固件ID
long size; //固件大小
}
UpgradeFirmwareMessageReply{
String deviceId;
String messageId;
boolean success;
}
UpgradeFirmwareProgressMessage{
String deviceId;
int progress; //进度0-100
boolean complete; //是否已完成
String version; //升级中的固件版本
boolean success; //是否成功
String errorReason; //错误原因
String firmwareId; //固件ID
}
父子设备消息处理请看这里
# 设备消息对应事件总线topic
协议包将设备上报后的报文解析为平台统一的设备消息后,会将消息转换为对应的topic 并发送到事件总线,可以通过从事件总线订阅消息来处理这些消息。
所有设备消息的topic
的前缀均为: /device/{productId}/{deviceId}
.
如:产品product-1
下的设备device-1
上线消息: /device/product-1/device-1/online
.
可通过通配符订阅所有设备的指定消息,如:/device/*/*/online
,或者订阅所有消息:/device/**
.
警告
- 此topic和mqtt的topic没有任何关系,仅仅作为系统内部通知的方式
- 使用通配符订阅可能将收到大量的消息,请保证消息的处理速度,否则会影响系统消息吞吐量.
# 总线topic
topic | 消息类型 | 说明 |
/online | DeviceOnlineMessage | 设备上线 |
/offline | DeviceOfflineMessage | 设备离线 |
/message/event/{eventId} | EventMessage | 设备事件 |
/message/property/report | ReportPropertyMessage | 设备上报属性 |
/message/send/property/read | ReadPropertyMessage | 平台下发读取消息指令 |
/message/send/property/write | WritePropertyMessage | 平台下发修改消息指令 |
/message/property/read/reply | ReadPropertyMessageReply | 读取属性回复 |
/message/property/write/reply | WritePropertyMessageReply | 修改属性回复 |
/message/send/function | FunctionInvokeMessage | 平台下发功能调用 |
/message/function/reply | FunctionInvokeMessageReply | 调用功能回复 |
/register | DeviceRegisterMessage | 设备注册,通常与子设备消息配合使用 |
/unregister | DeviceUnRegisterMessage | 设备注销,同上 |
/message/children/{childrenDeviceId}/{topic} | ChildDeviceMessage | 子设备消息,{topic}为子设备消息对应的topic |
/message/children/reply/{childrenDeviceId}/{topic} | ChildDeviceMessageReply | 子设备回复消息,同上 |
/message/direct | DirectDeviceMessage | 透传消息 |
/message/tags/update | UpdateTagMessage | 更新标签消息 |
/firmware/pull | RequestFirmwareMessage | 拉取固件请求 (设备->平台) |
/firmware/pull/reply | RequestFirmwareMessageReply | 拉取固件请求回复 (平台->设备) |
/firmware/report | ReportFirmwareMessage | 上报固件信息 |
/firmware/progresst | UpgradeFirmwareProgressMessage | 上报更新固件进度 |
/firmware/push | UpgradeFirmwareMessage | 推送固件更新 |
/firmware/push/reply | UpgradeFirmwareMessageReply | 固件更新回复 |
/message/log | DeviceLogMessage | 设备日志 |
/metadata/derived | DerivedMetadataMessage | 更新物模型 |
注意
列表中的topic已省略前缀/device/{productId}/{deviceId}
,使用时请加上。
在1.6版本后,支持订阅创建者分组下的设备消息了。
- 创建者topic前缀:
/user/{userId}
例如: /user/{userId}/device/{productId}/{deviceId}/**
# 设备接入流程
# 设备接入最佳实践
设备接入的核心是协议包。无论是直连设备,或者是与其他平台对接,理论上都可以在自定义协议包里进行处理。
# 物模型定义
在接入一个设备时,首先根据设备以及设备接入文档(报文说明), 将设备物模型的属性、功能以及事件设计好。
通常情况下:
对于设备固有不变的信息,建议使用设备标签
进行管理。
属性用于定义一些指标数据,如:电压
,温度
等。
属性都应该是简单的数据类型,如:int
,float
,string
等,避免使用结构体等复杂类型。
功能用于定义设备具有的一些可执行动作,如:消音
,关灯
,云台控制
等。根据情况设计好输入参数和输出参数。
事件用于定义设备在特定条件时,发生的动作,如:火警
,检测到人脸
,通常为结构体类型,用于保存比较复杂的数据。
说明
在设计物模型时,尽量屏蔽掉非必要参数,让物模型简单化、通用化。
# 协议包开发
建议使用策略模式来定义功能码,以及不同功能码对应的解析规则。如:使用枚举来定义功能码。
避免进行array copy
,应该使用偏移量直接处理报文。
常见场景算法实践:
- 使用枚举来处理不同类型的数据
伪代码如下:
@AllArgsConstructor
@Getter
public enum UpstreamCommand {
//注册命令
register((byte) 0x00, RegisterMessage::decode),
//数据命令
data((byte) 0x02, DataMessage::decode),
byte code;
BiFunction<byte[], Integer, ? extends MyDeviceMessage> decoder;
public static UpstreamCommand of(byte code) {
for (UpstreamCommand value : values()) {
if (value.code == code) {
return value;
}
}
throw new UnsupportedOperationException("不支持的命令:0x" + Hex.encodeHexString(new byte[]{data}));
}
//调用此方法解码数据
public static MyDeviceMessage decode(byte[] body) {
//业务流水号
int msgId = BytesUtils.beToInt(body, 0, 2); //2字节小端转int
//命令
UpstreamCommand command = of(body[4]);
return command.getDecoder().apply(body, 5);
}
}
- 使用二进制位来表示状态,0表示正常,1表示异常:
使用枚举定义二进制位表示的含义,使用位运算来判断对应数据是哪一位
@AllArgsConstructor
@Getter
public enum DataType{
OK(-9999,"正常"),
Bit0(0,"测试"),
Bit1(1,"火警"),
Bit2(2,"故障")
//....
;
private int bit;
private String text;
public static DataType of(long value) {
if (value == 0) {
return OK;
}
for (DataType type : values()) {
if ((value & (1 << type.bit)) != 0) {
//数据位支持多选的话,装到集合里即可
return partState;
}
}
throw new UnsupportedOperationException("不支持的状态:" + value);
}
}
注意
应该将对报文处理的类封装为独立的类,然后在开发过程中,使用单元测试验证处理是否正确。
避免直接在DeviceMessageCodec
里编写处理逻辑。
# 协议包特性配置
协议包提供了特性的配置接口,用于激活特定的功能。例如远程升级、数据解析等。
特性定义统一实现了org.jetlinks.core.metadata.Feature
接口。
例如:添加远程升级支持
CompositeProtocolSupport support = new CompositeProtocolSupport();
support.addFeature(DeviceFeatures.supportFirmware);