提交 3630bef7 编写于 作者: sq3536's avatar sq3536

换装rocketmq template

上级 b072597e
......@@ -108,8 +108,8 @@
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<!--达梦数据库-->
......
package cn.ibizlab.core.message;
import cn.ibizlab.core.extensions.cql.ExecResultRepository;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Slf4j
@Component
@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!='' && ${rocketmq.consumer.enabled:false}")
@RocketMQMessageListener(nameServer = "${rocketmq.consumer.namesrvAddr:}",
topic = "${rocketmq.producer.resultsTopic:DSTRESULTSMSG}",
selectorExpression="*",
consumerGroup = "${rocketmq.consumer.resultsGroupName:resultsCG}",
consumeThreadMax = 20,
consumeMode = ConsumeMode.CONCURRENTLY
)
public class ResultMQMsgListener implements RocketMQListener<MessageExt> {
@Autowired(required = false)
@Lazy
private ExecResultRepository execResultRepository;
@Override
public void onMessage(MessageExt messageExt) {
try {
log.info("MQ消息topic={}, tags={}, 消息内容={}", messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId());
String body = new String(messageExt.getBody(), "utf-8");
String tags = messageExt.getTags();
ResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<ResultsMQMsg>() {});
if(!StringUtils.isEmpty(resultsMQMsg.getKeyValueField()))
execResultRepository.saveResultsMQMsg(resultsMQMsg);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
}
}
\ No newline at end of file
package cn.ibizlab.core.message;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.DABuildExService;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!='' && ${rocketmq.consumer.enabled:false}")
@RocketMQMessageListener(nameServer = "${rocketmq.consumer.namesrvAddr:}",
topic = "${rocketmq.producer.ruleEngineTopic:DSTMSG}",
selectorExpression="*",
consumerGroup = "${rocketmq.consumer.ruleEngineGroupName:dstCG}",
consumeThreadMax = 1,
consumeMode = ConsumeMode.ORDERLY
)
public class RuleEngineMQMsgListener implements RocketMQListener<MessageExt> {
@Autowired
@Lazy
private RuleEngineExService ruleEngineExService;
@Autowired
@Lazy
private DABuildExService daBuildExService;
@Override
public void onMessage(MessageExt messageExt) {
try {
log.info("MQ消息topic={}, tags={}, 消息内容={}", messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId());
String body = new String(messageExt.getBody(), "utf-8");
String tags = messageExt.getTags();
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
System.out.println(engineMQMsg.toString());
if("Engine".equalsIgnoreCase(tags)) {
ruleEngineExService.processData(engineMQMsg);
}else if("Build".equalsIgnoreCase(tags)){
daBuildExService.processData(engineMQMsg);
}
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
}
}
\ No newline at end of file
......@@ -16,8 +16,8 @@ import org.springframework.context.annotation.Configuration;
* mq消费者配置
*/
@Slf4j
@Configuration
@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!=''")
//@Configuration
//@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!=''")
public class MQConsumerConfigure {
@Value("${rocketmq.consumer.ruleEngineGroupName:dstCG}")
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册