提交 1e43c56f 编写于 作者: sq3536's avatar sq3536

提交

上级 d401e4d5
package cn.ibizlab.core.extensions.domain;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.springframework.util.StringUtils;
import java.io.Serializable;
@Data
public class LocalMsgEvent implements Serializable {
private String body;
public <T> void setMessage(T message)
{
body = JSON.toJSONString(message);
}
public <T> T getMessage(Class<T> clazz)
{
if(StringUtils.isEmpty(body))
return null;
return JSON.parseObject(body,clazz);
}
private String tags = "default";
public int getType()
{
return "default".equals(tags)?2:1;
}
}
......@@ -6,6 +6,7 @@ import cn.ibizlab.core.extensions.cql.ExecResult;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.util.DefaultMQProducerService;
import cn.ibizlab.core.extensions.util.MsgProducerService;
import cn.ibizlab.core.lite.extensions.domain.FieldObj;
import cn.ibizlab.core.lite.extensions.domain.ModelObj;
import cn.ibizlab.core.rule.domain.ExecLog;
......@@ -30,7 +31,7 @@ public class AnalyseEntityServiceImpl extends BaseEntityServiceImpl {
public static final String Tag_SaveResults = "SaveResults";
@Autowired
private DefaultMQProducerService defaultMQProducerService;
private MsgProducerService defaultMQProducerService;
@Autowired
@Lazy
......@@ -207,7 +208,7 @@ public class AnalyseEntityServiceImpl extends BaseEntityServiceImpl {
resultsMQMsg.setDatas((List<ExecResult>)saveResults);
// defaultMQProducerService.sendToMQ(resultsMQMsg, "default");
}
defaultMQProducerService.sendToMQ(resultsMQMsg, "default");
defaultMQProducerService.sendResultsMsg(resultsMQMsg);
}
catch (Exception e)
{
......
......@@ -8,7 +8,7 @@ import cn.ibizlab.core.analysis.service.impl.DABuildServiceImpl;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.DefaultMQProducerService;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.extensions.util.MsgProducerService;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.core.lite.extensions.model.DataModel;
import cn.ibizlab.core.lite.extensions.service.LiteDataCallback;
......@@ -60,7 +60,7 @@ public class DABuildExService extends DABuildServiceImpl {
private IRuleItemService ruleItemService;
@Autowired
private DefaultMQProducerService defaultMQProducerService;
private MsgProducerService defaultMQProducerService;
@Autowired
private LiteModelService liteModelService;
......@@ -151,7 +151,7 @@ public class DABuildExService extends DABuildServiceImpl {
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas);
try {
defaultMQProducerService.sendToMQ(engineMQMsg, MQConsumeMsgListenerProcessor.Type_Build);
defaultMQProducerService.sendBuildMsg(engineMQMsg);
} catch (Exception ex) {
return false;
}
......
......@@ -2,8 +2,7 @@ package cn.ibizlab.core.extensions.service;
import cn.ibizlab.core.extensions.domain.BaseRequest;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.util.DefaultMQProducerService;
import cn.ibizlab.core.extensions.util.MQConsumeMsgListenerProcessor;
import cn.ibizlab.core.extensions.util.MsgProducerService;
import cn.ibizlab.core.lite.extensions.domain.EntityObj;
import cn.ibizlab.util.helper.Setting;
import cn.ibizlab.core.lite.extensions.model.DataModel;
......@@ -60,7 +59,7 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
private LiteModelService liteModelService;
@Autowired
private DefaultMQProducerService defaultMQProducerService;
private MsgProducerService defaultMQProducerService;
@Value("${ibiz.rulepath:/app/file/rules/}")
private String rulePath;
......@@ -112,7 +111,7 @@ public class RuleEngineExService extends RuleEngineServiceImpl {
engineMQMsg.setCount(datas.size());
engineMQMsg.setDatas(datas);
try {
defaultMQProducerService.sendToMQ(engineMQMsg, MQConsumeMsgListenerProcessor.Type_Engine);
defaultMQProducerService.sendEngineMsg(engineMQMsg);
} catch (Exception ex) {
return false;
}
......
......@@ -12,6 +12,7 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
......@@ -20,7 +21,8 @@ import org.springframework.stereotype.Service;
*/
@Slf4j
@Service
public class DefaultMQProducerService {
@ConditionalOnExpression("(${rocketmq.producer.enabled:false})||(${rocketmq.consumer.enabled:false})")
public class DefaultMQProducerService implements MsgProducerService{
@Value("${rocketmq.producer.ruleEngineTopic:DSTMSG}")
private String ruleEngineTopic;
......@@ -33,17 +35,28 @@ public class DefaultMQProducerService {
DefaultMQProducer defaultMQProducer;
public void sendToMQ(EngineMQMsg engineMQMsg, String tags) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
@Override
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, "Engine", msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("引擎消息发送响应:" + sendResult.toString());
}
@Override
public void sendBuildMsg(EngineMQMsg engineMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(engineMQMsg);
Message sendMsg = new Message(ruleEngineTopic, tags, msg.getBytes());
Message sendMsg = new Message(ruleEngineTopic, "Build", msg.getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("引擎消息发送响应:" + sendResult.toString());
}
public void sendToMQ(ResultsMQMsg resultMQMsg, String tags) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(resultMQMsg);
Message sendMsg = new Message(resultsTopic, tags, msg.getBytes());
sendMsg.setKeys(resultMQMsg.getKeyValueField());
@Override
public void sendResultsMsg(ResultsMQMsg resultsMQMsg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
String msg = JSON.toJSONString(resultsMQMsg);
Message sendMsg = new Message(resultsTopic, "default", msg.getBytes());
sendMsg.setKeys(resultsMQMsg.getKeyValueField());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("构建消息发送响应:" + sendResult.toString());
}
......
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.cql.ExecResultRepository;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import cn.ibizlab.core.extensions.service.DABuildExService;
import cn.ibizlab.core.extensions.service.RuleEngineExService;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
@Slf4j
public class LocalMsgEventHandler implements EventHandler<LocalMsgEvent> {
@Autowired
@Lazy
private RuleEngineExService ruleEngineExService;
@Autowired
@Lazy
private DABuildExService daBuildExService;
@Autowired(required = false)
@Lazy
private ExecResultRepository execResultRepository;
@Override
public void onEvent(LocalMsgEvent localMsgEvent, long sequence, boolean endOfBatch) throws Exception {
if (localMsgEvent.getType()==1) {
String tags = localMsgEvent.getTags();
if("Engine".equalsIgnoreCase(tags)) {
EngineMQMsg engineMQMsg = localMsgEvent.getMessage(EngineMQMsg.class);
if (log.isDebugEnabled()) {
log.debug("EngineMQMsg Event Handler: {}", engineMQMsg.getBatch());
}
ruleEngineExService.processData(engineMQMsg);
}else if("Build".equalsIgnoreCase(tags)){
EngineMQMsg engineMQMsg = localMsgEvent.getMessage(EngineMQMsg.class);
if (log.isDebugEnabled()) {
log.debug("BuildMQMsg Event Handler: {}", engineMQMsg.getBatch());
}
daBuildExService.processData(engineMQMsg);
}
}
else if (localMsgEvent.getType()==2) {
ResultsMQMsg resultsMQMsg = localMsgEvent.getMessage(ResultsMQMsg.class);
if (log.isDebugEnabled()) {
log.debug("ResultsMQMsg Event Handler: {}", resultsMQMsg.getKeyValueField());
}
if(!StringUtils.isEmpty(resultsMQMsg.getKeyValueField()))
execResultRepository.saveResultsMQMsg(resultsMQMsg);
}
}
}
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
import com.alibaba.fastjson.JSON;
import com.baomidou.jobs.model.JobsInfo;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@ConditionalOnExpression("(!${rocketmq.producer.enabled:false})&&(!${rocketmq.consumer.enabled:false})")
public class LocalMsgProducerService implements MsgProducerService{
@Autowired
protected Disruptor<LocalMsgEvent> disruptor;
@Override
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
localMsgEvent.setMessage(engineMQMsg);
localMsgEvent.setTags("Engine");
if (log.isDebugEnabled()) {
log.debug("EngineMQMsg Event Send: {}", engineMQMsg.getBatch());
}
disruptor.publishEvent((event, sequence, bind) -> {event.setBody(bind.getBody());event.setTags(localMsgEvent.getTags());}, localMsgEvent);
}
@Override
public void sendBuildMsg(EngineMQMsg engineMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
localMsgEvent.setMessage(engineMQMsg);
localMsgEvent.setTags("Build");
if (log.isDebugEnabled()) {
log.debug("BuildMQMsg Event Send: {}", engineMQMsg.getBatch());
}
disruptor.publishEvent((event, sequence, bind) -> {event.setBody(bind.getBody());event.setTags(localMsgEvent.getTags());}, localMsgEvent);
}
@Override
public void sendResultsMsg(ResultsMQMsg resultsMQMsg) throws Exception {
LocalMsgEvent localMsgEvent = new LocalMsgEvent();
localMsgEvent.setMessage(resultsMQMsg);
if (log.isDebugEnabled()) {
log.debug("ResultsMQMsg Event Send: {}", resultsMQMsg.getKeyValueField());
}
disruptor.publishEvent((event, sequence, bind) -> event.setBody(bind.getBody()), localMsgEvent);
}
}
......@@ -23,8 +23,7 @@ import java.util.List;
@Slf4j
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
public static final String Type_Engine = "Engine";
public static final String Type_Build = "Build";
private Object lockObject =new Object();
......@@ -75,9 +74,9 @@ public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
// Thread.sleep(80000);
// System.out.println("end sleep");
EngineMQMsg engineMQMsg = JSON.parseObject(body, new TypeReference<EngineMQMsg>() {});
if(Type_Engine.equalsIgnoreCase(tags)) {
if("Engine".equalsIgnoreCase(tags)) {
ruleEngineExService.processData(engineMQMsg);
}else if(Type_Build.equalsIgnoreCase(tags)){
}else if("Build".equalsIgnoreCase(tags)){
daBuildExService.processData(engineMQMsg);
}
} catch (Exception e) {
......
package cn.ibizlab.core.extensions.util;
import cn.ibizlab.core.extensions.domain.EngineMQMsg;
import cn.ibizlab.core.extensions.domain.ResultsMQMsg;
public interface MsgProducerService {
public void sendEngineMsg(EngineMQMsg engineMQMsg) throws Exception;
public void sendBuildMsg(EngineMQMsg engineMQMsg) throws Exception;
public void sendResultsMsg(ResultsMQMsg resultsMQMsg) throws Exception;
}
......@@ -57,7 +57,8 @@ public class LiteDataService {
}
public boolean saveBatch(EntityModel entityModel, String dsName, List<EntityObj> list) {
splitList(list, 500).forEach(objs->{
List<List<EntityObj>> splist=splitList(list, 500);
splist.forEach(objs->{
getEntityService(StringUtils.isEmpty(dsName)?entityModel.getDsName():dsName).saveBatch(dsName,entityModel,objs);
});
return true;
......@@ -65,7 +66,8 @@ public class LiteDataService {
public boolean saveBatch(String systemId, String entityName, String dsName, List<EntityObj> list) {
EntityModel entityModel = liteModelService.getEntityModel(systemId,entityName);
splitList(list, 500).forEach(objs->{
List<List<EntityObj>> splist=splitList(list, 500);
splist.forEach(objs->{
getEntityService(StringUtils.isEmpty(dsName)?entityModel.getDsName():dsName).saveBatch(dsName,entityModel,objs);
});
return true;
......@@ -150,8 +152,9 @@ public class LiteDataService {
EntityModel entityModel = dataModel.getFactEntityModel();
List<ModelObj> rt = new ArrayList<>();
List<EntityObj> factEntityList = new ArrayList<>();
List<List<EntityObj>> splist=splitList(kEntityObjs, 1000);
if (fillPropertys == null || fillPropertys.size() == 0 || fillPropertys.contains(dataModel.getFactPorperty().getPropertyName()))
splitList(kEntityObjs, 1000).forEach(list -> factEntityList.addAll(getEntityService(entityModel.getDsName()).selectBase(entityModel.getDsName(), entityModel, QueryFilter.createQuery().cust(dataModel.lookup(list)))));
splist.forEach(list -> factEntityList.addAll(getEntityService(entityModel.getDsName()).selectBase(entityModel.getDsName(), entityModel, QueryFilter.createQuery().cust(dataModel.lookup(list)))));
factEntityList.forEach(entityObj ->
rt.add(new ModelObj().setDataModel(dataModel).setFactEntity(entityObj.setProperty(dataModel.getFactPorperty()))));
fillEntityObj(dataModel, fillPropertys, rt);
......@@ -162,6 +165,7 @@ public class LiteDataService {
}
public void fillEntityObj(DataModel dataModel, Set<String> fillPropertys, List<ModelObj> modelObjs) {
List<List<ModelObj>> splist=splitList(modelObjs, 1000);
if (dataModel.getObjectProperties().size() > 1) {
for (Property property : dataModel.getObjectProperties()) {
if (property.getPropertyName().equals(dataModel.getFactPorperty().getPropertyName()))
......@@ -169,7 +173,7 @@ public class LiteDataService {
EntityModel entityModel = property.getEntityModel();
List<EntityObj> entityObjs = new ArrayList<>();
if (fillPropertys == null || fillPropertys.size() == 0 || fillPropertys.contains(property.getPropertyName()))
splitList(modelObjs, 1000).forEach(list -> entityObjs.addAll(getEntityService(entityModel.getDsName()).selectBase(entityModel.getDsName(), entityModel, QueryFilter.createQuery().cust(property.lookup(list)))));
splist.forEach(list -> entityObjs.addAll(getEntityService(entityModel.getDsName()).selectBase(entityModel.getDsName(), entityModel, QueryFilter.createQuery().cust(property.lookup(list)))));
entityObjs.forEach(entityObj -> {
entityObj.setProperty(property);
......@@ -180,19 +184,31 @@ public class LiteDataService {
}
private <T> List<List<T>> splitList(List<T> list, int groupSize) {
int length = list.size();
// 计算可以分成多少组
int num = (length + groupSize - 1) / groupSize; // TODO
List<List<T>> newList = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
// 开始位置
int fromIndex = i * groupSize;
// 结束位置
int toIndex = (i + 1) * groupSize < length ? (i + 1) * groupSize : length;
newList.add(list.subList(fromIndex, toIndex));
public static <T> List<List<T>> splitList(List<T> list, int count){
List<List<T>> lists = new ArrayList<List<T>>(); // 结果集
int listSize = list.size(); // 原集合长度
int listCount = listSize / count; // 可直接拆分集合数量
// 按量拆分
for (int i = 0; i < listCount ; i++) {
List<T> sub2=new ArrayList<>();
for(int j=i*count;j<(i + 1) * count;j++)
sub2.add(list.get(j));
lists.add(sub2);
}
return newList;
// 如果按量拆分后还有剩余, 收尾
int remainder = listSize % count;
if(remainder > 0) {
List<T> sub2=new ArrayList<>();
for(int j=listSize - remainder;j<listSize;j++)
sub2.add(list.get(j));
lists.add(sub2);
}
return lists;
}
......
package cn.ibizlab.core.util.config;
import cn.ibizlab.core.extensions.domain.LocalMsgEvent;
import cn.ibizlab.core.extensions.util.LocalMsgProducerService;
import cn.ibizlab.core.extensions.util.LocalMsgEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadFactory;
@Configuration
@ConditionalOnExpression("(!${rocketmq.producer.enabled:false})&&(!${rocketmq.consumer.enabled:false})")
public class LocalMsgConfiguration {
@Bean
@ConditionalOnMissingBean
public WaitStrategy waitStrategy() {
return new SleepingWaitStrategy(200,1000*1000*10);
}
@Bean
@ConditionalOnMissingBean
public ThreadFactory threadFactory() {
return DaemonThreadFactory.INSTANCE;
}
@Bean
@ConditionalOnMissingBean
public LocalMsgEventHandler localMsgEventHandler() {
return new LocalMsgEventHandler();
}
@Bean
@ConditionalOnClass({Disruptor.class})
public Disruptor<LocalMsgEvent> disruptor(WaitStrategy waitStrategy, ThreadFactory threadFactory,
LocalMsgEventHandler localMsgEventHandler) {
Disruptor<LocalMsgEvent> disruptor = new Disruptor<>(() -> new LocalMsgEvent(), 1024 * 1024,
threadFactory, ProducerType.SINGLE, waitStrategy);
disruptor.handleEventsWith(localMsgEventHandler);
// 启动
disruptor.start();
// WEB 容器关闭执行
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
// OK
disruptor.shutdown();
// wait up to 10 seconds for the ringbuffer to drain
RingBuffer<LocalMsgEvent> ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i < 20; i++) {
if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
break;
}
try {
// give ringbuffer some time to drain...
Thread.sleep(500);
} catch (InterruptedException e) {
// ignored
}
}
disruptor.shutdown();
} catch (Exception e) {
// to do nothing
}
}));
return disruptor;
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册