提交 571cced1 编写于 作者: fengmin's avatar fengmin

实时更新规则引擎订阅主题二

上级 f4d901e4
package cn.ibizlab.core.enums;
import lombok.Getter;
/**
* @author 无名小卒
* @describe 规则引擎参数枚举
* @data 2022/8/22
*/
@Getter
public enum RuleEnginerParamEnum {
AUTO_CREATE("isAuto", "Y"),
NO_AUTO_CREATE("isAuto", "N"),
TOPIC("topic", "topic"),
GROUP("group", "group"),
ELEMENT_NAME("property", "value");
private String property;
private String value;
RuleEnginerParamEnum(String property, String value) {
this.property = property;
this.value = value;
}
}
...@@ -24,6 +24,8 @@ import java.util.List; ...@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static cn.ibizlab.core.enums.RuleEnginerParamEnum.*;
@Component @Component
@Slf4j @Slf4j
@ConditionalOnExpression("${rocketmq.consumer.enabled}&&'${rocketmq.consumer.namesrvAddr:}'!=''") @ConditionalOnExpression("${rocketmq.consumer.enabled}&&'${rocketmq.consumer.namesrvAddr:}'!=''")
...@@ -34,7 +36,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -34,7 +36,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
@Value("${rocketmq.consumer.namesrvAddr:}") @Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr; private String namesrvAddr;
//mq消费者 //mq消费者
private static Map<String, DefaultMQPushConsumer> dataMap = new ConcurrentHashMap(); private static Map<String, DefaultMQPushConsumer> dataMap = new ConcurrentHashMap();
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
...@@ -55,15 +57,17 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -55,15 +57,17 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
*/ */
public static void addtopic(RuleEngine ruleEngine, String namesrvAddr) { public static void addtopic(RuleEngine ruleEngine, String namesrvAddr) {
Map map = getParamMap(ruleEngine.getExtParams()); Map map = getParamMap(ruleEngine.getExtParams());
DefaultMQPushConsumer consumer=null; DefaultMQPushConsumer consumer = null;
if ("Y".equals(map.get("isAuto")) && !StringUtils.isEmpty(map.get("topic"))) { if (AUTO_CREATE.getValue().equals(map.get(AUTO_CREATE.getProperty()))
&& !StringUtils.isEmpty(map.get(TOPIC.getProperty()))
&& !StringUtils.isEmpty(map.get(GROUP.getProperty()))) {
if (!StringUtils.isEmpty(dataMap.get(ruleEngine.getEngineId()))) { if (!StringUtils.isEmpty(dataMap.get(ruleEngine.getEngineId()))) {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr); log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr);
return; return;
} }
try { try {
log.info("ruleEngineConsumer 正在创建---------------------------------------"); log.info("ruleEngineConsumer 正在创建---------------------------------------");
consumer = new DefaultMQPushConsumer(ruleEngine.getEngineId()); consumer = new DefaultMQPushConsumer((String) map.get(GROUP.getProperty()));
consumer.setNamesrvAddr(namesrvAddr); consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMax(1);
...@@ -74,7 +78,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -74,7 +78,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer"); consumer.setInstanceName("dstMQMsgConsumer");
dataMap.put(ruleEngine.getEngineId(), consumer); dataMap.put(ruleEngine.getEngineId(), consumer);
consumer.subscribe((String) map.get("topic"), "*"); consumer.subscribe((String) map.get(TOPIC.getProperty()), "*");
consumer.start(); consumer.start();
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr); log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr);
} catch (MQClientException e) { } catch (MQClientException e) {
...@@ -95,7 +99,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -95,7 +99,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
HashMap map = new HashMap(); HashMap map = new HashMap();
if (!CollectionUtils.isEmpty(objects)) { if (!CollectionUtils.isEmpty(objects)) {
for (int j = 0, len1 = objects.size(); j < len1; j++) { for (int j = 0, len1 = objects.size(); j < len1; j++) {
map.put(objects.getJSONObject(j).getString("property"), objects.getJSONObject(j).getString("value")); map.put(objects.getJSONObject(j).getString(ELEMENT_NAME.getProperty()), objects.getJSONObject(j).getString(ELEMENT_NAME.getValue()));
} }
} }
return map; return map;
......
...@@ -11,6 +11,8 @@ import org.springframework.util.StringUtils; ...@@ -11,6 +11,8 @@ import org.springframework.util.StringUtils;
import java.util.Map; import java.util.Map;
import static cn.ibizlab.core.enums.RuleEnginerParamEnum.*;
/** /**
* @author 无名小卒 * @author 无名小卒
* @data 2022/8/22 * @data 2022/8/22
...@@ -23,24 +25,24 @@ public class TopicListenter { ...@@ -23,24 +25,24 @@ public class TopicListenter {
private String namesrvAddr; private String namesrvAddr;
public void updruleEngine(RuleEngine ruleEngine) { public void updruleEngine(RuleEngine ruleEngine) {
log.info("更新规则引擎订阅频道收到消息:引擎id:{},引擎名:{}",ruleEngine.getEngineId(),ruleEngine.getEngineName()); log.info("更新规则引擎订阅频道收到消息:引擎id:{},引擎名:{}", ruleEngine.getEngineId(), ruleEngine.getEngineName());
Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap(); Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap();
if (StringUtils.isEmpty(ruleEngine.getEngineId())) {//创建规则引擎 if (StringUtils.isEmpty(ruleEngine.getEngineId())) {//创建规则引擎
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr); RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);
} else { } else {
Map paramMap = RocktcermqCustmerRunner.getParamMap(ruleEngine.getExtParams()); Map paramMap = RocktcermqCustmerRunner.getParamMap(ruleEngine.getExtParams());
if (dataMap.containsKey(ruleEngine.getEngineId())&&("N".equals(paramMap.get("isAuto")) if (dataMap.containsKey(ruleEngine.getEngineId()) && (NO_AUTO_CREATE.getValue().equals(paramMap.get(NO_AUTO_CREATE.getProperty()))
||StringUtils.isEmpty(paramMap.get("topic")))) {//更新规则引擎 || StringUtils.isEmpty(paramMap.get(TOPIC.getProperty())))) {//更新规则引擎
dataMap.get(ruleEngine.getEngineId()).shutdown(); dataMap.get(ruleEngine.getEngineId()).shutdown();
dataMap.remove(ruleEngine.getEngineId()); dataMap.remove(ruleEngine.getEngineId());
}else { } else {
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr); RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);
} }
} }
} }
public void delEngine(String key) { public void delEngine(String key) {
log.info("删除规则引擎订阅频道收到消息:引擎id{}" , key); log.info("删除规则引擎订阅频道收到消息:引擎id{}", key);
Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap(); Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap();
if (dataMap.containsKey(key)) { if (dataMap.containsKey(key)) {
dataMap.remove(key);//删除相应监听的主题 dataMap.remove(key);//删除相应监听的主题
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册