Cassandra入门试用

1. 安装

1.1 官方文档

http://cassandra.apache.org/doc/latest/getting_started/index.html

1.2 安装前提
  • 安装Java 8
$java -version
java version "1.8.0_102"
Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
  • 安装Python 2.7
$python -V
Python 2.7.17
1.3 安装cassandra
  • 下载
wget http://mirrors.tuna.tsinghua.edu.cn/apache/cassandra/3.11.5/apache-cassandra-3.11.5-bin.tar.gz
  • 解压即安装
tar -xzvf apache-cassandra-3.11.5-bin.tar.gz
  • 启动脚本在cassandra的根目录的bin下
./bin/cassandra -f

2. 集群配置

  • 案例: 配置了3节点集群

    • 位于tdatanode1、tdatanode2、tdatanode3
    • tdatanode1、tdatanode2做为种子节点
    • 3个安装过程同上
  • 配置文件修改, $CASSANDRA_HOME/conf/cassandra.yaml

    • 所有机器使用相同的集群名称
        cluster_name: 'utrackRealTime'
    
    • 所有机器设置seed,均为tdatanode1、tdatanode2
        seed_provider:
            - class_name: org.apache.cassandra.locator.SimpleSeedProvider
              parameters:
                  - seeds: "tdatanode1,tdatanode2"
    
    • 监听地址设置为本机IP或者host名称
        listen_address: tdatanode1
        rpc_address: tdatanode1
    
  • 启动集群

# -f的意思的前台运行,测试时为了更方便的查看问题
./bin/cassandra -f
  • 交互式终端cqlsh连接
# 如果不是连本机,通过环境变量$CQLSH_HOST、$CQLSH_PORT指定主机和端口
cqlsh --request-timeout=600
# 指定机器
cqlsh tdatanode1 --request-timeout=60000

3. Java客户端

3.1 Maven依赖
<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.1.0</version>
</dependency>
3.2 客户端代码
  • 创建Session,应该是线程安全的,可以重用
PoolingOptions options = new PoolingOptions();
options.setNewConnectionThreshold(HostDistance.LOCAL,1800);
options.setCoreConnectionsPerHost(HostDistance.LOCAL,5);
options.setMaxConnectionsPerHost(HostDistance.LOCAL,5);

Cluster.Builder bulder = Cluster.builder().withPoolingOptions(options).addContactPoint("192.168.36.174").withPort(9042);
Cluster cluster = builder.build();
Session session = cluster.connect();
  • 执行操作
ResultSet rs = session.execute("select * FROM utrack.odl_user_hj");
for (Row r : rs.all()) {
    int size = r.getColumnDefinitions().size();
    for (int i = 0; i < size; i++) {
        r.getObject(i).toString();
    }
}
  • 进程退出时的清理
session.close();
cluster.close();

4. 性能测试

4.1 压测结论
连接 并发 QPS
1 1 1452
1 5 8182
1 10 14806
1 20 23584
1 30 32341
1 40 35945
1 50 33647

单连接、3节点、普通HDD、单JVM分配8G内存的情况下,并发30~40基本到最高性能, 35K QPS

连接数 并发数 QPS 备注
1 40 35945
L -5 40 36179 - 设置LOCAL最大连接数没有用,估计是
L -5 40
NewConnectionThreshold=0 25994
NewConnectionThreshold=1 29620
NewConnectionThreshold=10 32840
NewConnectionThreshold=800 37481
NotSet 37407
NewConnectionThreshold=1800 37000 - 阈值到800后再增长,对qps没有明显影响
R 5-10 40 35842
L 5-10 40 24113 - LOCAL coreConnect设置为5的情况下,性能反而差了。

不确定系统瓶颈的情况下,不要随意调优,Connection和新建Connection的Threshold调整可能不但不提升性能,反而降低

4.2 压测代码
  • 主类
public class ReadPerformance {

    private static ExecutorService es = Executors.newFixedThreadPool(40);

    public static void main(String[] args) throws InterruptedException {
        Session s = CassandraConnectionManager.getSession();
        PreparedStatement preparedStatement = s.prepare("select * from utrack.simple_test where domain = ?");

        long start = System.currentTimeMillis();

        for (int i = 0; i < 10_0000; i++) {
            final int index = i;
            es.submit(() -> {
                ResultSet rs = s.execute(preparedStatement.bind((int) (Math.random() * 200_0000)));
                rs.one().getObject(0);
                if (index % 1000 == 999) System.out.println("index: " + index + ", completed");
            });
        }

        System.out.println("await termination....");

        es.shutdown();

        if (!es.awaitTermination(1, TimeUnit.HOURS)) {
            System.out.println("await timeout....");
            es.shutdownNow();
        }

        long last = System.currentTimeMillis() - start;

        System.out.println("time used: " + last + ", qps:" + (10_0000_000 / last));

        CassandraConnectionManager.destory();

    }
}
  • 工具类
public class CassandraConnectionManager {

    public static Session getSession() {
        return Holder.session;
    }

    public static Cluster getCluster() {
        return Holder.cluster;
    }

    public static void destory() {
        Holder.destroy();
    }

    private static class Holder {
        private static Cluster cluster;
        private static Session session;

        static {
            PoolingOptions options = new PoolingOptions();
            options.setNewConnectionThreshold(HostDistance.LOCAL,1800);
//            options.setCoreConnectionsPerHost(HostDistance.LOCAL,5);
            options.setMaxConnectionsPerHost(HostDistance.LOCAL,5);

            Cluster.Builder bulder = Cluster.builder().withPoolingOptions(options).addContactPoint("192.168.36.174").withPort(9042);
            cluster = bulder.build();
            session = cluster.connect();
        }

        private static void destroy() {
            if (session != null) session.close();
            if (cluster != null) cluster.close();
        }
    }

}

相关推荐

  1. Cassandra入门试用

    2023-12-26 14:20:03       56 阅读
  2. docker+cassandra

    2023-12-26 14:20:03       62 阅读
  3. DynamoDB和Cassandra、MongoDB的比较

    2023-12-26 14:20:03       52 阅读
  4. 使用scyllaDb 或者cassandra存储聊天记录

    2023-12-26 14:20:03       53 阅读
  5. docker compose部署cassandra集群

    2023-12-26 14:20:03       43 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-26 14:20:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-26 14:20:03       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-26 14:20:03       82 阅读
  4. Python语言-面向对象

    2023-12-26 14:20:03       91 阅读

热门阅读

  1. 《微信小程序开发从入门到实战》学习六十三

    2023-12-26 14:20:03       57 阅读
  2. redis相关问题

    2023-12-26 14:20:03       50 阅读
  3. 访问者模式(Visitor)

    2023-12-26 14:20:03       56 阅读
  4. LeetCode //C - 1004. Max Consecutive Ones III

    2023-12-26 14:20:03       57 阅读