提交 c5745d1f 编写于 作者: zhouweidong's avatar zhouweidong

clean

上级 d26ba15a
package cn.ibizlab.core.util.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@Slf4j
@Configuration
@ConditionalOnExpression("${ibiz.rocketmq.producer.isOnOff:'off'}.equals('on')")
public class RocketMQConfig {
@Value("${ibiz.rocketmq.producer.groupName:default}")
private String groupName;
@Value("${ibiz.rocketmq.producer.namesrvAddr:127.0.0.1:9876}")
private String namesrvAddr;
// 消费者线程数据量
@Value("${ibiz.rocketmq.consumer.consumeThreadMin:1}")
private Integer consumeThreadMin;
@Value("${ibiz.rocketmq.consumer.consumeThreadMax:1}")
private Integer consumeThreadMax;
@Value("${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize:1}")
private Integer consumeMessageBatchMaxSize;
@Value("${ibiz.rocketmq.topic:default}")
private String topic;
@Autowired
@Lazy
private RocketMQListenerProcessor listenerProcessor;
/**
* mq 消费者配置
*
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQPushConsumer defaultConsumer() {
log.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(listenerProcessor);
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
try {
consumer.subscribe(topic, "deletesysuser || savesysuser");
consumer.start();
log.info("rocketmq consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, topic, namesrvAddr);
} catch (MQClientException e) {
log.error("rocketmq consumer 创建失败!" + e);
}
return consumer;
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册