提交 7ea297e0 编写于 作者: fengmin's avatar fengmin

mq消息订阅实现5

上级 15afdf4c
...@@ -19,6 +19,7 @@ import org.springframework.stereotype.Component; ...@@ -19,6 +19,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -37,31 +38,32 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -37,31 +38,32 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
List<RuleEngine> ruleEngines = ruleEngineMapper.selectList(new LambdaQueryWrapper<RuleEngine>().select(RuleEngine::getEngineId, RuleEngine::getExtParams)); List<RuleEngine> ruleEngines = ruleEngineMapper.selectList(new LambdaQueryWrapper<RuleEngine>()
.select(RuleEngine::getEngineId, RuleEngine::getExtParams));
if (!StringUtils.isEmpty(ruleEngines)) { if (!StringUtils.isEmpty(ruleEngines)) {
DefaultMQPushConsumer consumer = null;
for (int i = 0, len = ruleEngines.size(); i < len; i++) { for (int i = 0, len = ruleEngines.size(); i < len; i++) {
int finalI = i; Map map = getParamMap(ruleEngines.get(i).getExtParams());
String topic = getParamTopic(ruleEngines.get(finalI).getExtParams()); if ("Y".equals(map.get("isAuto")) && !StringUtils.isEmpty(map.get("topic"))) {
if (isAutoCreate(ruleEngines.get(finalI).getExtParams()) && topic != null) {
log.info("ruleEngineConsumer 正在创建---------------------------------------"); log.info("ruleEngineConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ruleEngines.get(finalI).getEngineId()); consumer = new DefaultMQPushConsumer(ruleEngines.get(i).getEngineId());
consumer.setNamesrvAddr(namesrvAddr); consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1); consumer.setConsumeMessageBatchMaxSize(1);
// 设置监听 // 设置监听
GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(finalI).getEngineId()); GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(i).getEngineId());
consumer.registerMessageListener(messageListener); consumer.registerMessageListener(messageListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer"); consumer.setInstanceName("dstMQMsgConsumer");
try { try {
consumer.subscribe(topic, "*"); consumer.subscribe((String) map.get("topic"), "*");
consumer.start(); consumer.start();
if (dataMap.get(ruleEngines.get(finalI)) == null) { if (StringUtils.isEmpty(dataMap.get(ruleEngines.get(i)))) {
dataMap.put(ruleEngines.get(finalI).getEngineId(), consumer); dataMap.put(ruleEngines.get(i).getEngineId(), consumer);
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr); log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(i).getEngineId(), ruleEngines.get(i).getModelId(), namesrvAddr);
} else { } else {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr); log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(i).getEngineId(), ruleEngines.get(i).getModelId(), namesrvAddr);
} }
} catch (MQClientException e) { } catch (MQClientException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -73,41 +75,21 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -73,41 +75,21 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
} }
/** /**
* 判断是否自动创建 * 获取参数map
* *
* @param extParams * @param extParams
* @return * @return
*/ */
public boolean isAutoCreate(String extParams) { public Map getParamMap(String extParams) {
JSONArray objects = JSON.parseArray(extParams); JSONArray objects = JSON.parseArray(extParams);
boolean flat = true; HashMap map = new HashMap();
if (!CollectionUtils.isEmpty(objects)) { if (!CollectionUtils.isEmpty(objects)) {
for (int j = 0, len1 = objects.size(); j < len1; j++) { for (int j = 0, len1 = objects.size(); j < len1; j++) {
String property = objects.getJSONObject(j).getString("property"); map.put(objects.getJSONObject(j).getString("property"), objects.getJSONObject(j).getString("value"));
if ("isAuto".equals(property)) {
if ("N".equals(objects.getJSONObject(j).getString("value"))) {
flat = false;
}
break;
}
} }
} }
return flat; return map;
} }
public String getParamTopic(String extParams) {
JSONArray objects = JSON.parseArray(extParams);
boolean flat = true;
if (!CollectionUtils.isEmpty(objects)) {
for (int j = 0, len1 = objects.size(); j < len1; j++) {
String property = objects.getJSONObject(j).getString("property");
if ("topic".equals(property)) {
return objects.getJSONObject(j).getString("value");
}
}
}
return null;
}
} }
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册