【功能新增】IoT: 添加 EMQX 插件,支持设备连接认证和 MQTT 连接参数获取,优化配置文件

This commit is contained in:
安浩浩 2025-02-20 18:30:57 +08:00
parent 8e7bbfe0da
commit ca95752266
32 changed files with 685 additions and 163 deletions

View File

@ -71,6 +71,14 @@ public interface IotDeviceUpstreamApi {
@PostMapping(PREFIX + "/add-topology")
CommonResult<Boolean> addDeviceTopology(@Valid @RequestBody IotDeviceTopologyAddReqDTO addReqDTO);
/**
* 认证 Emqx 连接
*
* @param authReqDTO 认证 Emqx 连接 DTO
*/
@PostMapping(PREFIX + "/authenticate-emqx-connection")
CommonResult<Boolean> authenticateEmqxConnection(@Valid @RequestBody IotDeviceEmqxAuthReqDTO authReqDTO);
// ========== 插件相关 ==========
/**

View File

@ -0,0 +1,32 @@
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
/**
* IoT 认证 Emqx 连接 Request DTO
*
* @author 芋道源码
*/
@Data
public class IotDeviceEmqxAuthReqDTO {
/**
* 客户端 ID
*/
@NotEmpty(message = "客户端 ID 不能为空")
private String clientId;
/**
* 用户名
*/
@NotEmpty(message = "用户名不能为空")
private String username;
/**
* 密码
*/
@NotEmpty(message = "密码不能为空")
private String password;
}

View File

@ -4,15 +4,14 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import jakarta.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import jakarta.annotation.Resource;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
/**
* * 设备数据 Upstream 上行 API 实现类
* * 设备数据 Upstream 上行 API 实现类
*/
@RestController
@Validated
@ -61,6 +60,12 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
return success(true);
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
Boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO);
return success(result);
}
// ========== 插件相关 ==========
@Override

View File

@ -7,8 +7,8 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceUpstreamReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceDownstreamService;
@ -177,4 +177,11 @@ public class IotDeviceController {
return success(true);
}
@GetMapping("/mqtt-connection-params")
@Operation(summary = "获取 MQTT 连接参数")
@PreAuthorize("@ss.hasPermission('iot:device:mqtt-connection-params')")
public CommonResult<IotDeviceMqttConnectionParamsRespVO> getMqttConnectionParams(@RequestParam("deviceId") Long deviceId) {
return success(deviceService.getMqttConnectionParams(deviceId));
}
}

View File

@ -168,4 +168,12 @@ public interface IotDeviceService {
*/
IotDeviceImportRespVO importDevice(List<IotDeviceImportExcelVO> importDevices, boolean updateSupport);
/**
* 获取 MQTT 连接参数
*
* @param deviceId 设备 ID
* @return MQTT 连接参数
*/
IotDeviceMqttConnectionParamsRespVO getMqttConnectionParams(Long deviceId);
}

View File

