提交 5f488c46 编写于 作者: zhouweidong's avatar zhouweidong

rocketMQ优化

上级 97778d93
<#ibiztemplate> <#ibiztemplate>
TARGET=PSSYSTEM TARGET=PSSYSTEM
</#ibiztemplate> </#ibiztemplate>
<#assign hasMQEntity=false> <#assign hasMQProducer=false>
<#assign hasMQConsumer=false>
<#list sys.getAllPSDataEntities() as dataEntity> <#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> <#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#assign hasMQEntity=true> <#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#assign hasMQConsumer=true>
<#elseif dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign hasMQProducer=true>
</#if>
<#if hasMQConsumer && hasMQProducer>
<#break> <#break>
</#if>
</#list> </#list>
</#if> </#if>
</#list> </#list>
<#if hasMQEntity> <#if hasMQProducer || hasMQConsumer>
<#assign nameAddress="127.0.0.1:9876"> <#assign nameAddress="127.0.0.1:9876">
<#assign instanceName="rmq-instance"> <#assign instanceName="rmq-instance">
package ${pub.getPKGCodeName()}.core.util.config; package ${pub.getPKGCodeName()}.core.util.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.message.MessageExt;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.List;
@Slf4j @Slf4j
@Configuration @Configuration
public class RocketMQConfig { public class RocketMQConfig {
<#list sys.getAllPSDataEntities() as dataEntity> <#if hasMQProducer>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> @Value("${r'${ibiz.rocketmq.producer.groupName: DEFAULT_PRODUCER}'}")
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> private String groupName;
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??> @Value("${r'${ibiz.rocketmq.producer.namesrvAddr:}'}")
<#assign syncAgent=dataSync.getOutPSSysDataSyncAgent()> private String namesrvAddr;
<#assign producer=dataEntity.codeName+syncAgent.codeName+"producer"> // 消息最大值
<#if ((syncAgent.getAgentTag())!'')!=''> @Value("${r'${ibiz.rocketmq.producer.maxMessageSize: 409600}'}")
<#assign nameAddress=syncAgent.getAgentTag()> private Integer maxMessageSize;
</#if> // 消息发送超时时间
<#if ((syncAgent.getAgentTag2())!'')!=''> @Value("${r'${ibiz.rocketmq.producer.sendMsgTimeOut: 3000}'}")
<#assign instanceName=syncAgent.getAgentTag2()> private Integer sendMsgTimeOut;
</#if> // 失败重试次数
@Bean("${producer}") @Value("${r'${ibiz.rocketmq.producer.retryTimesWhenSendFailed: 2}'}")
public DefaultMQProducer ${srfmethodname(producer)}(){ private Integer retryTimesWhenSendFailed;
DefaultMQProducer producer = null;
try{ /**
producer= new DefaultMQProducer(); * mq 生成者配置
producer.setSendMsgTimeout(6000); *
producer.setNamesrvAddr("${nameAddress}"); * @return
producer.setInstanceName("${instanceName}"); * @throws MQClientException
} */
catch(Exception e){ @Bean
log.error("初始化消息发送对象异常!"); public DefaultMQProducer defaultProducer() throws MQClientException {
} log.info("rocketmq defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
log.info("rocketmq producer server 开启成功----------------------------------");
return producer; return producer;
} }
</#if> </#if>
</#list>
</#if>
</#list>
<#list sys.getAllPSDataEntities() as dataEntity> <#if hasMQConsumer>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> // 消费者线程数据量
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> @Value("${r'${ibiz.rocketmq.consumer.consumeThreadMin: 1}'}")
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??> private Integer consumeThreadMin;
<#assign syncAgent=dataSync.getInPSSysDataSyncAgent()> @Value("${r'${ibiz.rocketmq.consumer.consumeThreadMax: 1}'}")
<#assign consumer=dataEntity.codeName+syncAgent.codeName+"consumer"> private Integer consumeThreadMax;
<#assign entityName=dataEntity.getCodeName()> @Value("${r'${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize: 1}'}")
<#if ((syncAgent.getAgentTag())!'')!=''> private Integer consumeMessageBatchMaxSize;
<#assign nameAddress=syncAgent.getAgentTag()>
</#if>
<#if ((syncAgent.getAgentTag2())!'')!=''>
<#assign instanceName=syncAgent.getAgentTag2()>
</#if>
@Autowired @Autowired
@Lazy private RocketMQListenerProcessor listenerProcessor;
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.service.I${entityName}Service ${entityName}Service;
@Bean("${consumer}") /**
public DefaultMQPushConsumer ${srfmethodname(consumer)}(){ * mq 消费者配置
DefaultMQPushConsumer consumer = null; *
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQPushConsumer defaultConsumer() {
log.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(listenerProcessor);
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
try { try {
consumer = new DefaultMQPushConsumer(); consumer.subscribe("default", "*");
consumer.setNamesrvAddr("${nameAddress}");
consumer.setInstanceName("${instanceName}");
consumer.subscribe("${dataSync.codeName}", "${dataSync.codeName}");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
<#if dataSync.getInTestPSDEAction()??>
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(msg.getBody()),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
${entityName}Service.${inputAction}(domain);
<#else>
log.info("接收到[]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody()));
</#if>
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); consumer.start();
}catch (Exception e){ log.info("rocketmq consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, "default", namesrvAddr);
log.error("初始化消息接收对象异常!"); } catch (MQClientException e) {
log.error("rocketmq consumer 创建失败!" + e);
} }
return consumer; return consumer;
} }
</#if> </#if>
</#list>
</#if>
</#list>
} }
</#if> </#if>
\ No newline at end of file
<#ibiztemplate>
TARGET=PSSYSTEM
</#ibiztemplate>
<#assign hasMQConsumer=false>
<#comment>实体中配置输入数据同步</#comment>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#assign hasMQConsumer=true>
</#if>
</#list>
</#if>
</#list>
<#if hasMQConsumer>
package ${pub.getPKGCodeName()}.core.util.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* MQ订阅消息处理
*/
@Slf4j
@Component
public class RocketMQListenerProcessor implements MessageListenerOrderly {
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()?? &&dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??>
<#assign entityName=dataEntity.getCodeName()>
<#if !P.exists(entityName,"service")>
@Autowired
@Lazy
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.service.I${entityName}Service ${entityName}Service;
</#if>
</#if>
</#list>
</#if>
</#list>
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(list)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeOrderlyStatus.SUCCESS;
}
for (MessageExt messageExt : list) {
log.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#assign dataSyncCodeName=dataSync.codeName?lower_case>
<#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??>
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
if ("${dataSyncCodeName}".equalsIgnoreCase(messageExt.getTopic())) {
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(msg.getBody()),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
${entityName}Service.${inputAction}(domain);
}
<#else>
log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody()));
</#if>
</#if>
</#list>
</#if>
</#list>
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
</#if>
\ No newline at end of file
<#ibiztemplate> <#ibiztemplate>
TARGET=PSSYSTEM TARGET=PSSYSTEM
</#ibiztemplate> </#ibiztemplate>
<#assign hasMQEntity=false> <#assign hasMQProducer=false>
<#list sys.getAllPSDataEntities() as dataEntity> <#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> <#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??> <#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign hasMQEntity=true> <#assign hasMQProducer=true>
<#break> <#break>
</#if> </#if>
</#list> </#list>
</#if> </#if>
</#list> </#list>
<#if hasMQEntity> <#if hasMQProducer>
package ${pub.getPKGCodeName()}.util.aspect; package ${pub.getPKGCodeName()}.util.aspect;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import ${pub.getPKGCodeName()}.util.domain.EntityBase; import ${pub.getPKGCodeName()}.util.domain.EntityBase;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.aspectj.lang.JoinPoint; import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression; import org.springframework.expression.Expression;
...@@ -35,7 +36,6 @@ import org.springframework.scheduling.annotation.Async; ...@@ -35,7 +36,6 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.List;
/** /**
* rocketMQ消息切面 * rocketMQ消息切面
...@@ -50,12 +50,10 @@ public class RocketMQAspect ...@@ -50,12 +50,10 @@ public class RocketMQAspect
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> <#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??> <#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign syncAgent=dataSync.getOutPSSysDataSyncAgent()> <#assign producer=dataEntity.codeName+dataSync.codeName>
<#assign producer=dataEntity.codeName+syncAgent.codeName+"producer">
@Autowired @Autowired
@Qualifier("${producer}")
@Lazy @Lazy
DefaultMQProducer ${producer}; DefaultMQProducer defaultMQProducer;
@AfterReturning(value = "(execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.create*(..))||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.update*(..))||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.save*(..)) ||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.remove*(..))) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.create*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.update*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.save*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.remove*(..))") @AfterReturning(value = "(execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.create*(..))||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.update*(..))||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.save*(..)) ||execution(* ${pub.getPKGCodeName()}.core.*.service.*${dataEntity.codeName}*.remove*(..))) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.create*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.update*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.save*(..)) && !execution(* ${pub.getPKGCodeName()}.core.es.service.*.remove*(..))")
@Async @Async
...@@ -64,7 +62,7 @@ public class RocketMQAspect ...@@ -64,7 +62,7 @@ public class RocketMQAspect
<#assign actionName=srfmethodname(dataSync.getOutTestPSDEAction().codeName)> <#assign actionName=srfmethodname(dataSync.getOutTestPSDEAction().codeName)>
outputAction(point, "${actionName}"); outputAction(point, "${actionName}");
</#if> </#if>
sendMsg(${producer}, "${dataSync.codeName}", "${dataSync.codeName}", getEntity(point)); sendMsg("default", "${dataSync.codeName?lower_case}", getEntity(point));
} }
</#if> </#if>
</#list> </#list>
...@@ -118,16 +116,15 @@ public class RocketMQAspect ...@@ -118,16 +116,15 @@ public class RocketMQAspect
* @param tag * @param tag
* @param body * @param body
*/ */
private void sendMsg(DefaultMQProducer producer, String topic, String tag, Object body) { private void sendMsg(String topic, String tag, Object body) {
if(ObjectUtils.isEmpty(body)) { if(ObjectUtils.isEmpty(body)) {
log.error("发送消息失败,无法获取到要发送的消息内容!"); log.error("消息内容为空,[{}]消息将被忽略!",tag);
return; return;
} }
try { try {
producer.start();
Message message = new Message(topic, tag, JSON.toJSONString(body).getBytes()); Message message = new Message(topic, tag, JSON.toJSONString(body).getBytes());
producer.send(message); SendResult sendResult = defaultMQProducer.send(message);
producer.shutdown(); log.info("消息发送响应:" + sendResult.toString());
} catch (Exception e) { } catch (Exception e) {
log.error("消息发送异常,"+e); log.error("消息发送异常,"+e);
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册