提交 1d43bb29 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

构建结果保存到关系数据库性能优化

上级 51cd7ac1
package cn.ibizlab.core.extensions.domain;
import cn.ibizlab.core.rule.domain.ExecResult;
import lombok.Data;
import java.util.List;
@Data
public class SqlResultsMQMsg {
private String msgId;
private String resultDataSource;
private String resultTableName;
private List<ExecResult> saveDatas;
private List<ExecResult> deleteDatas;
}
......@@ -2,7 +2,10 @@ package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.analysis.domain.DADimension;
import cn.ibizlab.core.dict.extensions.service.DictDstService;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.extensions.util.ExpiryMap;
import cn.ibizlab.core.extensions.util.MsgProducerService;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.lite.extensions.domain.FieldObj;
......@@ -32,19 +35,13 @@ import java.util.List;
@ConditionalOnExpression("''.equals('${cassandra.host:}')")
public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
@Autowired
private MsgProducerService defaultMQProducerService;
@Autowired
@Lazy
private DABuildExService daBuildExService;
@Autowired
private IExecResultService ruExecResultService;
@Override
public void saveResult(ModelObj param, String RULEID,String RULECODE, String RULENAME, String RU_EXECRESULTNAME, FieldObj BUSINESSCAT,
Integer RETVALUE, FieldObj KEYVALUEFIELD, FieldObj DOMAINSFIELD,FieldObj DIMFIELD,
......@@ -56,7 +53,6 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
result.setName(RU_EXECRESULTNAME);
result.setRetValue(RETVALUE);
if(DOMAINSFIELD==null)
DOMAINSFIELD=param.getEmpty();
if(BUSINESSCAT==null)
......@@ -137,9 +133,11 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
result2.setId(result2.getDefaultKey(true).toString());
if(result2.getRetValue()==1)
ruExecResultService.saveAsync(result2);
//ruExecResultService.saveAsync(result2);
this.addResults(param, Tag_SaveResults, result2);
else
ruExecResultService.deleteAsync(result2);
// ruExecResultService.deleteAsync(result2);
this.addResults(param, Tag_DeleteResults, result2);
//非逐层核算或无父代码项退出
if(dim.getRecursive() == 0 || StringUtils.isEmpty(code.getParent()))
{
......@@ -160,18 +158,38 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
CachedBeanCopier.copy(result, result2);
result2.setId(result2.getDefaultKey(true).toString());
if(result2.getRetValue()==1)
ruExecResultService.saveAsync(result2);
// ruExecResultService.saveAsync(result2);
this.addResults(param, Tag_SaveResults, result2);
else
ruExecResultService.deleteAsync(result2);
// ruExecResultService.deleteAsync(result2);
this.addResults(param, Tag_DeleteResults, result2);
}
}
}
}
}
// log.debug("saveResult:{},{} cost:{}", param.getRowKey(), RULEID, System.currentTimeMillis() - start);
}
/**
* 添加到新增或删除结果队列
* @param param
* @param tag
* @param result
*/
protected void addResults(ModelObj param, String tag, ExecResult result){
List<ExecResult> results;
if(param.containsKey(tag)){
results = (List<ExecResult>)param.get(tag);
}else{
results = new ArrayList<>();
param.set(tag, results);
}
results.add(result);
}
@Autowired
protected IExecLogService ruExecLogService;
public ExecLog processAll(BaseRequest msg)
......@@ -198,15 +216,23 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
return execlog;
}
//构建结果消息对象
SqlResultsMQMsg resultsMQMsg = new SqlResultsMQMsg();
resultsMQMsg.setMsgId(msg.getId());
if(!StringUtils.isEmpty(msg.getResultDataSource())){
resultsMQMsg.setResultDataSource(msg.getResultDataSource());
}
if(!StringUtils.isEmpty(msg.getResultTableName())){
resultsMQMsg.setResultTableName(msg.getResultTableName());
}
List<ExecResult> saveDatas = new ArrayList<>();
List<ExecResult> deleteDatas = new ArrayList<>();
try
{
for (ModelObj modelObj : msg.getDatas())
{
try
{
count++;
modelObj.set("BATCH",msg.getBatch());
modelObj.set(Tag_EngineId, msg.getEngineId());
......@@ -219,6 +245,15 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
}
processRule(modelObj, modelObj.getRowKey(),strResId);
//当前业务数据构建结果添加到保存和删除队列
Object saveResults = modelObj.get(Tag_SaveResults);
if(saveResults != null && saveResults instanceof List){
saveDatas.addAll((List<ExecResult>)saveResults);
}
Object deleteResults = modelObj.get(Tag_DeleteResults);
if(deleteResults != null && deleteResults instanceof List){
deleteDatas.addAll((List<ExecResult>)deleteResults);
}
}
catch (Exception e)
{
......@@ -233,17 +268,17 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
strErrorInfo.append(e.getCause().getMessage());
strErrorInfo.append("\r\n");
}
}
}
}
//发送构建结果消息
if(saveDatas.size() > 0) {
resultsMQMsg.setSaveDatas(saveDatas);
}
if(deleteDatas.size() > 0)
resultsMQMsg.setDeleteDatas(deleteDatas);
defaultMQProducerService.sendSqlResultsMsg(resultsMQMsg);
}
catch(Exception e)
......
......@@ -49,6 +49,7 @@ import java.util.*;
public class BaseEntityServiceImpl implements BaseEntityService
{
public static final String Tag_SaveResults = "SAVERESULTS";
public static final String Tag_DeleteResults = "DELETEESULTS";
public static final String Tag_EngineId = "engineId";
public static final String Tag_Batch_SyncRun = "SYNCRUN";
......
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.message.DstRocketMQTemplate;
import cn.ibizlab.core.rule.domain.ExecResult;
import com.alibaba.fastjson.JSON;
......@@ -99,6 +101,11 @@ public class DefaultMQProducerService implements MsgProducerService{
}
@Override
public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception {
}
@Override
public void sendRuleResultsMsg(String topic, String tags, List<ExecResult> listExecResultMsg){
try {
......
......@@ -4,15 +4,22 @@ import cn.ibizlab.core.extensions.cql.ExecResultRepository;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.extensions.service.DABuildExService;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import cn.ibizlab.core.lite.extensions.service.LiteDataService;
import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.service.IExecLogService;
import cn.ibizlab.util.helper.DataObject;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
import java.util.List;
@Slf4j
public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
......@@ -29,6 +36,25 @@ public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
@Lazy
private ExecResultRepository execResultRepository;
@Autowired(required = false)
@Lazy
private cn.ibizlab.core.extensions.service.ExecResultExService execResultExService;
@Autowired
@Lazy
protected IExecLogService ruExecLogService;
@Value("${defaultResultDataSource:default}")
private String Default_ResultDataSource;
@Value("${defaultResultTableName:IBZRULERESULT}")
private String Default_ResultTableName;
@Value("${ibiz.saveResultBatchSize:2000}")
private int saveResultBatchSize;
@Value("${ibiz.deleteResultBatchBatchSize:20000}")
private int deleteResultBatchSize;
@Override
public void onEvent(LocalMsgEvent localMsgEvent, long sequence, boolean endOfBatch) throws Exception {
if (localMsgEvent.getType()==1) {
......@@ -45,6 +71,33 @@ public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
log.debug("BuildMQMsg Event Handler: {}", engineMQMsg.getBatch());
}
daBuildExService.processData(engineMQMsg);
}else if("SqlResult".equalsIgnoreCase(tags)){
SqlResultsMQMsg resultsMQMsg = localMsgEvent.getMessage(SqlResultsMQMsg.class);
if (log.isDebugEnabled()) {
log.debug("SqlResultMsgProcess Event Handler: {}", resultsMQMsg.getMsgId());
}
String resultDataSource = DataObject.getStringValue(resultsMQMsg.getResultDataSource(),Default_ResultDataSource);
String resultTableName = DataObject.getStringValue(resultsMQMsg.getResultTableName(),Default_ResultTableName);
List<ExecResult> saveDatas = resultsMQMsg.getSaveDatas();
if(saveDatas != null && saveDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize);
splist.forEach(array ->{
execResultExService.saveResultBatch(array, resultDataSource, resultTableName);
});
}
List<ExecResult> deleteDatas = resultsMQMsg.getDeleteDatas();
if(deleteDatas != null && deleteDatas.size() > 0){
List<List<ExecResult>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize);
splist.forEach(array ->{
execResultExService.clearResultBatch(array, resultDataSource, resultTableName);
});
}
//临时处理批次处理结果为10,表示数据插入完成
ExecLog execlog=new ExecLog();
execlog.setId(resultsMQMsg.getMsgId());
execlog.setRetCode(10);
ruExecLogService.updateById(execlog);
}
}
else if (localMsgEvent.getType()==2) {
......
......@@ -3,6 +3,7 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.rule.domain.ExecResult;
import com.alibaba.fastjson.JSON;
import com.baomidou.jobs.model.JobsInfo;
......@@ -21,6 +22,9 @@ public class LocalMsgProducerService implements MsgProducerService{
@Autowired
protected Disruptor<LocalMsgEvent> disruptor;
@Autowired
protected Disruptor<LocalMsgEvent> resultDisruptor;
@Override
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
......@@ -50,7 +54,18 @@ public class LocalMsgProducerService implements MsgProducerService{
if (log.isDebugEnabled()) {
log.debug("ResultsMQMsg Event Send: {}", resultsMQMsg.getKeyValueField());
}
disruptor.publishEvent((event, sequence, bind) -> event.setBody(bind.getBody()), localMsgEvent);
resultDisruptor.publishEvent((event, sequence, bind) -> event.setBody(bind.getBody()), localMsgEvent);
}
@Override
public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
localMsgEvent.setMessage(resultsMQMsg);
localMsgEvent.setTags("SqlResult");
if (log.isDebugEnabled()) {
log.debug("SqlResultsMQMsg Event Send: {}", resultsMQMsg.getMsgId());
}
resultDisruptor.publishEvent((event, sequence, bind) -> {event.setBody(bind.getBody());event.setTags(localMsgEvent.getTags());}, localMsgEvent);
}
@Override
......
......@@ -2,6 +2,7 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.rule.domain.ExecResult;
import java.util.List;
......@@ -10,6 +11,7 @@ public interface MsgProducerService {
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws Exception;
public void sendBuildMsg(EngineMQMsg engineMQMsg) throws Exception;
public void sendResultsMsg(ResultsMQMsg resultsMQMsg) throws Exception;
public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception;
/**
* 规则结果发送至MQ消息队列
* @param topic MQ消息主题
......
package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.util.LocalMsgProducerService;
import cn.ibizlab.core.extensions.util.LocalMsgEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
......@@ -15,6 +14,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
......@@ -25,7 +25,7 @@ public class LocalMsgConfiguration {
@Bean
@ConditionalOnMissingBean
public WaitStrategy waitStrategy() {
return new SleepingWaitStrategy(200,1000*1000*10);
return new YieldingWaitStrategy();
}
@Bean
......@@ -40,13 +40,19 @@ public class LocalMsgConfiguration {
return new LocalMsgEventHandler();
}
@Bean
@ConditionalOnMissingBean
public LocalMsgEventHandler resultLocalMsgEventHandler() {
return new LocalMsgEventHandler();
}
@Bean
@ConditionalOnClass({Disruptor.class})
public Disruptor<LocalMsgEvent> disruptor(WaitStrategy waitStrategy, ThreadFactory threadFactory,
LocalMsgEventHandler localMsgEventHandler) {
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), 1024 * 1024,
threadFactory, ProducerType.SINGLE, waitStrategy);
Executors.newCachedThreadPool(), ProducerType.MULTI, waitStrategy);
disruptor.handleEventsWith(localMsgEventHandler);
// 启动
......@@ -79,4 +85,41 @@ public class LocalMsgConfiguration {
return disruptor;
}
@Bean
@ConditionalOnClass({Disruptor.class})
public Disruptor<LocalMsgEvent> resultDisruptor(WaitStrategy waitStrategy, ThreadFactory threadFactory,
LocalMsgEventHandler resultLocalMsgEventHandler) {
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), 1024 * 1024,
Executors.newCachedThreadPool(), ProducerType.MULTI, waitStrategy);
disruptor.handleEventsWith(resultLocalMsgEventHandler);
// 启动
disruptor.start();
// WEB 容器关闭执行
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
// OK
disruptor.shutdown();
// wait up to 10 seconds for the ringbuffer to drain
RingBuffer<LocalMsgEvent> ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i < 20; i++) {
if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
break;
}
try {
// give ringbuffer some time to drain...
Thread.sleep(500);
} catch (InterruptedException e) {
// ignored
}
}
disruptor.shutdown();
} catch (Exception e) {
// to do nothing
}
}));
return disruptor;
}
}
......@@ -3,7 +3,7 @@
<mapper namespace="cn.ibizlab.core.extensions.mapper.ExecResultExMapper">
<insert id="replaceBatch" parameterType="java.util.List" databaseId="mysql">
<!-- <insert id="replaceBatch" parameterType="java.util.List" databaseId="mysql">
<foreach collection="list" item="item" index="index" separator=";">
INSERT INTO ${resultTableName}
(ru_execresultid,ru_execresultname, createdate, updatedate, ruleid, rulename, retvalue, keyvaluefield,dimfield, metricfield,domainsfield,timefield,ext1field,ext2field, businesscat , systemid )
......@@ -11,10 +11,21 @@
(#{item.id}, #{item.name}, now(), now(), #{item.ruleId}, #{item.ruleName}, #{item.retValue}, #{item.keyValueField}, #{item.dimField}, #{item.metricField},
#{item.domainsField},#{item.timeField}, #{item.ext1Field}, #{item.ext2Field},#{item.businessCat},#{item.systemId} )
ON DUPLICATE KEY UPDATE
ru_execresultname=#{item.name}, createdate=now(), updatedate=now(), ruleid=#{item.ruleId}, rulename=#{item.ruleName},
retvalue=#{item.retValue}, keyvaluefield=#{item.keyValueField}, dimfield=#{item.dimField}, metricfield=#{item.metricField},domainsfield=#{item.domainsField},timefield=#{item.timeField},
ext1field=#{item.ext1Field},ext2field=#{item.ext2Field}, businesscat=#{item.businessCat},systemid=#{item.systemId}
updatedate=now(), retvalue=#{item.retValue}, metricfield=#{item.metricField},domainsfield=#{item.domainsField},timefield=#{item.timeField},
ext1field=#{item.ext1Field},ext2field=#{item.ext2Field}, businesscat=#{item.businessCat}
</foreach>
</insert>-->
<insert id="replaceBatch" parameterType="java.util.List" databaseId="mysql">
INSERT INTO ${resultTableName}
(ru_execresultid,ru_execresultname, createdate, updatedate, ruleid, rulename, retvalue, keyvaluefield,dimfield, metricfield,domainsfield,timefield,ext1field,ext2field, businesscat , systemid) values
<foreach collection="list" item="item" index="index" separator=",">
(#{item.id}, #{item.name}, now(), now(), #{item.ruleId}, #{item.ruleName}, #{item.retValue}, #{item.keyValueField}, #{item.dimField}, #{item.metricField},
#{item.domainsField},#{item.timeField}, #{item.ext1Field}, #{item.ext2Field},#{item.businessCat},#{item.systemId} )
</foreach>
ON DUPLICATE KEY UPDATE
updatedate=now(),retvalue=values(retValue), metricField=values(metricField),domainsField=values(domainsField),domainsField=values(domainsField),
ext1Field=values(ext1Field),ext2Field=values(ext2Field), businessCat=values(businessCat)
</insert>
<!--t1.BUSINESSCAT, t1.CREATEDATE, t1.DIMFIELD, t1.DOMAINSFIELD, t1.EXT1FIELD, t1.EXT2FIELD, t1.KEYVALUEFIELD, t1.METRICFIELD,
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册