Spark SQL百万级数据批量读写入MySQL

Spark SQL还包括一个可以使用JDBC从其他数据库读取数据的数据源。与使用JdbcRDD相比,应优先使用此功能。这是因为结果作为DataFrame返回,它们可以在Spark SQL中轻松处理或与其他数据源连接。JDBC数据源也更易于使用Java或Python,因为它不需要用户提供ClassTag。

可以使用Data Sources API将远程数据库中的表加载为DataFrame或Spark SQL临时视图。用户可以在数据源选项中指定JDBC连接属性。 user和password通常作为用于登录数据源的连接属性。除连接属性外,Spark还支持以下不区分大小写的选项:

属性名称 解释
url 要连接的JDBC URL
dbtable 读取或写入的JDBC表
query 指定查询语句
driver 用于连接到该URL的JDBC驱动类名
partitionColumn, lowerBound, upperBound 如果指定了这些选项,则必须全部指定。另外, numPartitions必须指定
numPartitions 表读写中可用于并行处理的最大分区数。这也确定了并发JDBC连接的最大数量。如果要写入的分区数超过此限制,我们可以通过coalesce(numPartitions)在写入之前进行调用将其降低到此限制
queryTimeout 默认为0,查询超时时间
fetchsize JDBC的获取大小,它确定每次要获取多少行。这可以帮助提高JDBC驱动程序的性能
batchsize 默认为1000,JDBC批处理大小,这可以帮助提高JDBC驱动程序的性能。
isolationLevel 事务隔离级别,适用于当前连接。它可以是一个NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。此选项仅适用于写作。
sessionInitStatement 在向远程数据库打开每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句,使用它来实现会话初始化代码。
truncate 这是与JDBC writer相关的选项。当SaveMode.Overwrite启用时,就会清空目标表的内容,而不是删除和重建其现有的表。默认为false
pushDownPredicate 用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark将尽可能将过滤器下推到JDBC数据源。

值得注意的是,Spark不指定分区的话,默认会使用一个分区读取数据,这样在数据量特别大的情况下,会出现OOM。在读取数据之后,调用DataFrameDF.rdd.partitions.size方法可以查看分区数。

1.Spark SQL批量写入MySQL
代码示例如下:

object BatchInsertMySQL {
   
  case class Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {
   

    // 创建sparkSession对象
    val conf = new SparkConf()
      .setAppName("BatchInsertMySQL")
    val spark: SparkSession =  SparkSession.builder()
      .config(conf)
      .getOrCreate()
    import spark.implicits._
    // MySQL连接参数
    val url = JDBCUtils.url
    val user = JDBCUtils.user
    val pwd = JDBCUtils.password

    // 创建Properties对象,设置连接mysql的用户名和密码
    val properties: Properties = new Properties()
    properties.setProperty("user", user) // 用户名
    properties.setProperty("password", pwd) // 密码
    properties.setProperty("driver", "com.mysql.jdbc.Driver")
    properties.setProperty("numPartitions","10")
    // 读取mysql中的表数据
    val testDF: DataFrame = spark.read.jdbc(url, "test", properties)
     println("testDF的分区数:  " + testDF.rdd.partitions.size)
   testDF.createOrReplaceTempView("test")
   testDF.persist(StorageLevel.MEMORY_AND_DISK)
   testDF.printSchema()
    val result =
      s"""-- SQL代码
               """.stripMargin

    val resultBatch = spark.sql(result).as[Person]
    println("resultBatch的分区数: " + resultBatch.rdd.partitions.size)
    // 批量写入MySQL
    // 此处最好对处理的结果进行一次重分区
    // 由于数据量特别大,会造成每个分区数据特别多
    resultBatch.repartition(500).foreachPartition(record => {
   

      val list = new ListBuffer[Person]
      record.foreach(person => {
   
        val name = Person.name
        val age = Person.age
        list.append(Person(name,age))
      })
      upsertDateMatch(list) //执行批量插入数据
    })
    // 批量插入MySQL的方法
    def upsertPerson(list: ListBuffer[Person]): Unit = {
   
      var connect: Connection = null
      var pstmt: PreparedStatement = null
      try {
   
        connect = JDBCUtils.getConnection()
        // 禁用自动提交
        connect.setAutoCommit(false)

        val sql = "REPLACE INTO `person`(name, age)" +
          " VALUES(?, ?)"
        pstmt = connect.prepareStatement(sql)
        var batchIndex = 0
        for (person <- list) {
   
          pstmt.setString(1, person.name)
          pstmt.setString(2, person.age)
          // 加入批次
          pstmt.addBatch()
          batchIndex +=1
          // 控制提交的数量,
          // MySQL的批量写入尽量限制提交批次的数据量,否则会把MySQL写挂!!!
          if(batchIndex % 1000 == 0 && batchIndex !=0){
   
            pstmt.executeBatch()
            pstmt.clearBatch()
          }
        }
        // 提交批次
        pstmt.executeBatch()
        connect.commit()
      } catch {
   
        case e: Exception =>
          e.printStackTrace()
      } finally {
   
        JDBCUtils.closeConnection(connect, pstmt)
      }
    }
    spark.close()
  }
}

