Spark写入kafka(批数据和流式)

Spark写入(批数据和流式处理)

Spark写入kafka批处理

写入kafka基础

在这里插入图片描述

# spark写入数据到kafka
from pyspark.sql import SparkSession,functions as F

ss = SparkSession.builder.getOrCreate()

# 创建df数据
df = ss.createDataFrame([[9, '王五', 21, '男'], [10, '大乔', 20, '女'], [11, '小乔', 22, '女']],
                        schema='id int,name string,age int,gender string')

df.show()
# todo 注意一:需要拼接一个value
# 在写入kafka时需要拼接一个value
df_kafka = df.select(F.concat_ws(',',df.id.cast('string'),df.name,df.age.cast('string'),df.gender).alias('value'))
df_kafka.show()

# 将df写入kafka
# todo 注意二:这个和读取kafka时的配置是一样,不过这里应该是没有读取起始量和读取结束量
options = {
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定写入主题
    'topic': 'user'
}
df_kafka.write.save(format='kafka', mode='append', **options)

kafka写入策略

在这里插入图片描述

# kafka数据写入策略
from pyspark.sql import SparkSession,functions as F

ss = SparkSession.builder.getOrCreate()


# 创建df数据
df = ss.createDataFrame([[200, '王五22222', 21, '男'], [201, '大乔22222', 20, '女'], [202, '小乔2222', 22, '女']],
                        schema='id int,name string,age int,gender string')

df.show()

# 在写入kakfa时需要拼接一个value
# # df_kafka = df.select(F.concat_ws(',',df.id.cast('string'),df.name,df.age.cast('string'),df.gender).alias('value'),F.lit(1).alias('partition'))
# # df_kafka.show()

# 指定分区 增加一个分区字段
options = {
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定写入主题
    'topic': 'user',
}
# df_kafka.write.save(format='kafka', mode='append', **options)



# 指定key  会key进行hash计算,相同key的数据会写入同一分区
# hash(key)%分区数  =
# df_kafka = df.select(F.concat_ws(',',df.id.cast('string'),df.name,df.age.cast('string'),df.gender).alias('value'),df.gender.alias('key'))
# df_kafka.show()

# 同时指定key和partition  按照分区写入
df_kafka = df.select(F.concat_ws(',',df.id.cast('string'),df.name,df.age.cast('string'),df.gender).alias('value'),df.gender.alias('key'),F.lit(2).alias('partition'))
df_kafka.show()

df_kafka.write.save(format='kafka', mode='append', **options)


写入kafka应答响应级别

# spark写入数据到kafka
# 指定ack应答级别
from pyspark.sql import SparkSession, functions as F

ss = SparkSession.builder.getOrCreate()

# 创建df数据
df = ss.createDataFrame([[9, '王五', 21, '男'], [10, '大乔', 20, '女'], [11, '小乔', 22, '女']],
                        schema='id int,name string,age int,gender string')

df.show()

# 在写入kakfa时需要拼接一个value
df_kafka = df.select(F.concat_ws(',', df.id.cast('string'), df.name, df.age.cast('string'), df.gender).alias('value'))
df_kafka.show()

# 将df写入kafka
options = {
    # 指定kafka的连接的broker服务节点信息
    'kafka.bootstrap.servers': 'node1:9092',
    # 指定写入主题
    'topic': 'user',
    # 指定级别
    'acks':'all'
}
df_kafka.write.save(format='kafka', mode='append', **options)

Sprak写入kafka流式处理

相关推荐

  1. spark写入数据报错

    2024-01-22 10:36:03       36 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-22 10:36:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-22 10:36:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-22 10:36:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-22 10:36:03       20 阅读

热门阅读

  1. ECMAScript日常总结--ES2020(ES11)

    2024-01-22 10:36:03       33 阅读
  2. 嵌入式驱动开发需要会哪些技能?

    2024-01-22 10:36:03       34 阅读
  3. 常用的gpt-4 prompt words收集3

    2024-01-22 10:36:03       27 阅读
  4. 架设一台NFS服务器,并按照以下要求配置

    2024-01-22 10:36:03       34 阅读
  5. Solon v2.6.5 发布(助力信创)

    2024-01-22 10:36:03       38 阅读
  6. gitlab-ci相关部署踩坑及要点记录

    2024-01-22 10:36:03       25 阅读
  7. 如何让 Websocket兼容低版本浏览器

    2024-01-22 10:36:03       28 阅读
  8. Spring AOP 技术实现原理

    2024-01-22 10:36:03       35 阅读
  9. 安装Office Web Apps 2013

    2024-01-22 10:36:03       33 阅读