提交 3ce7c9ba 编写于 作者: zhouweidong's avatar zhouweidong

rocketmq区分生产者与消费者配置

上级 bbef8674
<#ibiztemplate>
TARGET=PSSYSTEM
</#ibiztemplate>
<#assign hasMQProducer=false>
<#assign hasMQConsumer=false>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#assign hasMQConsumer=true>
<#elseif dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign hasMQProducer=true>
</#if>
<#if hasMQConsumer && hasMQProducer>
<#break>
<#break >
</#if>
</#list>
</#if>
</#list>
<#if hasMQProducer || hasMQConsumer>
<#if hasMQConsumer>
<#comment>服务接口微服务平台配置</#comment>
<#assign mqServerAddress="">
<#if sys.getAllPSDevSlnMSDepAPIs()??>
<#list sys.getAllPSDevSlnMSDepAPIs() as depSysApi>
<#if depSysApi.getPSDCMSPlatform()?? >
<#if depSysApi.getUserParam("ibiz.rocketmq.producer.namesrvAddr","")??>
<#assign mqServerAddress = depSysApi.getUserParam("ibiz.rocketmq.producer.namesrvAddr","")>
<#if depSysApi.getUserParam("rocketmq.producer.namesrvAddr","")??>
<#assign mqServerAddress = depSysApi.getUserParam("rocketmq.producer.namesrvAddr","")>
</#if>
</#if>
</#list>
......@@ -50,55 +45,26 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@Slf4j
@Configuration
@ConditionalOnExpression("${r'${ibiz.rocketmq.producer.isOnOff:'+"'"+mqServer+"'"+"}.equals('on')"}")
public class RocketMQConfig {
@ConditionalOnExpression("${r'${rocketmq.isOnOff:'+"'"+mqServer+"'"+"}.equals('on')"}")
public class RocketMQConsumer {
@Value("${r'${ibiz.rocketmq.producer.groupName:default}'}")
@Value("${r'${rocketmq.producer.groupName:default}'}")
private String groupName;
@Value("${r'${ibiz.rocketmq.producer.namesrvAddr:127.0.0.1:9876}'}")
@Value("${r'${rocketmq.producer.namesrvAddr:127.0.0.1:9876}'}")
private String namesrvAddr;
<#if hasMQProducer>
// 消息最大值
@Value("${r'${ibiz.rocketmq.producer.maxMessageSize:409600}'}")
private Integer maxMessageSize;
// 消息发送超时时间
@Value("${r'${ibiz.rocketmq.producer.sendMsgTimeOut:3000}'}")
private Integer sendMsgTimeOut;
// 失败重试次数
@Value("${r'${ibiz.rocketmq.producer.retryTimesWhenSendFailed:2}'}")
private Integer retryTimesWhenSendFailed;
/**
* mq 生成者配置
*
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info("rocketmq defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
log.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
</#if>
<#if hasMQConsumer>
// 消费者线程数据量
@Value("${r'${ibiz.rocketmq.consumer.consumeThreadMin:1}'}")
@Value("${r'${rocketmq.consumer.consumeThreadMin:1}'}")
private Integer consumeThreadMin;
@Value("${r'${ibiz.rocketmq.consumer.consumeThreadMax:1}'}")
@Value("${r'${rocketmq.consumer.consumeThreadMax:1}'}")
private Integer consumeThreadMax;
@Value("${r'${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize:1}'}")
@Value("${r'${rocketmq.consumer.consumeMessageBatchMaxSize:1}'}")
private Integer consumeMessageBatchMaxSize;
@Value("${r'${ibiz.rocketmq.topic:default}'}")
@Value("${r'${rocketmq.topic:default}'}")
private String topic;
@Autowired
......@@ -139,7 +105,6 @@ public class RocketMQConfig {
}
return consumer;
}
</#if>
}
</#if>
......
......@@ -49,7 +49,7 @@ import java.util.List;
*/
@Slf4j
@Component
@ConditionalOnExpression("${r'${ibiz.rocketmq.producer.isOnOff:'+"'"+mqServer+"'"+"}.equals('on')"}")
@ConditionalOnExpression("${r'${rocketmq.producer.isOnOff:'+"'"+mqServer+"'"+"}.equals('on')"}")
public class RocketMQListenerProcessor implements MessageListenerOrderly {
<#list sys.getAllPSDataEntities() as dataEntity>
......
<#ibiztemplate>
TARGET=PSSYSTEM
</#ibiztemplate>
<#assign hasMQProducer=false>
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getOutPSSysDataSyncAgent?? && dataSync.getOutPSSysDataSyncAgent()??>
<#assign hasMQProducer=true>
<#break >
</#if>
</#list>
</#if>
</#list>
<#if hasMQProducer>
<#comment>服务接口微服务平台配置</#comment>
<#assign mqServerAddress="">
<#if sys.getAllPSDevSlnMSDepAPIs()??>
<#list sys.getAllPSDevSlnMSDepAPIs() as depSysApi>
<#if depSysApi.getPSDCMSPlatform()?? >
<#if depSysApi.getUserParam("rocketmq.producer.namesrvAddr","")??>
<#assign mqServerAddress = depSysApi.getUserParam("rocketmq.producer.namesrvAddr","")>
</#if>
</#if>
</#list>
</#if>
<#if mqServerAddress!=''>
<#assign mqServer="on">
<#else>
<#assign mqServer="off">
</#if>
package ${pub.getPKGCodeName()}.core.util.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@Slf4j
@Configuration
@ConditionalOnExpression("${r'${rocketmq.isOnOff:'+"'"+mqServer+"'"+"}.equals('on')"}")
public class RocketMQProducer {
@Value("${r'${rocketmq.producer.groupName:default}'}")
private String groupName;
@Value("${r'${rocketmq.producer.namesrvAddr:127.0.0.1:9876}'}")
private String namesrvAddr;
// 消息最大值
@Value("${r'${rocketmq.producer.maxMessageSize:409600}'}")
private Integer maxMessageSize;
// 消息发送超时时间
@Value("${r'${rocketmq.producer.sendMsgTimeOut:3000}'}")
private Integer sendMsgTimeOut;
// 失败重试次数
@Value("${r'${rocketmq.producer.retryTimesWhenSendFailed:2}'}")
private Integer retryTimesWhenSendFailed;
/**
* mq 生成者配置
*
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info("rocketmq defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
log.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
}
</#if>
<#function getSubscribeTags>
<#assign result="">
<#list sys.getAllPSDataEntities() as dataEntity>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#if result!="">
<#assign result=result+" || ">
</#if>
<#assign result=result+dataSync.codeName?lower_case>
</#if>
</#list>
</#if>
</#list>
<#return result>
</#function>
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册