提交 012b3c12 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

临时关闭引擎自动构建,添加临时订阅主体测试功能

上级 3630bef7
package cn.ibizlab.core.message;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.DABuildExService;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
* 临时消息主题订阅功能
*/
@Slf4j
@Component
@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!='' && ${rocketmq.consumer.tempTopicEnabled:true}")
@RocketMQMessageListener(nameServer = "${rocketmq.consumer.namesrvAddr:}",
topic = "${rocketmq.consumer.tempTopic:DST_DATA_WSXF_TYYW_KG_XFJSDJ}",
selectorExpression="*",
consumerGroup = "${rocketmq.consumer.tempTopicGroupName:TEMPTOPICCG}",
consumeThreadMax = 1,
consumeMode = ConsumeMode.ORDERLY
)
public class TempMQMsgListener implements RocketMQListener<MessageExt> {
@Value("${rocketmq.consumer.tempTopicEngineId:aa3d79b84396c4f4f2663cafd09a0d2b}")
private String tempTopicEngineId;
@Autowired
@Lazy
private RuleEngineExService ruleEngineExService;
@Autowired
@Lazy
private DABuildExService daBuildExService;
@Override
public void onMessage(MessageExt messageExt) {
try {
log.info("MQ消息topic={}, tags={}, 消息内容={}", messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId());
String body = new String(messageExt.getBody(), "utf-8");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
engineMQMsg.setEngineId(tempTopicEngineId);
log.info(engineMQMsg.toString());
ruleEngineExService.processData(engineMQMsg);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
}
}
\ No newline at end of file
......@@ -26,9 +26,9 @@ import java.util.concurrent.ConcurrentHashMap;
import static cn.ibizlab.core.enums.RuleEnginerParamEnum.*;
@Component
@Slf4j
@ConditionalOnExpression("${rocketmq.consumer.enabled:false}")
//@Component
//@ConditionalOnExpression("${rocketmq.consumer.enabled:false}")
public class RocktcermqCustmerRunner implements ApplicationRunner {
@Autowired
private RuleEngineMapper ruleEngineMapper;
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册