提交 eff245df 编写于 作者: ibizdev's avatar ibizdev

zhouweidong 发布系统代码 [ibiz-uaa,UAA鉴权]

上级 25983ea2
package cn.ibizlab.core.util.config; package cn.ibizlab.core.util.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.message.MessageExt;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import java.util.List; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@Slf4j @Slf4j
@Configuration @Configuration
@ConditionalOnExpression("${ibiz.rocketmq.producer.isOnOff:'off'}.equals('on')")
public class RocketMQConfig { public class RocketMQConfig {
@Autowired // 消费者线程数据量
@Lazy @Value("${ibiz.rocketmq.consumer.consumeThreadMin:1}")
cn.ibizlab.core.uaa.service.ISysUserService SysUserService; private Integer consumeThreadMin;
@Value("${ibiz.rocketmq.consumer.consumeThreadMax:1}")
private Integer consumeThreadMax;
@Value("${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize:1}")
private Integer consumeMessageBatchMaxSize;
@Value("${ibiz.rocketmq.topic:default}")
private String topic;
@Bean("SysUserdeleteSysUserconsumer")
public DefaultMQPushConsumer sysUserdeleteSysUserconsumer(){
DefaultMQPushConsumer consumer = null;
try {
consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("rmq-instance");
consumer.subscribe("deleteSysUser", "deleteSysUser");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
cn.ibizlab.core.uaa.domain.SysUser domain = JSON.parseObject(new String(msg.getBody()),cn.ibizlab.core.uaa.domain.SysUser.class);
SysUserService.remove(domain);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}catch (Exception e){
log.error("初始化消息接收对象异常!");
}
return consumer;
}
@Autowired @Autowired
@Lazy @Lazy
cn.ibizlab.core.uaa.service.ISysUserService SysUserService; private RocketMQListenerProcessor listenerProcessor;
@Bean("SysUsersaveSysUserconsumer") /**
public DefaultMQPushConsumer sysUsersaveSysUserconsumer(){ * mq 消费者配置
DefaultMQPushConsumer consumer = null; *
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQPushConsumer defaultConsumer() {
log.info("defaultConsumer 正在创建---------------------------------------");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 设置监听
consumer.registerMessageListener(listenerProcessor);
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
try { try {
consumer = new DefaultMQPushConsumer(); consumer.subscribe(topic, "deletesysuser || savesysuser");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("rmq-instance");
consumer.subscribe("saveSysUser", "saveSysUser");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
cn.ibizlab.core.uaa.domain.SysUser domain = JSON.parseObject(new String(msg.getBody()),cn.ibizlab.core.uaa.domain.SysUser.class);
SysUserService.saveSysUser(domain);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); consumer.start();
}catch (Exception e){ log.info("rocketmq consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, topic, namesrvAddr);
log.error("初始化消息接收对象异常!"); } catch (MQClientException e) {
log.error("rocketmq consumer 创建失败!" + e);
} }
return consumer; return consumer;
} }
} }
package cn.ibizlab.core.util.config;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* MQ订阅消息处理
*/
@Slf4j
@Component
@ConditionalOnExpression("${ibiz.rocketmq.producer.isOnOff:'off'}.equals('on')")
public class RocketMQListenerProcessor implements MessageListenerOrderly {
@Autowired
@Lazy
cn.ibizlab.core.uaa.service.ISysUserService SysUserService;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(list)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeOrderlyStatus.SUCCESS;
}
for (MessageExt messageExt : list) {
log.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
if ("deletesysuser".equalsIgnoreCase(tags)) {
cn.ibizlab.core.uaa.domain.SysUser domain = JSON.parseObject(new String(body),cn.ibizlab.core.uaa.domain.SysUser.class);
SysUserService.remove(domain);
}
if ("savesysuser".equalsIgnoreCase(tags)) {
cn.ibizlab.core.uaa.domain.SysUser domain = JSON.parseObject(new String(body),cn.ibizlab.core.uaa.domain.SysUser.class);
SysUserService.saveSysUser(domain);
}
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
...@@ -262,11 +262,7 @@ ...@@ -262,11 +262,7 @@
<version>${baomidou-jobs.version}</version> <version>${baomidou-jobs.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
...@@ -350,13 +346,16 @@ ...@@ -350,13 +346,16 @@
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
</dependency> </dependency>
<!-- mp动态数据源 --> <!-- mp动态数据源 -->
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies> </dependencies>
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册