本文共 8962 字,大约阅读时间需要 29 分钟。
网上不少采用SpringBoot方式集成cassandra,然而,这些方式并非是基于官方原生的API直接调用,在经过spring boot体系进行二次封装后,对性能造成一定的损耗。接下来本文将以Cassandra 3.6.x版本为例,阐述如何在生产环境使用官方原生API进行Spring编程,以期最大限度达到最高性能。
com.datastax.cassandra cassandra-driver-core 3.6.x com.datastax.cassandra cassandra-driver-mapping 3.6.x com.google.guava guava 19.0 org.projectlombok lombok 1.18.12
cassandra.cluster-name=GCCLUSTERcassandra.contact-points=192.168.0.1,192.168.0.2,192.168.0.3cassandra.port=9042cassandra.keyspace=mallcassandra.username=libiaocassandra.password=libiao
/** * Copyright (c) 2020-2088 LEE POSSIBLE All Rights Reserved * Project: cassandra-bigdata * Package:com.leepossible.cassandra * Version 1.0 * cassandra config class * @author sessionListenableFuture(Cluster cluster){ return cluster.connectAsync(keyspace); } @Bean public MappingManager mappingManager(Session session) { final PropertyMapper propertyMapper = new DefaultPropertyMapper() .setNamingStrategy(new DefaultNamingStrategy(NamingConventions.LOWER_CAMEL_CASE, NamingConventions.LOWER_SNAKE_CASE)); final MappingConfiguration configuration = MappingConfiguration.builder() .withPropertyMapper(propertyMapper).build(); return new MappingManager(session, configuration); } private class CustomDateCodec extends TypeCodec{ private final TypeCodec innerCodec; CustomDateCodec(TypeCodec codec, Class javaClass){ super(codec.getCqlType(), javaClass); innerCodec = codec; } @Override public ByteBuffer serialize(Date value, ProtocolVersion protocolVersion) throws InvalidTypeException { if (value != null){ return innerCodec.serialize(LocalDate.fromMillisSinceEpoch(value.getTime()), protocolVersion); } return innerCodec.serialize(null, protocolVersion); } @Override public Date deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) throws InvalidTypeException { if (bytes != null && bytes.hasArray()){ return new Date(innerCodec.deserialize(bytes, protocolVersion).getMillisSinceEpoch()); } return null; } @Override public Date parse(String value) throws InvalidTypeException { if (!StringUtils.isEmpty(value)){ return new Date(innerCodec.parse(value).getMillisSinceEpoch()); } return null; } @Override public String format(Date value) throws InvalidTypeException { if (value != null){ return value.toString(); } return null; } }}
@Configuration@EnableAsyncpublic class AsyncTaskConfig implements AsyncConfigurer { private static final int DEFAULT_QUEUE_SIZE = 1024; @Override public Executor getAsyncExecutor() { return asyncExecutor(); } @Bean("taskExecutor") public AsyncTaskExecutor asyncExecutor() { int processors = Runtime.getRuntime().availableProcessors(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("execute-pool-%d").build(); ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setThreadFactory(threadFactory); taskExecutor.setCorePoolSize(2); taskExecutor.setMaxPoolSize(processors); taskExecutor.setQueueCapacity(DEFAULT_QUEUE_SIZE); taskExecutor.setKeepAliveSeconds(0); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.setThreadNamePrefix("singleBatchPool_"); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, objects) -> { //打印异常处理 }; }}
以数据库表结构为例,说明多个分区键的配置
/** * 数据库的结构 primary key((trans_code,trans_date),id);* **/@Data@NoArgsConstructor@Table(name = "t_trans_test", writeConsistency = "LOCAL_QUORUM", readConsistency ="LOCAL_TWO")public class TransTest { //集群键 @PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED, ordinal = 2, name = "id") private Long id; //分区键 @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED, ordinal = 0, name = "trans_code") private String transCode; @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED, ordinal = 1, name = "trans_date") private String transDate; private BigDecimal txAmt; private String txCd;}
以下是展示了同步查询和异步查询,同步保存和异步保存,同步和异步更新类似,其中通过异步可提高查询的QPS,异步保存可提高TPS。
import com.cassandra.per.model.TransTest;import com.datastax.driver.core.ConsistencyLevel;import com.datastax.driver.core.ResultSet;import com.datastax.driver.core.ResultSetFuture;import com.datastax.driver.core.Session;import com.datastax.driver.core.SimpleStatement;import com.datastax.driver.core.Statement;import com.datastax.driver.mapping.Mapper;import com.datastax.driver.mapping.MappingManager;import com.google.common.util.concurrent.FutureCallback;import com.google.common.util.concurrent.Futures;import com.google.common.util.concurrent.ListenableFuture;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.core.task.TaskExecutor;import org.springframework.stereotype.Repository;/** * Copyright (c) 2020-2088 LEE POSSIBLE All Rights Reserved * Project:cassandra-bigdata * Package:com.leepossible.cassandra * Version 1.0 * cassandra dao层处理 * @author mapper; @Autowired private ListenableFuturesessionListenableFuture; @Qualifier("taskExecutor") @Autowired private TaskExecutor taskExecutor; public TransTestDao(MappingManager mappingManager){ this.session = mappingManager.getSession(); this.mapper = mappingManager.mapper(TransTest.class); } /** * 同步查询 * @param id * @return */ public TransTest findById(Long id){ String sql = String.format("select %s from where id=%d", TABLE, id); ResultSet rs = session.execute(sql); return mapper.map(rs).one(); } /** * 异步查询 * @param id * @return * @throws Exception */ public TransTest findSessionAsyncById(Long id) throws Exception{ String sql = String.format("select %s from where id=%d", TABLE, id); ResultSetFuture future = session.executeAsync(sql); return mapper.map(future.get()).one(); } /** * 异步查询 * @param id * @return * @throws Exception */ public TransTest findFutureSessionAsyncById(Long id) throws Exception{ String sql = String.format("select %s from where id=%d", TABLE, id); ListenableFuture future = Futures.transformAsync(sessionListenableFuture, session -> { assert session != null; return session.executeAsync(sql); },taskExecutor); return mapper.map(future.get()).one(); } /** * 同步保存 * @param transTest */ public void save(TransTest transTest){ mapper.save(transTest); } public void saveSession(TransTest trans) { String sql = String.format("insert into %s(id, trans_code,trans_date,tx_mt,tx_cd)" + "VALUES(%d,%s,%s,%f,%s", TABLE, trans.getId(), trans.getTransCode(), trans.getTransDate(), trans.getTxAmt(), trans.getTxCd()); Statement statement = new SimpleStatement(sql); statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); session.executeAsync(statement); } /** * 异步保存 * @param trans */ public void saveFutureSession(TransTest trans){ String sql = String.format("insert into %s(id, trans_code,trans_date,tx_mt,tx_cd)" + "VALUES(%d,%s,%s,%f,%s", TABLE, trans.getId(), trans.getTransCode(), trans.getTransDate(), trans.getTxAmt(), trans.getTxCd()); Statement statement = new SimpleStatement(sql); statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); ListenableFuture future = Futures.transformAsync(sessionListenableFuture, session -> { assert session != null; return session.executeAsync(statement); }); Futures.addCallback(future, new FutureCallback () { @Override public void onSuccess(ResultSet rows) { } @Override public void onFailure(Throwable throwable) { //打印失败 } }); }}
转载地址:http://accpi.baihongyu.com/