提交 62eeaf34 编写于 作者: fengmin's avatar fengmin

mq消息订阅实现

上级 ae81b32f
......@@ -25,6 +25,7 @@ import cn.ibizlab.util.filter.SearchContextBase;
@Slf4j
@Data
public class DADimensionSearchContext extends SearchContextBase {
private String n_dimname_like;//[维度名称]
private String n_buildid_eq;//[分析标识]
......
......@@ -208,7 +208,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
if(!file.exists())
{
ruleItemService.buildRuleFile(ruleItem);
}
if(file.exists())
{
......
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import cn.ibizlab.core.util.config.SpringContextUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 通用MQ订阅消息处理
*/
@Slf4j
public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderly {
private static RuleEngineExService ruleEngineExService = SpringContextUtil.getBean(RuleEngineExService.class);
private String key = "";//标记消费者,与需要操作的规则引擎id相等
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(list)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeOrderlyStatus.SUCCESS;
}
MessageExt messageExt = list.get(0);
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {
});
engineMQMsg.setEngineId(key);
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
ruleEngineExService.processData(engineMQMsg);
log.info("消费成功");
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
return ConsumeOrderlyStatus.SUCCESS;
}
public GeneralConsumeMsgListenerProcessor(String key) {
this.key = key;
}
}
......@@ -6,14 +6,15 @@ import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
......@@ -25,8 +26,6 @@ import java.util.List;
public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
private Object lockObject =new Object();
@Value("${rocketmq.producer.ruleEngineTopic:DSTMSG}")
private String ruleEngineTopic;
......@@ -51,9 +50,7 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
......@@ -74,6 +71,7 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
// Thread.sleep(80000);
// System.out.println("end sleep");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
System.out.println(engineMQMsg.toString());
if("Engine".equalsIgnoreCase(tags)) {
ruleEngineExService.processData(engineMQMsg);
}else if("Build".equalsIgnoreCase(tags)){
......
......@@ -64,4 +64,12 @@ public interface RuleEngineMapper extends BaseMapper<RuleEngine> {
List<RuleEngine> selectByModelId(@Param("id") Serializable id);
/**
*
* 查询所有消费者信息
*/
List<RuleEngine> selectCustomerList();
}
......@@ -7,7 +7,6 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
......@@ -61,6 +60,7 @@ public class MQConsumerConfigure {
consumer.registerMessageListener(consumeMsgListenerProcessor);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
try {
consumer.subscribe(ruleEngineTopic, "*");
consumer.start();
......@@ -82,7 +82,6 @@ public class MQConsumerConfigure {
consumer.registerMessageListener(resultsMQMsgConsumeListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("resultsMQMsgConsumer");
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
consumer.subscribe(resultsTopic, "*");
......
package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.util.GeneralConsumeMsgListenerProcessor;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.mapper.RuleEngineMapper;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class RocktcermqCustmerRunner implements ApplicationRunner {
@Autowired
private RuleEngineMapper ruleEngineMapper;
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
//mq消费者
public static Map<String, DefaultMQPushConsumer> dataMap = new ConcurrentHashMap();
@Override
public void run(ApplicationArguments args) {
List<RuleEngine> ruleEngines = ruleEngineMapper.selectCustomerList();
if (!StringUtils.isEmpty(ruleEngines)) {
for (int i = 0, len = ruleEngines.size(); i < len; i++) {
int finalI = i;
if (isAutoCreate(ruleEngines.get(finalI).getExtParams())) {
log.info("ruleEngineConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ruleEngines.get(finalI).getEngineId());
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 设置监听
GeneralConsumeMsgListenerProcessor messageListener = new GeneralConsumeMsgListenerProcessor(ruleEngines.get(finalI).getEngineId());
consumer.registerMessageListener(messageListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
try {
consumer.subscribe(ruleEngines.get(finalI).getModelId(), "*");
consumer.start();
if (dataMap.get(ruleEngines.get(finalI)) == null) {
dataMap.put(ruleEngines.get(finalI).getEngineId(), consumer);
log.info("ruleEngineConsumer 创建成功 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr);
} else {
log.warn("ruleEngineConsumer 不能重复创建 groupName={}, topics={}, namesrvAddr={}", ruleEngines.get(finalI).getEngineId(), ruleEngines.get(finalI).getModelId(), namesrvAddr);
}
} catch (MQClientException e) {
e.printStackTrace();
log.error("ruleEngineConsumer 创建失败!");
}
}
}
}
}
/**
* 判断是否自动创建
*
* @param extParams
* @return
*/
public boolean isAutoCreate(String extParams) {
JSONArray objects = JSON.parseArray(extParams);
boolean flat = true;
if (!CollectionUtils.isEmpty(objects)) {
for (int j = 0, len1 = objects.size(); j < len1; j++) {
String property = objects.getJSONObject(j).getString("property");
if ("isAuto".equals(property)) {
if ("N".equals(objects.getJSONObject(j).getString("value"))) {
flat = false;
}
break;
}
}
}
return flat;
}
}
package cn.ibizlab.core.util.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @Description: 获取spring容器中的bean,通过bean名称获取
* @param beanName bean名称
* @return: Object 返回Object,需要做强制类型转换
*/
public static Object getBean(String beanName){
return applicationContext.getBean(beanName);
}
/**
* @Description: 获取spring容器中的bean, 通过bean类型获取
* @param beanClass bean 类型
* @return: T 返回指定类型的bean实例
*/
public static <T> T getBean(Class<T> beanClass) {
return applicationContext.getBean(beanClass);
}
public static <T> Optional<T> getBeanOptional(Class<T> beanClass) {
try {
T bean = applicationContext.getBean(beanClass);
return Optional.of(bean);
} catch (Exception e) {
return Optional.empty();
}
}
/**
* @Description: 获取spring容器中的bean, 通过bean名称和bean类型精确获取
* @param beanName bean 名称
* @param beanClass bean 类型
* @return: T 返回指定类型的bean实例
*/
public static <T> T getBean(String beanName, Class<T> beanClass){
return applicationContext.getBean(beanName,beanClass);
}
}
......@@ -41,6 +41,9 @@
<where><if test="ew!=null and ew.sqlSegment!=null and !ew.emptyOfWhere">${ew.sqlSegment}</if></where>
<if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if>
</select>
<select id="selectCustomerList" resultType="cn.ibizlab.core.rule.domain.RuleEngine">
SELECT ENGINEID ,(SELECT CODENAME FROM IBZMODEL WHERE E.MODELID=MODELID) AS MODELID,EXTPARAMS FROM IBZRULEENGINE E WHERE MODELID IS NOT NULL
</select>
<!--数据查询[Default]-->
<sql id="Default" databaseId="mysql">
......
spring:
profiles:
include: sys ,nacos, api-prod
include: sys ,nacos, api-prod,dev
application:
name: ibzdst-api
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册