提交 8ce8cab2 编写于 作者: fengmin's avatar fengmin

Merge remote-tracking branch 'origin/master'

......@@ -25,6 +25,7 @@ import cn.ibizlab.util.filter.SearchContextBase;
@Slf4j
@Data
public class DADimensionSearchContext extends SearchContextBase {
private String n_dimname_like;//[维度名称]
private String n_buildid_eq;//[分析标识]
......
......@@ -208,7 +208,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
if(!file.exists())
{
ruleItemService.buildRuleFile(ruleItem);
}
if(file.exists())
{
......
......@@ -6,14 +6,15 @@ import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
......@@ -25,8 +26,6 @@ import java.util.List;
public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
private Object lockObject =new Object();
@Value("${rocketmq.producer.ruleEngineTopic:DSTMSG}")
private String ruleEngineTopic;
......@@ -51,9 +50,7 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
......@@ -74,6 +71,7 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
// Thread.sleep(80000);
// System.out.println("end sleep");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
System.out.println(engineMQMsg.toString());
if("Engine".equalsIgnoreCase(tags)) {
ruleEngineExService.processData(engineMQMsg);
}else if("Build".equalsIgnoreCase(tags)){
......
......@@ -7,7 +7,6 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
......@@ -61,6 +60,7 @@ public class MQConsumerConfigure {
consumer.registerMessageListener(consumeMsgListenerProcessor);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
try {
consumer.subscribe(ruleEngineTopic, "*");
consumer.start();
......@@ -82,7 +82,6 @@ public class MQConsumerConfigure {
consumer.registerMessageListener(resultsMQMsgConsumeListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("resultsMQMsgConsumer");
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
consumer.subscribe(resultsTopic, "*");
......
spring:
profiles:
include: sys ,nacos, api-prod
include: sys ,nacos, api-prod,dev
application:
name: ibzdst-api
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册