提交 b33ee060 编写于 作者: zc's avatar zc

update:增加使用mq+数据库时的构建的场景

上级 8bf9f954
...@@ -17,9 +17,11 @@ import cn.ibizlab.util.dict.CodeItem; ...@@ -17,9 +17,11 @@ import cn.ibizlab.util.dict.CodeItem;
import cn.ibizlab.util.dict.CodeList; import cn.ibizlab.util.dict.CodeList;
import cn.ibizlab.util.helper.CachedBeanCopier; import cn.ibizlab.util.helper.CachedBeanCopier;
import cn.ibizlab.util.helper.DataObject; import cn.ibizlab.util.helper.DataObject;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.kie.api.runtime.KieSession; import org.kie.api.runtime.KieSession;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -42,6 +44,9 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl { ...@@ -42,6 +44,9 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
@Lazy @Lazy
private DABuildExService daBuildExService; private DABuildExService daBuildExService;
@Value("${rocketmq.sendResultsBatchSize:1000}")
private Integer sendResultsBatchSize;
@Override @Override
public void saveResult(ModelObj param, String RULEID,String RULECODE, String RULENAME, String RU_EXECRESULTNAME, FieldObj BUSINESSCAT, public void saveResult(ModelObj param, String RULEID,String RULECODE, String RULENAME, String RU_EXECRESULTNAME, FieldObj BUSINESSCAT,
Integer RETVALUE, FieldObj KEYVALUEFIELD, FieldObj DOMAINSFIELD,FieldObj DIMFIELD, Integer RETVALUE, FieldObj KEYVALUEFIELD, FieldObj DOMAINSFIELD,FieldObj DIMFIELD,
...@@ -254,6 +259,14 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl { ...@@ -254,6 +259,14 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
if(deleteResults != null && deleteResults instanceof List){ if(deleteResults != null && deleteResults instanceof List){
deleteDatas.addAll((List<ExecResult>)deleteResults); deleteDatas.addAll((List<ExecResult>)deleteResults);
} }
if (saveDatas.size() > sendResultsBatchSize || deleteDatas.size() > sendResultsBatchSize) {
resultsMQMsg.setSaveDatas(saveDatas);
resultsMQMsg.setDeleteDatas(deleteDatas);
defaultMQProducerService.sendSqlResultsMsg(resultsMQMsg);
resultsMQMsg.setMsgId(IdWorker.getIdStr());
saveDatas.clear();
deleteDatas.clear();
}
} }
catch (Exception e) catch (Exception e)
{ {
...@@ -273,13 +286,11 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl { ...@@ -273,13 +286,11 @@ public class AnalyseSqlServiceImpl extends BaseEntityServiceImpl {
} }
//发送构建结果消息 //发送构建结果消息
if(saveDatas.size() > 0) { if(saveDatas.size() > 0 || deleteDatas.size() > 0) {
resultsMQMsg.setSaveDatas(saveDatas); resultsMQMsg.setSaveDatas(saveDatas);
}
if(deleteDatas.size() > 0)
resultsMQMsg.setDeleteDatas(deleteDatas); resultsMQMsg.setDeleteDatas(deleteDatas);
defaultMQProducerService.sendSqlResultsMsg(resultsMQMsg); defaultMQProducerService.sendSqlResultsMsg(resultsMQMsg);
}
} }
catch(Exception e) catch(Exception e)
{ {
......
...@@ -103,6 +103,20 @@ public class DefaultMQProducerService implements MsgProducerService{ ...@@ -103,6 +103,20 @@ public class DefaultMQProducerService implements MsgProducerService{
@Override @Override
public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception { public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception {
String msg = JSON.toJSONString(resultsMQMsg);
log.trace(String.format("sendResultsMsg:%1$s", msg));
dstRocketMQTemplate.asyncSendOrderly(resultsTopic+":SqlResult",msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
log.info("结果消息发送响应:" + sendResult.toString());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
} }
......
...@@ -2,6 +2,12 @@ package cn.ibizlab.core.message; ...@@ -2,6 +2,12 @@ package cn.ibizlab.core.message;
import cn.ibizlab.core.extensions.cql.ExecResultRepository; import cn.ibizlab.core.extensions.cql.ExecResultRepository;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg; import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.domain.SqlResultsMQMsg;
import cn.ibizlab.core.lite.extensions.service.LiteDataService;
import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.ExecResult;
import cn.ibizlab.core.rule.service.IExecLogService;
import cn.ibizlab.util.helper.DataObject;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -10,11 +16,14 @@ import org.apache.rocketmq.spring.annotation.ConsumeMode; ...@@ -10,11 +16,14 @@ import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.List;
@Slf4j @Slf4j
@Component @Component
@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!='' && ${rocketmq.consumer.enabled:false}") @ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!='' && ${rocketmq.consumer.enabled:false}")
...@@ -31,18 +40,75 @@ public class ResultMQMsgListener implements RocketMQListener<MessageExt> { ...@@ -31,18 +40,75 @@ public class ResultMQMsgListener implements RocketMQListener<MessageExt> {
@Lazy @Lazy
private ExecResultRepository execResultRepository; private ExecResultRepository execResultRepository;
@Value("${rocketmq.producer.resultsTopic:DSTRESULTSMSG}")
private String resultsTopic;
@Value("${defaultResultDataSource:default}")
private String Default_ResultDataSource;
@Value("${defaultResultTableName:IBZRULERESULT}")
private String Default_ResultTableName;
@Value("${ibiz.saveResultBatchSize:2000}")
private int saveResultBatchSize;
@Value("${ibiz.deleteResultBatchBatchSize:20000}")
private int deleteResultBatchSize;
@Autowired(required = false)
@Lazy
private cn.ibizlab.core.extensions.service.ExecResultExService execResultExService;
@Value("${cassandra.host:}")
private String cassandraHost;
@Autowired
@Lazy
protected IExecLogService ruExecLogService;
@Override @Override
public void onMessage(MessageExt messageExt) { public void onMessage(MessageExt messageExt) {
try { try {
log.info("MQ消息topic={}, tags={}, 消息内容={}", messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId());
String body = new String(messageExt.getBody(), "utf-8"); String body = new String(messageExt.getBody(), "utf-8");
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String topic = messageExt.getTopic();
ResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<ResultsMQMsg>() {}); log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, messageExt.getMsgId());
if(!StringUtils.isEmpty(resultsMQMsg.getKeyValueField())) if (resultsTopic.equalsIgnoreCase(topic)) {
execResultRepository.saveResultsMQMsg(resultsMQMsg); if (StringUtils.isEmpty(cassandraHost)) {
SqlResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<SqlResultsMQMsg>() {
});
if (log.isDebugEnabled()) {
log.debug("SqlResultMsgProcess Event Handler: {}", resultsMQMsg.getMsgId());
}
String resultDataSource = DataObject.getStringValue(resultsMQMsg.getResultDataSource(), Default_ResultDataSource);
String resultTableName = DataObject.getStringValue(resultsMQMsg.getResultTableName(), Default_ResultTableName);
List<ExecResult> saveDatas = resultsMQMsg.getSaveDatas();
if (saveDatas != null && saveDatas.size() > 0) {
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize);
splist.forEach(array -> {
execResultExService.saveResultBatch(array, resultDataSource, resultTableName);
});
}
List<ExecResult> deleteDatas = resultsMQMsg.getDeleteDatas();
if (deleteDatas != null && deleteDatas.size() > 0) {
List<List<ExecResult>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize);
splist.forEach(array -> {
execResultExService.clearResultBatch(array, resultDataSource, resultTableName);
});
}
//临时处理批次处理结果为10,表示数据插入完成
ExecLog execlog = new ExecLog();
execlog.setId(resultsMQMsg.getMsgId());
execlog.setRetCode(10);
ruExecLogService.updateById(execlog);
} else {
ResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<ResultsMQMsg>() {});
if(!StringUtils.isEmpty(resultsMQMsg.getKeyValueField())){
execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
}
}
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}",e); log.error("获取MQ消息内容异常{}",e);
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册