提交 da44e70c 编写于 作者: zhouweidong's avatar zhouweidong

rocketMQ优化

上级 81e57986
...@@ -18,8 +18,6 @@ TARGET=PSSYSTEM ...@@ -18,8 +18,6 @@ TARGET=PSSYSTEM
</#if> </#if>
</#list> </#list>
<#if hasMQProducer || hasMQConsumer> <#if hasMQProducer || hasMQConsumer>
<#assign nameAddress="127.0.0.1:9876">
<#assign instanceName="rmq-instance">
package ${pub.getPKGCodeName()}.core.util.config; package ${pub.getPKGCodeName()}.core.util.config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -37,18 +35,18 @@ import org.springframework.context.annotation.Configuration; ...@@ -37,18 +35,18 @@ import org.springframework.context.annotation.Configuration;
public class RocketMQConfig { public class RocketMQConfig {
<#if hasMQProducer> <#if hasMQProducer>
@Value("${r'${ibiz.rocketmq.producer.groupName: DEFAULT_PRODUCER}'}") @Value("${r'${ibiz.rocketmq.producer.groupName:default}'}")
private String groupName; private String groupName;
@Value("${r'${ibiz.rocketmq.producer.namesrvAddr:}'}") @Value("${r'${ibiz.rocketmq.producer.namesrvAddr:127.0.0.1:9876}'}")
private String namesrvAddr; private String namesrvAddr;
// 消息最大值 // 消息最大值
@Value("${r'${ibiz.rocketmq.producer.maxMessageSize: 409600}'}") @Value("${r'${ibiz.rocketmq.producer.maxMessageSize:409600}'}")
private Integer maxMessageSize; private Integer maxMessageSize;
// 消息发送超时时间 // 消息发送超时时间
@Value("${r'${ibiz.rocketmq.producer.sendMsgTimeOut: 3000}'}") @Value("${r'${ibiz.rocketmq.producer.sendMsgTimeOut:3000}'}")
private Integer sendMsgTimeOut; private Integer sendMsgTimeOut;
// 失败重试次数 // 失败重试次数
@Value("${r'${ibiz.rocketmq.producer.retryTimesWhenSendFailed: 2}'}") @Value("${r'${ibiz.rocketmq.producer.retryTimesWhenSendFailed:2}'}")
private Integer retryTimesWhenSendFailed; private Integer retryTimesWhenSendFailed;
/** /**
...@@ -74,14 +72,15 @@ public class RocketMQConfig { ...@@ -74,14 +72,15 @@ public class RocketMQConfig {
<#if hasMQConsumer> <#if hasMQConsumer>
// 消费者线程数据量 // 消费者线程数据量
@Value("${r'${ibiz.rocketmq.consumer.consumeThreadMin: 1}'}") @Value("${r'${ibiz.rocketmq.consumer.consumeThreadMin:1}'}")
private Integer consumeThreadMin; private Integer consumeThreadMin;
@Value("${r'${ibiz.rocketmq.consumer.consumeThreadMax: 1}'}") @Value("${r'${ibiz.rocketmq.consumer.consumeThreadMax:1}'}")
private Integer consumeThreadMax; private Integer consumeThreadMax;
@Value("${r'${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize: 1}'}") @Value("${r'${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize:1}'}")
private Integer consumeMessageBatchMaxSize; private Integer consumeMessageBatchMaxSize;
@Autowired @Autowired
@Lazy
private RocketMQListenerProcessor listenerProcessor; private RocketMQListenerProcessor listenerProcessor;
/** /**
......
...@@ -16,6 +16,7 @@ TARGET=PSSYSTEM ...@@ -16,6 +16,7 @@ TARGET=PSSYSTEM
package ${pub.getPKGCodeName()}.core.util.config; package ${pub.getPKGCodeName()}.core.util.config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
...@@ -61,27 +62,28 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -61,27 +62,28 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8"); String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body); log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) { <#list sys.getAllPSDataEntities() as dataEntity>
log.error("获取MQ消息内容异常{}", e); <#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??>
} <#list dataEntity.getAllPSDEDataSyncs() as dataSync>
<#list sys.getAllPSDataEntities() as dataEntity> <#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??>
<#if dataEntity.getAllPSDEDataSyncs?? && dataEntity.getAllPSDEDataSyncs()??> <#assign dataSyncCodeName=dataSync.codeName?lower_case>
<#list dataEntity.getAllPSDEDataSyncs() as dataSync> <#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??>
<#if dataSync.getInPSSysDataSyncAgent?? && dataSync.getInPSSysDataSyncAgent()??> <#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
<#assign dataSyncCodeName=dataSync.codeName?lower_case>
<#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??>
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
if ("${dataSyncCodeName}".equalsIgnoreCase(messageExt.getTopic())) { if ("${dataSyncCodeName}".equalsIgnoreCase(messageExt.getTopic())) {
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(msg.getBody()),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class); ${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(body),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
${entityName}Service.${inputAction}(domain); ${entityName}Service.${inputAction}(domain);
} }
<#else> <#else>
log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody())); log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody()));
</#if> </#if>
</#if> </#if>
</#list> </#list>
</#if> </#if>
</#list> </#list>
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
} }
return ConsumeOrderlyStatus.SUCCESS; return ConsumeOrderlyStatus.SUCCESS;
} }
......
...@@ -18,7 +18,6 @@ package ${pub.getPKGCodeName()}.util.aspect; ...@@ -18,7 +18,6 @@ package ${pub.getPKGCodeName()}.util.aspect;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import ${pub.getPKGCodeName()}.util.domain.EntityBase; import ${pub.getPKGCodeName()}.util.domain.EntityBase;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
...@@ -36,6 +35,7 @@ import org.springframework.scheduling.annotation.Async; ...@@ -36,6 +35,7 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.List;
/** /**
* rocketMQ消息切面 * rocketMQ消息切面
...@@ -111,7 +111,6 @@ public class RocketMQAspect ...@@ -111,7 +111,6 @@ public class RocketMQAspect
/** /**
* 发送消息 * 发送消息
* @param producer
* @param topic * @param topic
* @param tag * @param tag
* @param body * @param body
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册