python 读写kafka

    1. 安装pykafka

pip install pykafka

    2. 生产者

from pykafka import KafkaClient
 
def get_kafka_producer(hosts, topics):
    client = KafkaClient(hosts=hosts)
    print(client.topics)
    topic = client.topics[topics]
    producer = topic.get_producer()
    return producer

测试

hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
producer = get_kafka_producer(hosts, topics)   
for i in range(10):
    msg = "test message " + str(i)
    # msg = bytes(msg, encoding='utf-8')    
    # producer.produce(msg)
    producer.produce(msg.encode())
producer.stop()

3. 消费者

def get_kafka_consumer(hosts, topics):
    client = KafkaClient(hosts=hosts)
    topic=client.topics[topics]
    consumer = topic.get_balanced_consumer(consumer_group='test_kafka_topic', auto_commit_enable=True,
zookeeper_connect='192.168.20.201:2181,192.168.20.202:2181,192.168.20.203:2181',
managed=True, consumer_timeout_ms=1000)
    # managed=True,即使用新式reblance分区方法,不需zk;managed=False则需通过zk来实现reblance      
    return consumer

测试

hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
consumer = get_kafka_consumer(hosts, topics)   
for msg in consumer:
    print(msg)
    if msg is not None:
        print(msg.offset)
        print(msg.value)

相关推荐

  1. python kafka

    2024-01-18 21:42:02       50 阅读
  2. PythonKafka队列

    2024-01-18 21:42:02       44 阅读
  3. python文件

    2024-01-18 21:42:02       42 阅读
  4. Python:文件

    2024-01-18 21:42:02       35 阅读
  5. Python 文件

    2024-01-18 21:42:02       24 阅读
  6. Python--文件

    2024-01-18 21:42:02       27 阅读
  7. kafka为什么不支持分离?

    2024-01-18 21:42:02       50 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-18 21:42:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-18 21:42:02       101 阅读
  3. 在Django里面运行非项目文件

    2024-01-18 21:42:02       82 阅读
  4. Python语言-面向对象

    2024-01-18 21:42:02       91 阅读

热门阅读

  1. python 常用功能积累

    2024-01-18 21:42:02       45 阅读
  2. 机器学习之协同过滤算法

    2024-01-18 21:42:02       73 阅读
  3. 机器学习在表面缺陷检测中的技术与实践

    2024-01-18 21:42:02       63 阅读
  4. MySQL-索引的介绍和使用

    2024-01-18 21:42:02       51 阅读
  5. Day6、指针的基本学习

    2024-01-18 21:42:02       46 阅读
  6. [贪心算法] 国王游戏

    2024-01-18 21:42:02       47 阅读
  7. 软件设计师考试---计算机硬件基础

    2024-01-18 21:42:02       55 阅读
  8. 服务器——配置免密登录

    2024-01-18 21:42:02       48 阅读
  9. “魔音智能去水印”隐私政策

    2024-01-18 21:42:02       54 阅读
  10. GoLang刷题之leetcode

    2024-01-18 21:42:02       52 阅读
  11. MySQL 8.0中移除的功能(一)

    2024-01-18 21:42:02       47 阅读