下载并且解压Flink
启动Flink.
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host harrydeMacBook-Pro.local.
Starting taskexecutor daemon on host harrydeMacBook-Pro.local.
访问localhost:8081
Flink 的版本附带了许多示例作业。您可以快速将这些应用程序之一部署到正在运行的集群。
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
Stop Flink
$ ./bin/stop-cluster.sh
自己编写java 代码运行第一个flink hello world.
pom.xml
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<flink.version>1.17.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.example.HelloWorld</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
java 代码
package org.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class HelloWorld {
public static void main(String[] args) throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a stream of data
DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");
// Apply transformation: split each word by space
DataStream<Tuple2<String, Integer>> wordCounts = dataStream
.flatMap(new Splitter())
.keyBy(0)
.sum(1);
// Print the result
wordCounts.print();
// Execute the Flink job
env.execute("Hello World Example");
}
// Custom FlatMapFunction to split each sentence into words
public static final class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) {
// Split the sentence into words
for (String word : sentence.split(" ")) {
// Emit the word with a count of 1
out.collect(new Tuple2<>(word, 1));
}
}
}
}
maven打包成jar
mvn package
利用flink CLI 运行jar
./bin/flink run -c your.package.FlinkJob /path/to/your/jar
执行后你能在localhost:8081上发现你的job相关信息。
参考