提交 8804c18e 编写于 作者: fengmin's avatar fengmin

mq消息订阅实现4

上级 a0dc2381
...@@ -2,9 +2,10 @@ package cn.ibizlab.core.util.config; ...@@ -2,9 +2,10 @@ package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.util.GeneralConsumeMsgListenerProcessor; import cn.ibizlab.core.extensions.util.GeneralConsumeMsgListenerProcessor;
import cn.ibizlab.core.rule.domain.RuleEngine; import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.mapper.RuleEngineExtendMapper; import cn.ibizlab.core.rule.mapper.RuleEngineMapper;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -27,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -27,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
@ConditionalOnExpression("${rocketmq.consumer.enabled}&&'${rocketmq.consumer.namesrvAddr:}'!=''") @ConditionalOnExpression("${rocketmq.consumer.enabled}&&'${rocketmq.consumer.namesrvAddr:}'!=''")
public class RocktcermqCustmerRunner implements ApplicationRunner { public class RocktcermqCustmerRunner implements ApplicationRunner {
@Autowired @Autowired
private RuleEngineExtendMapper ruleEngineMapper; private RuleEngineMapper ruleEngineMapper;
@Value("${rocketmq.consumer.namesrvAddr:}") @Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr; private String namesrvAddr;
...@@ -36,36 +37,37 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -36,36 +37,37 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
List<RuleEngine> ruleEngines = ruleEngineMapper.selectCustomerList(); List<RuleEngine> ruleEngines = ruleEngineMapper.selectList(new LambdaQueryWrapper<RuleEngine>().select(RuleEngine::getEngineId, RuleEngine::getExtParams));
if (!StringUtils.isEmpty(ruleEngines)) { if (!StringUtils.isEmpty(ruleEngines)) {
for (int i = 0, len = ruleEngines.size(); i < len; i++) { for (int i = 0, len = ruleEngines.size(); i < len; i++) {
int finalI = i; int finalI = i;
if (isAutoCreate(ruleEngines.get(finalI).getExtParams())) { String topic = getParamTopic(ruleEngines.get(finalI).getExtParams());
log.info("ruleEngineConsumer 正在创建---------------------------------------"); if (isAutoCreate(ruleEngines.get(finalI).getExtParams()) && topic != null) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ruleEngines.get(finalI).getEngineId()); log.info("ruleEngineConsumer 正在创建---------------------------------------");
consumer.setNamesrvAddr(namesrvAddr); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ruleEngines.get(finalI).getEngineId());
consumer.setConsumeThreadMin(1); consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1);
consumer.setConsumeMessageBatchMaxSize(1); consumer.setConsumeThreadMax(1);
// 设置监听 consumer.setConsumeMessageBatchMaxSize(1);
GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(finalI).getEngineId()); // 设置监听
consumer.registerMessageListener(messageListener); GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(finalI).getEngineId());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(messageListener);
consumer.setInstanceName("dstMQMsgConsumer"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try { consumer.setInstanceName("dstMQMsgConsumer");
consumer.subscribe(ruleEngines.get(finalI).getModelId(), "*"); try {
consumer.start(); consumer.subscribe(topic, "*");
if (dataMap.get(ruleEngines.get(finalI)) == null) { consumer.start();
dataMap.put(ruleEngines.get(finalI).getEngineId(), consumer); if (dataMap.get(ruleEngines.get(finalI)) == null) {
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr); dataMap.put(ruleEngines.get(finalI).getEngineId(), consumer);
} else { log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr);
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr); } else {
} log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr);
} catch (MQClientException e) {
e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!");
} }
} catch (MQClientException e) {
e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!");
} }
}
} }
} }
} }
...@@ -93,4 +95,19 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -93,4 +95,19 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
return flat; return flat;
} }
public String getParamTopic(String extParams) {
JSONArray objects = JSON.parseArray(extParams);
boolean flat = true;
if (!CollectionUtils.isEmpty(objects)) {
for (int j = 0, len1 = objects.size(); j < len1; j++) {
String property = objects.getJSONObject(j).getString("property");
if ("topic".equals(property)) {
return objects.getJSONObject(j).getString("value");
}
}
}
return null;
}
} }
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册