flink stream数据 动态写入多个topic
flink1.15之前
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
object DynamicKafkaProducer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 数据流中的元素类型为 (topic: String, message: String)
val stream: DataStream[(String, String)] = ...
// 定义 Kafka 序列化器
val kafkaSerializationSchema = new KafkaSerializationSchema[(String, String)] {
override def serialize(element: (String, String), timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord(element._1, element._2.getBytes("UTF-8"))
}
}
// 创建 FlinkKafkaProducer 实例
val kafkaProducer = new FlinkKafkaProducer[(String, String)](
"localh