一、背景
一直很好奇,Flink是如何运行我们的java类任务的,今天先记录下。
二、步骤
1、项目中添加Maven 依赖项
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<!-- Flink MySQL Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<!-- MySQL JDBC Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
</dependencies>
2、编写 Flink 任务:
创建一个 Flink 任务类,该类将从 Kafka 读取数据,处理数据,并将结果写入 MySQL。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
public class KafkaToMySQLJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 Kafka 消费者
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"kafka-topic",
new SimpleStringSchema(),
properties
);
// 从 Kafka 读取数据
DataStream<String> stream = env.addSource(kafkaConsumer);
// 处理数据(这里简单地将字符串转换为大写)
DataStream<String> processedStream = stream.map(String::toUpperCase);
// 将数据写入 MySQL
processedStream.addSink(JdbcSink.sink(
"INSERT INTO your_table (column) VALUES (?)",
new JdbcStatementBuilder<String>() {
@Override
public void accept(PreparedStatement ps, String str) throws SQLException {
ps.setString(1, str);
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(100)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/your_database")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("your_username")
.withPassword("your_password")
.build()
));
// 执行任务
env.execute("Kafka to MySQL Flink Job");
}
}
3、打包和部署:
使用 Maven 或 Gradle 打包你的项目,生成一个 JAR 文件。然后,将 JAR 文件提交到 Flink 集群执行。
mvn clean package
./bin/flink run -c your.package.KafkaToMySQLJob target/your-project-1.0-SNAPSHOT.jar
通过以上步骤,你就可以创建一个从 Kafka 读取数据并写入 MySQL 的 Flink 任务。你可以根据实际需求调整 Kafka 和 MySQL 的配置,以及数据处理逻辑。