提交 ba2b6c04 编写于 作者: zhouweidong's avatar zhouweidong

规则执行结果动态存储

上级 a156fddd
......@@ -12,8 +12,9 @@ public interface DbEntityMapper extends BaseMapper<EntityObj>{
List<EntityObj> search(@Param("sql") String sql, @Param("ew") Wrapper<EntityObj> wrapper);
Integer searchCount(@Param("sql") String sql, @Param("ew") Wrapper<EntityObj> wrapper);
int replaceBatch(List<ExecResult> var1);
int clearBatch(List<ExecResult> var1);
int replaceBatch(@Param("list") List<ExecResult> var1 , @Param("resultTableName") String tableName);
int replaceBatchByOracle(@Param("list") List<ExecResult> var1 , @Param("resultTableName") String tableName);
int clearBatch(@Param("list")List<ExecResult> var1, @Param("resultTableName") String tableName);
}
\ No newline at end of file
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.lite.extensions.service.DbEntityService;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.service.impl.ExecResultServiceImpl;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import lombok.extern.slf4j.Slf4j;
import cn.ibizlab.core.rule.domain.ExecResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.context.annotation.Primary;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
import java.util.*;
/**
......@@ -18,16 +26,23 @@ import java.util.*;
@Service("ExecResultExService")
public class ExecResultExService extends ExecResultServiceImpl {
@Autowired
private DstDataSourceExService dstDataSourceService;
@Autowired
private DbEntityService dbEntityService;
private ArrayList<ExecResult> resultsSaveAsync=new ArrayList<ExecResult>();
private Object objResultSaveAsyncLock=new Object();
private Timer timerSaveAsync=null;
@Autowired
@Lazy
private DynamicRoutingDataSource dynamicRoutingDataSource;
private ArrayList<ExecResult> resultsDeleteAsync=new ArrayList<ExecResult>();
private Object objResultDeleteAsyncLock=new Object();
private Timer timerDeleteAsync=null;
private Map<String, List<ExecResult>> resultsSaveAsync = new HashMap<>();
private Object objResultSaveAsyncLock = new Object();
private Timer timerSaveAsync = null;
private Map<String, List<ExecResult>> resultsDeleteAsync = new HashMap<>();
private Object objResultDeleteAsyncLock = new Object();
private Timer timerDeleteAsync = null;
@Override
protected Class currentModelClass() {
......@@ -36,19 +51,18 @@ public class ExecResultExService extends ExecResultServiceImpl {
/**
* [SaveAsync:SaveAsync] 行为扩展
*
* @param et
* @return
*/
@Override
@Transactional
public ExecResult saveAsync(ExecResult et) {
synchronized(objResultSaveAsyncLock)
{
this.resultsSaveAsync.add(et);
synchronized (objResultSaveAsyncLock) {
addExecResult(resultsSaveAsync, et);
}
//创建一个定时器
if(timerSaveAsync==null)
{
if (timerSaveAsync == null) {
timerSaveAsync = new Timer();
//schedule方法是执行时间定时任务的方法
timerSaveAsync.schedule(new TimerTask() {
......@@ -56,42 +70,39 @@ public class ExecResultExService extends ExecResultServiceImpl {
//run方法就是具体需要定时执行的任务
@Override
public void run() {
try
{
synchronized(objResultSaveAsyncLock)
{
if(resultsSaveAsync.size()>0)
{
List<ExecResult> args=new ArrayList<>();
for(ExecResult arg:resultsSaveAsync)
{
args.add(arg);
if(args.size()>=500)
{
// TreatEarlyWarningThread task = new TreatEarlyWarningThread(args);
// task.start();
dbEntityService.saveResultBatch(args);
args.clear();
try {
synchronized (objResultSaveAsyncLock) {
if (resultsSaveAsync.size() > 0) {
for (Map.Entry<String, List<ExecResult>> entry : resultsSaveAsync.entrySet()) {
String tabName = "";
String dsName = entry.getKey();
List<ExecResult> execResults = entry.getValue();
List<ExecResult> tempExecResults = new ArrayList<>();
if (execResults.size() > 0) {
dstDataSourceService.initDataSource(dsName);
DynamicDataSourceContextHolder.push(dsName);
for (ExecResult execResult : execResults) {
tempExecResults.add(execResult);
Object objTabName = execResult.get(RuleEngineExService.Setting_ResultTableName);
if (StringUtils.isEmpty(tabName) && !ObjectUtils.isEmpty(objTabName)) {
tabName = String.valueOf(objTabName);
}
if (tempExecResults.size() >= 500) {
saveResultBatch(tempExecResults, dsName, tabName);
}
}
if (tempExecResults.size() > 0) {
saveResultBatch(tempExecResults, dsName, tabName);
}
resultsSaveAsync.remove(dsName);
}
}
if(args.size()>0)
{
// TreatEarlyWarningThread task = new TreatEarlyWarningThread(args);
// task.start();
dbEntityService.saveResultBatch(args);
args.clear();
}
resultsSaveAsync.clear();
}
}
}
catch (Exception ex)
{
} catch (Exception ex) {
log.error(ex.getMessage());
} finally {
DynamicDataSourceContextHolder.poll();
}
}
......@@ -102,18 +113,17 @@ public class ExecResultExService extends ExecResultServiceImpl {
/**
* [DeleteAsync:DeleteAsync] 行为扩展
*
* @param et
* @return
*/
@Override
@Transactional
public ExecResult deleteAsync(ExecResult et) {
synchronized(objResultDeleteAsyncLock)
{
this.resultsDeleteAsync.add(et);
synchronized (objResultDeleteAsyncLock) {
addExecResult(resultsDeleteAsync, et);
}
if(timerDeleteAsync==null)
{
if (timerDeleteAsync == null) {
timerDeleteAsync = new Timer();
//schedule方法是执行时间定时任务的方法
timerDeleteAsync.schedule(new TimerTask() {
......@@ -121,42 +131,39 @@ public class ExecResultExService extends ExecResultServiceImpl {
//run方法就是具体需要定时执行的任务
@Override
public void run() {
try
{
synchronized(objResultDeleteAsyncLock)
{
if(resultsDeleteAsync.size()>0)
{
List<ExecResult> args=new ArrayList<>();
for(ExecResult arg:resultsDeleteAsync)
{
args.add(arg);
if(args.size()>=500)
{
// UpdateTreatEarlyWarningThread updateTreatEarlyWarningThread = new UpdateTreatEarlyWarningThread(args);
// updateTreatEarlyWarningThread.start();
dbEntityService.clearResultBatch(args);
args.clear();
try {
synchronized (objResultDeleteAsyncLock) {
if (resultsDeleteAsync.size() > 0) {
for (Map.Entry<String, List<ExecResult>> entry : resultsDeleteAsync.entrySet()) {
String tabName = "";
String dsName = entry.getKey();
List<ExecResult> execResults = entry.getValue();
List<ExecResult> tempExecResults = new ArrayList<>();
if (execResults.size() > 0) {
dstDataSourceService.initDataSource(dsName);
DynamicDataSourceContextHolder.push(dsName);
for (ExecResult execResult : execResults) {
tempExecResults.add(execResult);
Object objTabName = execResult.get(RuleEngineExService.Setting_ResultTableName);
if (StringUtils.isEmpty(tabName) && !ObjectUtils.isEmpty(objTabName)) {
tabName = String.valueOf(objTabName);
}
if (tempExecResults.size() >= 500) {
dbEntityService.clearResultBatch(tempExecResults, tabName);
}
}
if (tempExecResults.size() > 0) {
dbEntityService.clearResultBatch(tempExecResults, tabName);
}
resultsDeleteAsync.remove(dsName);
}
}
if(args.size()>=0)
{
// UpdateTreatEarlyWarningThread updateTreatEarlyWarningThread = new UpdateTreatEarlyWarningThread(args);
// updateTreatEarlyWarningThread.start();
dbEntityService.clearResultBatch(args);
args.clear();
}
resultsDeleteAsync.clear();
}
}
}
catch (Exception ex)
{
} catch (Exception ex) {
log.error(ex.getMessage());
} finally {
DynamicDataSourceContextHolder.poll();
}
}
......@@ -164,5 +171,51 @@ public class ExecResultExService extends ExecResultServiceImpl {
}
return et;
}
/**
* 将规则执行结果添加到待保存集合中
* @param execResultMap
* @param execResult
*/
private void addExecResult(Map<String, List<ExecResult>> execResultMap, ExecResult execResult) {
Object dsName = execResult.get(RuleEngineExService.Setting_ResultDataSource);
Object tabName = execResult.get(RuleEngineExService.Setting_ResultTableName);
if (ObjectUtils.isEmpty(tabName)) {
execResult.set(RuleEngineExService.Setting_ResultTableName, RuleEngineExService.Default_ResultTableName);
}
if (ObjectUtils.isEmpty(dsName)) {
dsName = RuleEngineExService.Default_ResultDataSource;
}
String dataSource = String.valueOf(dsName);
if (execResultMap.containsKey(dataSource)) {
execResultMap.get(dataSource).add(execResult);
} else {
execResultMap.put(dataSource, new ArrayList() {{
add(execResult);
}});
}
}
/**
* 根据数据源类型保存结果
* @param args
* @param
*/
private void saveResultBatch(List<ExecResult> args, String dsName, String tabName) {
Map<String, DataSource> dynamicDSMap = dynamicRoutingDataSource.getCurrentDataSources();
if (!ObjectUtils.isEmpty(dynamicDSMap) && dynamicDSMap.containsKey(dsName)) {
DataSource ds = dynamicDSMap.get(dsName);
if (ds instanceof DruidDataSource) {
DruidDataSource druidDataSource = (DruidDataSource) dynamicDSMap.get(dsName);
if ("oracle".equals(druidDataSource.getDbType())) {
dbEntityService.saveResultBatchByOracle(args, tabName);
args.clear();
return;
}
}
}
dbEntityService.saveResultBatch(args, tabName);
args.clear();
}
}
......@@ -60,6 +60,8 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
public static final String Setting_ResultDataSource = "resultDataSource";
public static final String Setting_ResultTableName = "resultTableName";
public static final String Default_ResultDataSource = "default";
public static final String Default_ResultTableName = "IBZRULERESULT";
@Autowired
private DbEntityService dbEntityService;
......
......@@ -166,20 +166,31 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
}
}
@Override
public int saveResultBatch(List<ExecResult> var1) {
return 0;
}
@Override
public int clearResultBatch(List<ExecResult> var1) {
return 0;
}
@Autowired
private DbEntityMapper dbEntityMapper;
@Override
public int saveResultBatch(List<ExecResult> var1)
public int saveResultBatch(List<ExecResult> var1,String tableName)
{
return dbEntityMapper.replaceBatch(var1);
return dbEntityMapper.replaceBatch(var1,tableName);
}
@Override
public int clearResultBatch(List<ExecResult> var1)
public int saveResultBatchByOracle(List<ExecResult> var1,String tableName) {
return dbEntityMapper.replaceBatchByOracle(var1,tableName);
}
public int clearResultBatch(List<ExecResult> var1,String tableName)
{
return dbEntityMapper.clearBatch(var1);
return dbEntityMapper.clearBatch(var1,tableName);
}
@Autowired
......
......@@ -15,9 +15,9 @@
<if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if>
</select>
<insert id="replaceBatch" parameterType="java.util.List" databaseId="mysql">
<insert id="replaceBatch" parameterType="java.util.List">
<foreach collection="list" item="item" index="index" separator=";">
INSERT INTO IBZRULERESULT
INSERT INTO ${resultTableName}
(ru_execresultid,ru_execresultname, createdate, updatedate, ruleid, rulename, retvalue, keyvaluefield,dimfield, metricfield,domainsfield,timefield,ext1field,ext2field, businesscat )
VALUES
(#{item.id}, #{item.name}, now(), now(), #{item.ruleId}, #{item.ruleName}, #{item.retValue}, #{item.keyValueField}, #{item.dimField}, #{item.metricField},
......@@ -30,8 +30,8 @@
</insert>
<!--t1.BUSINESSCAT, t1.CREATEDATE, t1.DIMFIELD, t1.DOMAINSFIELD, t1.EXT1FIELD, t1.EXT2FIELD, t1.KEYVALUEFIELD, t1.METRICFIELD,
t1.RETVALUE, t1.RULEID, t1.RULENAME, t1.RU_EXECRESULTID, t1.RU_EXECRESULTNAME, t1.SYSTEMID, t1.TIMEFIELD, t1.UPDATEDATE-->
<insert id="replaceBatch" parameterType="java.util.List" databaseId="oracle" >
MERGE INTO IBZRULERESULT A USING (
<insert id="replaceBatchByOracle" parameterType="java.util.List">
MERGE INTO ${resultTableName} A USING (
<foreach collection='list' item='item' index='index' separator='UNION ALL'>
select #{item.id} as ru_execresultid, #{item.name} as ru_execresultname, sysdate as createdate, sysdate as updatedate, #{item.ruleId} as ruleid, #{item.ruleName} as rulename,#{item.retValue} as retvalue, #{item.keyValueField} as keyvaluefield, #{item.dimField} as dimfield,
#{item.metricField} as metricfield,#{item.domainsField} as domainsfield,#{item.timeField} as timefield, #{item.ext1Field} as ext1field,#{item.ext2Field} as ext2field,#{item.businessCat} as businesscat from dual
......@@ -45,7 +45,7 @@
</insert>
<delete id="clearBatch" parameterType="java.util.List" >
delete from IBZRULERESULT where (ru_execresultid) in
delete from ${resultTableName} where (ru_execresultid) in
<foreach collection="list" item="item" index="index" open="(" close=")" separator=",">
(#{item.id})
</foreach>
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册