提交 e5fcb50c 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

规则及指标构建支持自定义sql条件(扩展参数设置extCond)

上级 0acbf836
......@@ -134,11 +134,12 @@ public class DABuildExService extends DABuildServiceImpl {
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
msg.setEngineId(et.getBuildId());
String extCond = Setting.getValue(et.getExtParams(), "extCond");
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
liteDataService.processDataModel(et.getModelId(), et.getLastRuntime(), new LiteDataCallback<List<EntityObj>>() {
liteDataService.processDataModel(et.getModelId(), extCond, et.getLastRuntime(), new LiteDataCallback<List<EntityObj>>() {
@Override
public void total(Integer total) {
String state = "FINISH";
......@@ -163,6 +164,7 @@ public class DABuildExService extends DABuildServiceImpl {
try {
defaultMQProducerService.sendBuildMsg(engineMQMsg);
} catch (Exception ex) {
log.error(String.format("指标构建发送业务数据到MQ失败,error:%1$s\n data:%2$s",ex.getMessage(),JSON.toJSONString(engineMQMsg)));
return false;
}
return true;
......
......@@ -99,11 +99,12 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
msg.setEngineId(et.getEngineId());
String extCond = Setting.getValue(et.getExtParams(), "extCond");
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
liteDataService.processDataModel(et.getModelId(), et.getLastRuntime(), new LiteDataCallback<List<EntityObj>>() {
liteDataService.processDataModel(et.getModelId(), extCond, et.getLastRuntime(), new LiteDataCallback<List<EntityObj>>() {
@Override
public void total(Integer total) {
String state = "FINISH";
......@@ -128,6 +129,7 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
try {
defaultMQProducerService.sendEngineMsg(engineMQMsg);
} catch (Exception ex) {
log.error(String.format("规则引擎发送业务数据到MQ失败,error:%1$s\n data:%2$s",ex.getMessage(),JSON.toJSONString(engineMQMsg)));
return false;
}
return true;
......
......@@ -27,6 +27,7 @@ public interface CommonEntityService {
void processList(DataModel dataModel, Timestamp lastModify, LiteDataCallback callback);
void processList(DataModel dataModel, String extCond, Timestamp lastModify, LiteDataCallback callback);
}
......@@ -64,9 +64,10 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
DynamicDataSourceContextHolder.push(dsName);
DbEntitySearchContext context = new DbEntitySearchContext();
context.setFilter(filter);
QueryWrapper qw = context.getSelectCond();
if (!StringUtils.isEmpty(filter.getCustSqlSegment()))
qw.apply(filter.getCustSqlSegment());
QueryWrapper<EntityObj> qw = context.getSelectCond();
if (!StringUtils.isEmpty(filter.getCustSqlSegment())) {
qw.and(wrapper -> wrapper.apply(filter.getCustSqlSegment()));
}
Integer count = baseMapper.searchCount(sql, qw);
......@@ -349,13 +350,20 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
@Value("${ibiz.syncBatchSize:500}")
private int processSize = 500;
@Override
public void processList(DataModel dataModel,Timestamp lastModify,LiteDataCallback callback)
public void processList(DataModel dataModel,Timestamp lastModify,LiteDataCallback callback){
this.processList(dataModel, "", lastModify, callback);
}
@Override
public void processList(DataModel dataModel,String extCond,Timestamp lastModify,LiteDataCallback callback)
{
EntityModel entityModel=dataModel.getFactEntityModel();
FieldModel lastModifyField=entityModel.getLastModifyField();
QueryFilter filter=new QueryFilter();
if(lastModifyField!=null)
filter.ge(lastModifyField.getColumnName(), lastModify);
if(StringUtils.hasLength(extCond)){
filter.cust(extCond);
}
//记录处理的总业务数据量
String sql=entityModel.getSqlSegment("CORE");
......@@ -376,9 +384,9 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
DbEntitySearchContext context=new DbEntitySearchContext();
context.setFilter(filter);
QueryWrapper qw=context.getSelectCond();
QueryWrapper<EntityObj> qw=context.getSelectCond();
if(!StringUtils.isEmpty(filter.getCustSqlSegment()))
qw.apply(filter.getCustSqlSegment());
qw.and(wrapper -> wrapper.apply(filter.getCustSqlSegment()));
Map<String, Object> paramsMap = new HashMap<>();
paramsMap.put("sql", sql);
......
......@@ -107,11 +107,11 @@ public class LiteDataService {
getEntityService(dsName).cursorRead(dsName,entityModel,sql,filter,callback);
}
public void processDataModel(String metaModelId,Timestamp lastModify,LiteDataCallback callback)
public void processDataModel(String metaModelId,String extCond, Timestamp lastModify,LiteDataCallback callback)
{
DataModel dataModel= liteModelService.getDataModel(metaModelId);
EntityModel entityModel = dataModel.getFactEntityModel();
getEntityService(entityModel.getDsName()).processList(dataModel,lastModify,callback);
getEntityService(entityModel.getDsName()).processList(dataModel,extCond,lastModify,callback);
}
......
......@@ -61,6 +61,10 @@ public class MongoEntityService implements CommonEntityService{
@Override
public void processList(DataModel dataModel, Timestamp lastModify, LiteDataCallback callback) {
this.processList(dataModel, "", lastModify, callback);
}
@Override
public void processList(DataModel dataModel, String extCond, Timestamp lastModify, LiteDataCallback callback) {
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册