reactor:【IoT 物联网】实现 alink 的 IotAlinkDeviceMessageCodec,并尝试接入 http 协议

This commit is contained in:
YunaiV 2025-06-09 21:44:47 +08:00
parent f58cf282dd
commit 120029bb17
14 changed files with 392 additions and 137 deletions

View File

@ -0,0 +1,25 @@
package cn.iocoder.yudao.module.iot.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* IoT 设备消息的方法枚举
*
* @author haohao
*/
@Getter
@AllArgsConstructor
public enum IotDeviceMessageMethodEnum {
// ========== 设备状态 ==========
STATE_ONLINE("thing.state.online"),
STATE_OFFLINE("thing.state.offline"),
;
private final String method;
}

View File

@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.core.mq.message;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -9,7 +8,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.Map;
/**
* IoT 设备消息
@ -32,109 +30,126 @@ public class IotDeviceMessage {
*/
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
// TODO @芋艿thingsboard 对应 id全部由后端生成由于追溯是不是调整下
/**
* 消息编号
*/
private String messageId;
// TODO @芋艿thingsboard 是使用 deviceId
/**
* 设备信息
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
// TODO @芋艿thingsboard 只定义了 type相当于 type + identifier 结合TbMsgType
/**
* 消息类型
*
* 枚举 {@link IotDeviceMessageTypeEnum}
* 由后端生成通过 {@link IotDeviceMessageUtils#generateMessageId()}
*/
private String type;
private String id;
/**
* 标识符
* 上报时间
*
* 枚举 {@link IotDeviceMessageIdentifierEnum}
* 由后端生成当前时间
*/
private String identifier;
private LocalDateTime reportTime;
// TODO @芋艿thingsboard 只有 data 字段没有 code 字段
// TODO @芋艿要不提前序列化成字符串类似 thingsboard data 字段
// ========== codec编解码字段 ==========
/**
* 请求编号
*
* 由设备生成对应阿里云 IoT Alink 协议中的 id华为云 IoTDA 协议的 request_id
*/
private String requestId;
/**
* 请求方法
*
* 枚举 {@link IotDeviceMessageMethodEnum}
* 例如说thing.property.report 属性上报
*/
private String method;
/**
* 请求参数
*
* 例如说属性上报的 properties事件上报的 params
*/
private Object data;
// TODO @芋艿可能会去掉
private Object params;
/**
* 响应码
*
* 目前只有 server 下行消息给 device 设备时才会有响应码
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
// ========== 后端字段 ==========
/**
* 上报时间
* 设备编号
*/
private LocalDateTime reportTime;
private Long deviceId;
/**
* 租户编号
*/
private Long tenantId;
/**
* 服务编号该消息由哪个 server 服务进行消费
*/
private String serverId;
public IotDeviceMessage ofPropertyReport(Map<String, Object> properties) {
this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier());
this.setData(properties);
return this;
// public IotDeviceMessage ofPropertyReport(Map<String, Object> properties) {
// this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
// this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier());
// this.setData(properties);
// return this;
// }
//
// public IotDeviceMessage ofPropertySet(Map<String, Object> properties) {
// this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
// this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier());
// this.setData(properties);
// return this;
// }
//
// public IotDeviceMessage ofStateOnline() {
// this.setType(IotDeviceMessageTypeEnum.STATE.getType());
// this.setIdentifier(IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier());
// return this;
// }
//
// public IotDeviceMessage ofStateOffline() {
// this.setType(IotDeviceMessageTypeEnum.STATE.getType());
// this.setIdentifier(IotDeviceMessageIdentifierEnum.STATE_OFFLINE.getIdentifier());
// return this;
// }
//
// public static IotDeviceMessage of(String productKey, String deviceName) {
// return of(productKey, deviceName,
// null, null);
// }
//
// public static IotDeviceMessage of(String productKey, String deviceName,
// String serverId) {
// return of(productKey, deviceName,
// null, serverId);
// }
//
// public static IotDeviceMessage of(String productKey, String deviceName,
// LocalDateTime reportTime, String serverId) {
// if (reportTime == null) {
// reportTime = LocalDateTime.now();
// }
// String messageId = IotDeviceMessageUtils.generateMessageId();
// return IotDeviceMessage.builder()
// .messageId(messageId).reportTime(reportTime)
// .productKey(productKey).deviceName(deviceName)
// .serverId(serverId).build();
// }
public static IotDeviceMessage of(String requestId, String method, Object params) {
return of(requestId, method, params, null, null);
}
public IotDeviceMessage ofPropertySet(Map<String, Object> properties) {
this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier());
this.setData(properties);
return this;
}
public IotDeviceMessage ofStateOnline() {
this.setType(IotDeviceMessageTypeEnum.STATE.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier());
return this;
}
public IotDeviceMessage ofStateOffline() {
this.setType(IotDeviceMessageTypeEnum.STATE.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.STATE_OFFLINE.getIdentifier());
return this;
}
public static IotDeviceMessage of(String productKey, String deviceName) {
return of(productKey, deviceName,
null, null);
}
public static IotDeviceMessage of(String productKey, String deviceName,
String serverId) {
return of(productKey, deviceName,
null, serverId);
}
public static IotDeviceMessage of(String productKey, String deviceName,
LocalDateTime reportTime, String serverId) {
if (reportTime == null) {
reportTime = LocalDateTime.now();
}
String messageId = IotDeviceMessageUtils.generateMessageId();
return IotDeviceMessage.builder()
.messageId(messageId).reportTime(reportTime)
.productKey(productKey).deviceName(deviceName)
.serverId(serverId).build();
public static IotDeviceMessage of(String requestId, String method,
Object params, Object data, Integer code) {
// 通用参数
IotDeviceMessage message = new IotDeviceMessage()
.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now());
// 当前参数
message.setRequestId(requestId).setMethod(method).setParams(params).setData(data).setCode(code);
return message;
}
}

