提交 dfb9fab1 编写于 作者: zc's avatar zc

update:批量新增(更新)、删除cassandra中的数据

上级 d58ac22f
......@@ -6,6 +6,7 @@ import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Repository;
......@@ -29,6 +30,12 @@ public class ExecResultRepository {
private static final String TABLE = "ru_execresult";
private static final String TABLE2 = "ru_execresult2";
@Value("${ibiz.batchSize:100}")
private int batchSize;
@Value("{ibiz.filterResultsData:false}")
private boolean filterResultsData;
public ExecResultRepository(MappingManager mappingManager) {
createTable(mappingManager.getSession());
......@@ -99,6 +106,10 @@ public class ExecResultRepository {
}
public void saveResultsMQMsg(ResultsMQMsg resultsMQMsg) {
if (filterResultsData) {
this.saveResultsMQMsg2(resultsMQMsg);
return;
}
final ResultSet result = session.execute(select().all().from(TABLE2).where(in("ruleid",resultsMQMsg.getRules())).and(eq("keyvaluefield",resultsMQMsg.getKeyValueField())));
List<ExecResult2> list = mapper2.map(result).all();
for(ExecResult2 execResult2:list) {
......@@ -124,6 +135,68 @@ public class ExecResultRepository {
}
}
public void saveResultsMQMsg2(ResultsMQMsg resultsMQMsg){
try {
final ResultSet result = session.execute(select().all().from(TABLE2).where(in("ruleid", resultsMQMsg.getRules())).and(eq("keyvaluefield", resultsMQMsg.getKeyValueField())));
List<ExecResult2> list = mapper2.map(result).all();
List<ExecResult> results = resultsMQMsg.getDatas();
//过滤需要删除的对象
if (results != null && results.size() > 0) {
for (ExecResult result1 : results) {
for (ExecResult2 result2 : list) {
if (result2.getRuleid().equals(result1.getRuleid()) && result2.getDimfield().equals(result1.getDimfield())
&& result2.getDomainsfield().equals(result1.getDomainsfield()) && result2.getKeyvaluefield().equals(result1.getKeyvaluefield())
&& result2.getTimefield().equals(result1.getTimefield())) {
list.remove(result2);
break;
}
}
}
}
//删除数据
PreparedStatement ps = this.getDeleteStatement();
PreparedStatement ps2 = this.getDeleteStatement2();
BatchStatement deleteBatch = new BatchStatement();
deleteBatch.setConsistencyLevel(ConsistencyLevel.ANY);// 设置一致性
for (ExecResult2 result2 : list) {
BoundStatement bs = ps.bind(result2.getRuleid(), 1, result2.getDimfield(), result2.getTimefield(), result2.getDomainsfield(), result2.getKeyvaluefield());
BoundStatement bs2 = ps2.bind(result2.getRuleid(), result2.getKeyvaluefield(), result2.getDimfield(), result2.getDomainsfield(), result2.getTimefield());
deleteBatch.add(bs);
deleteBatch.add(bs2);
if ( deleteBatch.size() > batchSize) {
session.execute(deleteBatch);
deleteBatch.clear();
}
}
if (deleteBatch.size() > 0) {
session.execute(deleteBatch);
deleteBatch.clear();
}
//保存数据
BatchStatement batch = new BatchStatement();
batch.setConsistencyLevel(ConsistencyLevel.ANY);// 设置一致性
PreparedStatement savePs1 = this.getInsertStatement();
PreparedStatement savePs2 = this.getInsertStatement2();
for (ExecResult item : results) {
BoundStatement bs1 = savePs1.bind(item.getRuleid(), item.getRetvalue(), item.getDimfield(), item.getTimefield(), item.getDomainsfield(), item.getKeyvaluefield(),
item.getBusinesscat(), item.getExt1field(), item.getExt2field(), item.getMetricfield(), item.getRuexecresultname(), item.getRulename(), item.getUpdatedate());
BoundStatement bs2 = savePs2.bind(item.getRuleid(), item.getKeyvaluefield(), item.getDimfield(),item.getDomainsfield(), item.getTimefield());
batch.add(bs1);
batch.add(bs2);
if ( batch.size() > batchSize) {
session.execute(batch);
batch.clear();
}
}
if (batch.size() > 0) {
session.execute(batch);
batch.clear();
}
} catch (Exception e){
log.error("cassandra保存错误原因:" + e.getMessage());
}
}
public List<ExecResult> sum(Collection<String> ruleids, Integer retValue, List<String> dims, Timestamp start, Timestamp end) {
......@@ -314,4 +387,36 @@ public class ExecResultRepository {
return mapper.map(result).all();
}
private PreparedStatement deleteStatement = null;
public PreparedStatement getDeleteStatement(){
if(deleteStatement == null){
deleteStatement = session.prepare("delete from ru_execresult where ruleid = ? and retvalue = ? and dimfield = ? and timefield=? and domainsfield=? and keyvaluefield=?");
}
return deleteStatement;
}
private PreparedStatement deleteStatement2 = null;
public PreparedStatement getDeleteStatement2(){
if(deleteStatement2 == null){
deleteStatement2 = session.prepare("delete from ru_execresult2 where ruleid = ? and keyvaluefield=? and dimfield = ? and domainsfield=? and timefield=? ");
}
return deleteStatement2;
}
private PreparedStatement insertStatement = null;
public PreparedStatement getInsertStatement(){
if(insertStatement == null){
insertStatement = session.prepare("insert into ru_execresult(ruleid, retvalue, dimfield, timefield, domainsfield, keyvaluefield, " +
"businesscat, ext_1field, ext_2field, metricfield, ruexecresultname, rulename, updatedate) values(?,?,?,?,?,?,?,?,?,?,?,?,?)");
}
return insertStatement;
}
private PreparedStatement insertStatement2 = null;
public PreparedStatement getInsertStatement2(){
if(insertStatement2 == null){
insertStatement2 = session.prepare("insert into ru_execresult2(ruleid, keyvaluefield, dimfield, domainsfield, timefield) values(?,?,?,?,?)");
}
return insertStatement2;
}
}
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册