做个笔记常用的spark-jdbc连接
1、mysql 的连接
def query_mysql(database,sqlstr):
jdbcUsername=''
jdbcHostname = " "
jdbcDatabase = ""
jdbcPort = 3306
mysql_df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url","jdbc:mysql://{0}:{1}/{2}?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true".format(jdbcHostname, jdbcPort, jdbcDatabase)) \
.option("dbtable", sqlstr) \
.option("user", jdbcUsername) \
.option("password", jdbcPassword) \
.load()
return mysql_df
def save__mysql(jdbcDF,database,action_text):
jdbcUsername=''
jdbcHostname = " "
jdbcDatabase = ""
jdbcPort = 3306
jdbcDF.write \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url","jdbc:mysql://{0}:{1}/{2}?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai".format(jdbcHostname, jdbcPort, jdbcDatabase)) \
.option("dbtable", action_text) \
.option("user", jdbcUsername) \
.option("password", jdbcPassword) \
.save()
2、oracle 的连接
def query_oracle(database,sql_str):
user = ""
pwd = ""
jdbcHostname=""
jdbcDatabase =""
empDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:@//{0}:1521/{1}".format(jdbcHostname,jdbcDatabase) )\
.option("dbtable", sql_str) \
.option("user", user) \
.option("password", pwd) \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.option("numpartitions",5)\
.option("fetchsize",2000)\
.load()
return empDF
3、sqlservice的连接
def query_sqlservice(jdbcdatabase,sql_str):
user = ""
pwd = ""
jdbcHostname=""
jdbcDatabase =""
empDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", "jdbc:sqlserver://{0}:1433;database={1}".format(jdbcHostname,jdbcDatabase)) \
.option("dbtable", sql_str) \
.option("user", user) \
.option("password", pwd) \
.option("numPartitions",5)\
.option("fetchsize",2000)\
.load().cache()
return empDF
4、posgresq的连接
def query_postgresql (database,sql_str):
jdbcUsername = ''
jdbcPassword = ''
empDF = spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url","jdbc:postgresql://{0}:1433/{1}".format(jdbcHostname,jdbcDatabase)) \
.option("dbtable", action_text) \
.option("user", jdbcUsername) \
.option("password", jdbcPassword) \
.option("numpartitions",5)\
.option("fetchsize",3000)\
.load()
return empDF