提交 9a560a19 编写于 作者: xuhui961310148's avatar xuhui961310148

update:引擎规则结果添加发送mq队列功能,老版相关报表数据查询等功能调整

上级 c1bce0fc
......@@ -121,6 +121,7 @@ public class DevBootSecurityConfig extends WebSecurityConfigurerAdapter {
.antMatchers("/ibizutil/**").permitAll()
.antMatchers("/dictionaries/**").permitAll()
.antMatchers("/dictionarys/**").permitAll()
.antMatchers("/ruleengines/{ruleengine_id}/syncrun").permitAll()
.antMatchers("/"+previewpath+"/**").permitAll();
......
......@@ -138,6 +138,28 @@ public class ExecResultRepository {
return mapper.map(result).all();
}
/**
* 增加组织部门的搜索的条件
* @param ruleids
* @param retValue
* @param dims
* @param start
* @param end
* @param ext_1field 组织部门
* @return
*/
public List<ExecResult> sum(Collection<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end, String ext_1field) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
final ResultSet result = session.execute(select().column("ruleid").column("retvalue").column("dimfield").sum("metricfield").as("metricfield").
from(TABLE).
where(in("ruleid", ruleids)).
and(eq("retvalue", retValue)).
and(in("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).and(eq("ext_1field", ext_1field)).groupBy("ruleid", "retvalue", "dimfield").allowFiltering().limit(5000).setReadTimeoutMillis(200000));
return mapper.map(result).all();
}
public List<ExecResult> avg(Collection<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
......@@ -150,6 +172,28 @@ public class ExecResultRepository {
return mapper.map(result).all();
}
/**
* 增加组织部门的搜索的条件
* @param ruleids
* @param retValue
* @param dims
* @param start
* @param end
* @param ext_1field 组织部门
* @return
*/
public List<ExecResult> avg(Collection<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end, String ext_1field) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
final ResultSet result = session.execute(select().column("ruleid").column("retvalue").column("dimfield").avg("metricfield").as("metricfield").
from(TABLE).
where(in("ruleid", ruleids)).
and(eq("retvalue", retValue)).
and(in("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).and(eq("ext_1field",ext_1field)).groupBy("ruleid", "retvalue", "dimfield").allowFiltering().limit(5000).setReadTimeoutMillis(200000));
return mapper.map(result).all();
}
public List<ExecResult> group(Collection<String> ruleids, Integer retValue, List<String> dims, String type, Timestamp start, Timestamp end) {
if (type.equalsIgnoreCase("avg"))
return avg(ruleids, retValue, dims, start, end);
......@@ -177,6 +221,22 @@ public class ExecResultRepository {
return getPageData(ruleid, pagingState, RESULTS_PER_PAGE, retValue, dims, start, end);
}
/**
* 根据规则ID、单位和时间查询相应的规则结果数据
* @param ruleid
* @param pagingState
* @param retValue
* @param dims
* @param start
* @param end
* @param ext_1field
* @return
*/
public ResultSet getPageData(String ruleid, PagingState pagingState, Integer retValue, String dims, Timestamp start, Timestamp end, String ext_1field) {
final int RESULTS_PER_PAGE = 1000;
return getPageData(ruleid, pagingState, RESULTS_PER_PAGE, retValue, dims, start, end, ext_1field);
}
/**
* 根据规则ID、单位和时间查询相应的规则结果数据
*
......@@ -204,6 +264,34 @@ public class ExecResultRepository {
return result;
}
/**
* 根据规则ID、单位和时间查询相应的规则结果数据
* @param ruleid
* @param pagingState
* @param pageSize
* @param retValue
* @param dims
* @param start
* @param end
* @return
*/
public ResultSet getPageData(String ruleid, PagingState pagingState, int pageSize, Integer retValue, String dims, Timestamp start, Timestamp end, String ext_1field) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
Statement statement = select().column("keyvaluefield").
from(TABLE).
where(eq("ruleid", ruleid)).
and(eq("retvalue", retValue)).
and(eq("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).and(eq("ext_1field",ext_1field)).allowFiltering().setReadTimeoutMillis(200000);
statement.setFetchSize(pageSize);
if (pagingState != null) {
statement.setPagingState(pagingState);
}
final ResultSet result = session.execute(statement);
return result;
}
/**
* 计算绩效数据的信访件数
*
......
......@@ -53,6 +53,7 @@ public class BaseRequest
private String engineId;
private String resultDataSource;
private String resultTableName;
private String resultTopic;
private String batch;
private List<String> rules;
private List<String> ruleIds;
......
......@@ -12,5 +12,6 @@ public class EngineMQMsg {
private String batch;
private Timestamp runTime;
private Integer count;
private String ruleIds;
private List<EntityObj> datas;
}
......@@ -24,4 +24,5 @@ public class FetchMetricDatasParam {
private String keyvaluefield;
private String businesscat;
private long pagingCurrent;
private String ext_1field;
}
......@@ -52,6 +52,7 @@ public class BaseEntityServiceImpl implements BaseEntityService
{
public static final String Tag_EngineId = "engineId";
public static final String Tag_Batch_SyncRun = "SYNCRUN";
public BaseEntityServiceImpl()
{
......@@ -139,6 +140,16 @@ public class BaseEntityServiceImpl implements BaseEntityService
if(param.get("resultTableName")!=null){
resultTableName = param.get("resultTableName");
}
boolean bSyncRun = false;
if(param.containsKey("BATCH") && param.getStringValue("BATCH","").startsWith(Tag_Batch_SyncRun)){
bSyncRun = true;
}
if(!StringUtils.isEmpty(param.get("resultTopic"))){
Object resultTopic = param.get("resultTopic");
if (!StringUtils.isEmpty(resultTopic)){
result.set("resultTopic", resultTopic);
}
}
result.set(Tag_EngineId,param.get(Tag_EngineId));
result.set("resultDataSource", resultDataSource);
result.set("resultTableName", resultTableName);
......@@ -218,15 +229,23 @@ public class BaseEntityServiceImpl implements BaseEntityService
result2.setId(result2.getDefaultKey(true).toString());
result2.setRuleId(RULECODE);
if (bSyncRun){
ruExecResultService.save(result2);
}else {
ruExecResultService.saveAsync(result2);
}
}
else
{
result2.setId(result2.getDefaultKey(true).toString());
if (bSyncRun){
ruExecResultService.remove(result2.getId());
}else {
ruExecResultService.deleteAsync(result2);
}
}
}
}
......@@ -490,6 +509,9 @@ public class BaseEntityServiceImpl implements BaseEntityService
if(!StringUtils.isEmpty(msg.getResultTableName())){
modelObj.set("resultTableName", msg.getResultTableName());
}
if(!StringUtils.isEmpty(msg.getResultTopic())){
modelObj.set("resultTopic", msg.getResultTopic());
}
processRule(modelObj, modelObj.getRowKey(),strResId);
}
......
......@@ -81,7 +81,7 @@ public class DACoreService {
String strDIMCodeList = param.getDimcodelist();
Timestamp timeStartTime = param.getStarttime();
Timestamp timeEndTime = param.getEndtime();
execResults = this.getResultIds(ruleidList, strDIMCodeList, strDimval, isIncludeChild, timeStartTime, timeEndTime);
execResults = this.getResultIds(ruleidList, strDIMCodeList, strDimval, isIncludeChild, timeStartTime, timeEndTime,"");
} catch (Exception ex) {
log.error("获取查询报表错误, 错误原因:"+ex.getMessage());
}
......@@ -111,7 +111,7 @@ public class DACoreService {
String reportname = daReport != null ? daReport.getReportName() : "";
String strTableDisplayType = param.getTableDisplayType();
//获取表格数据
JSONObject jsonObject = this.getERPortData(daReport, param.getDimval(), param.getDimcodelist(), param.getStarttime(), param.getEndtime(), strTableDisplayType);
JSONObject jsonObject = this.getERPortData(daReport, param.getDimval(), param.getDimcodelist(), param.getStarttime(), param.getEndtime(), strTableDisplayType,param.getExt_1field());
resultMap.put("vmcfg",vmcfg);
resultMap.put("reportname",reportname);
resultMap.put("resportdata",jsonObject);
......@@ -140,10 +140,11 @@ public class DACoreService {
String strDimval = param.getDimval();
Timestamp timeStart = param.getStarttime() == null ? this.getDefaultStartTimestamp() : param.getStarttime();
Timestamp timeEnd = param.getEndtime() == null ? this.getDefaultEndTimestamp() : param.getEndtime();
String ext_1field = param.getExt_1field();
if (StringUtils.isEmpty(strRuleid)) {
return resultMap;
}
resultMap = this.getKeyvaluefields(lPagingCurrent, strRuleid, strDimval, timeStart, timeEnd);
resultMap = this.getKeyvaluefields(lPagingCurrent, strRuleid, strDimval, timeStart, timeEnd, ext_1field);
} catch (Exception ex) {
log.error("获取查询报表错误, 错误原因:"+ex.getMessage());
}
......@@ -156,7 +157,7 @@ public class DACoreService {
* @param strDST_CBID
* @return
*/
public JSONObject getERPortData(DAReport report, String strLoadDimval, String strDict, Timestamp timeLoadStartTime, Timestamp timeLoadEndTime, String strTableDisplayType) {
public JSONObject getERPortData(DAReport report, String strLoadDimval, String strDict, Timestamp timeLoadStartTime, Timestamp timeLoadEndTime, String strTableDisplayType, String ext_1field) {
JSONObject jsonObject = new JSONObject();
if (report != null) {
DAReport daReport = report;
......@@ -170,7 +171,7 @@ public class DACoreService {
Timestamp startTime = timeLoadStartTime == null ? this.getDefaultStartTimestamp() : timeLoadStartTime;
Timestamp endTime = timeLoadEndTime == null ? this.getDefaultEndTimestamp() : timeLoadEndTime;
JSONArray jsonArray = new JSONArray();
List<HashMap<String, Object>> execResults = this.getResultIds(ruleidList, strDict, strDimval, startTime, endTime);
List<HashMap<String, Object>> execResults = this.getResultIds(ruleidList, strDict, strDimval, startTime, endTime, ext_1field);
jsonArray.add(execResults);
if (!StringUtils.isEmpty(strTableDisplayType) && "LIST_BOX".equals(strTableDisplayType)) {
JSONArray rowJson = new JSONArray();
......@@ -193,20 +194,21 @@ public class DACoreService {
* @param dimval
* @param start
* @param end
* @param ext_1field
* @return
*/
// @Cacheable( value="reportinspec",keyGenerator="keyGenerator")
@Cacheable( value="reportinspec",key="'reportinspec:' + #p0 + '||' + #p1 + '||' + #p2 + '||' + #p3 + '||' + #p4 + '||' + #p5")
public Map<String, String> getKeyvaluefields(long lPagingCurrent, String ruleid, String dimval, Timestamp start, Timestamp end)
public Map<String, String> getKeyvaluefields(long lPagingCurrent, String ruleid, String dimval, Timestamp start, Timestamp end, String ext_1field)
{
Map<String, String> result = null;
PagingState pagingState = null;
for (long i=1L;i<=lPagingCurrent;i++) {
if (lPagingCurrent == i) {
result = this.page(pagingState, ruleid, 1, dimval, start, end, true);
result = this.page(pagingState, ruleid, 1, dimval, start, end, true, ext_1field);
break;
} else {
result = this.page(pagingState, ruleid, 1, dimval, start, end, false);
result = this.page(pagingState, ruleid, 1, dimval, start, end, false, ext_1field);
}
String strPagingState = result.get("pagingState");
......@@ -219,21 +221,21 @@ public class DACoreService {
// @Cacheable( value="reportshow",keyGenerator="keyGenerator")
@Cacheable( value="reportshow",key="'reportshow:' + #p0 + '||' + #p1 + '||' + #p2 + '||' + #p3 + '||' + #p4 + '||' + #p5")
public List<HashMap<String,Object>> getResultIds(List<String> metricList, String dimcodelist,String dimval, Timestamp start, Timestamp end)
public List<HashMap<String,Object>> getResultIds(List<String> metricList, String dimcodelist,String dimval, Timestamp start, Timestamp end, String ext_1field)
{
return this.getResultIds(metricList, dimcodelist, dimval, true, start, end);
return this.getResultIds(metricList, dimcodelist, dimval, true, start, end, ext_1field);
}
// @Cacheable( value="reportshow", keyGenerator="keyGenerator")
@Cacheable( value="reportshow",key="'reportshow:' + #p0 + '||' + #p1 + '||' + #p2 + '||' + #p3 + '||' + #p4 + '||' + #p5 + '||' + #p6")
public List<HashMap<String,Object>> getResultIds(List<String> metricList, String dimcodelist, String dimval, boolean includeChild, Timestamp start, Timestamp end)
public List<HashMap<String,Object>> getResultIds(List<String> metricList, String dimcodelist, String dimval, boolean includeChild, Timestamp start, Timestamp end, String ext_1field)
{
QueryWrapper<DAMetric> sf=new QueryWrapper<>();
sf.in("da_metricid", metricList);
return this.getResult(this.daMetricExService.list(sf), dimcodelist, dimval, includeChild, start, end);
return this.getResult(this.daMetricExService.list(sf), dimcodelist, dimval, includeChild, start, end, ext_1field);
}
public List<HashMap<String,Object>> getResult(List<DAMetric> metricList, String dimcodelist,String dimval,boolean includeChild, Timestamp start, Timestamp end)
public List<HashMap<String,Object>> getResult(List<DAMetric> metricList, String dimcodelist, String dimval, boolean includeChild, Timestamp start, Timestamp end, String ext_1field)
{
List<String> sum=new ArrayList<String>();
......@@ -332,12 +334,24 @@ public class DACoreService {
if(sum.size()>0)
{
List<ExecResult> listsum = execResultRepository.sum(sum, 1, dims, start, end);
// List<ExecResult> listsum = execResultRepository.sum(sum, 1, dims, start, end);
List<ExecResult> listsum = new ArrayList<>();
if (!StringUtils.isEmpty(ext_1field)) {
listsum = execResultRepository.sum(sum, 1, dims, start, end, ext_1field);
} else {
listsum = execResultRepository.sum(sum, 1, dims, start, end);
}
list.addAll(listsum);
}
if(avg.size()>0)
{
List<ExecResult> listavg = execResultRepository.avg(avg, 1, dims, start, end);
// List<ExecResult> listavg = execResultRepository.avg(avg, 1, dims, start, end);
List<ExecResult> listavg = new ArrayList<>();
if (!StringUtils.isEmpty(ext_1field)) {
listavg = execResultRepository.avg(avg, 1, dims, start, end, ext_1field);
} else {
listavg = execResultRepository.avg(avg, 1, dims, start, end);
}
list.addAll(listavg);
}
......@@ -369,12 +383,24 @@ public class DACoreService {
List<ExecResult> list_yoy=new ArrayList<>();
if(sum_yoy.size()>0)
{
List<ExecResult> listsum=execResultRepository.sum(sum_yoy, 1, dims, getLastYear(start), getLastYear(end));
// List<ExecResult> listsum=execResultRepository.sum(sum_yoy, 1, dims, getLastYear(start), getLastYear(end));
List<ExecResult> listsum = new ArrayList<>();
if (!StringUtils.isEmpty(ext_1field)) {
listsum = execResultRepository.sum(sum_yoy, 1, dims, getLastYear(start), getLastYear(end), ext_1field);
} else {
listsum = execResultRepository.sum(sum_yoy, 1, dims, getLastYear(start), getLastYear(end));
}
list_yoy.addAll(listsum);
}
if(avg_yoy.size()>0)
{
List<ExecResult> listavg=execResultRepository.avg(avg_yoy, 1, dims, getLastYear(start), getLastYear(end));
// List<ExecResult> listavg=execResultRepository.avg(avg_yoy, 1, dims, getLastYear(start), getLastYear(end));
List<ExecResult> listavg = new ArrayList<>();
if (!StringUtils.isEmpty(ext_1field)) {
listavg = execResultRepository.avg(avg_yoy, 1, dims, getLastYear(start), getLastYear(end), ext_1field);
} else {
listavg = execResultRepository.avg(avg_yoy, 1, dims, getLastYear(start), getLastYear(end));
}
list_yoy.addAll(listavg);
}
......@@ -449,12 +475,24 @@ public class DACoreService {
List<ExecResult> list_mom=new ArrayList<>();
if(sum_mom.size()>0)
{
List<ExecResult> listsum=execResultRepository.sum(sum_mom, 1, dims, getLastMonth(start), getLastMonth(end));
// List<ExecResult> listsum=execResultRepository.sum(sum_mom, 1, dims, getLastMonth(start), getLastMonth(end));
List<ExecResult> listsum = new ArrayList<>();
if (!StringUtils.isEmpty(ext_1field)) {
listsum = execResultRepository.sum(sum_mom, 1, dims, getLastMonth(start), getLastMonth(end), ext_1field);
} else {
listsum = execResultRepository.sum(sum_mom, 1, dims, getLastMonth(start), getLastMonth(end));
}
list_mom.addAll(listsum);
}
if(avg_mom.size()>0)
{
List<ExecResult> listavg=execResultRepository.avg(avg_mom, 1, dims, getLastMonth(start), getLastMonth(end));
// List<ExecResult> listavg=execResultRepository.avg(avg_mom, 1, dims, getLastMonth(start), getLastMonth(end));
List<ExecResult> listavg = new ArrayList<>();
if (!StringUtils.isEmpty(ext_1field)) {
listavg = execResultRepository.avg(avg_mom, 1, dims, getLastMonth(start), getLastMonth(end), ext_1field);
} else {
listavg = execResultRepository.avg(avg_mom, 1, dims, getLastMonth(start), getLastMonth(end));
}
list_mom.addAll(listavg);
}
......@@ -562,7 +600,8 @@ public class DACoreService {
* 清除和绩效考核有关的缓存
*/
@Caching(evict={@CacheEvict(value = "getPerformanceData", allEntries = true),
@CacheEvict(value = "sumPerformancePiece", allEntries = true)})
@CacheEvict(value = "sumPerformancePiece", allEntries = true),
@CacheEvict(value = "getPerformanceReportData", allEntries = true)})
public void resetPerformance()
{
......@@ -570,7 +609,9 @@ public class DACoreService {
@Caching(evict={@CacheEvict(value = "onFetchMetricDatas", allEntries = true),
@CacheEvict(value = "onFetchReportDatas", allEntries = true),
@CacheEvict(value = "onFetchKeyValueField", allEntries = true)})
@CacheEvict(value = "onFetchKeyValueField", allEntries = true),
@CacheEvict(value = "reportshow", allEntries = true),
@CacheEvict(value = "reportinspec", allEntries = true)})
public void resetsnap()
{
......@@ -579,7 +620,10 @@ public class DACoreService {
/**
* 清除获取指标的缓存
*/
@CacheEvict(value = "onFetchMetricDatas", allEntries = true)
// @CacheEvict(value = "onFetchMetricDatas", allEntries = true)
@Caching(evict={@CacheEvict(value = "onFetchMetricDatas", allEntries = true),
@CacheEvict(value = "reportshow", allEntries = true),
@CacheEvict(value = "reportinspec", allEntries = true)})
public void resetsnapSingle()
{
......@@ -589,7 +633,9 @@ public class DACoreService {
* 清除报表的缓存
*/
@Caching(evict={@CacheEvict(value = "onFetchReportDatas", allEntries = true)})
@Caching(evict={@CacheEvict(value = "onFetchReportDatas", allEntries = true),
@CacheEvict(value = "reportshow", allEntries = true),
@CacheEvict(value = "reportinspec", allEntries = true)})
public void resetsnapReport()
{
......@@ -1078,12 +1124,17 @@ public class DACoreService {
return resultJson;
}
public Map<String, String> page(PagingState pagingState, String ruleid, Integer retValue, String dims, Timestamp start, Timestamp end, boolean bResolve) {
public Map<String, String> page(PagingState pagingState, String ruleid, Integer retValue, String dims, Timestamp start, Timestamp end, boolean bResolve, String ext_1field) {
Map<String, String> result = new HashMap<String, String>();
List<String> keyvaluefield = new ArrayList<String>();
// 查询数据
ResultSet rs = execResultRepository.getPageData(ruleid, pagingState, retValue, dims, start, end);
ResultSet rs = null;
if (!StringUtils.isEmpty(ext_1field)){
rs = execResultRepository.getPageData(ruleid, pagingState, retValue, dims, start, end, ext_1field);
}else {
rs = execResultRepository.getPageData(ruleid, pagingState, retValue, dims, start, end);
}
// 分页信息处理
PagingState resultpagingState = rs.getExecutionInfo().getPagingState();
String strPagingState = "";
......
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.extensions.mapper.ExecResultExMapper;
import cn.ibizlab.core.extensions.util.MsgProducerService;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.service.IRuleEngineService;
import cn.ibizlab.core.rule.service.impl.ExecResultServiceImpl;
......@@ -44,6 +45,8 @@ public class ExecResultExService extends ExecResultServiceImpl {
private int saveBatchSize;
@Value("${ibiz.execResult.deleteBatchSize:500}")
private int deleteBatchSize;
@Autowired
private MsgProducerService defaultMQProducerService;
@Autowired
@Lazy
......@@ -230,6 +233,11 @@ public class ExecResultExService extends ExecResultServiceImpl {
}
}
}
Object resultTopic = execResults.get(0).get("resultTopic");
if (!StringUtils.isEmpty(resultTopic)){
// 发送mq消息
defaultMQProducerService.sendRuleResultsMsg(resultTopic.toString(), "save",execResults);
}
result = execResultExMapper.replaceBatch(execResults, tableName);
} catch (Exception ex) {
log.error("存储规则结果发生异常,详细错误信息:" + ex.getMessage());
......@@ -248,6 +256,11 @@ public class ExecResultExService extends ExecResultServiceImpl {
dstDataSourceService.initDataSource(dsName);
DynamicDataSourceContextHolder.push(dsName);
}
Object resultTopic = execResults.get(0).get("resultTopic");
if (!StringUtils.isEmpty(resultTopic)){
// 发送mq消息
defaultMQProducerService.sendRuleResultsMsg(resultTopic.toString(),"delete",execResults);
}
result = execResultExMapper.clearBatch(execResults, tableName);
} catch (Exception ex) {
log.error("存储规则结果发生异常,详细错误信息:" + ex.getMessage());
......
......@@ -4,6 +4,7 @@ import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.MsgProducerService;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.util.helper.Setting;
import cn.ibizlab.core.lite.extensions.model.DataModel;
import cn.ibizlab.core.lite.extensions.service.LiteDataCallback;
......@@ -18,6 +19,8 @@ import cn.ibizlab.core.rule.service.impl.RuleEngineServiceImpl;
import cn.ibizlab.util.errors.BadRequestAlertException;
import cn.ibizlab.util.helper.CachedBeanCopier;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
......@@ -32,6 +35,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.*;
/**
......@@ -72,6 +76,10 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
@Autowired
@Lazy
private LiteDataService liteDataService;
@Autowired
private ExecResultExService execResultExService;
/**
* [Run:运行] 行为扩展
* @param et
......@@ -129,7 +137,59 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
return super.run(et);
}
@Override
public RuleEngine syncRun(RuleEngine et) {
log.info("引擎同步校验入参:" + JSON.toJSONString(et));
if(!StringUtils.isEmpty(et.getEngineId())) {
if (StringUtils.isEmpty(et.get("datakeys"))){
return super.syncRun(et);
}
String dataKeys = et.get("datakeys").toString();
boolean flag = false;
String ruleIds = null;
if (!StringUtils.isEmpty(et.get("ruleids"))){
flag = true;
ruleIds = et.get("ruleids").toString();
}
CachedBeanCopier.copy(get(et.getEngineId()), et);
String batch = BaseEntityServiceImpl.Tag_Batch_SyncRun + "_" + new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())+"["+et.getModelName()+"]";
List<EntityObj> lists = JSON.parseArray(dataKeys,EntityObj.class);
BaseRequest msg = new BaseRequest();
if (!StringUtils.isEmpty(ruleIds)){
msg = proxy.getRequest2(et.getEngineId(),batch,ruleIds).copy(true);
}else {
msg = proxy.getRequest(et.getEngineId(),batch).copy(true);
}
msg.setId(IdWorker.getIdStr());
ExecLog execlog=new ExecLog();
execlog.setId(msg.getId());
execlog.setName(msg.getBatch());
execlog.setKeyValueField(msg.getModel());
execlog.setSystemId(msg.getSystemid());
ruExecLogService.create(execlog);
msg.setDatas(liteDataService.getModelObjs(msg.getModelId(),msg.getFillpropertys(),lists));
baseEntityService.processAll(msg);
if (flag){
ArrayList<String> ruleIdsList = new ArrayList<>();
String[] array = ruleIds.split(";|,");
for (int i = 0; i < array.length; i++) {
ruleIdsList.add(array[i]);
}
ArrayList<String> dataKeysList = new ArrayList<>();
for (EntityObj entityObj : lists) {
for (Map.Entry<String, Object> entries : entityObj.entrySet()){
dataKeysList.add((String) entries.getValue());
}
}
QueryWrapper<ExecResult> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("systemid",et.getSystemId()).in("ruleid",ruleIdsList).in("keyvaluefield",dataKeysList);
List<ExecResult> execResults = execResultExService.list(queryWrapper);
et.set("result",execResults);
return et;
}
}
return super.syncRun(et);
}
@Autowired
protected IExecLogService ruExecLogService;
......@@ -178,7 +238,12 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
RuleEngineExService proxy;
@Cacheable(value ="ruleengine", key = "'batch:'+#p0+#p1")
public BaseRequest getRequest(String id,String batch)
public BaseRequest getRequest(String id,String batch){
return getRequest2(id,batch,null);
}
@Cacheable(value ="ruleengine", key = "'batch:'+#p0+#p1+#p2")
public BaseRequest getRequest2(String id,String batch,String ruleIds)
{
RuleEngine et = get(id);
BaseRequest msg=new BaseRequest();
......@@ -189,20 +254,31 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
msg.setEngineId(id);
String resultDataSource = Setting.getValue(et.getExtParams(), "resultDataSource");
String resultTableName = Setting.getValue(et.getExtParams(), "resultTableName");
String resultTopic = Setting.getValue(et.getExtParams(), "resultTopic");
if(StringUtils.isEmpty(resultDataSource)){
resultDataSource=Default_ResultDataSource;
}
if(StringUtils.isEmpty(resultTableName)){
resultTableName=Default_ResultTableName;
}
if(!StringUtils.isEmpty(resultTopic)){
msg.setResultTopic(resultTopic);
}
msg.setResultDataSource(resultDataSource);
msg.setResultTableName(resultTableName);
List<String> rules = new ArrayList<>();
DataModel dataModel=liteModelService.getDataModel(et.getModelId());
HashSet<String> fillpropertys=new HashSet<>();
ruleItemService.list(Wrappers.<RuleItem>lambdaQuery()
.eq(RuleItem::getModelId,et.getModelId()).ne(RuleItem::getGroup,"REP").like(RuleItem::getGroup,et.getGroup()))
.forEach(ruleItem -> {
Wrapper<RuleItem> wrappers = Wrappers.<RuleItem>lambdaQuery().eq(RuleItem::getModelId,et.getModelId()).ne(RuleItem::getGroup,"REP").like(RuleItem::getGroup,et.getGroup());
if (StringUtils.hasLength(ruleIds)){
Collection<String> collection = new ArrayList<>();
String array[] = ruleIds.split(";|,");
for (int i = 0; i < array.length; i++) {
collection.add(array[i]);
}
wrappers = Wrappers.<RuleItem>lambdaQuery().in(RuleItem::getRuleId,collection);
}
ruleItemService.list(wrappers).forEach(ruleItem -> {
String path=rulePath + et.getGroup() + File.separator + ruleItem.getRuleId() + ".drl";
File file=new File(path);
if(!file.exists())
......
......@@ -2,6 +2,7 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.rule.domain.ExecResult;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
......@@ -16,6 +17,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* MQ默认发送消息服务
*/
......@@ -60,4 +63,26 @@ public class DefaultMQProducerService implements MsgProducerService{
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("构建消息发送响应:" + sendResult.toString());
}
@Override
public void sendRuleResultsMsg(String topic, String tags, List<ExecResult> listExecResultMsg){
try {
String msg = JSON.toJSONString(listExecResultMsg);
Message sendMsg = new Message(topic, tags, msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("规则结果MQ主题:topic: {}, tags: {},消息发送响应 {}:", topic,tags,sendResult.toString());
} catch (MQClientException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常MQClientException: {}:", topic,tags,e);
} catch (RemotingException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常RemotingException: {}:", topic,tags,e);
} catch (MQBrokerException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常MQBrokerException: {}:", topic,tags,e);
} catch (InterruptedException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常InterruptedException: {}:", topic,tags,e);
}
}
}
......@@ -3,6 +3,7 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.rule.domain.ExecResult;
import com.alibaba.fastjson.JSON;
import com.baomidou.jobs.model.JobsInfo;
import com.lmax.disruptor.dsl.Disruptor;
......@@ -11,6 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@ConditionalOnExpression("(!${rocketmq.producer.enabled:false})&&(!${rocketmq.consumer.enabled:false})")
......@@ -49,4 +52,9 @@ public class LocalMsgProducerService implements MsgProducerService{
}
disruptor.publishEvent((event, sequence, bind) -> event.setBody(bind.getBody()), localMsgEvent);
}
@Override
public void sendRuleResultsMsg(String topic, String tags,List<ExecResult> listExecResultMsg) throws Exception {
log.error("RuleResultsMQMsg 不支持非MQ模式发送规则结果至消息队列!数据集合:{} 主题:topic: {}, tags:{}",JSON.toJSONString(listExecResultMsg),topic,tags);
}
}
......@@ -2,9 +2,20 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.rule.domain.ExecResult;
import java.util.List;
public interface MsgProducerService {
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws Exception;
public void sendBuildMsg(EngineMQMsg engineMQMsg) throws Exception;
public void sendResultsMsg(ResultsMQMsg resultsMQMsg) throws Exception;
/**
* 规则结果发送至MQ消息队列
* @param topic MQ消息主题
* @param tags 主题标识
* @param listExecResultMsg 消息内容
* @throws Exception
*/
public void sendRuleResultsMsg(String topic, String tags, List<ExecResult> listExecResultMsg) throws Exception;
}
......@@ -126,6 +126,7 @@ public class apiSecurityConfig extends WebSecurityConfigurerAdapter {
.antMatchers("/ibizutil/**").permitAll()
.antMatchers("/dictionaries/**").permitAll()
.antMatchers("/dictionarys/**").permitAll()
.antMatchers("/ruleengines/{ruleengine_id}/syncrun").permitAll()
.antMatchers("/lite/**").permitAll();
for (String excludePattern : excludesPattern) {
......
......@@ -5,6 +5,8 @@ import cn.ibizlab.core.analysis.service.impl.DAReportServiceImpl;
import cn.ibizlab.core.extensions.domain.FetchMetricDatasParam;
import cn.ibizlab.core.extensions.service.DACoreService;
import cn.ibizlab.util.errors.BadRequestAlertException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -14,6 +16,8 @@ import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -68,7 +72,7 @@ public class DACoreResource {
* @param
* @return
*/
@RequestMapping(method = RequestMethod.GET, value="dst/analyse/reportdata/resetperformancedata/{resettype}")
@RequestMapping(method = RequestMethod.GET, value="/dst/analyse/reportdata/resetperformancedata/{resettype}")
public ResponseEntity<Map<String, Object>> resetPerformanceData(@PathVariable(name="resettype",required = false) String resettype) {
Map<String, Object> resultMap = new HashMap<>();
if (StringUtils.isEmpty(resettype)) {
......@@ -86,7 +90,7 @@ public class DACoreResource {
daCoreService.resetPerformance();
daCoreService.resetsnap();
break;
case "reloadReportData":// 消除绩效报表的缓存
case "RELOADREPORTDATA":// 消除绩效报表的缓存
daCoreService.resetsnapReport();
break;
default:
......@@ -124,4 +128,35 @@ public class DACoreResource {
return resultMap;
}
/**
* 刷新报表缓存
* @param param
* @return
*/
@PostMapping(value="/dst/analyse/reportdata/reloadreportcache")
public Map<String, Object> reloadReportCache(@Validated @RequestBody FetchMetricDatasParam param) {
Map<String, Object> resultMap = new HashMap<>();
if (param == null) {
throw new BadRequestAlertException("刷新报表缓存错误,错误原因:请求参数为空", "", "");
}
String id = param.getReportid();
if (StringUtils.isEmpty(id)) {
return new HashMap<String,Object>();
}
DAReport daReport = daReportService.getById(id);
if (daReport == null) {
throw new BadRequestAlertException("报表不存在", "", "");
}
String strDimval = StringUtils.isEmpty(param.getDimval()) ? "%" : param.getDimval();
String strDict = StringUtils.isEmpty(param.getDimcodelist()) ? daCoreService.getDict(daReport) : param.getDimcodelist();
Timestamp startTime = param.getStarttime() == null ? daCoreService.getDefaultStartTimestamp() : param.getStarttime();
Timestamp endTime = param.getEndtime() == null ? daCoreService.getDefaultEndTimestamp() : param.getEndtime();
daCoreService.resetsnapReport();
param.setDimcodelist(strDict);
param.setDimval(strDimval);
param.setStarttime(startTime);
param.setEndtime(endTime);
this.fetchReportDatas(param);
return resultMap;
}
}
......@@ -67,7 +67,7 @@ public class DAGridCoreResource {
strDict = daReport.getDict();
}
//获取表格数据
JSONObject jsonObject = daCoreService.getERPortData(daReport, strLoadDimval, strDict, strLoadStartTime, strLoadEndTime, "default");
JSONObject jsonObject = daCoreService.getERPortData(daReport, strLoadDimval, strDict, strLoadStartTime, strLoadEndTime, "default", "");
responseJson.put("vmcfg",vmcfg);
responseJson.put("reportname",reportname);
responseJson.put("resportdata",jsonObject);
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册