提交 e640ff99 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

MQ消费这个支持groupname配置

上级 6f2fbffd
...@@ -20,8 +20,10 @@ import org.springframework.context.annotation.Configuration; ...@@ -20,8 +20,10 @@ import org.springframework.context.annotation.Configuration;
@Configuration @Configuration
public class MQConsumerConfigure { public class MQConsumerConfigure {
@Value("${rocketmq.consumer.groupName:}") @Value("${rocketmq.consumer.ruleEngineGroupName:dstCG}")
private String groupName; private String ruleEngineGroupName;
@Value("${rocketmq.consumer.resultsGroupName:resultsCG}")
private String resultsGroupName;
@Value("${rocketmq.consumer.namesrvAddr:}") @Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr; private String namesrvAddr;
@Value("${rocketmq.consumer.topics:}") @Value("${rocketmq.consumer.topics:}")
...@@ -52,8 +54,7 @@ public class MQConsumerConfigure { ...@@ -52,8 +54,7 @@ public class MQConsumerConfigure {
//@ConditionalOnExpression("${rocketmq.consumer.isOnOff:off}.equals('on')") //@ConditionalOnExpression("${rocketmq.consumer.isOnOff:off}.equals('on')")
public DefaultMQPushConsumer defaultConsumer() throws MQClientException { public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
log.info("defaultConsumer 正在创建---------------------------------------"); log.info("defaultConsumer 正在创建---------------------------------------");
String groupName = "dstCG"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ruleEngineGroupName);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr); consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMax(1);
...@@ -65,7 +66,7 @@ public class MQConsumerConfigure { ...@@ -65,7 +66,7 @@ public class MQConsumerConfigure {
try { try {
consumer.subscribe(ruleEngineTopic, "*"); consumer.subscribe(ruleEngineTopic, "*");
consumer.start(); consumer.start();
log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr); log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",ruleEngineGroupName,topics,namesrvAddr);
} catch (MQClientException e) { } catch (MQClientException e) {
log.error("consumer 创建失败!"); log.error("consumer 创建失败!");
} }
...@@ -75,8 +76,7 @@ public class MQConsumerConfigure { ...@@ -75,8 +76,7 @@ public class MQConsumerConfigure {
@Bean @Bean
public DefaultMQPushConsumer resultsMQMsgConsumer() throws MQClientException { public DefaultMQPushConsumer resultsMQMsgConsumer() throws MQClientException {
log.info("resultsMQMsgConsumer 正在创建---------------------------------------"); log.info("resultsMQMsgConsumer 正在创建---------------------------------------");
String groupName = "resultsCG"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(resultsGroupName);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr); consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20); consumer.setConsumeThreadMax(20);
...@@ -88,7 +88,7 @@ public class MQConsumerConfigure { ...@@ -88,7 +88,7 @@ public class MQConsumerConfigure {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*, // 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
consumer.subscribe(resultsTopic, "*"); consumer.subscribe(resultsTopic, "*");
consumer.start(); consumer.start();
log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr); log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",resultsGroupName,topics,namesrvAddr);
} catch (MQClientException e) { } catch (MQClientException e) {
log.error("consumer 创建失败!"); log.error("consumer 创建失败!");
} }
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册