提交 361bb9ce 编写于 作者: sq3536's avatar sq3536

etl

上级 7ba7c4d3
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.lite.extensions.domain.EntityModel;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.service.LiteDataCallback;
import cn.ibizlab.core.lite.extensions.service.LiteDataService;
import cn.ibizlab.core.lite.extensions.service.LiteModelService;
import cn.ibizlab.core.lite.mapper.TableSyncMapper;
import cn.ibizlab.core.lite.service.impl.TableSyncServiceImpl;
import cn.ibizlab.util.filter.QueryFilter;
import cn.ibizlab.util.helper.DataObject;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import cn.ibizlab.core.lite.domain.TableSync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.context.annotation.Primary;
import org.springframework.util.StringUtils;
import java.sql.Timestamp;
import java.sql.Wrapper;
import java.util.*;
/**
......@@ -28,7 +47,134 @@ public class TableSyncExService extends TableSyncServiceImpl {
*/
@Override
public TableSync sync(TableSync et) {
List<TableSync> list = null;
String params= DataObject.getStringValue(et.get("param"),"");
LambdaQueryWrapper<TableSync> wrapper= Wrappers.<TableSync>lambdaQuery();
if(!StringUtils.isEmpty(params))
wrapper.like(TableSync::getGroup,params);
if(!StringUtils.isEmpty(et.getId()))
wrapper.eq(TableSync::getId,et.getId());
wrapper.eq(TableSync::getIsEnable,1);
list=this.list(wrapper);
sync(list);
return super.sync(et);
}
@Autowired
private LiteModelService liteModelService;
@Autowired
private LiteDataService liteDataService;
public void sync(List<TableSync> list)
{
for(TableSync sync:list)
{
try
{
if(sync.getIsEnable()!=null&&sync.getIsEnable()==0)
continue;
String sourceSystemId=sync.getSourceSystemId();
String sourceDsId=sync.getSourceDsId();
String sourceEntityName=sync.getSourceEntityName();
String systemId=sync.getSystemId();
String dsId=sync.getDsId();
String entityName=sync.getEntityName();
if(StringUtils.isEmpty(entityName)||StringUtils.isEmpty(systemId))
continue;
if(StringUtils.isEmpty(sourceEntityName))
sourceEntityName=entityName;
if(StringUtils.isEmpty(sourceSystemId))
sourceSystemId=systemId;
EntityModel entityModel=liteModelService.getEntityModel(systemId,entityName);
if(StringUtils.isEmpty(dsId))
dsId=entityModel.getDsName();
EntityModel sourceModel=liteModelService.getEntityModel(sourceSystemId,sourceEntityName);
if(StringUtils.isEmpty(sourceDsId))
sourceDsId=sourceModel.getDsName();
if(sourceDsId.equals(dsId)&&sourceEntityName.equals(entityName))
continue;
Timestamp lastTimestamp=DataObject.getTimestampValue(sync.getEtlTimestamp(),DataObject.getBeginDate());
QueryFilter filter=QueryFilter.createQuery();
if(entityModel.getLastModifyField()!=null)
filter.ge(entityModel.getLastModifyField().getColumnName(),lastTimestamp);
Timestamp runTimestamp=new Timestamp(System.currentTimeMillis());
final String dsName=dsId;
liteDataService.searchCursor(sourceModel,sync.getSourceExpression(),sourceDsId,filter, new LiteDataCallback<List<EntityObj>>() {
@Override
public void total(Integer total) {
if(total > 0)
{
String updateSql = "UPDATE ibztablesync SET lastread=#{et.lastread},lastwrite=0," +
"lastruntime=#{et.lastruntime}," +
"lastrunresult='RUNNING' WHERE syncid = #{et.syncid}";
HashMap<String, Object> param = new HashMap<>();
param.put("lastread", total);
param.put("lastruntime", runTimestamp);
param.put("syncid", sync.getId());
updateBySQL(updateSql,param);
}
else {
String updateSql = "UPDATE ibztablesync SET lastread=0,lastwrite=0," +
"lastruntime=#{et.lastruntime}," +
"lastendtime=#{et.lastendtime}," +
"etltimestamp=#{et.etltimestamp}," +
"lastrunresult='FINISH' WHERE syncid = #{et.syncid}";
HashMap<String, Object> param = new HashMap<>();
param.put("lastendtime", new Timestamp(System.currentTimeMillis()));
param.put("lastruntime", runTimestamp);
param.put("etltimestamp", runTimestamp);
param.put("syncid", sync.getId());
updateBySQL(updateSql,param);
}
}
@Override
public boolean processData(List<EntityObj> datas) {
liteDataService.saveBatch(systemId,entityName,dsName,datas);
String updateSql = "UPDATE ibztablesync SET lastwrite=(CASE WHEN lastwrite is null then 0 else lastwrite end+#{et.count})," +
"lastendtime=CASE WHEN lastread <= lastwrite THEN #{et.lastendtime} ELSE lastendtime END," +
"etltimestamp=CASE WHEN lastread <= lastwrite THEN #{et.etltimestamp} ELSE etltimestamp END," +
"lastrunresult=CASE WHEN lastread <= lastwrite THEN 'FINISH' ELSE lastrunresult END WHERE syncid = #{et.syncid}";
HashMap<String, Object> param = new HashMap<>();
param.put("count", datas.size());
param.put("lastendtime", new Timestamp(System.currentTimeMillis()));
param.put("etltimestamp", runTimestamp);
param.put("syncid", sync.getId());
updateBySQL(updateSql,param);
return true;
}
});
}
catch (Exception ex)
{
log.error("sync同步失败:"+sync.getEntityName()+":"+ex.getMessage());
ex.printStackTrace();
}
}
}
private void updateBySQL(String updateSql , Map param)
{
try {
DynamicDataSourceContextHolder.push("master");
this.getBaseMapper().updateBySQL(updateSql, param);
} catch (Exception ex) {
log.error("详细错误信息:" + ex.getMessage() );
} finally {
DynamicDataSourceContextHolder.poll();
}
}
}
......@@ -187,7 +187,7 @@ public class DbEntityService extends ServiceImpl<DbEntityMapper, EntityObj> impl
EntityObj rowdata;
while ((rowdata = (EntityObj) myMyBatisCursorItemReader.read()) != null) {
datas.add(rowdata);
if(datas.size() >= processSize){
if(datas.size() >= 5){
boolean rt=callback.processData(datas);
datas.clear();
if(!rt)
......
......@@ -15,6 +15,7 @@ import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.sql.Timestamp;
import java.util.*;
......@@ -59,19 +60,23 @@ public class LiteDataService {
public boolean saveBatch(String systemId, String entityName, String dsName, List<EntityObj> list) {
EntityModel entityModel = liteModelService.getEntityModel(systemId,entityName);
splitList(list, 500).forEach(objs->{
getEntityService(entityModel.getDsName()).saveBatch(dsName,entityModel,objs);
getEntityService(StringUtils.isEmpty(dsName)?entityModel.getDsName():dsName).saveBatch(dsName,entityModel,objs);
});
return true;
}
public boolean save(String systemId, String entityName, String dsName, EntityObj entityObj) {
EntityModel entityModel = liteModelService.getEntityModel(systemId,entityName);
return getEntityService(entityModel.getDsName()).save(dsName,entityModel,entityObj);
if(StringUtils.isEmpty(dsName))
dsName=entityModel.getDsName();
return getEntityService(dsName).save(dsName,entityModel,entityObj);
}
public EntityObj get(String systemId, String entityName, String dsName, EntityObj entityObj) {
EntityModel entityModel = liteModelService.getEntityModel(systemId,entityName);
return getEntityService(entityModel.getDsName()).get(dsName,entityModel,entityObj);
if(StringUtils.isEmpty(dsName))
dsName=entityModel.getDsName();
return getEntityService(dsName).get(dsName,entityModel,entityObj);
}
public List<EntityObj> search(String systemId, String entityName, String dsName, QueryFilter filter) {
......@@ -80,9 +85,23 @@ public class LiteDataService {
public List<EntityObj> search(String systemId, String entityName, String dataset, String dsName, QueryFilter filter) {
EntityModel entityModel = liteModelService.getEntityModel(systemId,entityName);
return getEntityService(entityModel.getDsName()).search(dsName,dataset,entityModel,filter);
if(StringUtils.isEmpty(dsName))
dsName=entityModel.getDsName();
return getEntityService(dsName).search(dsName,dataset,entityModel,filter);
}
public void searchCursor(String systemId, String entityName, String sql, String dsName, QueryFilter filter,LiteDataCallback callback) {
EntityModel entityModel = liteModelService.getEntityModel(systemId,entityName);
if(StringUtils.isEmpty(dsName))
dsName=entityModel.getDsName();
searchCursor(entityModel,sql,dsName,filter,callback);
}
public void searchCursor(EntityModel entityModel, String sql, String dsName, QueryFilter filter,LiteDataCallback callback) {
if(StringUtils.isEmpty(dsName))
dsName=entityModel.getDsName();
getEntityService(dsName).cursorRead(dsName,entityModel,sql,filter,callback);
}
public void processDataModel(String metaModelId,Timestamp lastModify,LiteDataCallback callback)
{
......
......@@ -14,6 +14,7 @@
<column name="SRCENTITYID" type="VARCHAR(100)"/>
<column name="SRCENTITYNAME" type="VARCHAR(100)"/>
<column name="SRCDSID" type="VARCHAR(100)"/>
<column name="SRCSYSTEMID" type="VARCHAR(100)"/>
<column name="SRCEXP" type="CLOB"/>
<column name="ENTITYID" type="VARCHAR(100)"/>
<column name="ENTITYNAME" type="VARCHAR(100)"/>
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册