JDBC连接工具类:

object JDBCUtils {
   
  val user = "root"
  val password = "root"
  val url = "jdbc:mysql://localhost:3306/mydb"
  Class.forName("com.mysql.jdbc.Driver")
  // 获取连接
  def getConnection() = {
   
    DriverManager.getConnection(url,user,password)
  }
// 释放连接
  def closeConnection(connection: Connection, pstmt: PreparedStatement): Unit = {
   
    try {
   
      if (pstmt != null) {
   
        pstmt.close()
      }
    } catch {
   
      case e: Exception => e.printStackTrace()
    } finally {
   
      if (connection != null) {
   
        connection.close()
      }
    }
  }
}

Spark写入大量数据到MySQL时,在写入之前尽量对写入的DF进行重分区处理,避免分区内数据过多。在写入时,要注意使用foreachPartition来进行写入,这样可以为每一个分区获取一个连接,在分区内部设定批次提交,提交的批次不易过大,以免将数据库写挂。

相关推荐

  1. Spark SQL数据批量写入MySQL

    2023-12-07 19:34:04       62 阅读
  2. redis数据量预热方案

    2023-12-07 19:34:04       47 阅读
  3. mysql数据深分页问题

    2023-12-07 19:34:04       41 阅读
  4. 批量写入数据到Elasticsearch

    2023-12-07 19:34:04       54 阅读
  5. 如何实现数据从Excel导入到数据库

    2023-12-07 19:34:04       35 阅读
  6. 并发分布式锁

    2023-12-07 19:34:04       52 阅读
  7. easyExcel导出数据

    2023-12-07 19:34:04       25 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-07 19:34:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-07 19:34:04       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-07 19:34:04       82 阅读
  4. Python语言-面向对象

    2023-12-07 19:34:04       91 阅读

热门阅读

  1. 什么问题适合使用卡方检验?

    2023-12-07 19:34:04       44 阅读
  2. qt 链表QList,QLinkedList的常见使用

    2023-12-07 19:34:04       59 阅读
  3. 英伟达显卡系列与架构、代表产品

    2023-12-07 19:34:04       60 阅读
  4. Ubuntu 配置打开文件限制

    2023-12-07 19:34:04       64 阅读
  5. Python批量图像处理--图片重命名、图片旋转

    2023-12-07 19:34:04       64 阅读
  6. CG 函数

    2023-12-07 19:34:04       56 阅读
  7. 解决分布式React前端在本地开发环境的跨域问题

    2023-12-07 19:34:04       60 阅读
  8. 关于业界大语言模型(LLM)开源的一些看法

    2023-12-07 19:34:04       51 阅读
  9. 供应链产品经理常用的ChatGPT通用提示词模板

    2023-12-07 19:34:04       56 阅读
  10. MyBatis

    MyBatis

    2023-12-07 19:34:04      58 阅读
  11. 冒泡排序详解

    2023-12-07 19:34:04       63 阅读
  12. 【ASP.NET CORE】EntityFrameworkCore 数据迁移

    2023-12-07 19:34:04       57 阅读
  13. 如何在Go中构建For循环

    2023-12-07 19:34:04       57 阅读