reactor:【IoT 物联网】数据下行,基于 messagebus 实现订阅消费

This commit is contained in:
YunaiV 2025-05-30 22:14:46 +08:00
parent 1b59aa9ccb
commit b4035cb036
7 changed files with 64 additions and 55 deletions

View File

@ -3,18 +3,22 @@ package cn.iocoder.yudao.framework.common.util.json;
import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.databind.TimestampLocalDateTimeDeserializer;
import cn.iocoder.yudao.framework.common.util.json.databind.TimestampLocalDateTimeSerializer;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -32,7 +36,11 @@ public class JsonUtils {
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // 忽略 null objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // 忽略 null
objectMapper.registerModules(new JavaTimeModule()); // 解决 LocalDateTime 的序列化 // 解决 LocalDateTime 的序列化
SimpleModule simpleModule = new JavaTimeModule()
.addSerializer(LocalDateTime.class, TimestampLocalDateTimeSerializer.INSTANCE)
.addDeserializer(LocalDateTime.class, TimestampLocalDateTimeDeserializer.INSTANCE);
objectMapper.registerModules(simpleModule);
} }
/** /**

View File

@ -16,7 +16,7 @@ Authorization: Bearer {{token}}
### 请求 /iot/device/downstream 接口(属性设置) => 成功 ### 请求 /iot/device/downstream 接口(属性设置) => 成功
POST {{baseUrl}}/iot/device/downstream POST {{baseUrl}}/iot/device/downstream
Content-Type: application/json Content-Type: application/json
tenant-id: {{adminTenentId}} tenant-id: {{adminTenantId}}
Authorization: Bearer {{token}} Authorization: Bearer {{token}}
{ {

View File

@ -8,6 +8,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
@ -52,6 +53,8 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
@Resource @Resource
private IotDeviceProducer deviceProducer; private IotDeviceProducer deviceProducer;
@Resource
private IotDeviceMessageProducer deviceMessageProducer;
@Override @Override
public IotDeviceMessage downstreamDevice(IotDeviceDownstreamReqVO downstreamReqVO) { public IotDeviceMessage downstreamDevice(IotDeviceDownstreamReqVO downstreamReqVO) {
@ -150,26 +153,16 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
// TODO @super可优化过滤掉不合法的属性 // TODO @super可优化过滤掉不合法的属性
// 2. 发送请求 // 2. 发送请求
String url = String.format("sys/%s/%s/thing/service/property/set", // TODO @芋艿deviceName 的设置
getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); String deviceName = "xx";
IotDevicePropertySetReqDTO reqDTO = new IotDevicePropertySetReqDTO() Long tenantId = 1L;
.setProperties((Map<String, Object>) downstreamReqVO.getData()); cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage message = cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage
CommonResult<Boolean> result = requestPlugin(url, reqDTO, device); .of(getProductKey(device, parentDevice), getDeviceName(device, parentDevice), deviceName,
null, tenantId);
// 3. 发送设备消息 String serverId = "yy";
IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) deviceMessageProducer.sendGatewayDeviceMessage(serverId, message);
.setType(IotDeviceMessageTypeEnum.PROPERTY.getType()) // TODO @芋艿后续可以清理掉
.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier()) return null;
.setData(reqDTO.getProperties());
sendDeviceMessage(message, device, result.getCode());
// 4. 如果不成功抛出异常提示用户
if (result.isError()) {
log.error("[setDeviceProperty][设备({})属性设置失败,请求参数:({}),响应结果:({})]",
device.getDeviceKey(), reqDTO, result);
throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
}
return message;
} }
/** /**

View File

@ -32,7 +32,7 @@ public class IotDeviceMessage {
* *
* 其中%s 就是该server(protocol) 的标识 * 其中%s 就是该server(protocol) 的标识
*/ */
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "/%s"; public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
/** /**
* 请求编号 * 请求编号
@ -71,6 +71,7 @@ public class IotDeviceMessage {
* 例如说属性上报的 properties事件上报的 params * 例如说属性上报的 properties事件上报的 params
*/ */
private Object data; private Object data;
// TODO @芋艿可能会去掉
/** /**
* 响应码 * 响应码
* *
@ -100,6 +101,20 @@ public class IotDeviceMessage {
return this; return this;
} }
public IotDeviceMessage ofPropertySet(Map<String, Object> properties) {
this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier());
this.setData(properties);
return this;
}
public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey,
String serverId, Long tenantId) {
return of(productKey, deviceName, deviceKey,
null, null,
serverId, tenantId);
}
public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey, public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey,
String requestId, LocalDateTime reportTime, String requestId, LocalDateTime reportTime,
String serverId, Long tenantId) { String serverId, Long tenantId) {
@ -117,7 +132,7 @@ public class IotDeviceMessage {
// ========== Topic 相关 ========== // ========== Topic 相关 ==========
public static String getMessageBusGatewayDeviceMessageTopic(String serverId) { public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId); return String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
} }

View File

@ -30,7 +30,7 @@ public class IotDeviceMessageProducer {
* @param message 设备消息 * @param message 设备消息
*/ */
public void sendGatewayDeviceMessage(String serverId, Object message) { public void sendGatewayDeviceMessage(String serverId, Object message) {
messageBus.post(IotDeviceMessage.getMessageBusGatewayDeviceMessageTopic(serverId), message); messageBus.post(IotDeviceMessage.buildMessageBusGatewayDeviceMessageTopic(serverId), message);
} }
} }

