提交 07a4e20c 编写于 作者: zhouweidong's avatar zhouweidong

实体数据同步逻辑优化(remove)

上级 279993ca
...@@ -32,21 +32,21 @@ TARGET=PSSYSTEM ...@@ -32,21 +32,21 @@ TARGET=PSSYSTEM
</#if> </#if>
package ${pub.getPKGCodeName()}.core.util.config; package ${pub.getPKGCodeName()}.core.util.config;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* MQ订阅消息处理 * MQ订阅消息处理
...@@ -82,7 +82,8 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -82,7 +82,8 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
try { try {
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
Object body = null; String body = new String(messageExt.getBody(),"utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
<#list sys.getAllPSDataEntities() as dataEntity> <#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> <#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
...@@ -92,18 +93,15 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -92,18 +93,15 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)> <#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
<#assign columnname = dataEntity.getKeyPSDEField().getName()?lower_case> <#assign columnname = dataEntity.getKeyPSDEField().getName()?lower_case>
if ("${dataSyncCodeName}".equalsIgnoreCase(tags)) { if ("${dataSyncCodeName}".equalsIgnoreCase(tags)) {
<#if inputAction?lower_case == 'get' || inputAction?lower_case == 'remove'> JSONObject obj = JSONObject.parseObject(body);
body = toObject(messageExt.getBody());
if (!ObjectUtils.isEmpty(body)) {
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = new ${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}(); ${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = new ${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}();
domain.set("${columnname}",body); for(Map.Entry<String, Object> entry : obj.entrySet()){
${entityName}Service.${inputAction}(domain.get${srfcaseformat(dataEntity.getKeyPSDEField().codeName,'l_u2lC')?cap_first}()); domain.set(entry.getKey(),entry.getValue());
} }
<#if inputAction?lower_case == 'get' || inputAction?lower_case == 'remove'>
${entityName}Service.${inputAction}(domain.get${srfcaseformat(dataEntity.getKeyPSDEField().codeName,'l_u2lC')?cap_first}());
<#else> <#else>
String strBody = new String(messageExt.getBody(), "utf-8");
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(strBody,${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
${entityName}Service.${inputAction}(domain); ${entityName}Service.${inputAction}(domain);
body = strBody;
</#if> </#if>
} }
<#else> <#else>
...@@ -113,7 +111,6 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -113,7 +111,6 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
</#list> </#list>
</#if> </#if>
</#list> </#list>
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) { } catch (Exception e) {
log.error("获取MQ消息内容异常{}", e); log.error("获取MQ消息内容异常{}", e);
} }
...@@ -121,34 +118,5 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -121,34 +118,5 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
} }
return ConsumeOrderlyStatus.SUCCESS; return ConsumeOrderlyStatus.SUCCESS;
} }
/**
* 数组转对象
*
* @param bytes
* @return
*/
public Object toObject(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
bis = new ByteArrayInputStream(bytes);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} catch (Exception ex) {
} finally {
try {
if (ois != null) {
ois.close();
}
if (bis != null) {
bis.close();
}
} catch (Exception e) {
}
}
return obj;
}
} }
</#if> </#if>
\ No newline at end of file
...@@ -31,12 +31,15 @@ TARGET=PSSYSTEM ...@@ -31,12 +31,15 @@ TARGET=PSSYSTEM
</#if> </#if>
package ${pub.getPKGCodeName()}.util.aspect; package ${pub.getPKGCodeName()}.util.aspect;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import ${pub.getPKGCodeName()}.util.domain.EntityBase;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.aspectj.lang.JoinPoint; import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -47,14 +50,10 @@ import org.springframework.expression.Expression; ...@@ -47,14 +50,10 @@ import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
/** /**
* rocketMQ消息切面 * rocketMQ消息切面
*/ */
...@@ -78,14 +77,15 @@ public class RocketMQAspect ...@@ -78,14 +77,15 @@ public class RocketMQAspect
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??> <#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign producer=dataEntity.codeName+dataSync.codeName> <#assign producer=dataEntity.codeName+dataSync.codeName>
@AfterReturning(value = "${getPointCut(dataEntity,dataSync)}") @Around(value = "${getPointCut(dataEntity,dataSync)}")
@Async public void ${srfmethodname(producer)}(ProceedingJoinPoint point) {
public void ${srfmethodname(producer)}(JoinPoint point) { Object entity = getEntity(point);
Object result = point.proceed();
<#if dataSync.getOutTestPSDEAction?? && dataSync.getOutTestPSDEAction()??> <#if dataSync.getOutTestPSDEAction?? && dataSync.getOutTestPSDEAction()??>
<#assign actionName=srfmethodname(dataSync.getOutTestPSDEAction().codeName)> <#assign actionName=srfmethodname(dataSync.getOutTestPSDEAction().codeName)>
outputAction(point, "${actionName}"); outputAction(point, "${actionName}", entity);
</#if> </#if>
sendMsg(topic, "${dataSync.codeName?lower_case}", getEntity(point)); sendMsg(topic, "${dataSync.codeName?lower_case}", entity);
} }
</#if> </#if>
</#list> </#list>
...@@ -96,17 +96,17 @@ public class RocketMQAspect ...@@ -96,17 +96,17 @@ public class RocketMQAspect
* 输出过滤行为 * 输出过滤行为
* @param point * @param point
* @param actionName * @param actionName
* @param entity
*/ */
private void outputAction(JoinPoint point,String actionName) { private void outputAction(JoinPoint point, String actionName, Object entity) {
Object [] args = point.getArgs(); Object[] args = point.getArgs();
if(ObjectUtils.isEmpty(args) || args.length==0) { if (ObjectUtils.isEmpty(args) || args.length == 0 || ObjectUtils.isEmpty(entity)) {
return; return;
} }
Object arg = args[0];
Object service = point.getTarget(); Object service = point.getTarget();
EvaluationContext serviceCtx = new StandardEvaluationContext(); EvaluationContext serviceCtx = new StandardEvaluationContext();
serviceCtx.setVariable("service", service); serviceCtx.setVariable("service", service);
serviceCtx.setVariable("arg", arg); serviceCtx.setVariable("arg", entity);
Expression serviceExp = parser.parseExpression(String.format("#service.%s(#arg)", actionName)); Expression serviceExp = parser.parseExpression(String.format("#service.%s(#arg)", actionName));
serviceExp.getValue(serviceCtx); serviceExp.getValue(serviceCtx);
} }
...@@ -117,13 +117,31 @@ public class RocketMQAspect ...@@ -117,13 +117,31 @@ public class RocketMQAspect
* @param point * @param point
* @return * @return
*/ */
private Object getEntity(JoinPoint point) { private Object getEntity(ProceedingJoinPoint point) {
Object entity = null;
try {
String action = point.getSignature().getName(); String action = point.getSignature().getName();
Object[] args = point.getArgs(); Object[] args = point.getArgs();
Object serviceObj = point.getTarget();
if (ObjectUtils.isEmpty(args) || args.length == 0 || StringUtils.isEmpty(action)) { if (ObjectUtils.isEmpty(args) || args.length == 0 || StringUtils.isEmpty(action)) {
return null; return entity;
}
if ("remove".equalsIgnoreCase(action) || "get".equalsIgnoreCase(action)) {
Object idValue = args[0];
if (!ObjectUtils.isEmpty(serviceObj)) {
EvaluationContext oldContext = new StandardEvaluationContext();
oldContext.setVariable("service", serviceObj);
oldContext.setVariable("id", idValue);
Expression oldExp = parser.parseExpression("#service.get(#id)");
entity = oldExp.getValue(oldContext, EntityBase.class);
}
} else {
entity = args[0];
} }
return args[0]; } catch (Exception e) {
log.error("发送消息失败,无法获取实体对象" + e);
}
return entity;
} }
/** /**
...@@ -139,43 +157,13 @@ public class RocketMQAspect ...@@ -139,43 +157,13 @@ public class RocketMQAspect
return; return;
} }
try { try {
Message message = new Message(topic, tag, toByte(body)); Message message = new Message(topic, tag, JSON.toJSONString(body).getBytes());
SendResult sendResult = defaultMQProducer.send(message); SendResult sendResult = defaultMQProducer.send(message);
log.info("消息发送响应:" + sendResult.toString()); log.info("消息发送响应:" + sendResult.toString());
} catch (Exception e) { } catch (Exception e) {
log.error("消息发送异常," + e); log.error("消息发送异常," + e);
} }
} }
/**
* 转byte数组
*
* @param object
* @return
*/
public byte[] toByte(Object object) {
byte[] bytes = null;
ObjectOutputStream oos = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
oos = new ObjectOutputStream(bos);
oos.writeObject(object);
oos.flush();
bytes = bos.toByteArray();
} catch (Exception ex) {
} finally {
try {
if (oos != null) {
oos.close();
}
if (bos != null) {
bos.close();
}
} catch (Exception e) {
}
}
return bytes;
}
} }
</#if> </#if>
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册