提交 cd359f06 编写于 作者: fengmin's avatar fengmin

mq消息订阅实现2

上级 9442f52e
......@@ -11,6 +11,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
......@@ -34,11 +35,13 @@ public class GeneralConsumeMsgListenerProcessor implements MessageListenerOrderl
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {
});
engineMQMsg.setEngineId(key);
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
ruleEngineExService.processData(engineMQMsg);
if(!StringUtils.isEmpty(body)) {
EngineMQMsg engineMQMsg = JSON.parseObject((String) JSON.parse(body), new TypeReference<EngineMQMsg>() {
});
engineMQMsg.setEngineId(key);
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
ruleEngineExService.processData(engineMQMsg);
}
log.info("消费成功");
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
......
package cn.ibizlab.core.rule.mapper;
import cn.ibizlab.core.rule.domain.RuleEngine;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import java.util.List;
public interface RuleEngineExtendMapper extends BaseMapper<RuleEngine> {
/**
*
* 查询所有消费者信息
*/
List<RuleEngine> selectCustomerList();
}
......@@ -61,12 +61,7 @@ public interface RuleEngineMapper extends BaseMapper<RuleEngine> {
List<RuleEngine> selectByModelId(@Param("id") Serializable id);
/**
*
* 查询所有消费者信息
*/
List<RuleEngine> selectCustomerList();
}
......@@ -17,6 +17,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Slf4j
@Configuration
@ConditionalOnExpression("'${rocketmq.consumer.namesrvAddr:}'!=''")
public class MQConsumerConfigure {
@Value("${rocketmq.consumer.ruleEngineGroupName:dstCG}")
......
......@@ -12,6 +12,7 @@ import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@ConditionalOnExpression("${rocketmq.producer.enabled}&&'${rocketmq.producer.namesrvAddr:}'!=''")
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
......
package cn.ibizlab.core.util.config;
import com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration;
import cn.ibizlab.util.errors.BadRequestAlertException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.Query;
import java.lang.management.ManagementFactory;
import java.util.Set;
/**
* 外部容器启动服务时,自动注册服务到nacos
*/
@Component
@Slf4j
public class NacosRegisterConfig implements ApplicationRunner {
@Autowired(required = false)
private NacosAutoServiceRegistration registration;
@Value("${server.port:8080}")
Integer port;
@Override
public void run(ApplicationArguments args) {
log.info("正在尝试将应用程序注册到nacos");
if (registration != null && port != null) {
try {
String containerPort = getContainerPort();
if(!StringUtils.isEmpty(containerPort)){
registration.setPort(new Integer(containerPort));
}
else{
registration.setPort(port);
log.info("无法获取外部容器端口,将使用程序默认端口{}",port);
}
registration.start();
} catch (Exception e) {
throw new BadRequestAlertException("应用程序注册到nacos失败,"+e,"","");
}
log.info("已将应用程序成功注册到nacos");
}
else{
log.info("无法获取应用程序端口,将应用程序注册到nacos请求被忽略。");
}
}
/**
* 获取外部容器端口
*/
public String getContainerPort(){
String port = null;
try {
MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"), Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
if(!ObjectUtils.isEmpty(objectNames)){
port = objectNames.iterator().next().getKeyProperty("port");
}
}
catch (Exception e) {
log.error("获取外部容器端口失败!"+e);
}
return port;
}
}
......@@ -2,7 +2,9 @@ package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.util.GeneralConsumeMsgListenerProcessor;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.mapper.RuleEngineMapper;
import cn.ibizlab.core.rule.mapper.RuleEngineExtendMapper;
import cn.ibizlab.util.errors.BadRequestAlertException;
import com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
......@@ -13,19 +15,27 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.Query;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
@ConditionalOnExpression("${rocketmq.consumer.enabled}&&'${rocketmq.consumer.namesrvAddr:}'!=''")
public class RocktcermqCustmerRunner implements ApplicationRunner {
@Autowired
private RuleEngineMapper ruleEngineMapper;
private RuleEngineExtendMapper ruleEngineMapper;
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
......@@ -90,4 +100,60 @@ public class RocktcermqCustmerRunner implements ApplicationRunner {
}
return flat;
}
/**
* 外部容器启动服务时,自动注册服务到nacos
*/
@Component
@Slf4j
public static class NacosRegisterConfig implements ApplicationRunner {
@Autowired(required = false)
private NacosAutoServiceRegistration registration;
@Value("${server.port:8080}")
Integer port;
@Override
public void run(ApplicationArguments args) {
log.info("正在尝试将应用程序注册到nacos");
if (registration != null && port != null) {
try {
String containerPort = getContainerPort();
if(!StringUtils.isEmpty(containerPort)){
registration.setPort(new Integer(containerPort));
}
else{
registration.setPort(port);
log.info("无法获取外部容器端口,将使用程序默认端口{}",port);
}
registration.start();
} catch (Exception e) {
throw new BadRequestAlertException("应用程序注册到nacos失败,"+e,"","");
}
log.info("已将应用程序成功注册到nacos");
}
else{
log.info("无法获取应用程序端口,将应用程序注册到nacos请求被忽略。");
}
}
/**
* 获取外部容器端口
*/
public String getContainerPort(){
String port = null;
try {
MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"), Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
if(!ObjectUtils.isEmpty(objectNames)){
port = objectNames.iterator().next().getKeyProperty("port");
}
}
catch (Exception e) {
log.error("获取外部容器端口失败!"+e);
}
return port;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.ibizlab.core.rule.mapper.RuleEngineExtendMapper">
<resultMap id="RuleEngineResultMap" type="cn.ibizlab.core.rule.domain.RuleEngine" autoMapping="true">
<id property="engineId" column="engineid" /><!--主键字段映射-->
<result property="group" column="rulegroup" />
<result property="modelId" column="modelid" />
<result property="state" column="execstate" />
<!--通过mybatis自动注入关系属性[主实体],fetchType="lazy"为懒加载配置 -->
<association property="model" javaType="cn.ibizlab.core.lite.domain.MetaModel" column="modelid" select="cn.ibizlab.core.lite.mapper.MetaModelMapper.selectById" fetchType="lazy"></association>
</resultMap>
<select id="selectCustomerList" resultType="cn.ibizlab.core.rule.domain.RuleEngine">
SELECT ENGINEID ,(SELECT CODENAME FROM IBZMODEL WHERE E.MODELID=MODELID) AS MODELID,EXTPARAMS FROM IBZRULEENGINE E WHERE MODELID IS NOT NULL
</select>
</mapper>
\ No newline at end of file
......@@ -41,9 +41,6 @@
<where><if test="ew!=null and ew.sqlSegment!=null and !ew.emptyOfWhere">${ew.sqlSegment}</if></where>
<if test="ew!=null and ew.sqlSegment!=null and ew.emptyOfWhere">${ew.sqlSegment}</if>
</select>
<select id="selectCustomerList" resultType="cn.ibizlab.core.rule.domain.RuleEngine">
SELECT ENGINEID ,(SELECT CODENAME FROM IBZMODEL WHERE E.MODELID=MODELID) AS MODELID,EXTPARAMS FROM IBZRULEENGINE E WHERE MODELID IS NOT NULL
</select>
<!--数据查询[Default]-->
<sql id="Default" databaseId="mysql">
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册