提交 a153429a 编写于 作者: lab_xuhui's avatar lab_xuhui

合并分支 'master-xuhui' 到 'master'

update:新增报表构建整合cql和sql模式,规则引擎构建结果取消异步存储,使用本地消息模式存储结果

查看合并请求 !3
......@@ -14,7 +14,9 @@ import org.springframework.util.StringUtils;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* 实体[RUExecResult] 数据对象
......@@ -111,4 +113,30 @@ public class ExecResult implements Serializable {
public String debugInfo() {
return String.format("%1$s,%2$s,%3$s,%4$s,%5$s,%6$s", keyvaluefield, ruleid, retvalue, dimfield, timefield, domainsfield);
}
public static ExecResult fromSqlExecResult(cn.ibizlab.core.rule.domain.ExecResult sqlExecResult){
ExecResult cqlExecResult = new ExecResult();
cqlExecResult.setRuleid(sqlExecResult.getRuleId());
cqlExecResult.setRetvalue(sqlExecResult.getRetValue());
cqlExecResult.setDimfield(sqlExecResult.getDimField());
cqlExecResult.setTimefield(time2LocalDate(sqlExecResult.getTimeField()));
cqlExecResult.setDomainsfield(sqlExecResult.getDomainsField());
cqlExecResult.setKeyvaluefield(sqlExecResult.getKeyValueField());
cqlExecResult.setBusinesscat(sqlExecResult.getBusinessCat());
cqlExecResult.setExt1field(sqlExecResult.getExt1Field());
cqlExecResult.setExt2field(sqlExecResult.getExt2Field());
cqlExecResult.setMetricfield(sqlExecResult.getMetricField().doubleValue());
cqlExecResult.setRuexecresultname(sqlExecResult.getName());
cqlExecResult.setRulename(sqlExecResult.getRuleName());
cqlExecResult.setUpdatedate(nowLocalDate());
return cqlExecResult;
}
public static List<ExecResult> fromSqlExecResultList(List<cn.ibizlab.core.rule.domain.ExecResult> listSqlExecResults){
List<ExecResult> list = new ArrayList<>();
for (cn.ibizlab.core.rule.domain.ExecResult execResult : listSqlExecResults) {
list.add(fromSqlExecResult(execResult));
}
return list;
}
}
......@@ -12,4 +12,10 @@ public class SqlResultsMQMsg {
private String resultTableName;
private List<ExecResult> saveDatas;
private List<ExecResult> deleteDatas;
// cassandra数据库保存数据使用(报表构建)
private String keyValueField;
// cassandra数据库保存数据使用(报表构建)
private List<String> rules;
// 删除规则结果数据id(报表构建)
private List<String> deleteIds;
}
......@@ -17,6 +17,7 @@ public interface ExecResultExMapper extends BaseMapper<EntityObj>{
int replaceBatchByDameng(@Param("list") List<ExecResult> var1 , @Param("resultTableName") String tableName);
int replaceBatchByPG(@Param("list") List<ExecResult> var1 , @Param("resultTableName") String tableName);
int clearBatch(@Param("list")List<ExecResult> var1, @Param("resultTableName") String tableName);
int clearBatchByIds(@Param("list")List<String> var1, @Param("resultTableName") String tableName);
List<ExecResult> sumResult(@Param("resultTableName") String tableName, @Param("ruleids")List<String> ruleids, @Param("dimfields")List<String> dimfields, @Param("domainsfields")List<String> domainsfields, @Param("from")Timestamp from,@Param("to")Timestamp to);
List<ExecResult> avgResult(@Param("resultTableName") String tableName, @Param("ruleids")List<String> ruleids, @Param("dimfields")List<String> dimfields, @Param("domainsfields")List<String> domainsfields, @Param("from")Timestamp from,@Param("to")Timestamp to);
......
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.analysis.domain.DADimension;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.extensions.util.ExpiryMap;
import cn.ibizlab.core.extensions.util.LocalMsgResultProducerService;
import cn.ibizlab.core.lite.extensions.domain.FieldObj;
import cn.ibizlab.core.lite.extensions.domain.ModelObj;
import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.service.IExecLogService;
import cn.ibizlab.util.dict.CodeItem;
import cn.ibizlab.util.dict.CodeList;
import cn.ibizlab.util.helper.CachedBeanCopier;
import cn.ibizlab.util.helper.DataObject;
import lombok.extern.slf4j.Slf4j;
import org.kie.api.runtime.KieSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
/**
* cassandra数据库和mysql等关系数据库
* 报表数据构建结果分析处理
*/
@Slf4j
@Service("AnalyseEntityServiceImpl")
@ConditionalOnExpression("'all'.equals('${ibiz.analyseMode:}')")
public class AnalyseAllServiceImpl extends BaseEntityServiceImpl {
@Autowired
@Lazy
protected IExecLogService ruExecLogService;
@Autowired
@Lazy
private LocalMsgResultProducerService localMsgResultProducerService;
@Autowired
@Lazy
private DABuildExService daBuildExService;
@Value("${cassandra.host:}")
private String cassandraHost;
@Value("${ibiz.resultsBatchSend:true}")
protected boolean resultsBatchSend;
private ExpiryMap<String,List<DADimension>> dimlistMap = new ExpiryMap<>();
@Override
public void saveResult(ModelObj param, String RULEID,String RULECODE, String RULENAME, String RU_EXECRESULTNAME, FieldObj BUSINESSCAT,
Integer RETVALUE, FieldObj KEYVALUEFIELD, FieldObj DOMAINSFIELD,FieldObj DIMFIELD,
FieldObj METRICFIELD, FieldObj TIMEFIELD, FieldObj EXT1FIELD, FieldObj EXT2FIELD)
{
if (!StringUtils.isEmpty(cassandraHost) && 0 == RETVALUE){
// 使用cassandra数据库时且业务数据不满足当前规则时返回,cassandra数据库删除结果数据,依赖业务数据主键和规则集合
return;
}
ExecResult result = new ExecResult();
result.setRuleId(RULEID);
result.setRuleName(RULENAME);
result.setName(RU_EXECRESULTNAME);
result.setRetValue(RETVALUE);
if(DOMAINSFIELD == null){
DOMAINSFIELD = param.getEmpty();
}
if(BUSINESSCAT == null){
BUSINESSCAT = param.getEmpty();
}
if(METRICFIELD == null){
METRICFIELD = param.getEmpty();
}
if(TIMEFIELD == null){
TIMEFIELD = param.getEmpty();
}
if(EXT1FIELD == null){
EXT1FIELD = param.getEmpty();
}
if(EXT2FIELD == null){
EXT2FIELD = param.getEmpty();
}
result.setKeyValueField(param.getRowKey());
result.setBusinessCat(BUSINESSCAT.getValue(String.class,null));
result.setMetricField(METRICFIELD.getDecimal());
Timestamp timefield = TIMEFIELD.getValue(Timestamp.class,null);
if(timefield != null) {
result.setTimeField(timefield);
}
String ext1 = EXT1FIELD.getValues();
if(ext1.length()> 330){
ext1 = ext1.substring(0,330)+"...";
}
if(!StringUtils.isEmpty(ext1)){
result.setExt1Field(ext1);
}
String ext2 = EXT2FIELD.getValues();
if(ext2.length()> 330){
ext2 = ext2.substring(0,330)+"...";
}
if(!StringUtils.isEmpty(ext2)){
result.setExt2Field(ext2);
}
result.setDomainsField(DOMAINSFIELD.getValue(String.class, ""));
result.setSystemId(DataObject.getStringValue(param.get("systemid"),null));
if((!StringUtils.isEmpty(result.getRuleId()))&&(!StringUtils.isEmpty(result.getKeyValueField()))) {
List<DADimension> dims = getDims(result.getBusinessCat());
if(dims!=null) {
for (DADimension dim : dims) {
String codevalue = param.$(dim.getField()).getValue(String.class,"");
if(!StringUtils.isEmpty(codevalue)) {
String dictname = dim.getDict();
Object val = codevalue;
if(!StringUtils.isEmpty(dictname)) {
CodeList codeList = this.getCodeListCatalog(dictname);
//维度指定代码表时,向上同时为每一个父节点添加一条数据
while(true) {
CodeItem code = codeList.findCodeItem(val);
if(code != null) {
ExecResult result2 = new ExecResult();
CachedBeanCopier.copy(result, result2);
result2.setDimField(code.getValue().toString());
result2.setId(result2.getDefaultKey(true).toString());
if(result2.getRetValue() == 1) {
addResults(param,result2);
}else {
deleteResults(param,result2.getId());
}
//非逐层核算或无父代码项退出
if(dim.getRecursive() == 0 || StringUtils.isEmpty(code.getParent())) {
break;
}else{
val = code.getParent();
}
}else {
break;
}
}
}else{
//维度未指定代码表时,只保存一条数据
result.setDimField(codevalue);
ExecResult result2 = new ExecResult();
CachedBeanCopier.copy(result, result2);
result2.setId(result2.getDefaultKey(true).toString());
if(result2.getRetValue() == 1) {
addResults(param,result2);
}else {
deleteResults(param,result2.getId());
}
}
}
}
}
}
}
/**
* 添加到新增结果队列
* @param param
* @param result
*/
protected void addResults(ModelObj param, ExecResult result){
List<ExecResult> results;
if(param.containsKey(Tag_SaveResults)){
results = (List<ExecResult>)param.get(Tag_SaveResults);
}else{
results = new ArrayList<>();
param.set(Tag_SaveResults, results);
}
results.add(result);
}
/**
* 添加到删除结果队列
* @param param
* @param ru_execresultid
*/
protected void deleteResults(ModelObj param, String ru_execresultid){
List<String> results;
if(param.containsKey(Tag_DeleteResults)){
results = (List<String>)param.get(Tag_DeleteResults);
}else{
results = new ArrayList<>();
param.set(Tag_DeleteResults, results);
}
results.add(ru_execresultid);
}
/**
* 获取维度集合
* @param id
* @return
*/
public List<DADimension> getDims(String id) {
List<DADimension> dims = dimlistMap.get("dims" + id);
if(dims == null){
dims = daBuildExService.get(id).getDadimension();
if(dims != null) {
dimlistMap.put("dims" + id, dims);
}
}
return dims;
}
@Override
public ExecLog processAll(BaseRequest msg) {
ExecLog execlog=new ExecLog();
execlog.setId(msg.getId());
execlog.setRunBody(msg.toString());
StringBuilder strErrorInfo=new StringBuilder();
int count = 0;
int errorcount = 0;
String strResId=initRule(msg.getRules(),msg.getBatch());
if (StringUtils.isEmpty(strResId)) {
strErrorInfo.append("准备rule文件错误");
strErrorInfo.append("\r\n");
execlog.setRetCode(2);
execlog.setCnt(count);
execlog.setSucc(errorcount);
execlog.setRunResult(strErrorInfo.toString());
ruExecLogService.update(execlog);
return execlog;
}
List<SqlResultsMQMsg> sqlResultsMQMsgList = new ArrayList<>();
try {
for (ModelObj modelObj : msg.getDatas()){
try{
count++;
modelObj.set("BATCH",msg.getBatch());
modelObj.set(Tag_EngineId, msg.getEngineId());
modelObj.set("systemid",msg.getSystemid());
if(!StringUtils.isEmpty(msg.getResultDataSource())){
modelObj.set("resultDataSource", msg.getResultDataSource());
}
if(!StringUtils.isEmpty(msg.getResultTableName())){
modelObj.set("resultTableName", msg.getResultTableName());
}
processRule(modelObj, modelObj.getRowKey(),strResId);
//构建结果消息对象
SqlResultsMQMsg resultsMQMsg = new SqlResultsMQMsg();
resultsMQMsg.setMsgId(msg.getId());
resultsMQMsg.setKeyValueField(modelObj.getRowKey());
resultsMQMsg.setRules(msg.getRuleIds());
Object saveResults = modelObj.get(Tag_SaveResults);
if(saveResults != null && saveResults instanceof List){
resultsMQMsg.setSaveDatas((List<ExecResult>)saveResults);
}
if (StringUtils.isEmpty(cassandraHost)){
resultsMQMsg.setResultDataSource(msg.getResultDataSource());
resultsMQMsg.setResultTableName(msg.getResultTableName());
// 不存储至cassandra时,删除主键集合添加(cassandra删除数据不依靠规则结果数据主键)
Object deleteResults = modelObj.get(Tag_DeleteResults);
if(deleteResults != null && deleteResults instanceof List){
resultsMQMsg.setDeleteIds((List<String>)deleteResults);
}
}
if (resultsBatchSend){
sqlResultsMQMsgList.add(resultsMQMsg);
}else {
localMsgResultProducerService.sendBuildResultsMsg(resultsMQMsg);
}
} catch (Exception e) {
errorcount++;
log.error("Service错误,加载数据详细信息:" + modelObj.getRowKey() + "," + e.getMessage());
if (strErrorInfo.toString().length() <= 5000) {
strErrorInfo.append("加载数据详细信息:" + modelObj.getRowKey() + ",错误:" + e.getMessage());
strErrorInfo.append("\r\n");
if (e.getCause() != null && e.getCause().getMessage() != null) {
strErrorInfo.append(e.getCause().getMessage());
strErrorInfo.append("\r\n");
}
}
}
}
if (resultsBatchSend && sqlResultsMQMsgList.size() > 0){
localMsgResultProducerService.sendBuildResultsMsgList(sqlResultsMQMsgList);
}
}catch(Exception e) {
log.error("Service读取BaseRequest错误:" + e.toString());
strErrorInfo.append("读取BaseRequest错误:" + e.getMessage());
strErrorInfo.append("\r\n");
if (e.getCause() != null && e.getCause().getMessage() != null) {
strErrorInfo.append(e.getCause().getMessage());
strErrorInfo.append("\r\n");
}
execlog.setRetCode(1);
execlog.setCnt(count);
if (errorcount == 0) {
execlog.setSucc(0);
}else {
execlog.setSucc(count - errorcount);
}
execlog.setRunResult(strErrorInfo.toString());
ruExecLogService.update(execlog);
return execlog;
}finally{
sqlResultsMQMsgList.clear();
if(setkieSession.containsKey(strResId)){
KieSession kieSession = setkieSession.get(strResId);
try{
kieSession.dispose();
} catch (Exception ex){}
setkieSession.remove(strResId);
setRuleCount.remove(strResId);
}
}
log.info(new StringBuilder().append("Service成功,process successfully:").append(msg.toString()).toString());
strErrorInfo.append("Service成功,process successfully\r\n");
execlog.setCnt(count);
execlog.setSucc(count-errorcount);
if(execlog.getCnt() == 0) {
execlog.setRetCode(3);
}else{
execlog.setRetCode(0);
}
execlog.setRunResult(strErrorInfo.toString());
this.ruExecLogService.update(execlog);
return execlog;
}
}
......@@ -30,7 +30,7 @@ import java.util.List;
@Slf4j
@Service("AnalyseEntityServiceImpl")
@ConditionalOnExpression("!''.equals('${cassandra.host:}')")
@ConditionalOnExpression("!''.equals('${cassandra.host:}') && 'cql'.equals('${ibiz.analyseMode:cql}')")
public class AnalyseEntityServiceImpl extends BaseEntityServiceImpl {
@Autowired
......
......@@ -34,7 +34,7 @@ import java.util.List;
@Slf4j
@Service("AnalyseEntityServiceImpl")
@ConditionalOnExpression("''.equals('${cassandra.host:}')")
@ConditionalOnExpression("''.equals('${cassandra.host:}') && 'sql'.equals('${ibiz.analyseMode:sql}')")
public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
@Autowired
......
......@@ -2,7 +2,9 @@ package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.dict.extensions.service.DictDstService;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.extensions.util.ExpiryMap;
import cn.ibizlab.core.extensions.util.LocalMsgResultProducerService;
import cn.ibizlab.core.lite.extensions.domain.FieldObj;
import cn.ibizlab.core.lite.extensions.domain.ModelObj;
import cn.ibizlab.core.rule.domain.ExecLog;
......@@ -63,6 +65,13 @@ public class BaseEntityServiceImpl implements BaseEntityService
@Value("${defaultResultDataSource:default}")
protected String Default_ResultDataSource;
@Autowired
@Lazy
private LocalMsgResultProducerService localMsgResultProducerService;
@Value("${ibiz.resultsBatchSend:true}")
protected boolean resultsBatchSend;
@Value("${defaultResultTableName:IBZRULERESULT}")
protected String Default_ResultTableName;
@Value("${ibiz.expirationDays:7}")
......@@ -77,11 +86,6 @@ public class BaseEntityServiceImpl implements BaseEntityService
result.setName(RU_EXECRESULTNAME);
result.setRetValue(RETVALUE);
List<ExecResult> saveResults = null;
if(param.containsKey(Tag_SaveResults)){
saveResults = (List<ExecResult>) param.get(Tag_SaveResults);
}
if(KEYVALUEFIELD==null)
KEYVALUEFIELD=param.getEmpty();
if(DIMFIELD==null)
......@@ -150,10 +154,6 @@ public class BaseEntityServiceImpl implements BaseEntityService
if(param.get("resultTableName")!=null){
resultTableName = param.get("resultTableName");
}
boolean bSyncRun = false;
if(param.containsKey("BATCH") && param.getStringValue("BATCH","").startsWith(Tag_Batch_SyncRun)){
bSyncRun = true;
}
if(!StringUtils.isEmpty(param.get("resultTopic"))){
Object resultTopic = param.get("resultTopic");
if (!StringUtils.isEmpty(resultTopic)){
......@@ -238,14 +238,7 @@ public class BaseEntityServiceImpl implements BaseEntityService
result2.setId(result2.getDefaultKey(true).toString());
result2.setRuleId(RULECODE);
if(saveResults != null) {
saveResults.add(result2);
}
if (bSyncRun){
ruExecResultService.save(result2);
}else {
ruExecResultService.saveAsync(result2);
}
this.addResults(param,Tag_SaveResults,result2);
}
else
{
......@@ -253,17 +246,30 @@ public class BaseEntityServiceImpl implements BaseEntityService
result2.setBusinessCat(BUSINESSCAT.getItemValue(i,String.class,null));
result2.setId(result2.getDefaultKey(true).toString());
result2.setRuleId(RULECODE);
if (bSyncRun){
ruExecResultService.remove(result2.getId());
}else {
ruExecResultService.deleteAsync(result2);
}
this.addResults(param,Tag_DeleteResults,result2);
}
}
}
/**
* 添加到新增或删除结果队列
* @param param
* @param tag
* @param result
*/
protected void addResults(ModelObj param, String tag, ExecResult result){
List<ExecResult> results;
if(param.containsKey(tag)){
results = (List<ExecResult>)param.get(tag);
}else{
results = new ArrayList<>();
param.set(tag, results);
}
results.add(result);
}
@Autowired
private IExecResultService ruExecResultService;
......@@ -505,6 +511,8 @@ public class BaseEntityServiceImpl implements BaseEntityService
return execlog;
}
List<ExecResult> saveDatas = new ArrayList<>();
List<ExecResult> deleteDatas = new ArrayList<>();
try
{
......@@ -528,11 +536,37 @@ public class BaseEntityServiceImpl implements BaseEntityService
modelObj.set("resultTopic", msg.getResultTopic());
}
//设置符合规则条件的数据集合
if(msg.getBatch().startsWith(Tag_Batch_SyncRun)){
modelObj.set(Tag_SaveResults, msg.getResults());
}
processRule(modelObj, modelObj.getRowKey(),strResId);
List<ExecResult> saveExecResultList = (List<ExecResult>) modelObj.get(Tag_SaveResults);
List<ExecResult> deleteExecResultList = (List<ExecResult>) modelObj.get(Tag_DeleteResults);
// 设置符合规则条件的数据集合
if(msg.getBatch().startsWith(Tag_Batch_SyncRun) && saveExecResultList != null && saveExecResultList.size() > 0){
msg.getResults().addAll(saveExecResultList);
}
if (resultsBatchSend){
if (saveExecResultList != null && saveExecResultList.size()>0){
saveDatas.addAll(saveExecResultList);
}
if (deleteExecResultList != null && deleteExecResultList.size()>0){
deleteDatas.addAll(deleteExecResultList);
}
}else {
SqlResultsMQMsg sqlResultsMQMsg = new SqlResultsMQMsg();
sqlResultsMQMsg.setMsgId(msg.getId());
if (saveExecResultList != null && saveExecResultList.size()>0){
sqlResultsMQMsg.setSaveDatas(saveExecResultList);
}
if (deleteExecResultList != null && deleteExecResultList.size()>0){
sqlResultsMQMsg.setDeleteDatas(deleteExecResultList);
}
if(!StringUtils.isEmpty(msg.getResultDataSource())){
sqlResultsMQMsg.setResultDataSource(msg.getResultDataSource());
}
if(!StringUtils.isEmpty(msg.getResultTableName())){
sqlResultsMQMsg.setResultTableName(msg.getResultTableName());
}
localMsgResultProducerService.sendEngineResultsMsg(sqlResultsMQMsg);
}
}
catch (Exception e)
{
......@@ -553,6 +587,19 @@ public class BaseEntityServiceImpl implements BaseEntityService
}
}
if (resultsBatchSend && (saveDatas.size() > 0 || deleteDatas.size() > 0)){
SqlResultsMQMsg sqlResultsMQMsg = new SqlResultsMQMsg();
sqlResultsMQMsg.setMsgId(msg.getId());
if(!StringUtils.isEmpty(msg.getResultDataSource())){
sqlResultsMQMsg.setResultDataSource(msg.getResultDataSource());
}
if(!StringUtils.isEmpty(msg.getResultTableName())){
sqlResultsMQMsg.setResultTableName(msg.getResultTableName());
}
sqlResultsMQMsg.setSaveDatas(saveDatas);
sqlResultsMQMsg.setDeleteDatas(deleteDatas);
localMsgResultProducerService.sendEngineResultsMsg(sqlResultsMQMsg);
}
......@@ -584,6 +631,8 @@ public class BaseEntityServiceImpl implements BaseEntityService
}
finally
{
saveDatas.clear();
deleteDatas.clear();
if(setkieSession.containsKey(strResId))
{
KieSession kieSession=setkieSession.get(strResId);
......
......@@ -283,6 +283,24 @@ public class ExecResultExService extends ExecResultServiceImpl {
return result;
}
public int clearResultBatchByIds(List<String> execResultIds, String dsName ,String tableName) {
int result = 0;
try {
if(!Default_ResultDataSource.equalsIgnoreCase(dsName)){
dstDataSourceService.initDataSource(dsName);
DynamicDataSourceContextHolder.push(dsName);
}
result = execResultExMapper.clearBatchByIds(execResultIds, tableName);
} catch (Exception ex) {
log.error("存储构建结果发生异常,详细错误信息:" + ex.getMessage());
} finally {
if(!Default_ResultDataSource.equalsIgnoreCase(dsName)) {
DynamicDataSourceContextHolder.poll();
}
}
return result;
}
public List<ExecResult> sumResult(String dsName ,String tableName, List<String> ruleids, List<String> dimfields, List<String> domainsfields, Timestamp from, Timestamp to)
{
......
......@@ -12,6 +12,8 @@ import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.service.IExecLogService;
import cn.ibizlab.util.helper.DataObject;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -19,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
......@@ -54,6 +57,10 @@ public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
private int saveResultBatchSize;
@Value("${ibiz.deleteResultBatchBatchSize:20000}")
private int deleteResultBatchSize;
@Value("${ibiz.resultsBatchSend:true}")
protected boolean resultsBatchSend;
@Value("${cassandra.host:}")
private String cassandraHost;
@Override
public void onEvent(LocalMsgEvent localMsgEvent, long sequence, boolean endOfBatch) throws Exception {
......@@ -74,36 +81,61 @@ public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
}
daBuildExService.processData(engineMQMsg);
}else if("SqlResult".equalsIgnoreCase(tags)){
// 报表sql模式,结果存储至非cassandra数据库
SqlResultsMQMsg resultsMQMsg = localMsgEvent.getMessage(SqlResultsMQMsg.class);
localMsgEvent.setBody(null);
if (log.isDebugEnabled()) {
log.debug("SqlResultMsgProcess Event Handler: {}", resultsMQMsg.getMsgId());
}
String resultDataSource = DataObject.getStringValue(resultsMQMsg.getResultDataSource(),Default_ResultDataSource);
String resultTableName = DataObject.getStringValue(resultsMQMsg.getResultTableName(),Default_ResultTableName);
List<ExecResult> saveDatas = resultsMQMsg.getSaveDatas();
if(saveDatas != null && saveDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize);
splist.forEach(array ->{
execResultExService.saveResultBatch(array, resultDataSource, resultTableName);
});
}
List<ExecResult> deleteDatas = resultsMQMsg.getDeleteDatas();
if(deleteDatas != null && deleteDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize);
splist.forEach(array ->{
execResultExService.clearResultBatch(array, resultDataSource, resultTableName);
});
}
saveSqlData(resultsMQMsg);
//临时处理批次处理结果为10,表示数据插入完成
ExecLog execlog=new ExecLog();
execlog.setId(resultsMQMsg.getMsgId());
execlog.setRetCode(10);
ruExecLogService.updateById(execlog);
}else if ("EngineResult".equalsIgnoreCase(tags)){
// 规则结果存储至非cassandra数据库
SqlResultsMQMsg sqlResultsMQMsg = localMsgEvent.getMessage(SqlResultsMQMsg.class);
localMsgEvent.setBody(null);
if (log.isDebugEnabled()) {
log.debug("EngineResultMsgProcess Event Handler: {}", sqlResultsMQMsg.getMsgId());
}
saveSqlData(sqlResultsMQMsg);
if (resultsBatchSend){
//临时处理批次处理结果为10,表示数据插入完成
ExecLog execlog = new ExecLog();
execlog.setId(sqlResultsMQMsg.getMsgId());
execlog.setRetCode(10);
ruExecLogService.updateById(execlog);
}
}else if ("BuildResult".equalsIgnoreCase(tags)){
// 报表all构建模式(cql和sql二合一),构建结果处理
SqlResultsMQMsg sqlResultsMQMsg = localMsgEvent.getMessage(SqlResultsMQMsg.class);
localMsgEvent.setBody(null);
if (log.isDebugEnabled()) {
log.debug("BuildResultsMsgProcess Event Handler: {}", sqlResultsMQMsg.getMsgId());
}
List<SqlResultsMQMsg> lists = new ArrayList<>();
lists.add(sqlResultsMQMsg);
saveAllData(lists);
}else if ("BuildResultList".equalsIgnoreCase(tags)){
// 报表all构建模式(cql和sql二合一),构建结果批量处理
String boby = localMsgEvent.getBody();
localMsgEvent.setBody(null);
List<SqlResultsMQMsg> lists = JSONObject.parseObject(boby, new TypeReference<List<SqlResultsMQMsg>>() {});
if (log.isDebugEnabled()) {
log.debug("BuildResultsMsgListProcess Event Handler: {}", lists.get(0).getMsgId());
}
saveAllData(lists);
//临时处理批次处理结果为10,表示数据插入完成
ExecLog execlog = new ExecLog();
execlog.setId(lists.get(0).getMsgId());
execlog.setRetCode(10);
ruExecLogService.updateById(execlog);
}
}
else if (localMsgEvent.getType()==2) {
// 报表结果存储至cassandra数据库
ResultsMQMsg resultsMQMsg = localMsgEvent.getMessage(ResultsMQMsg.class);
localMsgEvent.setBody(null);
......@@ -115,4 +147,66 @@ public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
}
/**
* 报表构建sql模式、规则引擎构建,构建结果保存至非cassandra数据库
* @param sqlResultsMQMsg
*/
private void saveSqlData(SqlResultsMQMsg sqlResultsMQMsg){
String resultDataSource = DataObject.getStringValue(sqlResultsMQMsg.getResultDataSource(),Default_ResultDataSource);
String resultTableName = DataObject.getStringValue(sqlResultsMQMsg.getResultTableName(),Default_ResultTableName);
List<ExecResult> saveDatas = sqlResultsMQMsg.getSaveDatas();
if(saveDatas != null && saveDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize);
splist.forEach(array ->{
execResultExService.saveResultBatch(array, resultDataSource, resultTableName);
});
}
List<ExecResult> deleteDatas = sqlResultsMQMsg.getDeleteDatas();
if(deleteDatas != null && deleteDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize);
splist.forEach(array ->{
execResultExService.clearResultBatch(array, resultDataSource, resultTableName);
});
}
}
/**
* 报表all构建模式(cql和sql二合一),构建结果存储
* @param lists
*/
private void saveAllData(List<SqlResultsMQMsg> lists){
if (!StringUtils.isEmpty(cassandraHost)){
// 使用cassandra数据库
ResultsMQMsg resultsMQMsg = null;
for (SqlResultsMQMsg sqlResultsMQMsg : lists) {
if(!StringUtils.isEmpty(sqlResultsMQMsg.getKeyValueField())) {
resultsMQMsg = new ResultsMQMsg();
resultsMQMsg.setDatas(cn.ibizlab.core.extensions.cql.ExecResult.fromSqlExecResultList(sqlResultsMQMsg.getSaveDatas()));
resultsMQMsg.setRules(sqlResultsMQMsg.getRules());
resultsMQMsg.setKeyValueField(sqlResultsMQMsg.getKeyValueField());
execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
}
}else {
String resultDataSource = DataObject.getStringValue(lists.get(0).getResultDataSource(),Default_ResultDataSource);
String resultTableName = DataObject.getStringValue(lists.get(0).getResultTableName(),Default_ResultTableName);
for (SqlResultsMQMsg sqlResultsMQMsg : lists) {
List<ExecResult> saveDatas = sqlResultsMQMsg.getSaveDatas();
if(saveDatas != null && saveDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize);
splist.forEach(array ->{
execResultExService.saveResultBatch(array, resultDataSource, resultTableName);
});
}
List<String> deleteDatas = sqlResultsMQMsg.getDeleteIds();
if(deleteDatas != null && deleteDatas.size() > 0){
List<List<String>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize);
splist.forEach(array ->{
execResultExService.clearResultBatchByIds(array, resultDataSource, resultTableName);
});
}
}
}
}
}
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import com.alibaba.fastjson.JSON;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 构建结果(报表构建、规则构建)强制使用本地消息队列模式
*/
@Slf4j
@Service
public class LocalMsgResultProducerService {
@Autowired
protected Disruptor<LocalMsgEvent> resultDisruptor;
/**
* 规则构建结果发送
* @param sqlResultsMQMsg
* @throws Exception
*/
public void sendEngineResultsMsg(SqlResultsMQMsg sqlResultsMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
String msg = JSON.toJSONString(sqlResultsMQMsg);
localMsgEvent.setBody(msg);
localMsgEvent.setTags("EngineResult");
if (log.isDebugEnabled()) {
log.debug("EngineResultsMsg Event Send: {}", sqlResultsMQMsg.getMsgId());
}
resultDisruptor.publishEvent((event, sequence, bind) -> {event.setBody(bind.getBody());event.setTags(localMsgEvent.getTags());}, localMsgEvent);
}
/**
* 报表构建结果单条发送
* @param sqlResultsMQMsg
* @throws Exception
*/
public void sendBuildResultsMsg(SqlResultsMQMsg sqlResultsMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
String msg = JSON.toJSONString(sqlResultsMQMsg);
localMsgEvent.setBody(msg);
localMsgEvent.setTags("BuildResult");
if (log.isDebugEnabled()) {
log.debug("BuildResultsMsg Event Send: {}", sqlResultsMQMsg.getMsgId());
}
resultDisruptor.publishEvent((event, sequence, bind) -> {event.setBody(bind.getBody());event.setTags(localMsgEvent.getTags());}, localMsgEvent);
}
/**
* 报表构建结果批量发送
* @param sqlResultsMQMsgList
* @throws Exception
*/
public void sendBuildResultsMsgList(List<SqlResultsMQMsg> sqlResultsMQMsgList) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
String msg = JSON.toJSONString(sqlResultsMQMsgList);
localMsgEvent.setBody(msg);
localMsgEvent.setTags("BuildResultList");
if (log.isDebugEnabled()) {
log.debug("BuildResultsMsgList Event Send: {}", sqlResultsMQMsgList.get(0).getMsgId());
}
resultDisruptor.publishEvent((event, sequence, bind) -> {event.setBody(bind.getBody());event.setTags(localMsgEvent.getTags());}, localMsgEvent);
}
}
......@@ -8,6 +8,7 @@ import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
......@@ -19,7 +20,7 @@ import java.util.concurrent.ThreadFactory;
@Configuration
@ConditionalOnExpression("(!${rocketmq.producer.enabled:false})&&(!${rocketmq.consumer.enabled:false})")
//@ConditionalOnExpression("(!${rocketmq.producer.enabled:false})&&(!${rocketmq.consumer.enabled:false})")
public class LocalMsgConfiguration {
@Bean
......@@ -46,12 +47,14 @@ public class LocalMsgConfiguration {
return new LocalMsgEventHandler();
}
@Value("${ibiz.disruptorSize:1024}")
private int disruptorSize;
@Bean
@ConditionalOnClass({Disruptor.class})
public Disruptor<LocalMsgEvent> disruptor(WaitStrategy waitStrategy, ThreadFactory threadFactory,
LocalMsgEventHandler localMsgEventHandler) {
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), 1024 * 1024,
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), disruptorSize,
Executors.newCachedThreadPool(), ProducerType.MULTI, waitStrategy);
disruptor.handleEventsWith(localMsgEventHandler);
......@@ -89,7 +92,7 @@ public class LocalMsgConfiguration {
@ConditionalOnClass({Disruptor.class})
public Disruptor<LocalMsgEvent> resultDisruptor(WaitStrategy waitStrategy, ThreadFactory threadFactory,
LocalMsgEventHandler resultLocalMsgEventHandler) {
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), 1024 * 1024,
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), disruptorSize,
Executors.newCachedThreadPool(), ProducerType.MULTI, waitStrategy);
disruptor.handleEventsWith(resultLocalMsgEventHandler);
......
......@@ -112,6 +112,15 @@
</delete>
<delete id="clearBatchByIds" parameterType="java.util.List" >
delete from ${resultTableName} where (ru_execresultid) in
<foreach collection="list" item="item" index="index" open="(" close=")" separator=",">
(#{item})
</foreach>
</delete>
<select id="sumResult" resultType="cn.ibizlab.core.rule.domain.ExecResult" >
select ruleid, 1 as retvalue,dimfield,sum(metricfield) as metricfield from ${resultTableName} where ruleid in
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册