提交 d56a320d 编写于 作者: xuhui961310148's avatar xuhui961310148

update:ETL表同步数据去重临时提交

上级 67ab443d
...@@ -169,6 +169,7 @@ ...@@ -169,6 +169,7 @@
<!-- SQL日志 --> <!-- SQL日志 -->
<logger name="com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor" level="DEBUG" /> <logger name="com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor" level="DEBUG" />
<logger name="cn.ibizlab.core.extensions.util.DefaultMQProducerService" level="TRACE" /> <logger name="cn.ibizlab.core.extensions.util.DefaultMQProducerService" level="TRACE" />
<logger name="cn.ibizlab.core.extensions.service.TableSyncExService" level="TRACE" />
<!-- 日志输出级别 --> <!-- 日志输出级别 -->
<root level="INFO"> <root level="INFO">
<appender-ref ref="Console" /> <appender-ref ref="Console" />
......
...@@ -4,6 +4,7 @@ import cn.ibizlab.core.extensions.domain.EngineMQMsg; ...@@ -4,6 +4,7 @@ import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor; import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.lite.extensions.domain.EntityModel; import cn.ibizlab.core.lite.extensions.domain.EntityModel;
import cn.ibizlab.core.lite.extensions.domain.EntityObj; import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.domain.FieldModel;
import cn.ibizlab.core.lite.extensions.service.LiteDataCallback; import cn.ibizlab.core.lite.extensions.service.LiteDataCallback;
import cn.ibizlab.core.lite.extensions.service.LiteDataService; import cn.ibizlab.core.lite.extensions.service.LiteDataService;
import cn.ibizlab.core.lite.extensions.service.LiteModelService; import cn.ibizlab.core.lite.extensions.service.LiteModelService;
...@@ -20,6 +21,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; ...@@ -20,6 +21,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import cn.ibizlab.core.lite.domain.TableSync; import cn.ibizlab.core.lite.domain.TableSync;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -39,6 +41,9 @@ import java.util.*; ...@@ -39,6 +41,9 @@ import java.util.*;
@Service("TableSyncExService") @Service("TableSyncExService")
public class TableSyncExService extends TableSyncServiceImpl { public class TableSyncExService extends TableSyncServiceImpl {
@Value("${ibiz.isDistinct:false}")
private boolean isDistinct;
@Override @Override
protected Class currentModelClass() { protected Class currentModelClass() {
return com.baomidou.mybatisplus.core.toolkit.ReflectionKit.getSuperClassGenericType(this.getClass().getSuperclass(), 1); return com.baomidou.mybatisplus.core.toolkit.ReflectionKit.getSuperClassGenericType(this.getClass().getSuperclass(), 1);
...@@ -166,13 +171,62 @@ public class TableSyncExService extends TableSyncServiceImpl { ...@@ -166,13 +171,62 @@ public class TableSyncExService extends TableSyncServiceImpl {
@Override @Override
public boolean processData(List<EntityObj> datas) { public boolean processData(List<EntityObj> datas) {
List<EntityObj> datas2 = new ArrayList<>();
int beforeSize = datas.size();
boolean flag = false;
if (isDistinct){
// 重复数据集合
ArrayList<String> arrayList = new ArrayList<>();
LinkedHashMap<Object,EntityObj> params = new LinkedHashMap<>();
List<FieldModel> keyFields = entityModel.getKeyFields();
for (EntityObj entityObj: datas) {
Object strKey = null;
if (keyFields != null && keyFields.size() > 0){
for (FieldModel fieldModel : keyFields){
Object value = entityObj.get(fieldModel.getColumnName());
if (StringUtils.isEmpty(strKey)){
strKey = value ;
}else {
strKey = strKey + "||" + value;
}
}
}
if (!StringUtils.isEmpty(strKey)){
if (params.containsKey(strKey)){
try{
arrayList.add(JSONObject.toJSONString(entityObj));
}catch (Exception e){
e.printStackTrace();
}
}
params.put(strKey,entityObj);
}
}
for (EntityObj entityObj : params.values()) {
datas2.add(entityObj);
}
int afterSize = datas2.size();
if (afterSize != beforeSize){
try{
flag = true;
String beforeData = JSONObject.toJSONString(datas);
String afterData = JSONObject.toJSONString(datas2);
log.trace(String.format("--ETL表同步--表名称:%1$s ,去重前数据量:%2$s,去重后数据量:%3$s,\r\n 去重前数据内容:%4$s,\r\n 去重后数据内容:%5$s,\r\n 重复数据:%6$s",entityModel.getTableName(),beforeSize,afterSize,beforeData,afterData,arrayList));
}catch (Exception e){
e.printStackTrace();
}
}
if (afterSize > 0){
datas = datas2;
}
}
boolean rt=liteDataService.saveBatch(entityModel,dsName,datas); boolean rt=liteDataService.saveBatch(entityModel,dsName,datas);
if(rt) if(rt)
{ {
String updateSql = "UPDATE ibztablesync SET lastwrite=(CASE WHEN lastwrite is null then 0 else lastwrite end+#{et.count}) WHERE syncid = #{et.syncid}"; String updateSql = "UPDATE ibztablesync SET lastwrite=(CASE WHEN lastwrite is null then 0 else lastwrite end+#{et.count}) WHERE syncid = #{et.syncid}";
HashMap<String, Object> param = new HashMap<>(); HashMap<String, Object> param = new HashMap<>();
param.put("count", datas.size()); param.put("count", beforeSize);
param.put("syncid", sync.getId()); param.put("syncid", sync.getId());
updateBySQL(updateSql,param); updateBySQL(updateSql,param);
...@@ -189,6 +243,10 @@ public class TableSyncExService extends TableSyncServiceImpl { ...@@ -189,6 +243,10 @@ public class TableSyncExService extends TableSyncServiceImpl {
try { try {
DynamicDataSourceContextHolder.push("master"); DynamicDataSourceContextHolder.push("master");
execlog.setSucc(execlog.getSucc() + datas.size()); execlog.setSucc(execlog.getSucc() + datas.size());
if (flag){
// 去重后,写入数据库成功
execlog.setRetCode(5);
}
execLogService.update(execlog); execLogService.update(execlog);
} catch (Exception ex) { } catch (Exception ex) {
log.error("详细错误信息:" + ex.getMessage()); log.error("详细错误信息:" + ex.getMessage());
...@@ -214,7 +272,10 @@ public class TableSyncExService extends TableSyncServiceImpl { ...@@ -214,7 +272,10 @@ public class TableSyncExService extends TableSyncServiceImpl {
DynamicDataSourceContextHolder.push("master"); DynamicDataSourceContextHolder.push("master");
String runBody = JSONObject.toJSONString(datas); String runBody = JSONObject.toJSONString(datas);
execlog.setRunBody(runBody); execlog.setRunBody(runBody);
execlog.setRetCode(1); if (flag){
// 去重后,写入数据库失败
execlog.setRetCode(execlog.getRetCode() + 1);
}
execlog.setRunResult("sync同步失败:" + sync.getEntityName()); execlog.setRunResult("sync同步失败:" + sync.getEntityName());
execLogService.update(execlog); execLogService.update(execlog);
} catch (Exception ex) { } catch (Exception ex) {
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册