提交 d58ac22f 编写于 作者: zc's avatar zc

update:自定义功能实现

上级 832ab477
......@@ -21,6 +21,8 @@ import cn.ibizlab.util.errors.BadRequestAlertException;
import cn.ibizlab.util.helper.CachedBeanCopier;
import cn.ibizlab.util.helper.Setting;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
......@@ -38,7 +40,6 @@ import org.springframework.util.StringUtils;
import java.io.File;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.*;
......@@ -192,11 +193,107 @@ public class DABuildExService extends DABuildServiceImpl {
return super.run(et);
}
public DABuild filterData(DABuild et) {
if(!StringUtils.isEmpty(et.getBuildId()))
{
CachedBeanCopier.copy(get(et.getBuildId()), et);
String metricIds = Setting.getValue(et.getExtParams(), "metricIds");
String primaryKey = Setting.getValue(et.getExtParams(), "primaryKeys");
if("RUNNING".equalsIgnoreCase(et.getState()))
throw new BadRequestAlertException("构建正在执行中,不能重复执行","DABuild",et.getBuildId());
if (StringUtils.isEmpty(metricIds)) {
throw new BadRequestAlertException("扩展参数指标参数为空","DABuild",et.getBuildId());
}
BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
msg.setEngineId(et.getBuildId());
String extCond = Setting.getValue(et.getExtParams(), "extCond");
if (StringUtils.isEmpty(primaryKey)) {
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
liteDataService.processDataModel(et.getModelId(), extCond, et.getLastRuntime(), new LiteDataCallback<List<EntityObj>>() {
@Override
public void total(Integer total) {
String state = "FINISH";
if(total > 0) {
state = "RUNNING";
msg.setBatch(msg.getBatch()+"-cnt-"+total);
}
et.setTotal(total);
et.setProcessed(0);
et.setState(state);
update(et);
}
@Override
public boolean processData(List<EntityObj> datas) {
EngineMQMsg engineMQMsg = new EngineMQMsg();
engineMQMsg.setEngineId(et.getBuildId());
engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime);
engineMQMsg.setRuleIds(metricIds);
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas);
try {
defaultMQProducerService.sendBuildMsg(engineMQMsg);
} catch (Exception ex) {
log.error(String.format("指标构建发送业务数据到MQ失败,error:%1$s\n data:%2$s",ex.getMessage(),JSON.toJSONString(engineMQMsg)));
return false;
}
return true;
}
});
} else {
String state = "FINISH";
JSONArray jsonArray = JSONArray.parseArray(primaryKey);
int total = jsonArray.size();
if(total > 0) {
state = "RUNNING";
msg.setBatch(msg.getBatch()+"-cnt-"+total);
}
et.setTotal(total);
et.setProcessed(0);
et.setState(state);
update(et);
List<EntityObj> datas = new ArrayList<>();
for (int i = 0; i < jsonArray.size(); i++) {
EntityObj entityObj = new EntityObj();
JSONObject jsonObject = JSONObject.parseObject(jsonArray.get(i).toString());
for (String s : jsonObject.keySet()) {
entityObj.set(s,jsonObject.get(s));
}
datas.add(entityObj);
}
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
EngineMQMsg engineMQMsg = new EngineMQMsg();
engineMQMsg.setEngineId(et.getBuildId());
engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime);
engineMQMsg.setRuleIds(metricIds);
engineMQMsg.setCount(total);
engineMQMsg.setDatas(datas);
try {
defaultMQProducerService.sendBuildMsg(engineMQMsg);
} catch (Exception ex) {
log.error(String.format("指标构建发送业务数据到MQ失败,error:%1$s\n data:%2$s",ex.getMessage(),JSON.toJSONString(engineMQMsg)));
}
}
}
return super.filterData(et);
}
public void processData(EngineMQMsg engineMQMsg){
try{
BaseRequest msg=proxy.getRequest(engineMQMsg.getEngineId(),engineMQMsg.getBatch()).copy(true);
BaseRequest msg = null;
if (StringUtils.isEmpty(engineMQMsg.getRuleIds())){
msg=proxy.getRequest(engineMQMsg.getEngineId(),engineMQMsg.getBatch()).copy(true);
} else {
msg=proxy.getRequest2(engineMQMsg.getEngineId(),engineMQMsg.getBatch(),engineMQMsg.getRuleIds()).copy(true);
}
msg.setId(IdWorker.getIdStr());
ExecLog execlog=new ExecLog();
......@@ -238,6 +335,12 @@ public class DABuildExService extends DABuildServiceImpl {
@Cacheable(value ="dabuild", key = "'batch:'+#p0+#p1")
public BaseRequest getRequest(String id,String batch)
{
return this.getRequest2(id,batch,null);
}
@Cacheable(value ="dabuild", key = "'batch:'+#p0+#p1+#p2")
public BaseRequest getRequest2(String id,String batch,String metricIds)
{
DABuild et = get(id);
BaseRequest msg=new BaseRequest();
......@@ -264,10 +367,19 @@ public class DABuildExService extends DABuildServiceImpl {
HashSet<String> fillpropertys=new HashSet<>();
List<String> metrics = new ArrayList<>();
idaMetricService.list(Wrappers.<DAMetric>lambdaQuery().eq(DAMetric::getBuildId,et.getBuildId()))
.forEach(daMetric -> {
if (StringUtils.isEmpty(metricIds)) {
// idaMetricService.list(Wrappers.<DAMetric>lambdaQuery().eq(DAMetric::getBuildId,et.getBuildId()))
// .forEach(daMetric -> {
// metrics.add(daMetric.getMetricId());
// });
List<DAMetric> list = idaMetricService.list(Wrappers.<DAMetric>lambdaQuery().eq(DAMetric::getBuildId, et.getBuildId()));
for (DAMetric daMetric : list) {
metrics.add(daMetric.getMetricId());
});
}
} else {
metrics = Arrays.asList(metricIds.split(";"));
}
Assert.notEmpty(metrics,"构建数据失败:未配置指标");
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册