提交 1c3df64e 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

MQ消费者使用顺序消费模式

上级 c5692772
...@@ -5,9 +5,7 @@ import cn.ibizlab.core.extensions.service.RuleEngineExService; ...@@ -5,9 +5,7 @@ import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -21,7 +19,9 @@ import java.util.List; ...@@ -21,7 +19,9 @@ import java.util.List;
*/ */
@Slf4j @Slf4j
@Component @Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
private Object lockObject =new Object();
@Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}") @Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}")
private String ruleEngineTopic; private String ruleEngineTopic;
...@@ -29,40 +29,34 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrentl ...@@ -29,40 +29,34 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrentl
@Autowired @Autowired
private RuleEngineExService ruleEngineExService; private RuleEngineExService ruleEngineExService;
/**
* 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
* @param msgList
* @param consumeConcurrentlyContext
* @return
*/
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(msgList)) { synchronized (lockObject) {
log.info("MQ接收消息为空,直接返回成功"); if (CollectionUtils.isEmpty(list)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; log.info("MQ接收消息为空,直接返回成功");
} return ConsumeOrderlyStatus.SUCCESS;
MessageExt messageExt = msgList.get(0); }
log.info("MQ接收到的消息为:" + messageExt.toString()); MessageExt messageExt = list.get(0);
try { log.info("MQ接收到的消息为:" + messageExt.toString());
String topic = messageExt.getTopic(); try {
String tags = messageExt.getTags(); String topic = messageExt.getTopic();
String body = new String(messageExt.getBody(), "utf-8"); String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body); log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}",e); log.error("获取MQ消息内容异常{}", e);
} }
// 处理规则引擎构建消息 // 处理规则引擎构建消息
if(ruleEngineTopic.equalsIgnoreCase(messageExt.getTopic())){ if (ruleEngineTopic.equalsIgnoreCase(messageExt.getTopic())) {
this.processRuleEngineData(messageExt); this.processRuleEngineData(messageExt);
}
return ConsumeOrderlyStatus.SUCCESS;
} }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
protected void processRuleEngineData(MessageExt messageExt){ protected void processRuleEngineData(MessageExt messageExt){
try { try {
String body = new String(messageExt.getBody(), "utf-8"); String body = new String(messageExt.getBody(), "utf-8");
...@@ -73,4 +67,5 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrentl ...@@ -73,4 +67,5 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrentl
log.error("获取MQ消息内容异常{}",e); log.error("获取MQ消息内容异常{}",e);
} }
} }
} }
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册