spark3.0.1版本查询Hbase数据库例子

需求背景

现有需求,需要采用spark查询hbase数据库的数据同步到中间分析库,记录spark集成hbase的简单例子代码

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession

object ReadHBaseData {
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("ReadHBaseData")
      .master("local")
      .getOrCreate()
      
    // 创建HBase配置
    val conf = HBaseConfiguration.create()

    // 设置HBase连接参数
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    // 创建HBase连接
    val connection = ConnectionFactory.createConnection(conf)

    // 创建HBase表
    val tableName = "my_table"
    val table = connection.getTable(TableName.valueOf(tableName))

    // 创建HBase扫描对象
    val scan = new Scan()

    // 设置要读取的列族和列
    scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"))
    scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column2"))

    // 执行HBase扫描
    val scanner = table.getScanner(scan)

    // 遍历扫描结果并将结果转换为RDD
    val rdd = spark.sparkContext.parallelize(scanner.iterator().asScala.map(result => {
      val rowKey = Bytes.toString(result.getRow)
      val value1 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1")))
      val value2 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column2")))
      (rowKey, value1, value2)
    }).toList)

    // 将RDD转换为DataFrame
    val df = spark.createDataFrame(rdd).toDF("rowKey", "value1", "value2")

    // 显示DataFrame内容
    df.show()

    // 关闭HBase连接
    scanner.close()
    table.close()
    connection.close()

    // 关闭SparkSession
    spark.stop()
  }
}
 

相关推荐

  1. spark3.0.1版本查询Hbase数据库例子

    2024-06-06 18:52:01       32 阅读
  2. Spark例子

    2024-06-06 18:52:01       51 阅读
  3. Spark—shell,Hbase—shell

    2024-06-06 18:52:01       48 阅读
  4. spark+phoenix读取hbase

    2024-06-06 18:52:01       49 阅读
  5. Spark面试整理-Spark集成HBase

    2024-06-06 18:52:01       34 阅读
  6. mysql数据库查询语句配例题(一)

    2024-06-06 18:52:01       48 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-06 18:52:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-06 18:52:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-06 18:52:01       82 阅读
  4. Python语言-面向对象

    2024-06-06 18:52:01       91 阅读

热门阅读

  1. Visual Studio的使用教程

    2024-06-06 18:52:01       28 阅读
  2. rabbitmq的交换机类型以及他们的区别

    2024-06-06 18:52:01       31 阅读
  3. 【无标题】2024.6.6

    2024-06-06 18:52:01       36 阅读
  4. linux c 求取MD5 转char 输出

    2024-06-06 18:52:01       33 阅读
  5. 每天一个数据分析题(三百五十一)-树状体系图

    2024-06-06 18:52:01       32 阅读
  6. Redis安装教程

    2024-06-06 18:52:01       34 阅读
  7. 比较PWM调光和无极调光

    2024-06-06 18:52:01       36 阅读