提交 8b14fbfc 编写于 作者: zhouweidong's avatar zhouweidong

适配rocketmq

上级 ecb508f8
<#ibiztemplate>
TARGET=PSSYSTEM
</#ibiztemplate>
<#assign hasMQEntity=false>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#assign hasMQEntity=true>
<#break>
</#list>
</#if>
</#list>
<#if hasMQEntity>
<#assign nameAddress="127.0.0.1:9876">
<#assign instanceName="rmq-instance">
package ${dataEntity.codeName}.core.util.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import lombok.extern.slf4j.Slf4j;
import ${dataEntity.codeName}.core.valuerule.domain.City;
import ${dataEntity.codeName}.core.valuerule.service.ICityService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Slf4j
@Configuration
public class RocketMQConfig {
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign syncAgent=dataSync.getOutPSSysDataSyncAgent()>
<#assign producer=dataEntity.codeName+syncAgent.codeName+"producer">
<#if ((syncAgent.getAgentTag())!'')!=''>
<#assign nameAddress=syncAgent.getAgentTag()>
</#if>
<#if ((syncAgent.getAgentTag2())!'')!=''>
<#assign instanceName=syncAgent.getAgentTag2()>
</#if>
@Bean("${producer}")
public DefaultMQProducer ${srfmethodname(producer)}(){
DefaultMQProducer producer = null;
try{
producer= new DefaultMQProducer();
producer.setSendMsgTimeout(6000);
producer.setNamesrvAddr("${nameAddress}");
producer.setInstanceName("${instanceName}");
}
catch(Exception e){
log.error("初始化消息发送对象异常!");
}
return producer;
}
</#if>
</#list>
</#if>
</#list>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#assign syncAgent=dataSync.getInPSSysDataSyncAgent()>
<#assign consumer=dataEntity.codeName+syncAgent.codeName+"consumer">
<#if ((syncAgent.getAgentTag())!'')!=''>
<#assign nameAddress=syncAgent.getAgentTag()>
</#if>
<#if ((syncAgent.getAgentTag2())!'')!=''>
<#assign instanceName=syncAgent.getAgentTag2()>
</#if>
@Autowired
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.service.I${dataEntity.codeName}Service ${dataEntity.codeName}Service;
@Bean("${consumer}")
public DefaultMQPushConsumer ${srfmethodname(consumer)}(){
DefaultMQPushConsumer consumer = null;
try {
log.info("消息消费者");
consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr("${nameAddress}");
consumer.setInstanceName("${instanceName}");
consumer.subscribe("${dataSync.codeName}", "${dataSync.codeName}");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
<#if dataSync.getInTestPSDEAction()??>
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
${dataEntity.codeName}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${dataEntity.getCodeName()} domain = JSON.parseObject(new String(msg.getBody()),${dataEntity.codeName}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${dataEntity.getCodeName()}.class);
${dataEntity.codeName}Service.${inputAction}(domain);
<#else>
log.info("接收到[]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody()));
</#if>
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}catch (Exception e){
log.error("初始化消息接收对象异常!");
}
return consumer;
}
</#if>
</#list>
</#if>
</#list>
}
</#if>
\ No newline at end of file
......@@ -15,6 +15,15 @@ TARGET=PSSYSTEM
<#break>
</#if>
</#list>
<#assign hasMQEntity=false>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#assign hasMQEntity=true>
<#break>
</#list>
</#if>
</#list>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
......@@ -114,6 +123,9 @@ TARGET=PSSYSTEM
</#if>
</#list>
</#if>
<#if hasMQEntity>
<rocketmq.version>4.7.0</rocketmq.version>
</#if>
</properties>
......@@ -327,7 +339,13 @@ TARGET=PSSYSTEM
</exclusions>
</dependency>
</#if>
<#if hasMQEntity>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${r'${rocketmq.version}'}</version>
</dependency>
</#if>
</dependencies>
</dependencyManagement>
......
<#ibiztemplate>
TARGET=PSSYSTEM
</#ibiztemplate>
<#assign hasMQEntity=false>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign hasMQEntity=true>
<#break>
</#if>
</#list>
</#if>
</#list>
<#if hasMQEntity>
package ${pub.getPKGCodeName()}.util.aspect;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import lombok.extern.slf4j.Slf4j;
import ${pub.getPKGCodeName()}.util.domain.EntityBase;
import ${pub.getPKGCodeName()}.util.errors.BadRequestAlertException;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* rocketMQ消息切面
*/
@Aspect
@Component
@Slf4j
public class RocketMQAspect
{
private final ExpressionParser parser = new SpelExpressionParser();
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign syncAgent=dataSync.getOutPSSysDataSyncAgent()>
<#assign producer=dataEntity.codeName+syncAgent.codeName+"producer">
@Autowired
@Qualifier("${producer}")
DefaultMQProducer ${producer};
@AfterReturning(value = "(execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.create*(..))||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.update*(..))||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.save*(..)) ||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.remove*(..))) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.create*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.update*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.save*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.remove*(..))")
@Async
public void ${srfmethodname(producer)}(JoinPoint point){
<#if dataSync.getOutTestPSDEAction?? && dataSync.getOutTestPSDEAction()??>
outputAction(point,"${dataSync.getOutTestPSDEAction().codeName}");
</#if>
sendMsg(${producer},"${dataSync.codeName}","${dataSync.codeName}",getEntity(point));
}
</#if>
</#list>
</#if>
</#list>
/**
* 输出过滤行为
* @param point
* @param actionName
*/
private void outputAction(JoinPoint point,String actionName){
Object [] args = point.getArgs();
if(ObjectUtils.isEmpty(args) || args.length==0)
return ;
Object arg=args[0];
Object service=point.getTarget();
EvaluationContext serviceCtx = new StandardEvaluationContext();
serviceCtx.setVariable("service",service);
serviceCtx.setVariable("arg",arg);
Expression serviceExp = parser.parseExpression(String.format("#service.%s(#arg)",actionName));
serviceExp.getValue(serviceCtx);
}
/**
* 获取实体对象
* @param point
* @return
*/
private Object getEntity(JoinPoint point){
Object entity=null;
String action=point.getSignature().getName();
Object [] args = point.getArgs();
if(ObjectUtils.isEmpty(args) || args.length==0 || StringUtils.isEmpty(action))
return entity;
Object arg=args[0];
if(arg instanceof EntityBase || arg instanceof List)
return arg;
else
return null;
}
/**
* 发送消息
* @param producer
* @param topic
* @param tag
* @param body
*/
private void sendMsg(DefaultMQProducer producer ,String topic ,String tag ,Object body){
if(ObjectUtils.isEmpty(body)){
log.error("发送消息失败,无法获取到要发送的消息内容!");
return ;
}
try {
producer.start();
Message message = new Message(topic, tag, JSON.toJSONString(body).getBytes());
producer.send(message);
producer.shutdown();
} catch (Exception e) {
log.error("消息发送异常,"+e);
}
}
}
</#if>
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册