提交 eb1ce728 编写于 作者: fengmin's avatar fengmin

实时更新规则引擎订阅主题二

上级 c299ec0c
...@@ -2,7 +2,7 @@ package cn.ibizlab.core.extensions.util; ...@@ -2,7 +2,7 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg; import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.RuleEngineExService; import cn.ibizlab.core.extensions.service.RuleEngineExService;
import cn.ibizlab.core.util.config.SpringContextUtil; import cn.ibizlab.util.security.SpringContextHolder;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -21,7 +21,7 @@ import java.util.List; ...@@ -21,7 +21,7 @@ import java.util.List;
@Slf4j @Slf4j
public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderly { public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderly {
private static RuleEngineExService ruleEngineExService = SpringContextUtil.getBean(RuleEngineExService.class); private static RuleEngineExService ruleEngineExService = SpringContextHolder.getApplicationContext().getBean(RuleEngineExService.class);
private String key = "";//标记消费者,与需要操作的规则引擎id相等 private String key = "";//标记消费者,与需要操作的规则引擎id相等
@Override @Override
...@@ -36,13 +36,13 @@ public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderl ...@@ -36,13 +36,13 @@ public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderl
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8"); String body = new String(messageExt.getBody(), "utf-8");
if(!StringUtils.isEmpty(body)) { if(!StringUtils.isEmpty(body)) {
EngineMQMsg engineMQMsg = JSON.parseObject((String) JSON.parse(body), new TypeReference<EngineMQMsg>() { EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {
}); });
engineMQMsg.setEngineId(key); engineMQMsg.setEngineId(key);
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body); log.info("MQ接收到的消息为:topic={}, tags={}, 消息内容={}", topic, tags, body);
ruleEngineExService.processData(engineMQMsg); ruleEngineExService.processData(engineMQMsg);
} }
log.info("消费成功"); log.info("MQ消息消费成功");
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}", e); log.error("获取MQ消息内容异常{}", e);
} }
......
...@@ -62,7 +62,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -62,7 +62,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
&& !StringUtils.isEmpty(map.get(TOPIC.getProperty())) && !StringUtils.isEmpty(map.get(TOPIC.getProperty()))
&& !StringUtils.isEmpty(map.get(GROUP.getProperty()))) { && !StringUtils.isEmpty(map.get(GROUP.getProperty()))) {
if (!StringUtils.isEmpty(dataMap.get(ruleEngine.getEngineId()))) { if (!StringUtils.isEmpty(dataMap.get(ruleEngine.getEngineId()))) {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr); log.warn("ruleEngineConsumer 不能重复创建 EngineId={}, groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), map.get(GROUP.getProperty()), ruleEngine.getModelId(), namesrvAddr);
return; return;
} }
try { try {
...@@ -80,7 +80,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner { ...@@ -80,7 +80,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
dataMap.put(ruleEngine.getEngineId(), consumer); dataMap.put(ruleEngine.getEngineId(), consumer);
consumer.subscribe((String) map.get(TOPIC.getProperty()), "*"); consumer.subscribe((String) map.get(TOPIC.getProperty()), "*");
consumer.start(); consumer.start();
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(),map.get(TOPIC.getProperty()), namesrvAddr); log.info("ruleEngineConsumer 创建成功 EngineId={} groupName={}, topics={}, namesrvAddr={}",ruleEngine.getEngineId(), map.get(GROUP.getProperty()),map.get(TOPIC.getProperty()), namesrvAddr);
} catch (MQClientException e) { } catch (MQClientException e) {
e.printStackTrace(); e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!"); log.error("ruleEngineConsumer 创建失败!");
......
...@@ -11,8 +11,6 @@ import org.springframework.util.StringUtils; ...@@ -11,8 +11,6 @@ import org.springframework.util.StringUtils;
import java.util.Map; import java.util.Map;
import static cn.ibizlab.core.enums.RuleEnginerParamEnum.*;
/** /**
* @author 无名小卒 * @author 无名小卒
* @data 2022/8/22 * @data 2022/8/22
...@@ -27,17 +25,15 @@ public class TopicListenter { ...@@ -27,17 +25,15 @@ public class TopicListenter {
public void updruleEngine(RuleEngine ruleEngine) { public void updruleEngine(RuleEngine ruleEngine) {
log.info("更新规则引擎订阅频道收到消息:引擎id:{},引擎名:{}", ruleEngine.getEngineId(), ruleEngine.getEngineName()); log.info("更新规则引擎订阅频道收到消息:引擎id:{},引擎名:{}", ruleEngine.getEngineId(), ruleEngine.getEngineName());
Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap(); Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap();
if (StringUtils.isEmpty(ruleEngine.getEngineId())) {//创建规则引擎 if (StringUtils.isEmpty(ruleEngine.getEngineId())) {//创建规则引擎主题
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr); RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);
} else { } else {
Map paramMap = RocktcermqCustmerRunner.getParamMap(ruleEngine.getExtParams()); if (dataMap.containsKey(ruleEngine.getEngineId())) {//更新规则引擎主题
if (dataMap.containsKey(ruleEngine.getEngineId()) && (NO_AUTO_CREATE.getValue().equals(paramMap.get(NO_AUTO_CREATE.getProperty()))
|| StringUtils.isEmpty(paramMap.get(TOPIC.getProperty())))) {//更新规则引擎
dataMap.get(ruleEngine.getEngineId()).shutdown(); dataMap.get(ruleEngine.getEngineId()).shutdown();
log.info("引擎id:{},引擎名:{}被关闭!", ruleEngine.getEngineId(), ruleEngine.getEngineName());
dataMap.remove(ruleEngine.getEngineId()); dataMap.remove(ruleEngine.getEngineId());
} else {
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);
} }
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);//创建规则引擎主题
} }
} }
...@@ -45,6 +41,9 @@ public class TopicListenter { ...@@ -45,6 +41,9 @@ public class TopicListenter {
log.info("删除规则引擎订阅频道收到消息:引擎id{}", key); log.info("删除规则引擎订阅频道收到消息:引擎id{}", key);
Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap(); Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap();
if (dataMap.containsKey(key)) { if (dataMap.containsKey(key)) {
DefaultMQPushConsumer defaultMQPushConsumer = dataMap.get(key);
defaultMQPushConsumer.shutdown();
log.info("引擎id:{}被关闭!", key);
dataMap.remove(key);//删除相应监听的主题 dataMap.remove(key);//删除相应监听的主题
} }
} }
......
package cn.ibizlab.util.annotation; package cn.ibizlab.util.annotation;
import org.springframework.core.annotation.AliasFor; import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.*; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD}) @Target({ ElementType.METHOD})
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册