提交 44816124 编写于 作者: zhouweidong's avatar zhouweidong

rocketMQ优化

上级 4d9f772f
...@@ -79,6 +79,8 @@ public class RocketMQConfig { ...@@ -79,6 +79,8 @@ public class RocketMQConfig {
private Integer consumeThreadMax; private Integer consumeThreadMax;
@Value("${r'${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize:1}'}") @Value("${r'${ibiz.rocketmq.consumer.consumeMessageBatchMaxSize:1}'}")
private Integer consumeMessageBatchMaxSize; private Integer consumeMessageBatchMaxSize;
@Value("${r'${ibiz.rocketmq.topic:default}'}")
private String topic;
@Autowired @Autowired
@Lazy @Lazy
...@@ -110,9 +112,9 @@ public class RocketMQConfig { ...@@ -110,9 +112,9 @@ public class RocketMQConfig {
*/ */
// consumer.setMessageModel(MessageModel.CLUSTERING); // consumer.setMessageModel(MessageModel.CLUSTERING);
try { try {
consumer.subscribe("default", "${getSubscribeTags()}"); consumer.subscribe(topic, "${getSubscribeTags()}");
consumer.start(); consumer.start();
log.info("rocketmq consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, "default", namesrvAddr); log.info("rocketmq consumer 创建成功 groupName={}, topics={}, namesrvAddr={}", groupName, topic, namesrvAddr);
} catch (MQClientException e) { } catch (MQClientException e) {
log.error("rocketmq consumer 创建失败!" + e); log.error("rocketmq consumer 创建失败!" + e);
} }
......
...@@ -69,12 +69,12 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly { ...@@ -69,12 +69,12 @@ public class RocketMQListenerProcessor implements MessageListenerOrderly {
<#assign dataSyncCodeName=dataSync.codeName?lower_case> <#assign dataSyncCodeName=dataSync.codeName?lower_case>
<#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??> <#if dataSync.getInTestPSDEAction?? && dataSync.getInTestPSDEAction()??>
<#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)> <#assign inputAction=srfmethodname(dataSync.getInTestPSDEAction().codeName)>
if ("${dataSyncCodeName}".equalsIgnoreCase(tags)) { if ("${dataSyncCodeName}".equalsIgnoreCase(tags)) {
${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(body),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class); ${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName} domain = JSON.parseObject(new String(body),${pub.getPKGCodeName()}.core.${dataEntity.getPSSystemModule().getCodeName()?lower_case}.domain.${entityName}.class);
${entityName}Service.${inputAction}(domain); ${entityName}Service.${inputAction}(domain);
} }
<#else> <#else>
log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody())); log.info("接收到[{}]消息,但未配置实体输入过滤行为,消息将被忽略。"+new String(msg.getBody()));
</#if> </#if>
</#if> </#if>
</#list> </#list>
......
...@@ -25,6 +25,7 @@ import org.aspectj.lang.JoinPoint; ...@@ -25,6 +25,7 @@ import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression; import org.springframework.expression.Expression;
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册