提交 d7d828c6 编写于 作者: sq3536's avatar sq3536

换装rocketmq template 异步顺序发送生产消息

上级 012b3c12
......@@ -2,12 +2,15 @@ package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.message.DstRocketMQTemplate;
import cn.ibizlab.core.rule.domain.ExecResult;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.implementation.bytecode.Throw;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -16,6 +19,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;
import java.util.List;
......@@ -35,54 +39,84 @@ public class DefaultMQProducerService implements MsgProducerService{
@Autowired
@Lazy
DefaultMQProducer defaultMQProducer;
DstRocketMQTemplate dstRocketMQTemplate;
@Override
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
public void sendEngineMsg(EngineMQMsg engineMQMsg) {
String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, "Engine", msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("引擎消息发送响应:" + sendResult.toString());
dstRocketMQTemplate.asyncSendOrderly(ruleEngineTopic+":Engine",msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
log.info("引擎消息发送响应:" + sendResult.toString());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
@Override
public void sendBuildMsg(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
public void sendBuildMsg(EngineMQMsg engineMQMsg) {
String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, "Build", msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("引擎消息发送响应:" + sendResult.toString());
dstRocketMQTemplate.asyncSendOrderly(ruleEngineTopic+":Build",msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
log.info("构建消息发送响应:" + sendResult.toString());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
@Override
public void sendResultsMsg(ResultsMQMsg resultsMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(resultsMQMsg);
Message sendMsg = new Message(resultsTopic, "default", msg.getBytes());
sendMsg.setKeys(resultsMQMsg.getKeyValueField());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("构建消息发送响应:" + sendResult.toString());
dstRocketMQTemplate.asyncSendOrderly(resultsTopic+":default",msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
log.info("结果消息发送响应:" + sendResult.toString());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
@Override
public void sendRuleResultsMsg(String topic, String tags, List<ExecResult> listExecResultMsg){
try {
String msg = JSON.toJSONString(listExecResultMsg);
Message sendMsg = new Message(topic, tags, msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("规则结果MQ主题:topic: {}, tags: {},消息发送响应 {}:", topic,tags,sendResult.toString());
} catch (MQClientException e) {
dstRocketMQTemplate.asyncSendOrderly(topic+":"+tags,msg, DigestUtils.md5DigestAsHex(msg.getBytes()),new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
log.info("规则结果MQ主题:topic: {}, tags: {},消息发送响应 {}:", topic,tags,sendResult.toString());
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常MQClientException: {}:", topic,tags,e);
} catch (RemotingException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常RemotingException: {}:", topic,tags,e);
} catch (MQBrokerException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常MQBrokerException: {}:", topic,tags,e);
} catch (InterruptedException e) {
e.printStackTrace();
log.error("规则结果MQ主题:topic: {},tags: {},消息发送异常InterruptedException: {}:", topic,tags,e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.ibizlab.core.message;
import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@ConditionalOnExpression("${rocketmq.producer.enabled:false}")
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmq.producer.namesrvAddr}",
group = "${rocketmq.producer.groupName:dstproducer}",
sendMessageTimeout = 3000,
retryTimesWhenSendFailed = 2
)
public class DstRocketMQTemplate extends RocketMQTemplate {
}
\ No newline at end of file
......@@ -11,8 +11,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@ConditionalOnExpression("${rocketmq.producer.enabled:false}")
//@Configuration
//@ConditionalOnExpression("${rocketmq.producer.enabled:false}")
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
......
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册