提交 c5692772 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

引擎构建处理逻辑相关调整

上级 bc028771
...@@ -11,5 +11,6 @@ public class EngineMQMsg { ...@@ -11,5 +11,6 @@ public class EngineMQMsg {
private String engineId; private String engineId;
private String batch; private String batch;
private Timestamp runTime; private Timestamp runTime;
private Integer count;
private List<EntityObj> datas; private List<EntityObj> datas;
} }
...@@ -11,6 +11,7 @@ import java.util.List; ...@@ -11,6 +11,7 @@ import java.util.List;
public interface DbEntityMapper extends BaseMapper<EntityObj>{ public interface DbEntityMapper extends BaseMapper<EntityObj>{
List<EntityObj> search(@Param("sql") String sql, @Param("ew") Wrapper<EntityObj> wrapper); List<EntityObj> search(@Param("sql") String sql, @Param("ew") Wrapper<EntityObj> wrapper);
Integer searchCount(@Param("sql") String sql, @Param("ew") Wrapper<EntityObj> wrapper);
int replaceBatch(List<ExecResult> var1); int replaceBatch(List<ExecResult> var1);
int clearBatch(List<ExecResult> var1); int clearBatch(List<ExecResult> var1);
......
...@@ -89,6 +89,9 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -89,6 +89,9 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
@Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}") @Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}")
private String ruleEngineTopic; private String ruleEngineTopic;
@Value("${rocketmq.producer.ruleEngineBatchSize: 500}")
private int ruleEngineBatchSize;
@Value("${ibiz.rulepath:/app/file/rules/}") @Value("${ibiz.rulepath:/app/file/rules/}")
private String rulePath; private String rulePath;
/** /**
...@@ -116,25 +119,25 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -116,25 +119,25 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
msg.setResultTableName(resultTableName); msg.setResultTableName(resultTableName);
} }
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis()); java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
if(true){ if(true){
DataModel dataModel= JSON.toJavaObject(JSON.parseObject(metaModelService.get(et.getModelId()).getConfig()), DataModel.class); DataModel dataModel= JSON.toJavaObject(JSON.parseObject(metaModelService.get(et.getModelId()).getConfig()), DataModel.class);
EntityModel entityModel=dataModel.getFactEntityModel(); EntityModel entityModel=dataModel.getFactEntityModel();
FieldModel lastModifyField=entityModel.getLastModifyField(); FieldModel lastModifyField=entityModel.getLastModifyField();
QueryFilter filter=new QueryFilter(); QueryFilter filter=new QueryFilter();
if(lastModifyField!=null) if(lastModifyField!=null)
filter.ge(lastModifyField.getColumnName(), et.getLastRuntime()); filter.ge(lastModifyField.getColumnName(), et.getLastRuntime());
//记录处理的总业务数据量
String sql=entityModel.getSqlSegment("CORE"); String sql=entityModel.getSqlSegment("CORE");
Integer total = dbEntityService.selectCount(entityModel, filter);
if(total != null){
et.setTotal(total);
et.setProcessed(0);
this.update(et);
}
MyBatisCursorItemReader myMyBatisCursorItemReader =new MyBatisCursorItemReader(); MyBatisCursorItemReader myMyBatisCursorItemReader =new MyBatisCursorItemReader();
try{ try{
// List<EntityObj> kEntityObjs=dbEntityService.selectCore(entityModel, filter);
// if(kEntityObjs != null){
// this.sendToMQ(et.getEngineId(), kEntityObjs);
// return et;
// }
myMyBatisCursorItemReader.setSqlSessionFactory(sqlSessionFactory); myMyBatisCursorItemReader.setSqlSessionFactory(sqlSessionFactory);
myMyBatisCursorItemReader.setQueryId("cn.ibizlab.core.extensions.mapper.DbEntityMapper.search"); myMyBatisCursorItemReader.setQueryId("cn.ibizlab.core.extensions.mapper.DbEntityMapper.search");
...@@ -157,11 +160,13 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -157,11 +160,13 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
EntityObj rowdata; EntityObj rowdata;
while ((rowdata = (EntityObj) myMyBatisCursorItemReader.read()) != null) { while ((rowdata = (EntityObj) myMyBatisCursorItemReader.read()) != null) {
datas.add(rowdata); datas.add(rowdata);
if(datas.size() > 500){ total ++;
if(datas.size() >= ruleEngineBatchSize){
EngineMQMsg engineMQMsg = new EngineMQMsg(); EngineMQMsg engineMQMsg = new EngineMQMsg();
engineMQMsg.setEngineId(et.getEngineId()); engineMQMsg.setEngineId(et.getEngineId());
engineMQMsg.setBatch(msg.getBatch()); engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime); engineMQMsg.setRunTime(starttime);
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas); engineMQMsg.setDatas(datas);
this.sendToMQ(engineMQMsg); this.sendToMQ(engineMQMsg);
datas.clear(); datas.clear();
...@@ -172,17 +177,18 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -172,17 +177,18 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
engineMQMsg.setEngineId(et.getEngineId()); engineMQMsg.setEngineId(et.getEngineId());
engineMQMsg.setBatch(msg.getBatch()); engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime); engineMQMsg.setRunTime(starttime);
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas); engineMQMsg.setDatas(datas);
this.sendToMQ(engineMQMsg); this.sendToMQ(engineMQMsg);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
// try { try {
// myMyBatisCursorItemReader.close();// 关闭游标 myMyBatisCursorItemReader.close();// 关闭游标
// } catch (Exception ex) { } catch (Exception ex) {
// log.error(ex.getMessage()); //log.error(ex.getMessage());
// } }
DynamicDataSourceContextHolder.poll(); DynamicDataSourceContextHolder.poll();
} }
...@@ -247,16 +253,18 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -247,16 +253,18 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
protected void sendToMQ(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { protected void sendToMQ(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(engineMQMsg); String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, engineMQMsg.getEngineId(), msg.getBytes()); Message sendMsg = new Message(ruleEngineTopic, "default", msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg); SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("消息发送响应:" + sendResult.toString()); log.info("消息发送响应:" + sendResult.toString());
} }
public void processData(EngineMQMsg engineMQMsg){ public void processData(EngineMQMsg engineMQMsg){
RuleEngine et = get(engineMQMsg.getEngineId()); RuleEngine et = get(engineMQMsg.getEngineId());
try{
BaseRequest msg=new BaseRequest(); BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr()); msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName()); msg.setModel(et.getModelName());
msg.setBatch(engineMQMsg.getBatch());
String resultDataSource = Setting.getValue(et.getExtParams(), Setting_ResultDataSource); String resultDataSource = Setting.getValue(et.getExtParams(), Setting_ResultDataSource);
String resultTableName = Setting.getValue(et.getExtParams(), Setting_ResultTableName); String resultTableName = Setting.getValue(et.getExtParams(), Setting_ResultTableName);
if(!StringUtils.isEmpty(resultDataSource)){ if(!StringUtils.isEmpty(resultDataSource)){
...@@ -267,7 +275,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -267,7 +275,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
} }
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis()); java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
List<String> rules = new ArrayList<>(); List<String> rules = new ArrayList<>();
DataModel dataModel=liteModelService.getDataModel(et.getModelId()); DataModel dataModel=liteModelService.getDataModel(et.getModelId());
HashSet<String> fillpropertys=new HashSet<>(); HashSet<String> fillpropertys=new HashSet<>();
...@@ -301,15 +308,21 @@ public class RuleEngineExService extends RuleEngineServiceImpl { ...@@ -301,15 +308,21 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
} }
}); });
msg.setRules(rules); msg.setRules(rules);
msg.setDatas(dbEntityService.getModelObjs(et.getModelId(),fillpropertys,engineMQMsg.getDatas())); msg.setDatas(dbEntityService.getModelObjs(et.getModelId(),fillpropertys,engineMQMsg.getDatas()));
ExecLog execlog = baseEntityService.processAll(msg); ExecLog execlog = baseEntityService.processAll(msg);
}catch (Exception ex){
log.error("构建数据错误:%1$s", ex.getMessage());
}
String updateSql = "UPDATE `ibzruleengine` SET processed=(IFNULL(processed,0)+#{et.count}),lastruntime=CASE WHEN total = processed THEN #{et.runtime} ELSE lastruntime END WHERE engineid = #{et.engineId}";
HashMap<String, Object> param = new HashMap<>();
param.put("count", engineMQMsg.getCount());
param.put("runtime", engineMQMsg.getRunTime());
param.put("engineId", engineMQMsg.getEngineId());
this.execute(updateSql, param);
} }
/** /**
* [Check:校验] 行为扩展 * [Check:校验] 行为扩展
......
...@@ -213,6 +213,10 @@ public class EntityModel { ...@@ -213,6 +213,10 @@ public class EntityModel {
return "select "+columnSet+" from "+this.getTableName()+" "; return "select "+columnSet+" from "+this.getTableName()+" ";
} }
else if("COUNT".equalsIgnoreCase(dataSet))
{
return "select count(1) from "+this.getTableName()+" ";
}
else else
{ {
if(dataSets!=null) if(dataSets!=null)
......
...@@ -14,6 +14,7 @@ public interface CommonEntityService { ...@@ -14,6 +14,7 @@ public interface CommonEntityService {
List<EntityObj> selectBase(EntityModel entityModel, QueryFilter filter); List<EntityObj> selectBase(EntityModel entityModel, QueryFilter filter);
List<EntityObj> selectCore(EntityModel entityModel, QueryFilter filter); List<EntityObj> selectCore(EntityModel entityModel, QueryFilter filter);
Integer selectCount(EntityModel entityModel, QueryFilter filter);
List<EntityObj> search(String dataSet, EntityModel entityModel, QueryFilter filter); List<EntityObj> search(String dataSet, EntityModel entityModel, QueryFilter filter);
......
...@@ -51,6 +51,35 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl ...@@ -51,6 +51,35 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
return search("CORE",entityModel,filter); return search("CORE",entityModel,filter);
} }
@Override
public Integer selectCount(EntityModel entityModel, QueryFilter filter) {
String sql=entityModel.getSqlSegment("COUNT");
String dsName = entityModel.getDsName();
try
{
dstDataSourceService.initDataSource(dsName);
DynamicDataSourceContextHolder.push(dsName);
DbEntitySearchContext context=new DbEntitySearchContext();
context.setFilter(filter);
QueryWrapper qw=context.getSelectCond();
if(!StringUtils.isEmpty(filter.getCustSqlSegment()))
qw.apply(filter.getCustSqlSegment());
Integer count=baseMapper.searchCount(sql,qw);
return count;
}
catch(Exception ex)
{
log.error("详细错误信息:" + ex.getMessage() + ", 执行sql:" + sql);
return null;
}
finally
{
DynamicDataSourceContextHolder.poll();
}
}
@Override @Override
public List<EntityObj> search(String dataSet, EntityModel entityModel, QueryFilter filter) { public List<EntityObj> search(String dataSet, EntityModel entityModel, QueryFilter filter) {
......
...@@ -24,6 +24,12 @@ public class MongoEntityService implements CommonEntityService{ ...@@ -24,6 +24,12 @@ public class MongoEntityService implements CommonEntityService{
return null; return null;
} }
@Override
public Integer selectCount(EntityModel entityModel, QueryFilter filter) {
return null;
}
@Override @Override
public List<EntityObj> search(String dataSet, EntityModel entityModel, QueryFilter filter) { public List<EntityObj> search(String dataSet, EntityModel entityModel, QueryFilter filter) {
return null; return null;
......
...@@ -55,7 +55,7 @@ public class MQConsumerConfigure { ...@@ -55,7 +55,7 @@ public class MQConsumerConfigure {
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始 * 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费 * 如果不是第一次启动,那么按照上次消费的位置继续消费
*/ */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/** /**
* 设置消费模型,集群还是广播,默认为集群 * 设置消费模型,集群还是广播,默认为集群
*/ */
......
...@@ -9,6 +9,11 @@ ...@@ -9,6 +9,11 @@
<if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if> <if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if>
</select> </select>
<select id="searchCount" parameterType="cn.ibizlab.core.lite.extensions.filter.DbEntitySearchContext" resultType="Integer">
${sql}
<where><if test="ew!=null and ew.sqlSegment!=null and !ew.emptyOfWhere">${ew.sqlSegment}</if></where>
<if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if>
</select>
<insert id="replaceBatch" parameterType="java.util.List" databaseId="mysql"> <insert id="replaceBatch" parameterType="java.util.List" databaseId="mysql">
<foreach collection="list" item="item" index="index" separator=";"> <foreach collection="list" item="item" index="index" separator=";">
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册