使用spark将MongoDB数据导入hive

使用spark将MongoDB数据导入hive

一、pyspark
1.1 pymongo+spark
代码
import json,sys
import datetime, time
import pymongo
import urllib.parse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

'127.0.0.1 27017 mongo_db mongo_collection hive_db hive_table mongo_user mongo_password'
print('host:',sys.argv[1], 'port:',sys.argv[2], 'mongo_db:',sys.argv[3], 'mongo_collection:',sys.argv[4], 'hive_db:',sys.argv[5], 'hive_table:',sys.argv[6], 'mongo_user:',sys.argv[7], 'mongo_password:',sys.argv[8])
# MongoDB连接信息
mongo_username =sys.argv[7]
mongo_password = sys.argv[8]
mongo_host = sys.argv[1]
mongo_database = sys.argv[3]
mongo_port=sys.argv[2]

# 转义用户名和密码
if mongo_username and mongo_password:
    escaped_username = urllib.parse.quote_plus(mongo_username)
    escaped_password = urllib.parse.quote_plus(mongo_password)
    # 构建MongoDB连接URL
    mongo_connection_url = "mongodb://{0}:{1}@{2}:{3}/{4}".format(escaped_username, escaped_password, mongo_host, mongo_port, mongo_database)
else:
    mongo_connection_url = "mongodb://{0}:{1}/{2}".format( mongo_host, mongo_port, mongo_database)
# 连接 MongoDB
mongo_client = pymongo.MongoClient(mongo_connection_url)
mongo_db = mongo_client[sys.argv[3]]
mongo_collection = mongo_db[sys.argv[4]]
mongo_data = mongo_collection.find()
mongo_collection.list_indexes()
# 从 MongoDB 读取数据
values = []
values_batch = []
flag=0
time_start = time.time()
for data in mongo_data:
    if flag==0:
        # 定义字段列表
        field_list = list(data.keys())
        flag=1
    res={
   }
    for key in field_list:
        try:
            res[key]=json.dumps( str(data[key]), ensure_ascii=False)
        except:
            res[key]=None
    columns = list(res.values())
    values_batch.append(columns)

# 创建 SparkSession
spark = SparkSession.builder.appName("CreateStringDataFrame").master('local[*]').getOrCreate()
# 构建 StructType 对象
schema = StructType([StructField(field_name, StringType(), True) for field_name in field_list])
# 创建空的 DataFrame
df = spark.createDataFrame(values_batch, schema)
# 显示 DataFrame 结构
df.printSchema()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}".format(Hive_DB=sys.argv[5], Hive_Name=sys.argv[6]))
spark.stop()
time_end = time.time()
spark-submit
spark-submit --num-executors 1 --executor-memory 512M --executor-cores 1 --deploy-mode client --queue root.users.root  ./mongo.py 127.0.0.1 27017 test_db test_table tmp_can_delete_database mongo_test test1 test1
1.2 mongo-spark-connector

生产环境不方便使用,亲测各种报错

from pyspark.sql import SparkSession
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.2') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}")
spark.stop()
二、Scala
2.1 pom.xml
<dependencies>
       	<dependency>
            <groupId>com.thoughtworks.paranamer</groupId>
            <artifactId>paranamer</artifactId>
            <version>2.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>

        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.12</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>
2.2 代码
package org.spark
import org.apache.spark.sql.{
   SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object Main {
   
  private def insert(host:String,port:String,db:String,collection:String,Hive_DB:String,Hive_Name:String,username:Object=None,password:Object=None): Unit = {
   
    var url=""
    //判断username 和密码是否为空
    if(username==None || password==None){
   
      url=s"mongodb://${
     host}:${
     port}/${
     db}.${
     collection}"
    }else{
   
      url=s"mongodb://${
     username}:${
     password}@${
     host}:${
     port}/${
     db}.${
     collection}"
    }
    val spark = SparkSession.builder()
      .appName("mongo")
      .config("spark.mongodb.input.uri", url)
      .getOrCreate()
    val df=spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    df.show(5)
    val string_df=df.select(df.columns.map(c => col(c).cast("string").alias(c)): _*)

    string_df.write.mode(SaveMode.Overwrite).saveAsTable(s"${
     Hive_DB}.${
     Hive_Name}")

    spark.stop()
  }


  def main(args: Array[String]): Unit = {
   
    val start_time = System.currentTimeMillis()
    println(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))
    insert(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))
//    insert("127.0.0.1","27017","mongo_db","mongo_collection","hive_db","hive_table","mongo_username","mongo_password")
    val end_time = System.currentTimeMillis()
    val duration = (end_time - start_time) / 1000.0
    println(s"程序运行时间为 $duration 秒")
  }
}

相关推荐

  1. 使用sparkMongoDB数据导入hive

    2024-01-13 08:08:06       64 阅读
  2. 使用SqoopHive数据导出到TiDB

    2024-01-13 08:08:06       53 阅读
  3. Spark SQLHive表中的数据写入到MySQL数据库

    2024-01-13 08:08:06       65 阅读
  4. 使用apoc数据数据库导入neo4j

    2024-01-13 08:08:06       47 阅读
  5. 如何txt文件导入hive

    2024-01-13 08:08:06       34 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-13 08:08:06       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-13 08:08:06       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-13 08:08:06       82 阅读
  4. Python语言-面向对象

    2024-01-13 08:08:06       91 阅读

热门阅读

  1. CompletableFuture、ListenableFuture高级用列

    2024-01-13 08:08:06       42 阅读
  2. STM32 i2c从机模式中断处理参考

    2024-01-13 08:08:06       46 阅读
  3. 9个Linux网络命令

    2024-01-13 08:08:06       53 阅读
  4. 基本数据结构 | 并查集

    2024-01-13 08:08:06       63 阅读
  5. Docker-Compose编排Nginx1.25.1+PHP7.4.33+Redis7.0.11环境

    2024-01-13 08:08:06       39 阅读
  6. Redis学习指南(3)-Redis的应用领域

    2024-01-13 08:08:06       62 阅读
  7. Jupyter Markdown格式

    2024-01-13 08:08:06       59 阅读