博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【数据平台】之Cassandra大数据利器-代码实战干货I
阅读量:4119 次
发布时间:2019-05-25

本文共 8962 字,大约阅读时间需要 29 分钟。

网上不少采用SpringBoot方式集成cassandra,然而,这些方式并非是基于官方原生的API直接调用,在经过spring boot体系进行二次封装后,对性能造成一定的损耗。接下来本文将以Cassandra 3.6.x版本为例,阐述如何在生产环境使用官方原生API进行Spring编程,以期最大限度达到最高性能。

1、pom.xml依赖

com.datastax.cassandra
cassandra-driver-core
3.6.x
com.datastax.cassandra
cassandra-driver-mapping
3.6.x
com.google.guava
guava
19.0
org.projectlombok
lombok
1.18.12

2、application.properties配置

cassandra.cluster-name=GCCLUSTERcassandra.contact-points=192.168.0.1,192.168.0.2,192.168.0.3cassandra.port=9042cassandra.keyspace=mallcassandra.username=libiaocassandra.password=libiao

3、CassandraConfig 配置类

/** * Copyright (c) 2020-2088 LEE POSSIBLE All Rights Reserved * Project: cassandra-bigdata * Package:com.leepossible.cassandra * Version 1.0 * cassandra config class * @author  sessionListenableFuture(Cluster cluster){        return cluster.connectAsync(keyspace);    }    @Bean    public MappingManager mappingManager(Session session) {        final PropertyMapper propertyMapper = new DefaultPropertyMapper()                .setNamingStrategy(new DefaultNamingStrategy(NamingConventions.LOWER_CAMEL_CASE, NamingConventions.LOWER_SNAKE_CASE));        final MappingConfiguration configuration = MappingConfiguration.builder()                .withPropertyMapper(propertyMapper).build();        return new MappingManager(session, configuration);    }    private class CustomDateCodec extends TypeCodec
{ private final TypeCodec
innerCodec; CustomDateCodec(TypeCodec
codec, Class
javaClass){ super(codec.getCqlType(), javaClass); innerCodec = codec; } @Override public ByteBuffer serialize(Date value, ProtocolVersion protocolVersion) throws InvalidTypeException { if (value != null){ return innerCodec.serialize(LocalDate.fromMillisSinceEpoch(value.getTime()), protocolVersion); } return innerCodec.serialize(null, protocolVersion); } @Override public Date deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) throws InvalidTypeException { if (bytes != null && bytes.hasArray()){ return new Date(innerCodec.deserialize(bytes, protocolVersion).getMillisSinceEpoch()); } return null; } @Override public Date parse(String value) throws InvalidTypeException { if (!StringUtils.isEmpty(value)){ return new Date(innerCodec.parse(value).getMillisSinceEpoch()); } return null; } @Override public String format(Date value) throws InvalidTypeException { if (value != null){ return value.toString(); } return null; } }}

3.1、配置异步加载

@Configuration@EnableAsyncpublic class AsyncTaskConfig implements AsyncConfigurer {    private static final int DEFAULT_QUEUE_SIZE = 1024;    @Override    public Executor getAsyncExecutor() {        return asyncExecutor();    }    @Bean("taskExecutor")    public AsyncTaskExecutor asyncExecutor() {        int processors = Runtime.getRuntime().availableProcessors();        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("execute-pool-%d").build();        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();        taskExecutor.setThreadFactory(threadFactory);        taskExecutor.setCorePoolSize(2);        taskExecutor.setMaxPoolSize(processors);        taskExecutor.setQueueCapacity(DEFAULT_QUEUE_SIZE);        taskExecutor.setKeepAliveSeconds(0);        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);        taskExecutor.setAwaitTerminationSeconds(60);        taskExecutor.setThreadNamePrefix("singleBatchPool_");        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        taskExecutor.initialize();        return taskExecutor;    }        @Override    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return (throwable, method, objects) -> {            //打印异常处理        };    }}

4、Model类定义

以数据库表结构为例,说明多个分区键的配置

/** * 数据库的结构 primary key((trans_code,trans_date),id);* **/@Data@NoArgsConstructor@Table(name = "t_trans_test", writeConsistency = "LOCAL_QUORUM", readConsistency ="LOCAL_TWO")public class TransTest {    //集群键    @PrimaryKeyColumn(type = PrimaryKeyType.CLUSTERED, ordinal = 2, name = "id")    private Long id;    //分区键    @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED, ordinal = 0, name = "trans_code")    private String transCode;        @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED, ordinal = 1, name = "trans_date")    private String transDate;        private BigDecimal txAmt;        private String txCd;}

5、Dao层访问

以下是展示了同步查询和异步查询,同步保存和异步保存,同步和异步更新类似,其中通过异步可提高查询的QPS,异步保存可提高TPS。

import com.cassandra.per.model.TransTest;import com.datastax.driver.core.ConsistencyLevel;import com.datastax.driver.core.ResultSet;import com.datastax.driver.core.ResultSetFuture;import com.datastax.driver.core.Session;import com.datastax.driver.core.SimpleStatement;import com.datastax.driver.core.Statement;import com.datastax.driver.mapping.Mapper;import com.datastax.driver.mapping.MappingManager;import com.google.common.util.concurrent.FutureCallback;import com.google.common.util.concurrent.Futures;import com.google.common.util.concurrent.ListenableFuture;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.core.task.TaskExecutor;import org.springframework.stereotype.Repository;/** * Copyright (c) 2020-2088 LEE POSSIBLE All Rights Reserved * Project:cassandra-bigdata * Package:com.leepossible.cassandra * Version 1.0 * cassandra dao层处理  * @author  mapper;    @Autowired    private ListenableFuture
sessionListenableFuture; @Qualifier("taskExecutor") @Autowired private TaskExecutor taskExecutor; public TransTestDao(MappingManager mappingManager){ this.session = mappingManager.getSession(); this.mapper = mappingManager.mapper(TransTest.class); } /** * 同步查询 * @param id * @return */ public TransTest findById(Long id){ String sql = String.format("select %s from where id=%d", TABLE, id); ResultSet rs = session.execute(sql); return mapper.map(rs).one(); } /** * 异步查询 * @param id * @return * @throws Exception */ public TransTest findSessionAsyncById(Long id) throws Exception{ String sql = String.format("select %s from where id=%d", TABLE, id); ResultSetFuture future = session.executeAsync(sql); return mapper.map(future.get()).one(); } /** * 异步查询 * @param id * @return * @throws Exception */ public TransTest findFutureSessionAsyncById(Long id) throws Exception{ String sql = String.format("select %s from where id=%d", TABLE, id); ListenableFuture
future = Futures.transformAsync(sessionListenableFuture, session -> { assert session != null; return session.executeAsync(sql); },taskExecutor); return mapper.map(future.get()).one(); } /** * 同步保存 * @param transTest */ public void save(TransTest transTest){ mapper.save(transTest); } public void saveSession(TransTest trans) { String sql = String.format("insert into %s(id, trans_code,trans_date,tx_mt,tx_cd)" + "VALUES(%d,%s,%s,%f,%s", TABLE, trans.getId(), trans.getTransCode(), trans.getTransDate(), trans.getTxAmt(), trans.getTxCd()); Statement statement = new SimpleStatement(sql); statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); session.executeAsync(statement); } /** * 异步保存 * @param trans */ public void saveFutureSession(TransTest trans){ String sql = String.format("insert into %s(id, trans_code,trans_date,tx_mt,tx_cd)" + "VALUES(%d,%s,%s,%f,%s", TABLE, trans.getId(), trans.getTransCode(), trans.getTransDate(), trans.getTxAmt(), trans.getTxCd()); Statement statement = new SimpleStatement(sql); statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); ListenableFuture
future = Futures.transformAsync(sessionListenableFuture, session -> { assert session != null; return session.executeAsync(statement); }); Futures.addCallback(future, new FutureCallback
() { @Override public void onSuccess(ResultSet rows) { } @Override public void onFailure(Throwable throwable) { //打印失败 } }); }}

转载地址:http://accpi.baihongyu.com/

你可能感兴趣的文章
深入浅出PHP(Exploring PHP)
查看>>
深入理解PHP原理之Opcodes
查看>>
深入理解PHP原理之变量(Variables inside PHP)
查看>>
深入理解PHP原理之变量作用域(Scope in PHP)
查看>>
深入理解PHP原理之变量分离/引用(Variables Separation)
查看>>
深入理解PHP内存管理之谁动了我的内存
查看>>
ubuntu 14.04 lamp 安装
查看>>
php和javascript中Json操作总结
查看>>
Linux 下安装 Memcached 和 PHP 开启 Memcached 扩展
查看>>
php模块memcache和memcached区别分析
查看>>
PHP模块 Memcached功能多于Memcache
查看>>
Memcached命令使用
查看>>
Firebug教程
查看>>
透视WebMVC
查看>>
ubuntu14.04 安装redis
查看>>
互联网协议入门(一)
查看>>
互联网协议入门(二)
查看>>
http协议学习系列
查看>>
redis 命令
查看>>
Redis配置文件redis.conf参数说明
查看>>