View File

@ -1,12 +1,11 @@
package cn.iocoder.yudao.module.iot.net.component.http.config; package cn.iocoder.yudao.module.iot.net.component.http.config;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler; import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl; import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer; import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
@ -35,15 +34,6 @@ import org.springframework.context.event.EventListener;
}) })
public class IotNetComponentHttpAutoConfiguration { public class IotNetComponentHttpAutoConfiguration {
/**
* 组件 key
*/
private static final String PLUGIN_KEY = "http";
public IotNetComponentHttpAutoConfiguration() {
// 构造函数中不输出日志移到 initialize 方法中
}
/** /**
* 初始化 HTTP 组件 * 初始化 HTTP 组件
* *
@ -53,27 +43,30 @@ public class IotNetComponentHttpAutoConfiguration {
public void initialize(ApplicationStartedEvent event) { public void initialize(ApplicationStartedEvent event) {
log.info("[IotNetComponentHttpAutoConfiguration][开始初始化]"); log.info("[IotNetComponentHttpAutoConfiguration][开始初始化]");
// 从应用上下文中获取需要的 Bean // TODO @芋艿临时处理
IotNetComponentRegistry componentRegistry = event.getApplicationContext() IotMessageBus messageBus = event.getApplicationContext()
.getBean(IotNetComponentRegistry.class); .getBean(IotMessageBus.class);
IotNetComponentCommonProperties commonProperties = event.getApplicationContext() messageBus.register(new IotMessageBusSubscriber<IotDeviceMessage>() {
.getBean(IotNetComponentCommonProperties.class);
// 设置当前组件的核心标识 @Override
// 注意这里只为当前 HTTP 组件设置 pluginKey不影响其他组件 public String getTopic() {
// TODO @haohao多个会存在冲突的问题哇 return IotDeviceMessage.buildMessageBusGatewayDeviceMessageTopic("yy");
commonProperties.setPluginKey(PLUGIN_KEY); }
// HTTP 组件注册到组件注册表 @Override
componentRegistry.registerComponent( public String getGroup() {
PLUGIN_KEY, return "test";
SystemUtil.getHostInfo().getAddress(), }
0, // 内嵌模式固定为 0自动生成对应的 port 端口号
IotNetComponentCommonUtils.getProcessId());
log.info("[initialize][IoT HTTP 组件初始化完成]"); @Override
public void onMessage(IotDeviceMessage message) {
System.out.println(message);
}
});
} }
// TODO @芋艿貌似这里不用注册 bean
/** /**
* 创建 Vert.x 实例 * 创建 Vert.x 实例
* *

View File

@ -32,7 +32,7 @@ public class IotDeviceUpstreamServer extends AbstractVerticle {
private final IotDeviceMessageProducer deviceMessageProducer; private final IotDeviceMessageProducer deviceMessageProducer;
@Override @Override
public void start() throws Exception { public void start() {
start(Promise.promise()); start(Promise.promise());
} }