提交 f6058d7b 编写于 作者: zhouweidong's avatar zhouweidong

实体数据同步逻辑优化(remove)

上级 19beda15
...@@ -43,6 +43,9 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -43,6 +43,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List; import java.util.List;
/** /**
...@@ -79,8 +82,7 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -79,8 +82,7 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
try { try {
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8"); Object body = null;
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
<#list sys.getAllPSDataEntities() as dataEntity> <#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> <#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
...@@ -88,16 +90,25 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -88,16 +90,25 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
<#assign dataSyncCodeName=dataSync.codeName?lower_case> <#assign dataSyncCodeName=dataSync.codeName?lower_case>
<#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??> <#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??>
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)> <#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
<#assign columnname = dataEntity.getKeyPSDEField().getName()?lower_case>
if ("${dataSyncCodeName}".equalsIgnoreCase(tags)) { if ("${dataSyncCodeName}".equalsIgnoreCase(tags)) {
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(body),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
<#if inputAction?lower_case == 'get' || inputAction?lower_case == 'remove'> <#if inputAction?lower_case == 'get' || inputAction?lower_case == 'remove'>
body = toObject(messageExt.getBody());
if (!ObjectUtils.isEmpty(body)) {
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = new ${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName};
domain.set("${columnname}",body);
${entityName}Service.${inputAction}(domain.get${srfcaseformat(dataEntity.getKeyPSDEField().codeName,'l_u2lC')?cap_first}()); ${entityName}Service.${inputAction}(domain.get${srfcaseformat(dataEntity.getKeyPSDEField().codeName,'l_u2lC')?cap_first}());
}
<#else> <#else>
String strBody = new String(messageExt.getBody(), "utf-8");
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(strBody,${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
${entityName}Service.${inputAction}(domain); ${entityName}Service.${inputAction}(domain);
body = strBody;
</#if> </#if>
} }
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
<#else> <#else>
log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody())); log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(messageExt.getBody()));
</#if> </#if>
</#if> </#if>
</#list> </#list>
...@@ -110,5 +121,34 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -110,5 +121,34 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
} }
return ConsumeOrderlyStatus.SUCCESS; return ConsumeOrderlyStatus.SUCCESS;
} }
/**
* 数组转对象
*
* @param bytes
* @return
*/
public Object toObject(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
bis = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} catch (Exception ex) {
} finally {
try {
if (ois != null) {
ois.close();
}
if (bis != null) {
bis.close();
}
} catch (Exception e) {
}
}
return obj;
}
} }
</#if> </#if>
\ No newline at end of file
...@@ -32,11 +32,6 @@ TARGET=PSSYSTEM ...@@ -32,11 +32,6 @@ TARGET=PSSYSTEM
package ${pub.getPKGCodeName()}.util.aspect; package ${pub.getPKGCodeName()}.util.aspect;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import ${pub.getPKGCodeName()}.util.domain.EntityBase;
import ${pub.getPKGCodeName()}.util.errors.BadRequestAlertException;
import ${pub.getPKGCodeName()}.util.helper.DEFieldCacheMap;
import java.lang.reflect.Method;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
...@@ -45,18 +40,20 @@ import org.aspectj.lang.annotation.AfterReturning; ...@@ -45,18 +40,20 @@ import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression; import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.List;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
/** /**
* rocketMQ消息切面 * rocketMQ消息切面
...@@ -116,75 +113,68 @@ public class RocketMQAspect ...@@ -116,75 +113,68 @@ public class RocketMQAspect
/** /**
* 获取实体对象 * 获取实体对象
*
* @param point * @param point
* @return * @return
*/ */
private Object getEntity(JoinPoint point) { private Object getEntity(JoinPoint point) {
String action=point.getSignature().getName(); String action = point.getSignature().getName();
Object [] args = point.getArgs(); Object[] args = point.getArgs();
if(ObjectUtils.isEmpty(args) || args.length==0 || StringUtils.isEmpty(action)) { if (ObjectUtils.isEmpty(args) || args.length == 0 || StringUtils.isEmpty(action)) {
return null; return null;
} }
Object arg=args[0]; return args[0];
if(arg instanceof EntityBase || arg instanceof List) {
return arg;
}
else {
Object service = point.getTarget();
EntityBase entity = getEntity(service.getClass());
String id = DEFieldCacheMap.getDEKeyField(entity.getClass());
if(StringUtils.isEmpty(id)) {
log.debug("无法获取实体主键属性[{}]",entity.getClass().getSimpleName());
return null;
}
entity.set(id, arg);
return entity;
}
}
/**
* 获取实体
*
* @param service
* @return
*/
private EntityBase getEntity(Class service) {
Method[] methods = service.getDeclaredMethods();
for (Method method : methods) {
for (Class cls : method.getParameterTypes()) {
try {
Object arg = cls.newInstance();
if (arg instanceof EntityBase) {
return (EntityBase) arg;
}
} catch (Exception e) {
}
}
}
if(!ObjectUtils.isEmpty(service.getSuperclass()) && !service.getSuperclass().getName().equals(Object.class.getName())) {
return getEntity(service.getSuperclass());
}
throw new BadRequestAlertException("获取实体信息失败", "RocketMQAspect", "getEntity");
} }
/** /**
* 发送消息 * 发送消息
*
* @param topic * @param topic
* @param tag * @param tag
* @param body * @param body
*/ */
private void sendMsg(String topic, String tag, Object body) { private void sendMsg(String topic, String tag, Object body) {
if(ObjectUtils.isEmpty(body)) { if (ObjectUtils.isEmpty(body)) {
log.error("消息内容为空,[{}]消息将被忽略!",tag); log.error("消息内容为空,[{}]消息将被忽略!", tag);
return; return;
} }
try { try {
Message message = new Message(topic, tag, JSON.toJSONString(body).getBytes()); Message message = new Message(topic, tag, toByte(body));
SendResult sendResult = defaultMQProducer.send(message); SendResult sendResult = defaultMQProducer.send(message);
log.info("消息发送响应:" + sendResult.toString()); log.info("消息发送响应:" + sendResult.toString());
} catch (Exception e) { } catch (Exception e) {
log.error("消息发送异常,"+e); log.error("消息发送异常," + e);
}
}
/**
* 转byte数组
*
* @param object
* @return
*/
public byte[] toByte(Object object) {
byte[] bytes = null;
ObjectOutputStream oos = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
oos = new ObjectOutputStream(bos);
oos.writeObject(object);
oos.flush();
bytes = bos.toByteArray();
} catch (Exception ex) {
} finally {
try {
if (oos != null) {
oos.close();
}
if (bos != null) {
bos.close();
}
} catch (Exception e) {
}
} }
return bytes;
} }
} }
</#if> </#if>
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册