View File

@ -30,7 +30,7 @@ public class IotDeviceMessageProducer {
* @param serverId 网关的 serverId 标识
* @param message 设备消息
*/
public void sendGatewayDeviceMessage(String serverId, Object message) {
public void sendDeviceMessageToGateway(String serverId, IotDeviceMessage message) {
messageBus.post(IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(serverId), message);
}

View File

@ -18,6 +18,7 @@ public class IotDeviceMessageUtils {
return IdUtil.fastSimpleUUID();
}
// TODO @芋艿需要优化下
/**
* 是否是上行消息由设备发送
*

View File

@ -0,0 +1,33 @@
package cn.iocoder.yudao.module.iot.gateway.codec;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
* {@link cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage} 的编解码器
*
* @author 芋道源码
*/
public interface IotDeviceMessageCodec {
/**
* 编码消息
*
* @param message 消息
* @return 编码后的消息内容
*/
byte[] encode(IotDeviceMessage message);
/**
* 解码消息
*
* @param bytes 消息内容
* @return 解码后的消息内容
*/
IotDeviceMessage decode(byte[] bytes);
/**
* @return 类型
*/
String type();
}

View File

@ -0,0 +1,81 @@
package cn.iocoder.yudao.module.iot.gateway.codec.alink;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
/**
* 阿里云 Alink {@link IotDeviceMessage} 的编解码器
*
* @author 芋道源码
*/
@Component
public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class AlinkMessage {
public static final String VERSION_1 = "1.0";
/**
* 消息 ID且每个消息 ID 在当前设备具有唯一性
*/
private String id;
/**
* 版本号
*/
private String version;
/**
* 请求方法
*/
private String method;
/**
* 请求参数
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
}
@Override
public byte[] encode(IotDeviceMessage message) {
AlinkMessage alinkMessage = new AlinkMessage(message.getRequestId(), AlinkMessage.VERSION_1,
message.getMethod(), message.getParams(), message.getData(), message.getCode());
return JsonUtils.toJsonByte(alinkMessage);
}
@Override
@SuppressWarnings("DataFlowIssue")
public IotDeviceMessage decode(byte[] bytes) {
AlinkMessage alinkMessage = JsonUtils.parseObject(bytes, AlinkMessage.class);
Assert.notNull(alinkMessage, "消息不能为空");
Assert.equals(alinkMessage.getVersion(), AlinkMessage.VERSION_1, "消息版本号必须是 1.0");
return IotDeviceMessage.of(alinkMessage.getId(),
alinkMessage.getMethod(), alinkMessage.getParams(), alinkMessage.getData(), alinkMessage.getCode());
}
@Override
public String type() {
return "alink";
}
}

View File

@ -1 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.codec.alink;

View File

@ -33,7 +33,7 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
@Override
public void handle(RoutingContext context) {
public final void handle(RoutingContext context) {
try {
// 1. 前置处理
CommonResult<Object> result = beforeHandle(context);

View File

@ -13,6 +13,7 @@ import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
@ -40,11 +41,14 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
private final IotDeviceCommonApi deviceClientService;
private final IotDeviceMessageService deviceMessageService;
public IotHttpAuthHandler(IotHttpUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceClientService = SpringUtil.getBean(IotDeviceCommonApi.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
}
@Override
@ -78,9 +82,9 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
Assert.notBlank(token, "生成 token 不能为空位");
// 3. 执行上线
deviceMessageProducer.sendDeviceMessage(IotDeviceMessage.of(deviceInfo.getProductKey(), deviceInfo.getDeviceName(),
protocol.getServerId())
.ofStateOnline());
IotDeviceMessage message = deviceMessageService.buildDeviceMessageOfStateOnline(
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
deviceMessageProducer.sendDeviceMessage(message);
// 构建响应数据
return success(MapUtil.of("token", token));

View File

@ -1,14 +1,17 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import io.vertx.core.Handler;
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
@ -23,49 +26,39 @@ import java.util.Map;
*/
@RequiredArgsConstructor
@Slf4j
public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
// TODO @haohao你说咱要不要把 "/sys/:productKey/:deviceName"
// + IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic()也抽到 IotDeviceTopicEnum build 这种尽量都收敛掉
/**
* 属性上报路径
*/
public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName"
+ IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getTopic();
/**
* 事件上报路径
*/
public static final String EVENT_PATH = "/sys/:productKey/:deviceName"
+ IotDeviceTopicEnum.EVENT_POST_TOPIC_PREFIX.getTopic() + ":identifier"
+ IotDeviceTopicEnum.EVENT_POST_TOPIC_SUFFIX.getTopic();
public static final String PATH = "/topic/sys/:productKey/:deviceName/*";
private final IotHttpUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
private final IotDeviceMessageService deviceMessageService;
public IotHttpUpstreamHandler(IotHttpUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
}
@Override
public void handle(RoutingContext context) {
String path = context.request().path();
protected CommonResult<Object> handle0(RoutingContext context) {
// 1. 解析通用参数
String productKey = context.pathParam("productKey");
String deviceName = context.pathParam("deviceName");
JsonObject body = context.body().asJsonObject();
String method = context.pathParam("*").replaceAll(StrPool.SLASH, StrPool.DOT);
// 2. 根据路径模式处理不同类型的请求
if (isPropertyPostPath(path)) {
// 处理属性上报
handlePropertyPost(context, productKey, deviceName, body);
} else if (isEventPostPath(path)) {
// 处理事件上报
String identifier = context.pathParam("identifier");
handleEventPost(context, productKey, deviceName, identifier, body);
}
// 2.1 解析消息
byte[] bytes = context.body().buffer().getBytes();
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes,
productKey, deviceName, protocol.getServerId());
Assert.equals(method, message.getMethod(), "method 不匹配");
// 2.2 发送消息
deviceMessageProducer.sendDeviceMessage(message);
// 3. 返回结果
return CommonResult.success(MapUtil.of("messageId", message.getId()));
}
/**
@ -101,7 +94,8 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
JsonObject body) {
// 1.1 构建设备消息
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId())
.ofPropertyReport(parsePropertiesFromBody(body));
// .ofPropertyReport(parsePropertiesFromBody(body))
;
// 1.2 发送消息
deviceMessageProducer.sendDeviceMessage(message);

View File

@ -1,22 +0,0 @@
/**
* MQTT 协议路由器包
* <p>
* 包含 MQTT 协议的所有路由处理器和抽象基类
* <ul>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAbstractHandler}
* - 抽象路由处理器基类</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamRouter}
* - 上行消息路由器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttAuthRouter}
* - 认证路由器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttPropertyHandler}
* - 属性处理器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttEventHandler}
* - 事件处理器</li>
* <li>{@link cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttServiceHandler}
* - 服务处理器</li>
* </ul>
*
* @author 芋道源码
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;

View File

@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.service.auth;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
/**
* IoT 设备 Token 服务 Service 接口
* IoT 设备 Token Service 接口
*
* @author 芋道源码
*/

View File

@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.iot.gateway.service.message;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
* IoT 设备消息 Service 接口
*
* @author 芋道源码
*/
public interface IotDeviceMessageService {
/**
* 编码消息
*
* @param message 消息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 编码后的消息内容
*/
byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName);
/**
* 解码消息
*
* @param bytes 消息内容
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param serverId 设备连接的 serverId
* @return 解码后的消息内容
*/
IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName, String serverId);
/**
* 构建设备上线消息
*
* @return 消息
*/
IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId);
}

