提交 f4d901e4 编写于 作者: fengmin's avatar fengmin

实时更新规则引擎订阅主题一

上级 7ea297e0
package cn.ibizlab.core.aspect;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.util.job.TopicListenter;
import cn.ibizlab.util.domain.EntityBase;
import lombok.SneakyThrows;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
/**
* @author 无名小卒
* @data 2022/8/22
*/
@Aspect
@Component
@ConditionalOnExpression("${rocketmq.consumer.enabled}&&'${rocketmq.consumer.namesrvAddr:}'!=''")
public class TopicListenerApsect {
@Autowired
private TopicListenter topicListenter;
@Pointcut("execution( * cn.ibizlab.core.rule.service.IRuleEngineService.create(..))")
public void create() {
}
@Pointcut("execution( * cn.ibizlab.core.rule.service.IRuleEngineService.update(..))")
public void update() {
}
/**
* 规则引擎创建切面
*
* @param point
*/
@AfterReturning(value = "create()|| update()")
@SneakyThrows
public void create(JoinPoint point) {
Object[] args = point.getArgs();
if (ObjectUtils.isEmpty(args) || args.length == 0) {
return;
}
Object serviceParam = args[0];
if (serviceParam instanceof EntityBase) {
EntityBase entity = (EntityBase) serviceParam;
String extparams = (String) entity.get("extparams");
if (extparams.contains("isAuto") && extparams.contains("topic")) {
topicListenter.updruleEngine((RuleEngine) entity);
}
}
}
/**
* 实体数据更新切面,在成功更新数据后将新增数据内容记录审计日志内(审计明细【AuditInfo】中只记录审计属性变化情况,审计属性在平台属性中配置)
* 使用环切【@Around】获取要删除的完整数据,并将审计属性相关信息记录到审计日志中
*
* @param point
* @return
* @throws Throwable
*/
@AfterReturning("execution(* cn.ibizlab.core.rule.service.IRuleEngineService.remove(..))")
public void remove(JoinPoint point) throws Throwable {
Object args[] = point.getArgs();
if (ObjectUtils.isEmpty(args) || args.length == 0) {
return;
}
Object idValue = args[0];
topicListenter.delEngine((String) idValue);
}
}
......@@ -34,42 +34,52 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
//mq消费者
public static Map<String, DefaultMQPushConsumer> dataMap = new ConcurrentHashMap();
private static Map<String, DefaultMQPushConsumer> dataMap = new ConcurrentHashMap();
@Override
public void run(ApplicationArguments args) {
List<RuleEngine> ruleEngines = ruleEngineMapper.selectList(new LambdaQueryWrapper<RuleEngine>()
.select(RuleEngine::getEngineId, RuleEngine::getExtParams));
if (!StringUtils.isEmpty(ruleEngines)) {
DefaultMQPushConsumer consumer = null;
for (int i = 0, len = ruleEngines.size(); i < len; i++) {
Map map = getParamMap(ruleEngines.get(i).getExtParams());
if ("Y".equals(map.get("isAuto")) && !StringUtils.isEmpty(map.get("topic"))) {
log.info("ruleEngineConsumer 正在创建---------------------------------------");
consumer = new DefaultMQPushConsumer(ruleEngines.get(i).getEngineId());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 设置监听
GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(i).getEngineId());
consumer.registerMessageListener(messageListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
try {
consumer.subscribe((String) map.get("topic"), "*");
consumer.start();
if (StringUtils.isEmpty(dataMap.get(ruleEngines.get(i)))) {
dataMap.put(ruleEngines.get(i).getEngineId(), consumer);
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(i).getEngineId(), ruleEngines.get(i).getModelId(), namesrvAddr);
} else {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(i).getEngineId(), ruleEngines.get(i).getModelId(), namesrvAddr);
}
} catch (MQClientException e) {
e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!");
}
}
addtopic(ruleEngines.get(i), namesrvAddr);
}
}
}
/**
* 添加规则引擎
*
* @param ruleEngine
* @param namesrvAddr
*/
public static void addtopic(RuleEngine ruleEngine, String namesrvAddr) {
Map map = getParamMap(ruleEngine.getExtParams());
DefaultMQPushConsumer consumer=null;
if ("Y".equals(map.get("isAuto")) && !StringUtils.isEmpty(map.get("topic"))) {
if (!StringUtils.isEmpty(dataMap.get(ruleEngine.getEngineId()))) {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr);
return;
}
try {
log.info("ruleEngineConsumer 正在创建---------------------------------------");
consumer = new DefaultMQPushConsumer(ruleEngine.getEngineId());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 设置监听
GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngine.getEngineId());
consumer.registerMessageListener(messageListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
dataMap.put(ruleEngine.getEngineId(), consumer);
consumer.subscribe((String) map.get("topic"), "*");
consumer.start();
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngine.getEngineId(), ruleEngine.getModelId(), namesrvAddr);
} catch (MQClientException e) {
e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!");
}
}
}
......@@ -80,7 +90,7 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
* @param extParams
* @return
*/
public Map getParamMap(String extParams) {
public static Map getParamMap(String extParams) {
JSONArray objects = JSON.parseArray(extParams);
HashMap map = new HashMap();
if (!CollectionUtils.isEmpty(objects)) {
......@@ -91,5 +101,8 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
return map;
}
public static Map<String, DefaultMQPushConsumer> getDataMap() {
return dataMap;
}
}
package cn.ibizlab.core.util.job;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.util.config.RocktcermqCustmerRunner;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
* @author 无名小卒
* @data 2022/8/22
*/
@Component
@Slf4j
@Async("asyncExecutor")
public class TopicListenter {
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
public void updruleEngine(RuleEngine ruleEngine) {
log.info("更新规则引擎订阅频道收到消息:引擎id:{},引擎名:{}",ruleEngine.getEngineId(),ruleEngine.getEngineName());
Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap();
if (StringUtils.isEmpty(ruleEngine.getEngineId())) {//创建规则引擎
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);
} else {
Map paramMap = RocktcermqCustmerRunner.getParamMap(ruleEngine.getExtParams());
if (dataMap.containsKey(ruleEngine.getEngineId())&&("N".equals(paramMap.get("isAuto"))
||StringUtils.isEmpty(paramMap.get("topic")))) {//更新规则引擎
dataMap.get(ruleEngine.getEngineId()).shutdown();
dataMap.remove(ruleEngine.getEngineId());
}else {
RocktcermqCustmerRunner.addtopic(ruleEngine, namesrvAddr);
}
}
}
public void delEngine(String key) {
log.info("删除规则引擎订阅频道收到消息:引擎id{}" , key);
Map<String, DefaultMQPushConsumer> dataMap = RocktcermqCustmerRunner.getDataMap();
if (dataMap.containsKey(key)) {
dataMap.remove(key);//删除相应监听的主题
}
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册