提交 e4635431 编写于 作者: zc's avatar zc

update:代码完善

上级 b33ee060
...@@ -104,11 +104,11 @@ public class DefaultMQProducerService implements MsgProducerService{ ...@@ -104,11 +104,11 @@ public class DefaultMQProducerService implements MsgProducerService{
@Override @Override
public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception { public void sendSqlResultsMsg(SqlResultsMQMsg resultsMQMsg) throws Exception {
String msg = JSON.toJSONString(resultsMQMsg); String msg = JSON.toJSONString(resultsMQMsg);
log.trace(String.format("sendResultsMsg:%1$s", msg)); log.trace(String.format("sendSqlResultsMsg:%1$s", msg));
dstRocketMQTemplate.asyncSendOrderly(resultsTopic+":SqlResult",msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){ dstRocketMQTemplate.asyncSendOrderly(resultsTopic+":SqlResult",msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("结果消息发送响应:" + sendResult.toString()); log.info("SqlResultsMsg结果消息发送响应:" + sendResult.toString());
} }
@Override @Override
......
...@@ -40,9 +40,6 @@ public class ResultMQMsgListener implements RocketMQListener<MessageExt> { ...@@ -40,9 +40,6 @@ public class ResultMQMsgListener implements RocketMQListener<MessageExt> {
@Lazy @Lazy
private ExecResultRepository execResultRepository; private ExecResultRepository execResultRepository;
@Value("${rocketmq.producer.resultsTopic:DSTRESULTSMSG}")
private String resultsTopic;
@Value("${defaultResultDataSource:default}") @Value("${defaultResultDataSource:default}")
private String Default_ResultDataSource; private String Default_ResultDataSource;
...@@ -74,43 +71,42 @@ public class ResultMQMsgListener implements RocketMQListener<MessageExt> { ...@@ -74,43 +71,42 @@ public class ResultMQMsgListener implements RocketMQListener<MessageExt> {
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, messageExt.getMsgId()); log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, messageExt.getMsgId());
if (resultsTopic.equalsIgnoreCase(topic)) { if (StringUtils.isEmpty(cassandraHost)) {
if (StringUtils.isEmpty(cassandraHost)) { SqlResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<SqlResultsMQMsg>() {
SqlResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<SqlResultsMQMsg>() { });
if (log.isDebugEnabled()) {
log.debug("SqlResultMsgProcess Event Handler: {}", resultsMQMsg.getMsgId());
}
String resultDataSource = DataObject.getStringValue(resultsMQMsg.getResultDataSource(), Default_ResultDataSource);
String resultTableName = DataObject.getStringValue(resultsMQMsg.getResultTableName(), Default_ResultTableName);
List<ExecResult> saveDatas = resultsMQMsg.getSaveDatas();
if (saveDatas != null && saveDatas.size() > 0) {
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize);
splist.forEach(array -> {
execResultExService.saveResultBatch(array, resultDataSource, resultTableName);
}); });
if (log.isDebugEnabled()) { }
log.debug("SqlResultMsgProcess Event Handler: {}", resultsMQMsg.getMsgId()); List<ExecResult> deleteDatas = resultsMQMsg.getDeleteDatas();
} if (deleteDatas != null && deleteDatas.size() > 0) {
String resultDataSource = DataObject.getStringValue(resultsMQMsg.getResultDataSource(), Default_ResultDataSource); List<List<ExecResult>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize);
String resultTableName = DataObject.getStringValue(resultsMQMsg.getResultTableName(), Default_ResultTableName); splist.forEach(array -> {
List<ExecResult> saveDatas = resultsMQMsg.getSaveDatas(); execResultExService.clearResultBatch(array, resultDataSource, resultTableName);
if (saveDatas != null && saveDatas.size() > 0) { });
List<List<ExecResult>> splist = LiteDataService.splitList(saveDatas, saveResultBatchSize); }
splist.forEach(array -> { //临时处理批次处理结果为10,表示数据插入完成
execResultExService.saveResultBatch(array, resultDataSource, resultTableName); ExecLog execlog = new ExecLog();
}); execlog.setId(resultsMQMsg.getMsgId());
} execlog.setRetCode(10);
List<ExecResult> deleteDatas = resultsMQMsg.getDeleteDatas(); ruExecLogService.updateById(execlog);
if (deleteDatas != null && deleteDatas.size() > 0) { } else {
List<List<ExecResult>> splist = LiteDataService.splitList(deleteDatas, deleteResultBatchSize); ResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<ResultsMQMsg>() {
splist.forEach(array -> { });
execResultExService.clearResultBatch(array, resultDataSource, resultTableName); if (!StringUtils.isEmpty(resultsMQMsg.getKeyValueField())) {
}); execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
//临时处理批次处理结果为10,表示数据插入完成
ExecLog execlog = new ExecLog();
execlog.setId(resultsMQMsg.getMsgId());
execlog.setRetCode(10);
ruExecLogService.updateById(execlog);
} else {
ResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<ResultsMQMsg>() {});
if(!StringUtils.isEmpty(resultsMQMsg.getKeyValueField())){
execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}",e); log.error("获取MQ消息内容异常{}", e);
} }
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册