提交 fad5b671 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

引擎构建MQ消息格式调整,构建时传递引擎配置参数到分析数据对象,以便分析结果保存时可用动态指定数据源和表名

上级 134bef4d
...@@ -42,6 +42,8 @@ public class BaseRequest ...@@ -42,6 +42,8 @@ public class BaseRequest
private List<ModelObj> datas; private List<ModelObj> datas;
private String model; private String model;
private String resultDataSource;
private String resultTableName;
private String batch; private String batch;
private List<String> rules; private List<String> rules;
......
package cn.ibizlab.core.extensions.domain;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import lombok.Data;
import java.sql.Timestamp;
import java.util.List;
@Data
public class EngineMQMsg {
private String engineId;
private String batch;
private Timestamp runTime;
private List<EntityObj> datas;
}
...@@ -104,6 +104,12 @@ public class BaseEntityServiceImpl implements BaseEntityService ...@@ -104,6 +104,12 @@ public class BaseEntityServiceImpl implements BaseEntityService
result.setExt1Field(EXT1FIELD.toString()); result.setExt1Field(EXT1FIELD.toString());
if(EXT2FIELD!=null) if(EXT2FIELD!=null)
result.setExt2Field(EXT2FIELD.toString()); result.setExt2Field(EXT2FIELD.toString());
if(param.containsKey(RuleEngineExService.Setting_ResultDataSource)){
result.set(RuleEngineExService.Setting_ResultDataSource, param.get(RuleEngineExService.Setting_ResultDataSource));
}
if(param.containsKey(RuleEngineExService.Setting_ResultTableName)){
result.set(RuleEngineExService.Setting_ResultTableName, param.get(RuleEngineExService.Setting_ResultTableName));
}
if(DIMFIELD == null){ if(DIMFIELD == null){
return; return;
...@@ -385,6 +391,12 @@ public class BaseEntityServiceImpl implements BaseEntityService ...@@ -385,6 +391,12 @@ public class BaseEntityServiceImpl implements BaseEntityService
count++; count++;
modelObj.set("BATCH",msg.getBatch()); modelObj.set("BATCH",msg.getBatch());
if(!StringUtils.isEmpty(msg.getResultDataSource())){
modelObj.set(RuleEngineExService.Setting_ResultDataSource, msg.getResultDataSource());
}
if(!StringUtils.isEmpty(msg.getResultTableName())){
modelObj.set(RuleEngineExService.Setting_ResultTableName, msg.getResultTableName());
}
processRule(modelObj, modelObj.getRowKey(),strResId); processRule(modelObj, modelObj.getRowKey(),strResId);
} }
......
package cn.ibizlab.core.extensions.service; package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.extensions.domain.BaseRequest; import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.lite.extensions.domain.EntityModel; import cn.ibizlab.core.lite.extensions.domain.EntityModel;
import cn.ibizlab.core.lite.extensions.domain.EntityObj; import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.domain.FieldModel; import cn.ibizlab.core.lite.extensions.domain.FieldModel;
import cn.ibizlab.core.lite.extensions.domain.Setting;
import cn.ibizlab.core.lite.extensions.filter.DbEntitySearchContext; import cn.ibizlab.core.lite.extensions.filter.DbEntitySearchContext;
import cn.ibizlab.core.lite.extensions.model.DataModel; import cn.ibizlab.core.lite.extensions.model.DataModel;
import cn.ibizlab.core.lite.extensions.service.DbEntityService; import cn.ibizlab.core.lite.extensions.service.DbEntityService;
import cn.ibizlab.core.lite.extensions.service.LiteModelService; import cn.ibizlab.core.lite.extensions.service.LiteModelService;
import cn.ibizlab.core.lite.service.IMetaModelService; import cn.ibizlab.core.lite.service.IMetaModelService;
import cn.ibizlab.core.rule.domain.ExecLog; import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.domain.RuleItem; import cn.ibizlab.core.rule.domain.RuleItem;
import cn.ibizlab.core.rule.filter.RuleItemSearchContext;
import cn.ibizlab.core.rule.service.IRuleItemService; import cn.ibizlab.core.rule.service.IRuleItemService;
import cn.ibizlab.core.rule.service.impl.RuleEngineServiceImpl; import cn.ibizlab.core.rule.service.impl.RuleEngineServiceImpl;
import cn.ibizlab.util.filter.QueryFilter; import cn.ibizlab.util.filter.QueryFilter;
import cn.ibizlab.util.helper.CachedBeanCopier; import cn.ibizlab.util.helper.CachedBeanCopier;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONArray;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import cn.ibizlab.core.rule.domain.RuleEngine;
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.mybatis.spring.batch.MyBatisCursorItemReader; import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.context.annotation.Primary;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.apache.rocketmq.common.message.Message;
import java.io.File; import java.io.File;
import java.util.*; import java.util.*;
...@@ -59,6 +58,9 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -59,6 +58,9 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
return com.baomidou.mybatisplus.core.toolkit.ReflectionKit.getSuperClassGenericType(this.getClass().getSuperclass(), 1); return com.baomidou.mybatisplus.core.toolkit.ReflectionKit.getSuperClassGenericType(this.getClass().getSuperclass(), 1);
} }
public static final String Setting_ResultDataSource = "resultDataSource";
public static final String Setting_ResultTableName = "resultTableName";
@Autowired @Autowired
private DbEntityService dbEntityService; private DbEntityService dbEntityService;
...@@ -102,6 +104,19 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -102,6 +104,19 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
{ {
CachedBeanCopier.copy(get(et.getEngineId()), et); CachedBeanCopier.copy(get(et.getEngineId()), et);
BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
String resultDataSource = Setting.getValue(et.getExtParams(), Setting_ResultDataSource);
String resultTableName = Setting.getValue(et.getExtParams(), Setting_ResultTableName);
if(!StringUtils.isEmpty(resultDataSource)){
msg.setResultDataSource(resultDataSource);
}
if(!StringUtils.isEmpty(resultTableName)){
msg.setResultTableName(resultTableName);
}
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
if(true){ if(true){
DataModel dataModel= JSON.toJavaObject(JSON.parseObject(metaModelService.get(et.getModelId()).getConfig()), DataModel.class); DataModel dataModel= JSON.toJavaObject(JSON.parseObject(metaModelService.get(et.getModelId()).getConfig()), DataModel.class);
EntityModel entityModel=dataModel.getFactEntityModel(); EntityModel entityModel=dataModel.getFactEntityModel();
...@@ -111,8 +126,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -111,8 +126,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
if(lastModifyField!=null) if(lastModifyField!=null)
filter.ge(lastModifyField.getColumnName(), et.getLastRuntime()); filter.ge(lastModifyField.getColumnName(), et.getLastRuntime());
String sql=entityModel.getSqlSegment("CORE"); String sql=entityModel.getSqlSegment("CORE");
MyBatisCursorItemReader myMyBatisCursorItemReader =new MyBatisCursorItemReader(); MyBatisCursorItemReader myMyBatisCursorItemReader =new MyBatisCursorItemReader();
try{ try{
...@@ -140,17 +153,27 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -140,17 +153,27 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
DynamicDataSourceContextHolder.push(entityModel.getDsName()); DynamicDataSourceContextHolder.push(entityModel.getDsName());
myMyBatisCursorItemReader.open(new ExecutionContext()); // 开启游标 myMyBatisCursorItemReader.open(new ExecutionContext()); // 开启游标
DynamicDataSourceContextHolder.poll(); DynamicDataSourceContextHolder.poll();
List<EntityObj> batch = new ArrayList<>(); List<EntityObj> datas = new ArrayList<>();
EntityObj rowdata; EntityObj rowdata;
while ((rowdata = (EntityObj) myMyBatisCursorItemReader.read()) != null) { while ((rowdata = (EntityObj) myMyBatisCursorItemReader.read()) != null) {
batch.add(rowdata); datas.add(rowdata);
if(batch.size() > 500){ if(datas.size() > 500){
this.sendToMQ(et.getEngineId(), batch); EngineMQMsg engineMQMsg = new EngineMQMsg();
batch.clear(); engineMQMsg.setEngineId(et.getEngineId());
engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime);
engineMQMsg.setDatas(datas);
this.sendToMQ(engineMQMsg);
datas.clear();
} }
} }
if(batch.size() > 0){ if(datas.size() > 0){
this.sendToMQ(et.getEngineId(), batch); EngineMQMsg engineMQMsg = new EngineMQMsg();
engineMQMsg.setEngineId(et.getEngineId());
engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime);
engineMQMsg.setDatas(datas);
this.sendToMQ(engineMQMsg);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -167,10 +190,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -167,10 +190,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
if(!StringUtils.isEmpty(et.getModelId())) if(!StringUtils.isEmpty(et.getModelId()))
{ {
BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
List<String> rules = new ArrayList<>(); List<String> rules = new ArrayList<>();
DataModel dataModel=liteModelService.getDataModel(et.getModelId()); DataModel dataModel=liteModelService.getDataModel(et.getModelId());
...@@ -226,18 +245,26 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -226,18 +245,26 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
return super.run(et); return super.run(et);
} }
protected void sendToMQ(String engineId, List<EntityObj> batch) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { protected void sendToMQ(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(batch); String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, engineId, msg.getBytes()); Message sendMsg = new Message(ruleEngineTopic, engineMQMsg.getEngineId(), msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg); SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("消息发送响应:" + sendResult.toString()); log.info("消息发送响应:" + sendResult.toString());
} }
public void processData(String engineId, List<EntityObj> batch){ public void processData(EngineMQMsg engineMQMsg){
RuleEngine et = get(engineId); RuleEngine et = get(engineMQMsg.getEngineId());
BaseRequest msg=new BaseRequest(); BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr()); msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName()); msg.setModel(et.getModelName());
String resultDataSource = Setting.getValue(et.getExtParams(), Setting_ResultDataSource);
String resultTableName = Setting.getValue(et.getExtParams(), Setting_ResultTableName);
if(!StringUtils.isEmpty(resultDataSource)){
msg.setResultDataSource(resultDataSource);
}
if(!StringUtils.isEmpty(resultTableName)){
msg.setResultTableName(resultTableName);
}
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis()); java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
...@@ -277,14 +304,12 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -277,14 +304,12 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
msg.setRules(rules); msg.setRules(rules);
msg.setDatas(dbEntityService.getModelObjs(et.getModelId(),fillpropertys,batch)); msg.setDatas(dbEntityService.getModelObjs(et.getModelId(),fillpropertys,engineMQMsg.getDatas()));
ExecLog execlog = baseEntityService.processAll(msg); ExecLog execlog = baseEntityService.processAll(msg);
} }
/** /**
* [Check:校验] 行为扩展 * [Check:校验] 行为扩展
......
package cn.ibizlab.core.extensions.util; package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.RuleEngineExService; import cn.ibizlab.core.extensions.service.RuleEngineExService;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -9,20 +9,15 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; ...@@ -9,20 +9,15 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* @author: lockie * MQ订阅消息处理
* @Date: 2020/4/21 11:05
* @Description: 消费者监听
*/ */
@Slf4j @Slf4j
@Component @Component
...@@ -56,18 +51,26 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrentl ...@@ -56,18 +51,26 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrentl
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body); log.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body);
if(ruleEngineTopic.equalsIgnoreCase(topic)){
this.processRuleEngineData(tags, body);
}
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}",e); log.error("获取MQ消息内容异常{}",e);
} }
// TODO 处理业务逻辑
// 处理规则引擎构建消息
if(ruleEngineTopic.equalsIgnoreCase(messageExt.getTopic())){
this.processRuleEngineData(messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
protected void processRuleEngineData(String engineId, String body){
List<EntityObj> batch = JSON.parseObject(body, new TypeReference<ArrayList<EntityObj>>() {}); protected void processRuleEngineData(MessageExt messageExt){
ruleEngineExService.processData(engineId, batch); try {
String body = new String(messageExt.getBody(), "utf-8");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
ruleEngineExService.processData(engineMQMsg);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
} }
} }
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册