提交 ba29dd00 编写于 作者: zhouweidong's avatar zhouweidong

规则执行结果动态存储

上级 2b214386
......@@ -2,13 +2,20 @@ package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.lite.extensions.service.DbEntityService;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.service.IRuleEngineService;
import cn.ibizlab.core.rule.service.impl.ExecResultServiceImpl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.*;
/**
......@@ -20,7 +27,11 @@ import java.util.*;
public class ExecResultExService extends ExecResultServiceImpl {
@Autowired
private DbEntityService dbEntityService;
public DbEntityService dbEntityService;
@Autowired
@Lazy
public IRuleEngineService engineService;
private Map<String, List<ExecResult>> resultsSaveAsync = new HashMap<>();
private Object objResultSaveAsyncLock = new Object();
......@@ -59,22 +70,23 @@ public class ExecResultExService extends ExecResultServiceImpl {
synchronized (objResultSaveAsyncLock) {
if (resultsSaveAsync.size() > 0) {
for (Map.Entry<String, List<ExecResult>> entry : resultsSaveAsync.entrySet()) {
String dsName = entry.getKey();
String engineId = entry.getKey();
String dsName = getDSNameByEngineId(engineId);
List<ExecResult> execResults = entry.getValue();
List<ExecResult> tempExecResults = new ArrayList<>();
if (execResults.size() > 0) {
for (ExecResult execResult : execResults) {
tempExecResults.add(execResult);
if (tempExecResults.size() >= 500) {
dbEntityService.saveResultBatch(tempExecResults,dsName);
dbEntityService.saveResultBatch(tempExecResults, dsName);
tempExecResults.clear();
}
}
if (tempExecResults.size() > 0) {
dbEntityService.saveResultBatch(tempExecResults,dsName);
dbEntityService.saveResultBatch(tempExecResults, dsName);
tempExecResults.clear();
}
resultsSaveAsync.remove(dsName);
resultsSaveAsync.remove(engineId);
}
}
}
......@@ -112,21 +124,21 @@ public class ExecResultExService extends ExecResultServiceImpl {
synchronized (objResultDeleteAsyncLock) {
if (resultsDeleteAsync.size() > 0) {
for (Map.Entry<String, List<ExecResult>> entry : resultsDeleteAsync.entrySet()) {
String dsName = entry.getKey();
String engineId = entry.getKey();
String dsName = getDSNameByEngineId(engineId);
List<ExecResult> execResults = entry.getValue();
List<ExecResult> tempExecResults = new ArrayList<>();
if (execResults.size() > 0) {
for (ExecResult execResult : execResults) {
tempExecResults.add(execResult);
if (tempExecResults.size() >= 500) {
dbEntityService.clearResultBatch(tempExecResults,dsName);
tempExecResults.clear();
dbEntityService.clearResultBatch(tempExecResults, dsName);
}
}
if (tempExecResults.size() > 0) {
dbEntityService.clearResultBatch(tempExecResults,dsName);
dbEntityService.clearResultBatch(tempExecResults, dsName);
}
resultsDeleteAsync.remove(dsName);
resultsDeleteAsync.remove(engineId);
}
}
}
......@@ -140,28 +152,51 @@ public class ExecResultExService extends ExecResultServiceImpl {
return et;
}
/**
* 通过引擎获取数据源信息
*
* @param engineId
* @return
*/
private String getDSNameByEngineId(String engineId) {
String dsName = "";
RuleEngine engine = engineService.get(engineId);
if (!ObjectUtils.isEmpty(engine.getExtParams())) {
JSONArray array = JSONArray.parseArray(engine.getExtParams());
for (int a = 0; a < array.size(); a++) {
JSONObject engineParam = array.getJSONObject(a);
String property = engineParam.getString("property");
String value = engineParam.getString("value");
if (!StringUtils.isEmpty(property) && RuleEngineExService.Setting_ResultDataSource.equalsIgnoreCase(property) && !StringUtils.isEmpty(value)) {
return value;
}
}
}
return dsName;
}
/**
* 将规则执行结果添加到待保存集合中
*
* @param execResultMap
* @param execResult
*/
private void addExecResult(Map<String, List<ExecResult>> execResultMap, ExecResult execResult) {
Object dsName = execResult.get(RuleEngineExService.Setting_ResultDataSource);
Object tabName = execResult.get(RuleEngineExService.Setting_ResultTableName);
if (ObjectUtils.isEmpty(tabName)) {
execResult.set(RuleEngineExService.Setting_ResultTableName, RuleEngineExService.Default_ResultTableName);
}
if (ObjectUtils.isEmpty(dsName)) {
dsName = RuleEngineExService.Default_ResultDataSource;
}
String dataSource = String.valueOf(dsName);
if (execResultMap.containsKey(dataSource)) {
execResultMap.get(dataSource).add(execResult);
Object engineId = execResult.get(BaseEntityServiceImpl.Tag_EngineId);
if (ObjectUtils.isEmpty(engineId)) {
log.error("存储规则结果失败,无法数据[{}]的引擎标识!", execResult.getId());
return;
}
String strEngine = String.valueOf(engineId);
if (execResultMap.containsKey(engineId)) {
execResultMap.get(engineId).add(execResult);
} else {
execResultMap.put(dataSource, new ArrayList() {{
execResultMap.put(strEngine, new ArrayList() {{
add(execResult);
}});
}
}
}
......@@ -59,8 +59,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
public static final String Setting_ResultDataSource = "resultDataSource";
public static final String Setting_ResultTableName = "resultTableName";
public static final String Default_ResultDataSource = "default";
public static final String Default_ResultTableName = "IBZRULERESULT";
@Autowired
private DbEntityService dbEntityService;
......
......@@ -40,6 +40,8 @@ import java.util.Set;
@Slf4j
public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> implements CommonEntityService {
public static final String Default_ResultTableName = "IBZRULERESULT";
@Autowired
private DstDataSourceExService dstDataSourceService;
......@@ -171,10 +173,12 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
public int saveResultBatch(List<ExecResult> execResults, String dsName) {
int result = 0;
try {
String tableName = getTableName(execResults);
Map<String, DataSource> dynamicDSMap = dynamicRoutingDataSource.getCurrentDataSources();
if(!ObjectUtils.isEmpty(dsName)){
dstDataSourceService.initDataSource(dsName);
DynamicDataSourceContextHolder.push(dsName);
}
Map<String, DataSource> dynamicDSMap = dynamicRoutingDataSource.getCurrentDataSources();
String tableName = getTableName(execResults);
if (!ObjectUtils.isEmpty(dynamicDSMap) && dynamicDSMap.containsKey(dsName)) {
DataSource ds = dynamicDSMap.get(dsName);
if (ds instanceof DruidDataSource) {
......@@ -199,7 +203,7 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
* @return
*/
private String getTableName(List<ExecResult> execResults) {
String tableName= RuleEngineExService.Default_ResultTableName;
String tableName = Default_ResultTableName;
for(ExecResult execResult : execResults){
if(!ObjectUtils.isEmpty(execResult.get(RuleEngineExService.Setting_ResultTableName))){
return String.valueOf(execResult.get(RuleEngineExService.Setting_ResultTableName));
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册