@ -20,6 +20,8 @@ import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.util.MqttSignUtils;
import cn.iocoder.yudao.module.iot.util.MqttSignUtils.MqttSignResult;
import jakarta.annotation.Resource;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@ -123,10 +125,8 @@ public class IotDeviceServiceImpl implements IotDeviceService {
.setDeviceType(product.getDeviceType());
// 生成并设置必要的字段
// TODO @芋艿各种 mqtt 是不是可以简化
device.setDeviceSecret(generateDeviceSecret())
.setMqttClientId(generateMqttClientId())
.setMqttUsername(generateMqttUsername(device.getDeviceName(), device.getProductKey()))
.setMqttPassword(generateMqttPassword());
// clientIdusernamepassword 根据规则实时生成
device.setDeviceSecret(generateDeviceSecret());
// 设置设备状态为未激活
device.setState(IotDeviceStateEnum.INACTIVE.getState());
}
@ -318,35 +318,6 @@ public class IotDeviceServiceImpl implements IotDeviceService {
return IdUtil.fastSimpleUUID();
}
/**
* 生成 MQTT Client ID
*
* @return 生成的 MQTT Client ID
*/
private String generateMqttClientId() {
return IdUtil.fastSimpleUUID();
}
/**
* 生成 MQTT Username
*
* @param deviceName 设备名称
* @param productKey 产品 Key
* @return 生成的 MQTT Username
*/
private String generateMqttUsername(String deviceName, String productKey) {
return deviceName + "&" + productKey;
}
/**
* 生成 MQTT Password
*
* @return 生成的 MQTT Password
*/
private String generateMqttPassword() {
return RandomUtil.randomString(32);
}
@Override
@Transactional(rollbackFor = Exception.class) // 添加事务异常则回滚所有导入
public IotDeviceImportRespVO importDevice(List<IotDeviceImportExcelVO> importDevices, boolean updateSupport) {
@ -417,6 +388,17 @@ public class IotDeviceServiceImpl implements IotDeviceService {
return respVO;
}
@Override
public IotDeviceMqttConnectionParamsRespVO getMqttConnectionParams(Long deviceId) {
IotDeviceDO device = validateDeviceExists(deviceId);
MqttSignResult mqttSignResult = MqttSignUtils.calculate(device.getProductKey(), device.getDeviceName(),
device.getDeviceSecret());
return new IotDeviceMqttConnectionParamsRespVO()
.setMqttClientId(mqttSignResult.getClientId())
.setMqttUsername(mqttSignResult.getUsername())
.setMqttPassword(mqttSignResult.getPassword());
}
private void deleteDeviceCache(IotDeviceDO device) {
// 保证 Spring AOP 触发
getSelf().deleteDeviceCache0(device);

View File

@ -62,4 +62,11 @@ public interface IotDeviceUpstreamService {
*/
void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO);
/**
* Emqx 连接认证
*
* @param authReqDTO Emqx 连接认证 DTO
*/
Boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO);
}

View File

