Rust消费kafka

use futures::stream::StreamExt; // 引入 StreamExt 以使用 next() 方法
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Message};

async fn run_consumer() -> KafkaResult<()> {
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", "test_group")
        .set("bootstrap.servers", "localhost:9092")
        .set("enable.auto.commit", "true")
        .set("session.timeout.ms", "6000")
        .set("auto.offset.reset", "earliest")
        .create()
        .expect("Consumer creation failed");

    consumer.subscribe(&["test_topic"]).expect("Can't subscribe to specified topics");

    let mut message_stream = consumer.stream();

    while let Some(message) = message_stream.next().await {
        match message {
            Ok(m) => {
                match m.payload_view::<str>() {
                    Some(Ok(payload)) => {
                        println!("Key: '{:?}', Payload: '{}'", m.key(), payload);
                    }
                    Some(Err(e)) => {
                        eprintln!("Error while deserializing message payload: {:?}", e);
                    }
                    None => {
                        println!("Key: '{:?}', Payload: <empty>", m.key());
                    }
                }
                consumer.commit_message(&m, CommitMode::Async)?;
            }
            Err(e) => eprintln!("Kafka error: {}", e),
        }
    }
    Ok(())
}

fn main() {
    let runtime = tokio::runtime::Runtime::new().unwrap();
    runtime.block_on(run_consumer()).unwrap();
}

[dependencies]

rdkafka = "0.36.2"

tokio = { version = "1.36.0", features = ["full"] }

futures = "0.3.30"

[target.x86_64-unknown-linux-musl]

linker = "x86_64-linux-musl-gcc"

相关推荐

  1. Rust消费kafka

    2024-02-06 07:16:06       49 阅读
  2. Kafka之【消费消息

    2024-02-06 07:16:06       28 阅读
  3. kafka无法消费数据

    2024-02-06 07:16:06       49 阅读
  4. Kafka批量消费

    2024-02-06 07:16:06       40 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-06 07:16:06       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-06 07:16:06       106 阅读
  3. 在Django里面运行非项目文件

    2024-02-06 07:16:06       87 阅读
  4. Python语言-面向对象

    2024-02-06 07:16:06       96 阅读

热门阅读

  1. Spring boot 集成redis

    2024-02-06 07:16:06       48 阅读
  2. 【案例】--分布式”雪花算法案例

    2024-02-06 07:16:06       48 阅读
  3. Springboot使用kafka的两种方式

    2024-02-06 07:16:06       57 阅读
  4. 百度语音合成API

    2024-02-06 07:16:06       49 阅读
  5. Vue 本地存储

    2024-02-06 07:16:06       56 阅读
  6. C语言:公式求和

    2024-02-06 07:16:06       57 阅读
  7. go c 通过内存原始二进制内容直接传递结构体

    2024-02-06 07:16:06       54 阅读
  8. 12118 - Inspector‘s Dilemma (UVA)

    2024-02-06 07:16:06       55 阅读