提交 8f9a85d4 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

引擎构建使用RocketMQ

上级 80756e69
......@@ -22,3 +22,20 @@ spring:
cache:
caffeine:
spec: initialCapacity=5,maximumSize=50000,expireAfterWrite=3600s
rocketmq:
producer:
isOnOff: on
groupName: ${spring.application.name}
namesrvAddr: 172.16.170.163:9876
maxMessageSize: 409600
sendMsgTimeOut: 3000
retryTimesWhenSendFailed: 2
ruleEngineTopic: DSTMSG
consumer:
isOnOff: on
groupName: ${spring.application.name}
namesrvAddr: 172.16.170.163:9876
topics: DSTMSG~*
consumeThreadMin: 5
consumeMessageBatchMaxSize: 1
......@@ -58,6 +58,13 @@
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<!-- MyBatis游标查询 -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.1.2.RELEASE</version>
</dependency>
<!-- MySQL数据库 -->
<dependency>
<groupId>mysql</groupId>
......@@ -98,6 +105,12 @@
<artifactId>jobs-spring-boot-starter</artifactId>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
</dependencies>
......
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.lite.extensions.domain.EntityModel;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.domain.FieldModel;
import cn.ibizlab.core.lite.extensions.filter.DbEntitySearchContext;
import cn.ibizlab.core.lite.extensions.model.DataModel;
import cn.ibizlab.core.lite.extensions.service.DbEntityService;
import cn.ibizlab.core.lite.extensions.service.LiteModelService;
import cn.ibizlab.core.lite.service.IMetaModelService;
import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.RuleItem;
import cn.ibizlab.core.rule.filter.RuleItemSearchContext;
import cn.ibizlab.core.rule.service.IRuleItemService;
import cn.ibizlab.core.rule.service.impl.RuleEngineServiceImpl;
import cn.ibizlab.util.filter.QueryFilter;
import cn.ibizlab.util.helper.CachedBeanCopier;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import cn.ibizlab.core.rule.domain.RuleEngine;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.context.annotation.Primary;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.apache.rocketmq.common.message.Message;
import java.io.File;
import java.util.*;
......@@ -51,6 +70,22 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
@Autowired
private LiteModelService liteModelService;
@Autowired
private IMetaModelService metaModelService;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Autowired
private DstDataSourceExService dstDataSourceService;
@Autowired
DefaultMQProducer defaultMQProducer;
@Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}")
private String ruleEngineTopic;
@Value("${ibiz.rulepath:/app/file/rules/}")
private String rulePath;
/**
......@@ -66,6 +101,68 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
{
CachedBeanCopier.copy(get(et.getEngineId()), et);
if(true){
DataModel dataModel= JSON.toJavaObject(JSON.parseObject(metaModelService.get(et.getModelId()).getConfig()), DataModel.class);
EntityModel entityModel=dataModel.getFactEntityModel();
FieldModel lastModifyField=entityModel.getLastModifyField();
QueryFilter filter=new QueryFilter();
if(lastModifyField!=null)
filter.ge(lastModifyField.getColumnName(), et.getLastRuntime());
String sql=entityModel.getSqlSegment("CORE");
MyBatisCursorItemReader myMyBatisCursorItemReader =new MyBatisCursorItemReader();
try{
List<EntityObj> kEntityObjs=dbEntityService.selectCore(entityModel, filter);
if(kEntityObjs != null){
this.sendToMQ(et.getEngineId(), kEntityObjs);
return et;
}
myMyBatisCursorItemReader.setSqlSessionFactory(sqlSessionFactory);
myMyBatisCursorItemReader.setQueryId("cn.ibizlab.core.extensions.mapper.DbEntityMapper.search");
DbEntitySearchContext context=new DbEntitySearchContext();
context.setFilter(filter);
QueryWrapper qw=context.getSelectCond();
if(!StringUtils.isEmpty(filter.getCustSqlSegment()))
qw.apply(filter.getCustSqlSegment());
Map<String, Object> paramsMap = new HashMap<>();
paramsMap.put("sql", sql);
paramsMap.put("ew", qw);
myMyBatisCursorItemReader.setParameterValues(paramsMap);// 设置sql传入参数
dstDataSourceService.initDataSource(entityModel.getDsName());
DynamicDataSourceContextHolder.push(entityModel.getDsName());
myMyBatisCursorItemReader.open(new ExecutionContext()); // 开启游标
DynamicDataSourceContextHolder.poll();
List<EntityObj> batch = new ArrayList<>();
EntityObj rowdata;
while ((rowdata = (EntityObj) myMyBatisCursorItemReader.read()) != null) {
batch.add(rowdata);
if(batch.size() > 500){
this.sendToMQ(et.getEngineId(), batch);
}
}
if(batch.size() > 0){
this.sendToMQ(et.getEngineId(), batch);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
myMyBatisCursorItemReader.close();// 关闭游标
} catch (Exception ex) {
log.error(ex.getMessage());
}
DynamicDataSourceContextHolder.poll();
}
}else{
if(!StringUtils.isEmpty(et.getModelId()))
{
BaseRequest msg=new BaseRequest();
......@@ -121,13 +218,73 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
et.setLastRuntime(starttime);
this.update(et);
}
}
}
return super.run(et);
}
protected void sendToMQ(String engineId, List<EntityObj> batch) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(batch);
Message sendMsg = new Message(ruleEngineTopic, engineId, msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("消息发送响应:" + sendResult.toString());
}
public void processData(String engineId, List<EntityObj> batch){
RuleEngine et = get(engineId);
BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
List<String> rules = new ArrayList<>();
DataModel dataModel=liteModelService.getDataModel(et.getModelId());
HashSet<String> fillpropertys=new HashSet<>();
ruleItemService.list(Wrappers.<RuleItem>lambdaQuery()
.eq(RuleItem::getModelId,et.getModelId()).ne(RuleItem::getGroup,"REP").like(RuleItem::getGroup,et.getGroup()))
.forEach(ruleItem -> {
String path=rulePath + et.getGroup() + File.separator + ruleItem.getRuleId() + ".drl";
File file=new File(path);
if(!file.exists())
{
ruleItemService.buildRuleFile(ruleItem);
}
if(file.exists())
{
rules.add(path);
}
if((!StringUtils.isEmpty(ruleItem.getCond()))&&ruleItem.getCond().startsWith("["))
fillpropertys.addAll(JSON.toJavaObject(JSON.parseArray(ruleItem.getCond()),LinkedHashSet.class));
});
dataModel.getAllProperty().forEach(prop->{
if(fillpropertys.contains(prop.getPropertyName()))
{
DataModel p=prop.getOwnerDataModel().getParentDataModel();
while (p!=null)
{
fillpropertys.add(p.getFactPorperty().getPropertyName());
p=p.getParentDataModel();
}
return super.run(et);
}
});
msg.setRules(rules);
msg.setDatas(dbEntityService.getModelObjs(dataModel,fillpropertys,batch));
msg.setDatas(dbEntityService.getModelObjs(et.getModelId(),fillpropertys,et.getLastRuntime()));
ExecLog execlog = baseEntityService.processAll(msg);
}
/**
* [Check:校验] 行为扩展
* @param et
......
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @author: lockie
* @Date: 2020/4/21 11:05
* @Description: 消费者监听
*/
@Slf4j
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
@Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}")
private String ruleEngineTopic;
@Autowired
private RuleEngineExService ruleEngineExService;
/**
* 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
* @param msgList
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgList)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgList.get(0);
log.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body);
if(ruleEngineTopic.equalsIgnoreCase(topic)){
this.processRuleEngineData(tags, body);
}
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
// TODO 处理业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
protected void processRuleEngineData(String engineId, String body){
List<EntityObj> batch = JSON.parseObject(body, new TypeReference<ArrayList<EntityObj>>() {});
ruleEngineExService.processData(engineId, batch);
}
}
package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* mq消费者配置
*/
@Slf4j
@Configuration
public class MQConsumerConfigure {
@Value("${rocketmq.consumer.groupName: DEFAULT_CONSUMER}")
private String groupName;
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
@Value("${rocketmq.consumer.topics:}")
private String topics;
// 消费者线程数据量
@Value("${rocketmq.consumer.consumeThreadMin: 5}")
private Integer consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax: 32}")
private Integer consumeThreadMax;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize: 1}")
private Integer consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
/**
* mq 消费者配置
* @return
* @throws MQClientException
*/
@Bean
//@ConditionalOnExpression("${rocketmq.consumer.isOnOff:off}.equals('on')")
public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
log.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(consumeMsgListenerProcessor);
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
String[] topicArr = topics.split(";");
for (String tag : topicArr) {
String[] tagArr = tag.split("~");
consumer.subscribe(tagArr[0], tagArr[1]);
}
consumer.start();
log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
log.error("consumer 创建失败!");
}
return consumer;
}
}
package cn.ibizlab.core.util.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
@Value("${rocketmq.producer.groupName: DEFAULT_PRODUCER}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr:}")
private String namesrvAddr;
// 消息最大值
@Value("${rocketmq.producer.maxMessageSize: 409600}")
private Integer maxMessageSize;
// 消息发送超时时间
@Value("${rocketmq.producer.sendMsgTimeOut: 3000}")
private Integer sendMsgTimeOut;
// 失败重试次数
@Value("${rocketmq.producer.retryTimesWhenSendFailed: 2}")
private Integer retryTimesWhenSendFailed;
/**
* mq 生成者配置
* @return
* @throws MQClientException
*/
@Bean
//@ConditionalOnExpression("${rocketmq.producer.isOnOff:off}.equals('on')")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
log.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册