@ -20,6 +20,8 @@ import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import cn.iocoder.yudao.module.iot.util.MqttSignUtils;
import cn.iocoder.yudao.module.iot.util.MqttSignUtils.MqttSignResult;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -58,25 +60,26 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
// 2.1 情况一属性上报
String requestId = IdUtil.fastSimpleUUID();
if (Objects.equals(simulatorReqVO.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType())) {
reportDeviceProperty(((IotDevicePropertyReportReqDTO)
new IotDevicePropertyReportReqDTO().setRequestId(requestId).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
reportDeviceProperty(((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
.setRequestId(requestId).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setProperties((Map<String, Object>) simulatorReqVO.getData()));
return;
}
// 2.2 情况二事件上报
if (Objects.equals(simulatorReqVO.getType(), IotDeviceMessageTypeEnum.EVENT.getType())) {
reportDeviceEvent(((IotDeviceEventReportReqDTO)
new IotDeviceEventReportReqDTO().setRequestId(requestId).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setIdentifier(simulatorReqVO.getIdentifier()).setParams((Map<String, Object>) simulatorReqVO.getData()));
reportDeviceEvent(((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId)
.setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setIdentifier(simulatorReqVO.getIdentifier())
.setParams((Map<String, Object>) simulatorReqVO.getData()));
return;
}
// 2.3 情况三状态变更
if (Objects.equals(simulatorReqVO.getType(), IotDeviceMessageTypeEnum.STATE.getType())) {
updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO()
.setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setState((Integer) simulatorReqVO.getData()));
return;
}
@ -277,6 +280,37 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
sendDeviceMessage(message, device);
}
@Override
public Boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
log.info("[authenticateEmqxConnection][认证 Emqx 连接: {}]", authReqDTO);
// 1. 校验设备是否存在
// username 格式${DeviceName}&${ProductKey}
String[] usernameParts = authReqDTO.getUsername().split("&");
if (usernameParts.length != 2) {
log.error("[authenticateEmqxConnection][认证失败username 格式不正确]");
return Boolean.FALSE;
}
String deviceName = usernameParts[0];
String productKey = usernameParts[1];
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
productKey, deviceName);
if (device == null) {
log.error("[authenticateEmqxConnection][设备({}/{}) 不存在]",
productKey, deviceName);
return Boolean.FALSE;
}
// 2. 校验密码
String deviceSecret = device.getDeviceSecret();
String clientId = authReqDTO.getClientId();
MqttSignResult sign = MqttSignUtils.calculate(productKey, deviceName, deviceSecret, clientId);
if (!StrUtil.equals(sign.getPassword(), authReqDTO.getPassword())) {
log.error("[authenticateEmqxConnection][认证失败,密码不正确]");
return Boolean.FALSE;
}
log.info("[authenticateEmqxConnection][认证成功]");
return Boolean.TRUE;
}
private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) {
// 1. 异步记录设备与插件实例的映射
pluginInstanceService.updateDevicePluginInstanceProcessIdAsync(device.getDeviceKey(), reqDTO.getProcessId());

View File

@ -0,0 +1,96 @@
package cn.iocoder.yudao.module.iot.util;
import lombok.Getter;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
/**
* MQTT 签名工具类
* 提供静态方法来计算 MQTT 连接参数
*/
public class MqttSignUtils {
private static final String SIGN_METHOD = "hmacsha256";
/**
* 计算 MQTT 连接参数
*
* @param productKey 产品密钥
* @param deviceName 设备名称
* @param deviceSecret 设备密钥
* @return 包含 clientId, username, password 的结果对象
*/
public static MqttSignResult calculate(String productKey, String deviceName, String deviceSecret) {
String clientId = productKey + "." + deviceName;
String username = deviceName + "&" + productKey;
String signContent = String.format("clientId%sdeviceName%sdeviceSecret%sproductKey%s",
clientId, deviceName, deviceSecret, productKey);
String password = sign(signContent, deviceSecret);
return new MqttSignResult(clientId, username, password);
}
/**
* 计算 MQTT 连接参数
*
* @param productKey 产品密钥
* @param deviceName 设备名称
* @param deviceSecret 设备密钥
* @param clientId 客户端 ID
* @return 包含 clientId, username, password 的结果对象
*/
public static MqttSignResult calculate(String productKey, String deviceName, String deviceSecret, String clientId) {
String username = deviceName + "&" + productKey;
String signContentBuilder = "clientId" + clientId +
"deviceName" + deviceName +
"deviceSecret" + deviceSecret +
"productKey" + productKey;
String password = sign(signContentBuilder, deviceSecret);
return new MqttSignResult(clientId, username, password);
}
private static String sign(String content, String key) {
try {
Mac mac = Mac.getInstance(SIGN_METHOD);
mac.init(new SecretKeySpec(key.getBytes(StandardCharsets.UTF_8), SIGN_METHOD));
byte[] signData = mac.doFinal(content.getBytes(StandardCharsets.UTF_8));
return bytesToHex(signData);
} catch (Exception e) {
throw new RuntimeException("Failed to sign content with HmacSHA256", e);
}
}
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder(bytes.length * 2);
for (byte b : bytes) {
String hex = Integer.toHexString(0xFF & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
/**
* MQTT 签名结果类
*/
@Getter
public static class MqttSignResult {
private final String clientId;
private final String username;
private final String password;
public MqttSignResult(String clientId, String username, String password) {
this.clientId = clientId;
this.username = username;
this.password = password;
}
}
}

View File

@ -44,8 +44,8 @@ public class IotPluginCommonAutoConfiguration {
@Bean(initMethod = "init", destroyMethod = "stop")
public IotPluginInstanceHeartbeatJob pluginInstanceHeartbeatJob(
IotDeviceUpstreamApi deviceDataApi, IotDeviceDownstreamServer deviceDownstreamServer) {
return new IotPluginInstanceHeartbeatJob(deviceDataApi, deviceDownstreamServer);
IotDeviceUpstreamApi deviceDataApi, IotDeviceDownstreamServer deviceDownstreamServer, IotPluginCommonProperties commonProperties) {
return new IotPluginInstanceHeartbeatJob(deviceDataApi, deviceDownstreamServer, commonProperties);
}
}

View File

@ -7,6 +7,11 @@ import org.springframework.validation.annotation.Validated;
import java.time.Duration;
/**
* IoT 插件的通用配置类
*
* @author haohao
*/
@ConfigurationProperties(prefix = "yudao.iot.plugin.common")
@Validated
@Data
@ -45,4 +50,10 @@ public class IotPluginCommonProperties {
*/
private Integer downstreamPort = DOWNSTREAM_PORT_RANDOM;
/**
* 插件包标识符
*/
@NotEmpty(message = "插件包标识符不能为空")
private String pluginKey;
}

View File

@ -16,7 +16,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IOT 设备配置设置 Vertx Handler
* IoT 设备配置设置 Vertx Handler
*
* 芋道源码
*/

View File

@ -5,14 +5,19 @@ import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceOt
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.vertx.core.json.JsonObject;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 设备 OTA 升级 Vertx Handler
* <p>
* 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotDeviceOtaUpgradeVertxHandler implements Handler<RoutingContext> {

View File

@ -16,7 +16,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IOT 设备服务获取 Vertx Handler
* IoT 设备服务获取 Vertx Handler
*
* 芋道源码
*/

View File

@ -16,7 +16,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IOT 设备服务设置 Vertx Handler
* IoT 设备服务设置 Vertx Handler
*
* 芋道源码
*/

View File

@ -5,17 +5,18 @@ import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceSe
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import io.vertx.core.json.JsonObject;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IOT 设备服务调用 Vertx Handler
* IoT 设备服务调用 Vertx Handler
*
* 芋道源码
*/

View File

@ -4,6 +4,7 @@ import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import lombok.RequiredArgsConstructor;
@ -23,6 +24,7 @@ public class IotPluginInstanceHeartbeatJob {
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotDeviceDownstreamServer deviceDownstreamServer;
private final IotPluginCommonProperties commonProperties;
public void init() {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(buildPluginInstanceHeartbeatReqDTO(true));
@ -41,9 +43,8 @@ public class IotPluginInstanceHeartbeatJob {
}
private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(Boolean online) {
// TODO @haohaopluginKey 的获取
return new IotPluginInstanceHeartbeatReqDTO()
.setPluginKey("yudao-module-iot-plugin-http").setProcessId(IotPluginCommonUtils.getProcessId())
.setPluginKey(commonProperties.getPluginKey()).setProcessId(IotPluginCommonUtils.getProcessId())
.setHostIp(SystemUtil.getHostInfo().getAddress()).setDownstreamPort(deviceDownstreamServer.getPort())
.setOnline(online);
}

View File

@ -57,6 +57,12 @@ public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi {
return null;
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/authenticate-emqx-connection";
return doPost(url, authReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
String url = properties.getUpstreamUrl() + URL_PREFIX + "/report-property";

View File

@ -41,4 +41,12 @@ public class IotPluginCommonUtils {
.end(JsonUtils.toJsonString(result));
}
@SuppressWarnings("deprecation")
public static void writeJson(RoutingContext routingContext, String result) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(result);
}
}

View File

@ -1,6 +1,6 @@
plugin.id=plugin-emqx
plugin.class=cn.iocoder.yudao.module.iot.plugin.EmqxPlugin
plugin.id=yudao-module-iot-plugin-emqx
plugin.class=cn.iocoder.yudao.module.iot.plugin.emqx.config.IotEmqxPlugin
plugin.version=1.0.0
plugin.provider=ahh
plugin.provider=yudao
plugin.dependencies=
plugin.description=plugin-emqx-1.0.0
plugin.description=yudao-module-iot-plugin-emqx-1.0.0

View File

@ -12,6 +12,7 @@
<packaging>jar</packaging>
<artifactId>yudao-module-iot-plugin-emqx</artifactId>
<version>1.0.0</version>
<name>${project.artifactId}</name>
<description>
@ -21,36 +22,16 @@
<properties>
<!-- 插件相关 -->
<plugin.id>emqx-plugin</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.EmqxPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.provider>ahh</plugin.provider>
<plugin.description>emqx-plugin-0.0.1</plugin.description>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.emqx.config.IotEmqxPlugin</plugin.class>
<plugin.version>${project.version}</plugin.version>
<plugin.provider>yudao</plugin.provider>
<plugin.description>${project.artifactId}-${project.version}</plugin.description>
<plugin.dependencies/>
</properties>
<build>
<plugins>
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>read-project-properties</goal>
</goals>
<configuration>
<files>
<file>plugin.properties</file>
</files>
</configuration>
</execution>
</executions>
</plugin>
-->
<!-- 插件模式 zip -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@ -94,6 +75,7 @@
</executions>
</plugin>
<!-- 插件模式 jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
@ -111,54 +93,72 @@
</archive>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <version>3.6.0</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <minimizeJar>true</minimizeJar>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- <configuration>-->
<!-- <archive>-->
<!-- <manifestEntries>-->
<!-- <Plugin-Id>${plugin.id}</Plugin-Id>-->
<!-- <Plugin-Class>${plugin.class}</Plugin-Class>-->
<!-- <Plugin-Version>${plugin.version}</Plugin-Version>-->
<!-- <Plugin-Provider>${plugin.provider}</Plugin-Provider>-->
<!-- <Plugin-Description>${plugin.description}</Plugin-Description>-->
<!-- <Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>-->
<!-- </manifestEntries>-->
<!-- </archive>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- 独立模式 -->
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>-standalone</classifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<!-- 其他依赖项 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-plugin-common</artifactId>
<version>${revision}</version>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- PF4J Spring 集成 -->
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<scope>provided</scope>
</dependency>
<!-- 项目依赖 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Vert.x 核心依赖 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<!-- Vert.x Web 模块 -->
<!-- 工具类相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,42 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class EmqxPlugin extends Plugin {
private ExecutorService executorService;
public EmqxPlugin(PluginWrapper wrapper) {
super(wrapper);
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
public void start() {
log.info("EmqxPlugin.start()");
if (executorService.isShutdown() || executorService.isTerminated()) {
executorService = Executors.newSingleThreadExecutor();
}
IotDeviceUpstreamApi deviceDataApi = SpringUtil.getBean(IotDeviceUpstreamApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
}
@Override
public void stop() {
log.info("EmqxPlugin.stop()");
}
}

View File

@ -0,0 +1,22 @@
package cn.iocoder.yudao.module.iot.plugin.emqx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* IoT Emqx 插件的独立运行入口
*/
@Slf4j
@SpringBootApplication
public class IotEmqxPluginApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(IotEmqxPluginApplication.class);
application.setWebApplicationType(WebApplicationType.NONE);
application.run(args);
log.info("[main][独立模式启动完成]");
}
}

View File

@ -0,0 +1,57 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.config;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPlugin;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
// TODO @芋艿完善注释
/**
* 负责插件的启动和停止
*/
@Slf4j
public class IotEmqxPlugin extends SpringPlugin {
public IotEmqxPlugin(PluginWrapper wrapper) {
super(wrapper);
}
@Override
public void start() {
log.info("[EmqxPlugin][EmqxPlugin 插件启动开始...]");
try {
log.info("[EmqxPlugin][EmqxPlugin 插件启动成功...]");
} catch (Exception e) {
log.error("[EmqxPlugin][EmqxPlugin 插件开启动异常...]", e);
}
}
@Override
public void stop() {
log.info("[EmqxPlugin][EmqxPlugin 插件停止开始...]");
try {
log.info("[EmqxPlugin][EmqxPlugin 插件停止成功...]");
} catch (Exception e) {
log.error("[EmqxPlugin][EmqxPlugin 插件停止异常...]", e);
}
}
@Override
protected ApplicationContext createApplicationContext() {
// 创建插件自己的 ApplicationContext
AnnotationConfigApplicationContext pluginContext = new AnnotationConfigApplicationContext();
// 设置父容器为主应用的 ApplicationContext 确保主应用中提供的类可用
pluginContext.setParent(SpringUtil.getApplicationContext());
// 继续使用插件自己的 ClassLoader 以加载插件内部的类
pluginContext.setClassLoader(getWrapper().getPluginClassLoader());
// 扫描当前插件的自动配置包
pluginContext.scan("cn.iocoder.yudao.module.iot.plugin.emqx.config");
pluginContext.refresh();
return pluginContext;
}
}

View File

@ -0,0 +1,35 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.emqx.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.IotDeviceUpstreamServer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* IoT 插件 Emqx 的专用自动配置类
*
* @author haohao
*/
@Configuration
@EnableConfigurationProperties(IotPluginEmqxProperties.class)
public class IotPluginEmqxAutoConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
IotPluginCommonProperties commonProperties,
IotPluginEmqxProperties emqxProperties,
IotDeviceDownstreamServer deviceDownstreamServer) {
return new IotDeviceUpstreamServer(commonProperties, emqxProperties, deviceUpstreamApi, deviceDownstreamServer);
}
@Bean
public IotDeviceDownstreamHandler deviceDownstreamHandler() {
return new IotDeviceDownstreamHandlerImpl();
}
}

View File

@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* 物联网插件 - EMQX 配置
*
* @author 芋道源码
*/
@ConfigurationProperties(prefix = "yudao.iot.plugin.emqx")
@Validated
@Data
public class IotPluginEmqxProperties {
/**
* 服务主机
*/
private String host;
/**
* 服务端口
*/
private int port;
/**
* 是否启用 SSL
*/
private boolean ssl;
/**
* 订阅的主题
*/
private String topics;
/**
* 认证端口
*/
private int authPort;
}

View File

@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
/**
* EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
* <p>
* 但是由于设备通过 HTTP 短链接接入导致其实无法下行指导给 device 设备所以基本都是直接返回失败
* 类似 MQTTWebSocketTCP 插件是可以实现下行指令的
*
* @author 芋道源码
*/
public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
return CommonResult.success(true);
}
}

View File

@ -0,0 +1,83 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 设备下行服务端接收来自 device 设备的请求转发给 server 服务器
* <p>
* 协议HTTP
*
* @author haohao
*/
@Slf4j
public class IotDeviceUpstreamServer {
private final Vertx vertx;
private final HttpServer server;
private final IotPluginEmqxProperties emqxProperties;
public IotDeviceUpstreamServer(IotPluginCommonProperties commonProperties,
IotPluginEmqxProperties emqxProperties,
IotDeviceUpstreamApi deviceUpstreamApi,
IotDeviceDownstreamServer deviceDownstreamServer) {
this.emqxProperties = emqxProperties;
// 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 创建 Router 实例
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body
router.post(IotDeviceAuthVertxHandler.PATH)
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
}
/**
* 启动 HTTP 服务器
*/
public void start() {
log.info("[start][开始启动]");
server.listen(emqxProperties.getAuthPort())
.toCompletionStage()
.toCompletableFuture()
.join();
log.info("[start][启动完成,端口({})]", this.server.actualPort());
}
/**
* 停止所有
*/
public void stop() {
log.info("[stop][开始关闭]");
try {
// 关闭 HTTP 服务器
if (server != null) {
server.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
// 关闭 Vertx 实例
if (vertx != null) {
vertx.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
log.info("[stop][关闭完成]");
} catch (Exception e) {
log.error("[stop][关闭异常]", e);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,54 @@
package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEmqxAuthReqDTO;
import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT Emqx 连接认证的 Vert.x Handler
* <a href="https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">...</a>
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/mqtt/auth";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@Override
@SuppressWarnings("unchecked")
public void handle(RoutingContext routingContext) {
JsonObject json = routingContext.body().asJsonObject();
String clientId = json.getString("clientid");
String username = json.getString("username");
String password = json.getString("password");
IotDeviceEmqxAuthReqDTO authReqDTO = buildDeviceEmqxAuthReqDTO(clientId, username, password);
CommonResult<Boolean> authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
if (authResult.getCode() != 0 || !authResult.getData()) {
denyAccess(routingContext);
return;
}
IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"allow\"}");
}
private void denyAccess(RoutingContext routingContext) {
IotPluginCommonUtils.writeJson(routingContext, "{\"result\": \"deny\"}");
}
private IotDeviceEmqxAuthReqDTO buildDeviceEmqxAuthReqDTO(String clientId, String username, String password) {
return new IotDeviceEmqxAuthReqDTO().setClientId(clientId).setUsername(username).setPassword(password);
}
}

View File

@ -0,0 +1,17 @@
spring:
application:
name: yudao-module-iot-plugin-emqx
yudao:
iot:
plugin:
common:
upstream-url: http://127.0.0.1:48080
downstream-port: 8100
plugin-key: yudao-module-iot-plugin-emqx
emqx:
host: 127.0.0.1
port: 1883
ssl: false
topics: "/sys/#"
auth-port: 8101

View File

@ -21,7 +21,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 设备设备上报的 Vert.x Handler
* IoT 设备事件上报的 Vert.x Handler
*/
@RequiredArgsConstructor
@Slf4j

View File

@ -8,5 +8,6 @@ yudao:
common:
upstream-url: http://127.0.0.1:48080
downstream-port: 8093
plugin-key: yudao-module-iot-plugin-http
http:
server-port: 8092