View File

@ -0,0 +1,83 @@
package cn.iocoder.yudao.module.iot.gateway.service.message;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
/**
* IoT 设备消息 Service 实现类
*
* @author 芋道源码
*/
@Service
public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
/**
* 编解码器
*/
private final Map<String, IotAlinkDeviceMessageCodec> codes;
public IotDeviceMessageServiceImpl(List<IotAlinkDeviceMessageCodec> codes) {
this.codes = CollectionUtils.convertMap(codes, IotAlinkDeviceMessageCodec::type);
}
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName) {
// TODO @芋艿获取设备信息
String codecType = "alink";
return codes.get(codecType).encode(message);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName, String serverId) {
// TODO @芋艿获取设备信息
String codecType = "alink";
IotDeviceMessage message = codes.get(codecType).decode(bytes);
// 补充后端字段
Long deviceId = 25L;
Long tenantId = 1L;
appendDeviceMessage(message, deviceId, tenantId, serverId);
return message;
}
@Override
public IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId) {
IotDeviceMessage message = IotDeviceMessage.of(null,
IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod(), null);
// 补充后端字段
Long deviceId = 25L;
Long tenantId = 1L;
return appendDeviceMessage(message, deviceId, tenantId, serverId);
}
/**
* 补充消息的后端字段
*
* @param message 消息
* @param deviceId 设备编号
* @param tenantId 租户编号
* @param serverId 设备连接的 serverId
* @return 消息
*/
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message,
Long deviceId, Long tenantId, String serverId) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
.setDeviceId(deviceId).setTenantId(tenantId).setServerId(serverId);
// 特殊如果设备没有指定 requestId则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId());
}
return message;
}
}