【代码评审】IoT:rocketmq 数据桥接的接入
This commit is contained in:
parent
41c41bc6c3
commit
8e7bbfe0da
|
@ -146,32 +146,28 @@ public class IotDataBridgeDO extends BaseDO {
|
||||||
* RocketMQ 名称服务器地址
|
* RocketMQ 名称服务器地址
|
||||||
*/
|
*/
|
||||||
private String nameServer;
|
private String nameServer;
|
||||||
|
|
||||||
/**
|
|
||||||
* 生产者/消费者组
|
|
||||||
*/
|
|
||||||
private String group;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 主题
|
|
||||||
*/
|
|
||||||
private String topic;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 标签
|
|
||||||
*/
|
|
||||||
private String tags;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 访问密钥
|
* 访问密钥
|
||||||
*/
|
*/
|
||||||
private String accessKey;
|
private String accessKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 秘密钥匙
|
* 秘密钥匙
|
||||||
*/
|
*/
|
||||||
private String secretKey;
|
private String secretKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 生产者组
|
||||||
|
*/
|
||||||
|
private String group;
|
||||||
|
/**
|
||||||
|
* 主题
|
||||||
|
*/
|
||||||
|
private String topic;
|
||||||
|
/**
|
||||||
|
* 标签
|
||||||
|
*/
|
||||||
|
private String tags;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,39 +143,38 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
|
private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
|
||||||
// 1. 创建生产者实例,指定生产者组名
|
// 1.1 创建生产者实例,指定生产者组名
|
||||||
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
|
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
|
||||||
|
// TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer
|
||||||
try {
|
try {
|
||||||
// 2. 设置 NameServer 地址
|
// 1.2 设置 NameServer 地址
|
||||||
producer.setNamesrvAddr(config.getNameServer());
|
producer.setNamesrvAddr(config.getNameServer());
|
||||||
// 3. 启动生产者
|
// 1.3 启动生产者
|
||||||
producer.start();
|
producer.start();
|
||||||
// 4. 创建消息对象,指定Topic、Tag和消息体
|
|
||||||
|
// 2.1 创建消息对象,指定Topic、Tag和消息体
|
||||||
Message msg = new Message(
|
Message msg = new Message(
|
||||||
config.getTopic(),
|
config.getTopic(),
|
||||||
config.getTags(),
|
config.getTags(),
|
||||||
message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
|
message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
|
||||||
);
|
);
|
||||||
|
// 2.2 发送同步消息并处理结果
|
||||||
// 5. 发送同步消息并处理结果
|
|
||||||
SendResult sendResult = producer.send(msg);
|
SendResult sendResult = producer.send(msg);
|
||||||
// 6. 处理发送结果
|
// 2.3 处理发送结果
|
||||||
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
|
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
|
||||||
log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]",
|
log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
|
||||||
message, config, sendResult);
|
|
||||||
} else {
|
} else {
|
||||||
log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]",
|
log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
|
||||||
message, config, sendResult);
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[executeRocketMQ][message({}) config({}) 发送异常]",
|
log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e);
|
||||||
message, config, e);
|
|
||||||
} finally {
|
} finally {
|
||||||
// 7. 关闭生产者
|
// 3. 关闭生产者
|
||||||
producer.shutdown();
|
producer.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO @芋艿:测试代码,后续清理
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
// 1. 创建 IotRuleSceneDataBridgeAction 实例
|
// 1. 创建 IotRuleSceneDataBridgeAction 实例
|
||||||
IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction();
|
IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction();
|
||||||
|
|
Loading…
Reference in New Issue