12月13日总结

实验8
Flink初级编程实践

1.实验目的
(1)通过实验掌握基本的Flink编程方法。
(2)掌握用IntelliJ IDEA工具编写Flink程序的方法。
2.实验平台
(1)Ubuntu18.04(或Ubuntu16.04)。
(2)IntelliJ IDEA。
(3)Flink1.9.1。
3.实验步骤
(1)使用IntelliJ IDEA工具开发WordCount程序
在Linux系统中安装IntelliJ IDEA,然后使用IntelliJ IDEA工具开发WordCount程序,并打包成JAR文件,提交到Flink中运行。
(2)数据流词频统计
使用Linux系统自带的NC程序模拟生成数据流,不断产生单词并发送出去。编写Flink程序对NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在IntelliJ IDEA中开发和调试程序,然后,再打成JAR包部署到Flink中运行。

4.实验报告
题目: Flink初级编程实践 姓名 刘梦阳 日期2023.12.08
实验环境:(1)centOS8。(2)IntelliJ IDEA。(3)Flink1.9.1。
实验内容与完成情况:
(1)使用IntelliJ IDEA工具开发WordCount程序
在Linux系统中安装IntelliJ IDEA,然后使用IntelliJ IDEA工具开发WordCount程序,并打包成JAR文件,提交到Flink中运行。
在linux启动flink集群

创建idea项目添加maven依赖

Pom.xml


4.0.0

org.example
FinkWordCount
1.0-SNAPSHOT

8 8 org.apache.maven.plugins maven-assembly-plugin false jar-with-dependencies WordCount.main make-assembly package assembly org.apache.flink flink-java 1.9.1 org.apache.flink flink-streaming-java_2.12 1.9.1 provided org.apache.flink flink-clients_2.12 1.9.1 junit junit 4.13.2 test 使用maven打包jar包

启动hadoop和flink

将WordCount.jar提交到flink运行

(2)数据流词频统计
使用Linux系统自带的NC程序模拟生成数据流,不断产生单词并发送出去。编写Flink程序对NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在IntelliJ IDEA中开发和调试程序,然后,再打成JAR包部署到Flink中运行。
Java代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.err.println("指定port参数,默认值为9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource text = env.socketTextStream("node1", port, "\n");
//计算数据
DataStream windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split("\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
})//打平操作,把每行的单词转为<word,count>类型的数据
.keyBy("word")//针对相同的word数据进行分组
.timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
.sum("count");
//把数据打印到控制台
windowCount.print()
.setParallelism(1);//使用一个并行度
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
}

在IntelliJ IDEA中调试程序

打包jar包部署到flink
在linux运行nc程序

将jar包提交运行

在nc中输入单词

进入flink的web页面

查看输出内容

出现的问题:
安装过程中的依赖问题:在安装Flink或Idea的过程中,可能会遇到一些缺少依赖库或软件包的问题。
环境变量配置问题:可能会因为环境变量没有正确配置而导致Flink或Idea无法找到必需的路径。
Idea配置问题:在Idea中编写代码时,可能会遇到无法找到依赖库或配置不正确的问题。
网络或端口问题:Flink作业无法连接到集群或无法使用指定的端口。
解决方案(列出遇到的问题和解决办法,列出没有解决的问题):
安装过程中的依赖问题解决办法: 确保按照相关文档的指导正确安装了所有必需的依赖项。可以使用包管理工具(如apt、yum等)来安装缺失的库或软件包。
环境变量配置问题解决办法: 检查并确保PATH和其他必需的环境变量已经正确设置,以便系统能够找到相关的执行文件和库。
Idea配置问题解决办法: 确保你在Idea中正确配置了Flink的相关设置,包括项目的依赖项和运行配置。检查类路径和库的设置,确保它们指向正确的位置。
网络或端口问题解决办法: 检查网络连接,确保集群可访问。确保端口没有被防火墙阻止,并且Flink配置中使用的端口没有被其他程序占用。

相关推荐

  1. 1213总结

    2023-12-13 10:08:19       53 阅读
  2. 1211总结

    2023-12-13 10:08:19       43 阅读
  3. 1212总结

    2023-12-13 10:08:19       41 阅读
  4. 128总结

    2023-12-13 10:08:19       32 阅读
  5. 1210总结

    2023-12-13 10:08:19       42 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-13 10:08:19       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-13 10:08:19       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-13 10:08:19       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-13 10:08:19       18 阅读

热门阅读

  1. 12月12日总结

    2023-12-13 10:08:19       41 阅读
  2. 每日总结

    2023-12-13 10:08:19       46 阅读
  3. Linux部署mosquitto及其配置

    2023-12-13 10:08:19       41 阅读
  4. pandas 遍历

    2023-12-13 10:08:19       40 阅读
  5. Qt打包

    2023-12-13 10:08:19       50 阅读
  6. 12.12总结

    2023-12-13 10:08:19       44 阅读
  7. get请求数组参数,格式转换

    2023-12-13 10:08:19       59 阅读
  8. 【异步】CompletableFuture

    2023-12-13 10:08:19       44 阅读
  9. 深入了解RPM包管理与Nginx源码包管理

    2023-12-13 10:08:19       49 阅读
  10. clickhouse sql优化笔记

    2023-12-13 10:08:19       41 阅读
  11. 逃逸分析案例

    2023-12-13 10:08:19       54 阅读
  12. 每日总结

    2023-12-13 10:08:19       39 阅读
  13. ferry前端项目部署

    2023-12-13 10:08:19       49 阅读
  14. selenium

    2023-12-13 10:08:19       58 阅读
  15. openresty动态解析域名

    2023-12-13 10:08:19       44 阅读
  16. Linux的bash脚本

    2023-12-13 10:08:19       55 阅读
  17. AtCoder Grand Contest 001

    2023-12-13 10:08:19       55 阅读
  18. TCP和UDP的区别

    2023-12-13 10:08:19       36 阅读
  19. Git合并代码(rebase)

    2023-12-13 10:08:19       39 阅读
  20. android重启app

    2023-12-13 10:08:19       42 阅读