提交 09f51350 编写于 作者: fengmin's avatar fengmin

mq消息订阅实现

上级 ae81b32f
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import cn.ibizlab.core.util.config.SpringContextUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 通用MQ订阅消息处理
*/
@Slf4j
public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderly {
private static RuleEngineExService ruleEngineExService = SpringContextUtil.getBean(RuleEngineExService.class);
private String key = "";//标记消费者,与需要操作的规则引擎id相等
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(list)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeOrderlyStatus.SUCCESS;
}
MessageExt messageExt = list.get(0);
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {
});
engineMQMsg.setEngineId(key);
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
ruleEngineExService.processData(engineMQMsg);
log.info("消费成功");
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
return ConsumeOrderlyStatus.SUCCESS;
}
public GeneralConsumeMsgListenerProcessor(String key) {
this.key = key;
}
}
package cn.ibizlab.core.rule.mapper; package cn.ibizlab.core.rule.mapper;
import java.util.List;
import org.apache.ibatis.annotations.*;
import java.util.Map;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import java.util.Map;
import org.apache.ibatis.annotations.Select;
import cn.ibizlab.core.rule.domain.RuleEngine; import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.filter.RuleEngineSearchContext; import cn.ibizlab.core.rule.filter.RuleEngineSearchContext;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import java.io.Serializable;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.*;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
public interface RuleEngineMapper extends BaseMapper<RuleEngine> { public interface RuleEngineMapper extends BaseMapper<RuleEngine> {
...@@ -64,4 +61,12 @@ public interface RuleEngineMapper extends BaseMapper<RuleEngine> { ...@@ -64,4 +61,12 @@ public interface RuleEngineMapper extends BaseMapper<RuleEngine> {
List<RuleEngine> selectByModelId(@Param("id") Serializable id); List<RuleEngine> selectByModelId(@Param("id") Serializable id);
/**
*
* 查询所有消费者信息
*/
List<RuleEngine> selectCustomerList();
} }
package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.util.GeneralConsumeMsgListenerProcessor;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.mapper.RuleEngineMapper;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class RocktcermqCustmerRunner implements ApplicationRunner {
@Autowired
private RuleEngineMapper ruleEngineMapper;
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
//mq消费者
public static Map<String, DefaultMQPushConsumer> dataMap = new ConcurrentHashMap();
@Override
public void run(ApplicationArguments args) {
List<RuleEngine> ruleEngines = ruleEngineMapper.selectCustomerList();
if (!StringUtils.isEmpty(ruleEngines)) {
for (int i = 0, len = ruleEngines.size(); i < len; i++) {
int finalI = i;
if (isAutoCreate(ruleEngines.get(finalI).getExtParams())) {
log.info("ruleEngineConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ruleEngines.get(finalI).getEngineId());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 设置监听
GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(finalI).getEngineId());
consumer.registerMessageListener(messageListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
try {
consumer.subscribe(ruleEngines.get(finalI).getModelId(), "*");
consumer.start();
if (dataMap.get(ruleEngines.get(finalI)) == null) {
dataMap.put(ruleEngines.get(finalI).getEngineId(), consumer);
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr);
} else {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr);
}
} catch (MQClientException e) {
e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!");
}
}
}
}
}
/**
* 判断是否自动创建
*
* @param extParams
* @return
*/
public boolean isAutoCreate(String extParams) {
JSONArray objects = JSON.parseArray(extParams);
boolean flat = true;
if (!CollectionUtils.isEmpty(objects)) {
for (int j = 0, len1 = objects.size(); j < len1; j++) {
String property = objects.getJSONObject(j).getString("property");
if ("isAuto".equals(property)) {
if ("N".equals(objects.getJSONObject(j).getString("value"))) {
flat = false;
}
break;
}
}
}
return flat;
}
}
package cn.ibizlab.core.util.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @Description: 获取spring容器中的bean,通过bean名称获取
* @param beanName bean名称
* @return: Object 返回Object,需要做强制类型转换
*/
public static Object getBean(String beanName){
return applicationContext.getBean(beanName);
}
/**
* @Description: 获取spring容器中的bean, 通过bean类型获取
* @param beanClass bean 类型
* @return: T 返回指定类型的bean实例
*/
public static <T> T getBean(Class<T> beanClass) {
return applicationContext.getBean(beanClass);
}
public static <T> Optional<T> getBeanOptional(Class<T> beanClass) {
try {
T bean = applicationContext.getBean(beanClass);
return Optional.of(bean);
} catch (Exception e) {
return Optional.empty();
}
}
/**
* @Description: 获取spring容器中的bean, 通过bean名称和bean类型精确获取
* @param beanName bean 名称
* @param beanClass bean 类型
* @return: T 返回指定类型的bean实例
*/
public static <T> T getBean(String beanName, Class<T> beanClass){
return applicationContext.getBean(beanName,beanClass);
}
}
...@@ -41,6 +41,9 @@ ...@@ -41,6 +41,9 @@
<where><if test="ew!=null and ew.sqlSegment!=null and !ew.emptyOfWhere">${ew.sqlSegment}</if></where> <where><if test="ew!=null and ew.sqlSegment!=null and !ew.emptyOfWhere">${ew.sqlSegment}</if></where>
<if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if> <if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if>
</select> </select>
<select id="selectCustomerList" resultType="cn.ibizlab.core.rule.domain.RuleEngine">
SELECT ENGINEID ,(SELECT CODENAME FROM IBZMODEL WHERE E.MODELID=MODELID) AS MODELID,EXTPARAMS FROM IBZRULEENGINE E WHERE MODELID IS NOT NULL
</select>
<!--数据查询[Default]--> <!--数据查询[Default]-->
<sql id="Default" databaseId="mysql"> <sql id="Default" databaseId="mysql">
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册