提交 e526753d 编写于 作者: hebao@lab.ibiz5.com's avatar hebao@lab.ibiz5.com

支持构建报表基础数据功能

上级 31aaef16
......@@ -23,18 +23,22 @@ spring:
caffeine:
spec: initialCapacity=5,maximumSize=50000,expireAfterWrite=3600s
cassandra:
host: 172.16.100.243
cluster:
name: cluster
port: 9042
keyspace: ibzdst
class: SimpleStrategy
replication_factor: 3
rocketmq:
producer:
isOnOff: on
groupName: ${spring.application.name}
namesrvAddr: 172.16.170.163:9876
maxMessageSize: 409600
sendMsgTimeOut: 3000
retryTimesWhenSendFailed: 2
ruleEngineTopic: DSTMSG
namesrvAddr: 172.16.180.243:9876
consumer:
isOnOff: on
groupName: ${spring.application.name}
namesrvAddr: 172.16.170.163:9876
topics: DSTMSG~*
consumeMessageBatchMaxSize: 1
namesrvAddr: 172.16.180.243:9876
logging:
config: classpath:logback-spring.xml
\ No newline at end of file
......@@ -119,6 +119,11 @@
<version>7.6.0.165</version>
</dependency>
<!-- cassandra -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
</dependency>
</dependencies>
<properties>
......
package cn.ibizlab.core.extensions.cql;
import com.alibaba.fastjson.annotation.JSONField;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import com.datastax.driver.mapping.annotations.Transient;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cglib.beans.BeanCopier;
import org.springframework.util.StringUtils;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Date;
/**
* 实体[RUExecResult] 数据对象
*/
@Table(name = "ru_execresult")
@Data
@Slf4j
public class ExecResult implements Serializable {
@PartitionKey
private String ruleid;
@PartitionKey(1)
private Integer retvalue;
@ClusteringColumn
private String dimfield;
@ClusteringColumn(1)
@JsonIgnore
@JSONField(serialize = false)
private LocalDate timefield;
@ClusteringColumn(2)
private String domainsfield;
@ClusteringColumn(3)
private String keyvaluefield;
private String ruexecresultname;
@JsonIgnore
@JSONField(serialize = false)
private LocalDate updatedate;
private String rulename;
private Double metricfield;
private String ext1field;
private String ext2field;
private String businesscat;
@Transient
public long getTimefieldLV() {
return timefield.getMillisSinceEpoch();
}
public void setTimefieldLV(long timefield) {
this.timefield = LocalDate.fromMillisSinceEpoch(timefield);
}
@Transient
public long getUpdatedateLV() {
return updatedate.getMillisSinceEpoch();
}
public void setUpdatedateLV(long updatedate) {
this.updatedate = LocalDate.fromMillisSinceEpoch(updatedate);
}
public static LocalDate nowLocalDate() {
Date time = new Date();
return LocalDate.fromYearMonthDay(time.getYear() + 1900, time.getMonth() + 1, time.getDate());
}
public void stringTimeField(String timeDate) {
if (StringUtils.isEmpty(timeDate) && timeDate.length() != 8)
return;
try {
this.timefield = LocalDate.fromYearMonthDay(Integer.valueOf(timeDate.substring(0, 4)), Integer.valueOf(timeDate.substring(5, 7)), Integer.valueOf(timeDate.substring(8, 10)));
} catch (Exception ex) {
log.error(String.format("setTimeField Error[%1$s]:%2$s", timeDate, ex.getMessage()));
}
}
public static LocalDate time2LocalDate(Timestamp time) {
return LocalDate.fromYearMonthDay(time.getYear() + 1900, time.getMonth() + 1, time.getDate());
}
/**
* 复制当前对象数据到目标对象(粘贴重置)
*
* @throws Exception
*/
public ExecResult copyTo(ExecResult targetEntity) {
BeanCopier copier = BeanCopier.create(ExecResult.class, ExecResult.class, false);
copier.copy(this, targetEntity, null);
return targetEntity;
}
public String debugInfo() {
return String.format("%1$s,%2$s,%3$s,%4$s,%5$s,%6$s", keyvaluefield, ruleid, retvalue, dimfield, timefield, domainsfield);
}
}
package cn.ibizlab.core.extensions.cql;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Date;
/**
* 实体[RUExecResult] 数据对象
*/
@Table(name = "ru_execresult2")
@Data
public class ExecResult2 implements Serializable {
@PartitionKey
private String ruleid;
@PartitionKey(1)
private String keyvaluefield;
@ClusteringColumn
private String dimfield;
@ClusteringColumn(1)
private String domainsfield;
@ClusteringColumn(2)
private LocalDate timefield;
public String debugInfo(){
return String.format("%1$s,%2$s,%3$s,%4$s,%5$s", keyvaluefield,ruleid,dimfield,timefield,domainsfield);
}
}
\ No newline at end of file
package cn.ibizlab.core.extensions.cql;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import com.datastax.driver.core.*;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import java.sql.Timestamp;
import java.util.List;
import java.util.UUID;
import static com.datastax.driver.core.DataType.*;
import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
@Slf4j
@Repository
public class ExecResultRepository {
private Mapper<ExecResult> mapper;
private Mapper<ExecResult2> mapper2;
private Session session;
private static final String TABLE = "ru_execresult";
private static final String TABLE2 = "ru_execresult2";
public ExecResultRepository(MappingManager mappingManager) {
createTable(mappingManager.getSession());
this.mapper = mappingManager.mapper(ExecResult.class);
this.mapper2 = mappingManager.mapper(ExecResult2.class);
this.session = mappingManager.getSession();
}
private void createTable(Session session) {
session.execute(
SchemaBuilder.createTable(TABLE)
.ifNotExists()
.addPartitionKey("ruleid", text())
.addPartitionKey("retvalue", cint())
.addClusteringColumn("dimfield", text())
.addClusteringColumn("timefield", date())
.addClusteringColumn("domainsfield", text())
.addClusteringColumn("keyvaluefield", text())
.addColumn("ruexecresultname", text())
.addColumn("updatedate", date())
.addColumn("rulename", text())
.addColumn("metricfield", cdouble())
.addColumn("ext_1field", text())
.addColumn("ext_2field", text())
.addColumn("businesscat", text())
.withOptions().clusteringOrder("dimfield", SchemaBuilder.Direction.ASC).clusteringOrder("timefield", SchemaBuilder.Direction.DESC).clusteringOrder("domainsfield", SchemaBuilder.Direction.ASC).clusteringOrder("keyvaluefield", SchemaBuilder.Direction.ASC)
);
session.execute(
SchemaBuilder.createTable(TABLE2)
.ifNotExists()
.addPartitionKey("ruleid", text())
.addPartitionKey("keyvaluefield", text())
.addClusteringColumn("dimfield", text())
.addClusteringColumn("domainsfield", text())
.addClusteringColumn("timefield", date())
.withOptions().clusteringOrder("dimfield", SchemaBuilder.Direction.ASC).clusteringOrder("domainsfield", SchemaBuilder.Direction.ASC).clusteringOrder("timefield", SchemaBuilder.Direction.DESC)
);
}
public ExecResult find(String country, String firstName, String secondName, UUID id) {
return mapper.get(country, firstName, secondName, id);
}
public List<ExecResult> findAll() {
final ResultSet result = session.execute(select().all().from(TABLE));
return mapper.map(result).all();
}
public ExecResult save(ExecResult execResult) {
ExecResult2 execResult2=new ExecResult2();
execResult2.setRuleid(execResult.getRuleid());
execResult2.setKeyvaluefield(execResult.getKeyvaluefield());
execResult2.setDimfield(execResult.getDimfield());
execResult2.setDomainsfield(execResult.getDomainsfield());
execResult2.setTimefield(execResult.getTimefield());
try{
mapper.save(execResult);
}catch(Exception ex){
ex.printStackTrace();
}
try{
mapper2.save(execResult2);
}catch(Exception ex){
ex.printStackTrace();
}
return execResult;
}
public void saveResultsMQMsg(ResultsMQMsg resultsMQMsg) {
final ResultSet result = session.execute(select().all().from(TABLE2).where(in("ruleid",resultsMQMsg.getRules())).and(eq("keyvaluefield",resultsMQMsg.getKeyValueField())));
List<ExecResult2> list = mapper2.map(result).all();
for(ExecResult2 execResult2:list) {
ExecResult execResult=new ExecResult();
execResult.setRuleid(execResult2.getRuleid());
execResult.setRetvalue(1);
execResult.setDimfield(execResult2.getDimfield());
execResult.setTimefield(execResult2.getTimefield());
execResult.setDomainsfield(execResult2.getDomainsfield());
execResult.setKeyvaluefield(execResult2.getKeyvaluefield());
try{
mapper.delete(execResult);
}catch(Exception ex){}
try{
mapper2.delete(execResult2);
}catch(Exception ex){}
}
List<ExecResult> results = resultsMQMsg.getDatas();
if(results != null && results.size() > 0){
for(ExecResult saveResult : results){
save(saveResult);
}
}
}
public List<ExecResult> sum(List<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
final ResultSet result = session.execute(select().column("ruleid").column("retvalue").column("dimfield").sum("metricfield").as("metricfield").
from(TABLE).
where(in("ruleid", ruleids)).
and(eq("retvalue", retValue)).
and(in("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).groupBy("ruleid", "retvalue", "dimfield").limit(5000).setReadTimeoutMillis(200000));
return mapper.map(result).all();
}
public List<ExecResult> avg(List<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
final ResultSet result = session.execute(select().column("ruleid").column("retvalue").column("dimfield").avg("metricfield").as("metricfield").
from(TABLE).
where(in("ruleid", ruleids)).
and(eq("retvalue", retValue)).
and(in("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).groupBy("ruleid", "retvalue", "dimfield").limit(5000).setReadTimeoutMillis(200000));
return mapper.map(result).all();
}
public List<ExecResult> group(List<String> ruleids, Integer retValue, List<String> dims, String type, Timestamp start, Timestamp end) {
if (type.equalsIgnoreCase("avg"))
return avg(ruleids, retValue, dims, start, end);
else
return sum(ruleids, retValue, dims, start, end);
}
public List<ExecResult> group(List<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end) {
return group(ruleids, retValue, dims, "sum", start, end);
}
/**
* 根据规则ID、单位和时间查询相应的规则结果数据
*
* @param ruleid
* @param retValue
* @param dims
* @param start
* @param end
* @return
*/
public ResultSet getPageData(String ruleid, PagingState pagingState, Integer retValue, String dims, Timestamp start, Timestamp end) {
final int RESULTS_PER_PAGE = 1000;
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
Statement statement = select().column("keyvaluefield").
from(TABLE).
where(eq("ruleid", ruleid)).
and(eq("retvalue", retValue)).
and(eq("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).setReadTimeoutMillis(200000);
statement.setFetchSize(RESULTS_PER_PAGE);
if (pagingState != null) {
statement.setPagingState(pagingState);
}
final ResultSet result = session.execute(statement);
return result;
}
/**
* 计算绩效数据的信访件数
*
* @param ruleids
* @param retValue
* @param dims
* @param start
* @param end
* @return
*/
public List<ExecResult> sumPerformancePiece(List<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end) {
LocalDate st = ExecResult.time2LocalDate(start);
LocalDate ed = ExecResult.time2LocalDate(end);
final ResultSet result = session.execute(select().column("dimfield").column("keyvaluefield").
from(TABLE).
where(in("ruleid", ruleids)).
and(eq("retvalue", retValue)).
and(in("dimfield", dims)).
and(gte("timefield", st)).and(lte("timefield", ed)).limit(5000).setReadTimeoutMillis(200000));
return mapper.map(result).all();
}
}
......@@ -48,6 +48,7 @@ public class BaseRequest
private String resultTableName;
private String batch;
private List<String> rules;
private List<String> ruleIds;
private SimpleDateFormat format =new SimpleDateFormat("yyyyMMddHHmmss");
public String getBatch()
......
package cn.ibizlab.core.extensions.domain;
import cn.ibizlab.core.extensions.cql.ExecResult;
import lombok.Data;
import java.util.List;
@Data
public class ResultsMQMsg {
private String keyValueField;
private List<String> rules;
private List<ExecResult> datas;
}
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.analysis.domain.DADimension;
import cn.ibizlab.core.analysis.service.IDABuildService;
import cn.ibizlab.core.extensions.cql.ExecResult;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.util.DefaultMQProducerService;
import cn.ibizlab.core.lite.extensions.domain.ModelObj;
import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.service.IExecLogService;
import cn.ibizlab.util.dict.CodeItem;
import cn.ibizlab.util.helper.CachedBeanCopier;
import cn.ibizlab.util.helper.DataObject;
import lombok.extern.slf4j.Slf4j;
import org.kie.api.runtime.KieSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
@Slf4j
@Service("AnalyseEntityServiceImpl")
public class AnalyseEntityServiceImpl extends BaseEntityServiceImpl {
public static final String Tag_SaveResults = "SaveResults";
@Autowired
private DefaultMQProducerService defaultMQProducerService;
@Autowired
@Lazy
private cn.ibizlab.util.client.IBZDictFeignClient ibzDictFeignClient;
@Override
public void saveResult(ModelObj param, String RULEID, String RULECODE, String RULENAME, String RU_EXECRESULTNAME, Object BUSINESSCAT, Integer RETVALUE,
Object KEYVALUEFIELD, Object DOMAINSFIELD, Object DIMFIELD, Object METRICFIELD, Object TIMEFIELD, Object EXT1FIELD, Object EXT2FIELD)
{
if(0==RETVALUE)
return;
List<ExecResult> saveResults = new ArrayList<>();
ExecResult result=new ExecResult();
result.setRuleid(RULEID);
result.setRulename(RULENAME);
result.setRuexecresultname(RU_EXECRESULTNAME);
result.setRetvalue(RETVALUE);
result.setKeyvaluefield(DataObject.getStringValue(valueOf(KEYVALUEFIELD),param.getRowKey()));
//result.setSystemId(DataObject.getStringValue(param.get("systemid"),null));
result.setBusinesscat(DataObject.getStringValue(valueOf(BUSINESSCAT),null));
if(METRICFIELD!=null) {
Object val = METRICFIELD;
if ( (val instanceof BigDecimal))
result.setMetricfield(((BigDecimal) val).doubleValue());
else if ( (val instanceof Double))
result.setMetricfield(new BigDecimal((Double) val).doubleValue());
else if ( (val instanceof Float))
result.setMetricfield(new BigDecimal(((Float) val).doubleValue()).doubleValue());
else if ( (val instanceof Integer))
result.setMetricfield(new BigDecimal((int)val).doubleValue());
else
result.setMetricfield(1d);
}
else
result.setMetricfield(1d);
Object timefield=valueOf(TIMEFIELD);
if(timefield!=null&&timefield instanceof Timestamp)
result.setTimefield(ExecResult.time2LocalDate(((Timestamp)TIMEFIELD)));
if(EXT1FIELD!=null) {
String ext1=valuesOf(EXT1FIELD);
if(ext1!=null&&ext1.length()>330)
ext1=ext1.substring(0,330)+"...";
result.setExt1field(ext1);
}
if(EXT2FIELD!=null) {
String ext2=valuesOf(EXT2FIELD);
if(ext2!=null&&ext2.length()>330)
ext2=ext2.substring(0,330)+"...";
result.setExt2field(ext2);
}
if(DOMAINSFIELD!=null)
{
result.setDomainsfield(DataObject.getStringValue(valueOf(DOMAINSFIELD), ""));
}
if((!StringUtils.isEmpty(result.getRuleid()))&&(!StringUtils.isEmpty(result.getDomainsfield()))&&(!StringUtils.isEmpty(result.getKeyvaluefield()))&&(!ObjectUtils.isEmpty(result.getTimefield()))) {
result.setUpdatedate(ExecResult.nowLocalDate());
List<DADimension> dims=this.getDims(result.getBusinesscat());
if(dims!=null) {
ArrayList<ExecResult> results = new ArrayList<>();
for (DADimension dim : dims) {
Object codevalue=param.$(dim.getField());
if(!ObjectUtils.isEmpty(codevalue))
{
String dictname=dim.getDict();
Object val=codevalue;
if(!StringUtils.isEmpty(dictname))
{
//维度指定代码表时,向上同时为每一个父节点添加一条数据
while(true)
{
CodeItem code=ibzDictFeignClient.getCodeList(dictname).findCodeItem(val);
if(code!=null)
{
ExecResult result2 = new ExecResult();
CachedBeanCopier.copy(result, result2);
result2.setDimfield(valuesOf(code.getValue()));
saveResults.add(result2);
if(StringUtils.isEmpty(code.getParent()))
{
break;
}
else
{
val=code.getParent();
}
}
else
break;
}
}else{
//维度未指定代码表时,只保存一条数据
result.setDimfield(valuesOf(codevalue));
saveResults.add(result);
}
}
}
}
if(saveResults.size() > 0){
param.set(Tag_SaveResults, saveResults);
}
}
}
private Hashtable<String, List<DADimension>> dimset=new Hashtable<>();
private Object dimslock=new Object();
@Autowired
private IDABuildService daBuildExService;
private List<DADimension> getDims(String id)
{
synchronized(dimslock)
{
if(!dimset.containsKey(id))
dimset.put(id, daBuildExService.get(id).getDadimension());
return dimset.get(id);
}
}
@Autowired
protected IExecLogService ruExecLogService;
public ExecLog processAll(BaseRequest msg)
{
ExecLog execlog=new ExecLog();
execlog.setId(msg.getId());
execlog.setRunBody(msg.toString());
StringBuilder strErrorInfo=new StringBuilder();
int count = 0;
int errorcount = 0;
String strResId=initRule(msg.getRules(),msg.getBatch());
if (StringUtils.isEmpty(strResId))
{
strErrorInfo.append("准备rule文件错误");
strErrorInfo.append("\r\n");
execlog.setRetCode(2);
execlog.setCnt(count);
execlog.setSucc(errorcount);
execlog.setRunResult(strErrorInfo.toString());
ruExecLogService.update(execlog);
return execlog;
}
try
{
for (ModelObj modelObj : msg.getDatas())
{
try
{
count++;
modelObj.set("BATCH",msg.getBatch());
modelObj.set(Tag_EngineId, msg.getEngineId());
modelObj.set("systemid",msg.getSystemid());
if(!StringUtils.isEmpty(msg.getResultDataSource())){
modelObj.set(RuleEngineExService.Setting_ResultDataSource, msg.getResultDataSource());
}
if(!StringUtils.isEmpty(msg.getResultTableName())){
modelObj.set(RuleEngineExService.Setting_ResultTableName, msg.getResultTableName());
}
processRule(modelObj, modelObj.getRowKey(),strResId);
ResultsMQMsg resultsMQMsg = new ResultsMQMsg();
resultsMQMsg.setKeyValueField(modelObj.getRowKey());
resultsMQMsg.setRules(msg.getRuleIds());
Object saveResults = modelObj.get(Tag_SaveResults);
if(saveResults != null && saveResults instanceof List){
resultsMQMsg.setDatas((List<ExecResult>)saveResults);
// defaultMQProducerService.sendToMQ(resultsMQMsg, "default");
}
defaultMQProducerService.sendToMQ(resultsMQMsg, "default");
}
catch (Exception e)
{
errorcount++;
log.error("Service错误,加载数据详细信息:" + modelObj.getRowKey() + "," + e.getMessage());
if (strErrorInfo.toString().length() <= 5000)
{
strErrorInfo.append("加载数据详细信息:" + modelObj.getRowKey() + ",错误:" + e.getMessage());
strErrorInfo.append("\r\n");
if (e.getCause() != null && e.getCause().getMessage() != null)
{
strErrorInfo.append(e.getCause().getMessage());
strErrorInfo.append("\r\n");
}
}
}
}
}
catch(Exception e)
{
log.error("Service读取BaseRequest错误:" + e.toString());
strErrorInfo.append("读取BaseRequest错误:" + e.getMessage());
strErrorInfo.append("\r\n");
if (e.getCause() != null && e.getCause().getMessage() != null)
{
strErrorInfo.append(e.getCause().getMessage());
strErrorInfo.append("\r\n");
}
execlog.setRetCode(1);
execlog.setCnt(count);
if (errorcount == 0)
execlog.setSucc(0);
else
execlog.setSucc(count-errorcount);
execlog.setRunResult(strErrorInfo.toString());
ruExecLogService.update(execlog);
return execlog;
}
finally
{
if(setkieSession.containsKey(strResId))
{
KieSession kieSession=setkieSession.get(strResId);
try
{
kieSession.dispose();
}
catch (Exception ex){}
setkieSession.remove(strResId);
setRuleCount.remove(strResId);
}
}
log.info(new StringBuilder().append("Service成功,process successfully:").append(msg.toString()).toString());
strErrorInfo.append("Service成功,process successfully\r\n");
execlog.setCnt(count);
execlog.setSucc(count-errorcount);
if(execlog.getCnt()==0)
execlog.setRetCode(3);
else
execlog.setRetCode(0);
execlog.setRunResult(strErrorInfo.toString());
this.ruExecLogService.update(execlog);
return execlog;
}
}
package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.analysis.domain.DAMetric;
import cn.ibizlab.core.analysis.service.IDAMetricService;
import cn.ibizlab.core.analysis.service.impl.DABuildServiceImpl;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.DefaultMQProducerService;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.domain.Setting;
import cn.ibizlab.core.lite.extensions.model.DataModel;
import cn.ibizlab.core.lite.extensions.service.LiteDataCallback;
import cn.ibizlab.core.lite.extensions.service.LiteDataService;
import cn.ibizlab.core.lite.extensions.service.LiteModelService;
import cn.ibizlab.core.rule.domain.ExecLog;
import cn.ibizlab.core.rule.domain.RuleEngine;
import cn.ibizlab.core.rule.domain.RuleItem;
import cn.ibizlab.core.rule.service.IExecLogService;
import cn.ibizlab.core.rule.service.IRuleItemService;
import cn.ibizlab.util.errors.BadRequestAlertException;
import cn.ibizlab.util.helper.CachedBeanCopier;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import cn.ibizlab.core.analysis.domain.DABuild;
import org.kie.api.definition.rule.Rule;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.context.annotation.Primary;
import org.springframework.util.StringUtils;
import java.io.File;
import java.sql.Wrapper;
import java.util.*;
/**
* 实体[分析] 自定义服务对象
*/
......@@ -16,6 +48,32 @@ import java.util.*;
@Service("DABuildExService")
public class DABuildExService extends DABuildServiceImpl {
@Value("${ibiz.rulepath:/app/file/rules/}")
private String rulePath;
@Autowired
protected IExecLogService ruExecLogService;
@Autowired
private IDAMetricService idaMetricService;
@Autowired
private IRuleItemService ruleItemService;
@Autowired
private DefaultMQProducerService defaultMQProducerService;
@Autowired
private LiteModelService liteModelService;
@Autowired
@Lazy
private LiteDataService liteDataService;
@Autowired
@Qualifier("AnalyseEntityServiceImpl")
private BaseEntityService analyseEntityServiceImpl;
@Override
protected Class currentModelClass() {
return com.baomidou.mybatisplus.core.toolkit.ReflectionKit.getSuperClassGenericType(this.getClass().getSuperclass(), 1);
......@@ -29,6 +87,16 @@ public class DABuildExService extends DABuildServiceImpl {
@Override
@Transactional
public DABuild check(DABuild et) {
if(!StringUtils.isEmpty(et.getBuildId())) {
CachedBeanCopier.copy(get(et.getBuildId()), et);
idaMetricService.list(Wrappers.<DAMetric>lambdaQuery()
.eq(DAMetric::getBuildId,et.getBuildId()))
.forEach(dametric -> {
idaMetricService.update(dametric);
});
}
return super.check(et);
}
/**
......@@ -48,7 +116,138 @@ public class DABuildExService extends DABuildServiceImpl {
*/
@Override
public DABuild run(DABuild et) {
if(!StringUtils.isEmpty(et.getBuildId()))
{
CachedBeanCopier.copy(get(et.getBuildId()), et);
if("RUNNING".equalsIgnoreCase(et.getState()))
throw new BadRequestAlertException("构建正在执行中,不能重复执行","DABuild",et.getBuildId());
BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
msg.setEngineId(et.getBuildId());
java.sql.Timestamp starttime = new java.sql.Timestamp(System.currentTimeMillis());
liteDataService.processDataModel(et.getModelId(), et.getLastRuntime(), new LiteDataCallback<List<EntityObj>>() {
@Override
public void total(Integer total) {
et.setTotal(total);
et.setProcessed(0);
et.setState("RUNNING");
update(et);
}
@Override
public boolean processData(List<EntityObj> datas) {
EngineMQMsg engineMQMsg = new EngineMQMsg();
engineMQMsg.setEngineId(et.getBuildId());
engineMQMsg.setBatch(msg.getBatch());
engineMQMsg.setRunTime(starttime);
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas);
try {
defaultMQProducerService.sendToMQ(engineMQMsg, MQConsumeMsgListenerProcessor.Type_Build);
} catch (Exception ex) {
return false;
}
return true;
}
});
}
return super.run(et);
}
public void processData(EngineMQMsg engineMQMsg){
DABuild et = get(engineMQMsg.getEngineId());
try{
BaseRequest msg=new BaseRequest();
msg.setId(IdWorker.getIdStr());
msg.setModel(et.getModelName());
msg.setBatch(engineMQMsg.getBatch());
msg.setSystemid(et.getSystemId());
msg.setEngineId(engineMQMsg.getEngineId());
ExecLog execlog=new ExecLog();
execlog.setId(msg.getId());
execlog.setName(msg.getBatch());
execlog.setKeyValueField(msg.getModel());
execlog.setSystemId(msg.getSystemid());
ruExecLogService.create(execlog);
List<String> ruleIds = new ArrayList<>();
List<String> rulePaths = new ArrayList<>();
DataModel dataModel=liteModelService.getDataModel(et.getModelId());
HashSet<String> fillpropertys=new HashSet<>();
List<String> metrics = new ArrayList<>();
idaMetricService.list(Wrappers.<DAMetric>lambdaQuery().eq(DAMetric::getBuildId,et.getBuildId()))
.forEach(daMetric -> {
metrics.add(daMetric.getMetricId());
});
if(metrics.size() == 0){
log.error(String.format("构建数据失败:未配置指标"));
return;
}
ruleItemService.list(Wrappers.<RuleItem>lambdaQuery().in(RuleItem::getRuleId, metrics))
.forEach(ruleItem -> {
String path=rulePath + ruleItem.getGroup() + File.separator + ruleItem.getRuleId() + ".drl";
File file=new File(path);
if(!file.exists())
{
ruleItemService.buildRuleFile(ruleItem);
}
if(file.exists())
{
rulePaths.add(path);
ruleIds.add(ruleItem.getRuleId());
}
if((!StringUtils.isEmpty(ruleItem.getCond()))&&ruleItem.getCond().startsWith("["))
fillpropertys.addAll(JSON.toJavaObject(JSON.parseArray(ruleItem.getCond()), LinkedHashSet.class));
});
if(ruleIds.size() == 0){
log.error(String.format("构建数据失败:无有效规则"));
return;
}
dataModel.getAllProperty().forEach(prop->{
if(fillpropertys.contains(prop.getPropertyName()))
{
DataModel p=prop.getOwnerDataModel().getParentDataModel();
while (p!=null)
{
fillpropertys.add(p.getFactPorperty().getPropertyName());
p=p.getParentDataModel();
}
}
});
msg.setRuleIds(ruleIds);
msg.setRules(rulePaths);
msg.setDatas(liteDataService.getModelObjs(et.getModelId(),fillpropertys,engineMQMsg.getDatas()));
analyseEntityServiceImpl.processAll(msg);
}catch (Exception ex){
log.error(String.format("构建数据错误:%1$s", ex.getMessage()));
ex.printStackTrace();
}
String updateSql = "UPDATE ibzdabuild SET processed=(CASE WHEN processed is null then 0 else processed end+#{et.count}),lastruntime=CASE WHEN total <= processed THEN #{et.runtime} ELSE lastruntime END," +
"BUILDSTATE=CASE WHEN total <= processed THEN 'FINISH' ELSE BUILDSTATE END WHERE buildid = #{et.engineId}";
HashMap<String, Object> param = new HashMap<>();
param.put("count", engineMQMsg.getCount());
param.put("runtime", engineMQMsg.getRunTime());
param.put("engineId", engineMQMsg.getEngineId());
this.execute(updateSql, param);
}
}
......@@ -53,6 +53,7 @@ public class DAMetricExService extends DAMetricServiceImpl {
@Transactional
public DAMetric syncRule(DAMetric et) {
RuleItem rule =new RuleItem();
rule.setRuleId(et.getMetricId());
rule.setRuleCode(et.getMetricId());
rule.setRuleName(et.getMetricName());
rule.setModelId(et.getModelId());
......@@ -60,7 +61,8 @@ public class DAMetricExService extends DAMetricServiceImpl {
rule.setCfg(et.getCfg());
rule.setFieldSet(et.getFieldSet());
rule.setGroup("REP");
rule.setBusinessCat(et.getMetricType());
//rule.setBusinessCat(et.getMetricType());
rule.setBusinessCat(et.getBuildId());
rule.setMemo(et.getModelName()+"."+et.getMetricName());
ruleService.save(rule);
return super.syncRule(et);
......
......@@ -2,6 +2,8 @@ package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.DefaultMQProducerService;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.domain.Setting;
import cn.ibizlab.core.lite.extensions.model.DataModel;
......@@ -21,12 +23,6 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
......@@ -66,15 +62,8 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
@Autowired
private LiteModelService liteModelService;
@Autowired
@Lazy
DefaultMQProducer defaultMQProducer;
@Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}")
private String ruleEngineTopic;
private DefaultMQProducerService defaultMQProducerService;
@Value("${ibiz.rulepath:/app/file/rules/}")
private String rulePath;
......@@ -88,7 +77,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
* @return
*/
@Override
//@Transactional
public RuleEngine run(RuleEngine et) {
if(!StringUtils.isEmpty(et.getEngineId()))
......@@ -124,7 +112,7 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas);
try {
sendToMQ(engineMQMsg);
defaultMQProducerService.sendToMQ(engineMQMsg, MQConsumeMsgListenerProcessor.Type_Engine);
} catch (Exception ex) {
return false;
}
......@@ -137,12 +125,6 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
return super.run(et);
}
public void sendToMQ(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, "default", msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("消息发送响应:" + sendResult.toString());
}
@Autowired
......@@ -241,7 +223,7 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
});
}
return super.run(et);
return super.check(et);
}
}
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
/**
* MQ默认发送消息服务
*/
@Slf4j
@Service
public class DefaultMQProducerService {
@Value("${rocketmq.producer.ruleEngineTopic:DSTMSG}")
private String ruleEngineTopic;
@Value("${rocketmq.producer.resultsTopic:DSTRESULTSMSG}")
private String resultsTopic;
@Autowired
@Lazy
DefaultMQProducer defaultMQProducer;
public void sendToMQ(EngineMQMsg engineMQMsg, String tags) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, tags, msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("引擎消息发送响应:" + sendResult.toString());
}
public void sendToMQ(ResultsMQMsg resultMQMsg, String tags) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(resultMQMsg);
Message sendMsg = new Message(resultsTopic, tags, msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("构建消息发送响应:" + sendResult.toString());
}
}
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.service.DABuildExService;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
......@@ -9,8 +10,10 @@ import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
......@@ -20,15 +23,22 @@ import java.util.List;
@Slf4j
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
public static final String Type_Engine = "Engine";
public static final String Type_Build = "Build";
private Object lockObject =new Object();
@Value("${rocketmq.producer.ruleEngineTopic: DSTMSG}")
@Value("${rocketmq.producer.ruleEngineTopic:DSTMSG}")
private String ruleEngineTopic;
@Autowired
@Lazy
private RuleEngineExService ruleEngineExService;
@Autowired
@Lazy
private DABuildExService daBuildExService;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(list)) {
......@@ -59,12 +69,17 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
protected void processRuleEngineData(MessageExt messageExt){
try {
String body = new String(messageExt.getBody(), "utf-8");
String tags = messageExt.getTags();
// System.out.println("start sleep");
// Thread.sleep(80000);
// System.out.println("end sleep");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
if(Type_Engine.equalsIgnoreCase(tags)) {
ruleEngineExService.processData(engineMQMsg);
}else if(Type_Build.equalsIgnoreCase(tags)){
daBuildExService.processData(engineMQMsg);
}
} catch (Exception e) {
log.error("获取MQ消息内容异常{}",e);
}
......
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.cql.ExecResultRepository;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
@Slf4j
@Component
public class ResultsMQMsgConsumeListener implements MessageListenerConcurrently {
@Value("${rocketmq.producer.resultsTopic:DSTRESULTSMSG}")
private String resultsTopic;
@Autowired
private ExecResultRepository execResultRepository;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(list)) {
log.info("MQ接收消息为空,直接返回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//System.out.println(System.currentTimeMillis());
MessageExt messageExt = list.get(0);
log.info("MQ接收到的消息为:" + messageExt.toString());
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
log.trace("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
if(resultsTopic.equalsIgnoreCase(topic)){
ResultsMQMsg resultsMQMsg = JSON.parseObject(body, new TypeReference<ResultsMQMsg>() {});
if(!StringUtils.isEmpty(resultsMQMsg.getKeyValueField()))
execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
} catch (Exception e) {
log.error("获取MQ消息内容异常{}", e);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
package cn.ibizlab.core.util.config;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.mapping.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import static com.datastax.driver.core.schemabuilder.SchemaBuilder.createKeyspace;
import static com.datastax.driver.mapping.NamingConventions.*;
@Configuration
public class CassandraConfig {
@Bean
public Cluster cluster(
@Value("${cassandra.host:172.16.100.243}") String host,
@Value("${cassandra.cluster.name:cluster}") String clusterName,
@Value("${cassandra.options.maxrequestsperconnection:64}") int maxRequestsPerConnection,
@Value("${cassandra.options.newconnectionthreshold:100}") int newConnectionThreshold,
@Value("${cassandra.options.coreconnectionsperhost:2}") int coreConnectionsPerHost,
@Value("${cassandra.options.maxconnectionsperhost:6}") int maxConnectionsPerHost,
@Value("${cassandra.port:9042}") int port) {
if(StringUtils.isEmpty(host))
return null;
// InetSocketAddress addr=new InetSocketAddress("172.16.100.243",9042);
// InetSocketAddress addr2=new InetSocketAddress("172.16.100.77",9042);
// InetSocketAddress addr3=new InetSocketAddress("172.16.100.77",19042);
// LoadBalancingPolicy lbp = new TokenAwarePolicy(
// DCAwareRoundRobinPolicy.builder().withLocalDc("myDC").build()
// );
//读超时或连接超时设置
SocketOptions so = new SocketOptions().setReadTimeoutMillis(200000).setConnectTimeoutMillis(3000);
//连接池配置
//PoolingOptions poolingOptions = new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 2, 3);
//集群在同一个机房用HostDistance.LOCAL 不同的机房用HostDistance.REMOTE 忽略用HostDistance.IGNORED
PoolingOptions poolingOptions= new PoolingOptions()
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)//每个连接最多允许64个并发请求
.setNewConnectionThreshold(HostDistance.LOCAL, newConnectionThreshold)
.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost)//和集群里的每个机器都至少有2个连接
.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost);//和集群里的每个机器都最多有6个连接
//查询配置
//设置一致性级别ANY(0),ONE(1),TWO(2),THREE(3),QUORUM(4),ALL(5),LOCAL_QUORUM(6),EACH_QUORUM(7),SERIAL(8),LOCAL_SERIAL(9),LOCAL_ONE(10);
//可以在每次生成查询statement的时候设置,也可以像这样全局设置
QueryOptions queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
//重试策略
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return Cluster.builder()
.addContactPoint(host)
.withPort(port) .withClusterName(clusterName)
//.withLoadBalancingPolicy(lbp)
.withSocketOptions(so)
.withPoolingOptions(poolingOptions)
.withQueryOptions(queryOptions)
.withRetryPolicy(retryPolicy)
.build();
}
@Bean
public Session session(Cluster cluster, @Value("${cassandra.keyspace:dst}") String keyspace)
throws IOException {
// final Session session = cluster.connect(keyspace);
if(StringUtils.isEmpty(keyspace))
return null;
final Session session = cluster.connect();
setupKeyspace(session, keyspace);
return session;
}
@Value("${cassandra.class:SimpleStrategy}")
private String classStrategy;
@Value("${cassandra.replication_factor:1}")
private Integer replication_factor;
private void setupKeyspace(Session session, String keyspace) throws IOException {
final Map<String, Object> replication = new HashMap<>();
replication.put("class", classStrategy);
replication.put("replication_factor", replication_factor);
session.execute(createKeyspace(keyspace).ifNotExists().with().replication(replication));
session.execute("USE " + keyspace);
// String[] statements =
// split(IOUtils.toString(getClass().getResourceAsStream("/cql/setup.cql")), ";");
// Arrays.stream(statements).map(statement -> normalizeSpace(statement) +
// ";").forEach(session::execute);
}
@Bean
public MappingManager mappingManager(Session session) {
if(session==null)
return null;
final PropertyMapper propertyMapper =
new DefaultPropertyMapper()
.setNamingStrategy(new DefaultNamingStrategy(LOWER_CAMEL_CASE, LOWER_SNAKE_CASE));
final MappingConfiguration configuration =
MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
return new MappingManager(session, configuration);
}
}
package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.extensions.util.ResultsMQMsgConsumeListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
......@@ -18,22 +20,29 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConsumerConfigure {
@Value("${rocketmq.consumer.groupName: DEFAULT_CONSUMER}")
@Value("${rocketmq.consumer.groupName:}")
private String groupName;
@Value("${rocketmq.consumer.namesrvAddr:}")
private String namesrvAddr;
@Value("${rocketmq.consumer.topics:}")
private String topics;
// 消费者线程数据量
@Value("${rocketmq.consumer.consumeThreadMin: 1}")
@Value("${rocketmq.consumer.consumeThreadMin:10}")
private Integer consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax: 1}")
@Value("${rocketmq.consumer.consumeThreadMax:20}")
private Integer consumeThreadMax;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize: 1}")
private Integer consumeMessageBatchMaxSize;
@Value("${rocketmq.producer.ruleEngineTopic:DSTMSG}")
private String ruleEngineTopic;
@Value("${rocketmq.producer.resultsTopic:DSTRESULTSMSG}")
private String resultsTopic;
@Autowired
private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
@Autowired
private ResultsMQMsgConsumeListener resultsMQMsgConsumeListener;
/**
* mq 消费者配置
* @return
......@@ -43,31 +52,41 @@ public class MQConsumerConfigure {
//@ConditionalOnExpression("${rocketmq.consumer.isOnOff:off}.equals('on')")
public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
log.info("defaultConsumer 正在创建---------------------------------------");
String groupName = "dstCG";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 设置监听
consumer.registerMessageListener(consumeMsgListenerProcessor);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setInstanceName("dstMQMsgConsumer");
try {
consumer.subscribe(ruleEngineTopic, "*");
consumer.start();
log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
log.error("consumer 创建失败!");
}
return consumer;
}
/**
* 设置consumer第一次启动是从队列头部开始还是队列尾部开始
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
@Bean
public DefaultMQPushConsumer resultsMQMsgConsumer() throws MQClientException {
log.info("resultsMQMsgConsumer 正在创建---------------------------------------");
String groupName = "resultsCG";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
consumer.registerMessageListener(resultsMQMsgConsumeListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setInstanceName("resultsMQMsgConsumer");
try {
// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
String[] topicArr = topics.split(";");
for (String tag : topicArr) {
String[] tagArr = tag.split("~");
consumer.subscribe(tagArr[0], tagArr[1]);
}
consumer.subscribe(resultsTopic, "*");
consumer.start();
log.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
} catch (MQClientException e) {
......@@ -75,4 +94,5 @@ public class MQConsumerConfigure {
}
return consumer;
}
}
......@@ -15,18 +15,18 @@ import org.springframework.context.annotation.Configuration;
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
@Value("${rocketmq.producer.groupName: DEFAULT_PRODUCER}")
@Value("${rocketmq.producer.groupName:dstproducer}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr:}")
private String namesrvAddr;
// 消息最大值
@Value("${rocketmq.producer.maxMessageSize: 409600}")
@Value("${rocketmq.producer.maxMessageSize:409600}")
private Integer maxMessageSize;
// 消息发送超时时间
@Value("${rocketmq.producer.sendMsgTimeOut: 3000}")
@Value("${rocketmq.producer.sendMsgTimeOut:3000}")
private Integer sendMsgTimeOut;
// 失败重试次数
@Value("${rocketmq.producer.retryTimesWhenSendFailed: 2}")
@Value("${rocketmq.producer.retryTimesWhenSendFailed:2}")
private Integer retryTimesWhenSendFailed;
/**
......
......@@ -85,6 +85,7 @@
<oracle.version>19.8.0.0</oracle.version>
<postgresql.version>42.2.6</postgresql.version>
<metrics.version>3.2.6</metrics.version>
</properties>
......@@ -261,6 +262,11 @@
<version>${baomidou-jobs.version}</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
......
......@@ -35,4 +35,11 @@ public class CodeList
@JsonProperty("items")
private List<CodeItem> options = new ArrayList<>();
public CodeItem findCodeItem(Object value){
for(CodeItem codeItem : options){
if(value != null && codeItem.getValue() != null && value.equals(codeItem.getValue()))
return codeItem;
}
return null;
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册