提交 f70ebd51 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

test

上级 d5ae30d6
...@@ -31,38 +31,38 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly { ...@@ -31,38 +31,38 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
@Override @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
synchronized (lockObject) { if (CollectionUtils.isEmpty(list)) {
if (CollectionUtils.isEmpty(list)) { log.info("MQ接收消息为空,直接返回成功");
log.info("MQ接收消息为空,直接返回成功"); return ConsumeOrderlyStatus.SUCCESS;
return ConsumeOrderlyStatus.SUCCESS; }
} System.out.println(System.currentTimeMillis());
MessageExt messageExt = list.get(0); MessageExt messageExt = list.get(0);
log.info("MQ接收到的消息为:" + messageExt.toString()); log.info("MQ接收到的消息为:" + messageExt.toString());
try { try {
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8"); String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body); log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}", e); log.error("获取MQ消息内容异常{}", e);
} }
// 处理规则引擎构建消息 // 处理规则引擎构建消息
if (ruleEngineTopic.equalsIgnoreCase(messageExt.getTopic())) { if (ruleEngineTopic.equalsIgnoreCase(messageExt.getTopic())) {
this.processRuleEngineData(messageExt); this.processRuleEngineData(messageExt);
}
return ConsumeOrderlyStatus.SUCCESS;
} }
return ConsumeOrderlyStatus.SUCCESS;
} }
protected void processRuleEngineData(MessageExt messageExt){ protected void processRuleEngineData(MessageExt messageExt){
try { try {
String body = new String(messageExt.getBody(), "utf-8"); String body = new String(messageExt.getBody(), "utf-8");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {}); Thread.sleep(10000);
ruleEngineExService.processData(engineMQMsg); // EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
// ruleEngineExService.processData(engineMQMsg);
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}",e); log.error("获取MQ消息内容异常{}",e);
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册