【flink番外篇】14、Flink异步I/O访问外部数据示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文通过Flink 的异步I/O访问外部数据,以redis作为数据源的异步读取使用示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文运行示例需要redis的环境。

一、示例:异步读取用户信息

本示例是模拟根据外部数据用户姓名查询redis中用户的个人信息。

本示例外部数据就以flink的集合作为示例,redis数据中存储的为hash表,下面验证中会有具体展示。

1、maven依赖

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.bahir</groupId>
		<artifactId>flink-connector-redis_2.12</artifactId>
		<version>1.1.0</version>
		<exclusions>
			<exclusion>
				<artifactId>flink-streaming-java_2.12</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-runtime_2.12</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-core</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-java</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
		</exclusions>
	</dependency>

</dependencies>


2、redis异步交互数据实现

1)、读取redis数据时以string进行输出

package org.datastreamapi.source.custom.redis;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import com.sun.jdi.IntegerValue;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author alanchan
 *
 */
public class CustomRedisSource extends RichAsyncFunction<String, String> {
   
	private JedisPoolConfig config = null;

	private static String ADDR = "192.168.10.41";
	private static int PORT = 6379;
	// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时
	private static int TIMEOUT = 10000;
	private JedisPool jedisPool = null;
	private Jedis jedis = null;

	@Override
	public void open(Configuration parameters) throws Exception {
   
		super.open(parameters);
		config = new JedisPoolConfig();
		jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);

		jedis = jedisPool.getResource();
	}

	@Override
	public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
   
		// 文件中读取的内容
		System.out.println("输入参数input----:" + input);
		// 发起一个异步请求,返回结果
		CompletableFuture.supplyAsync(new Supplier<String>() {
   
			@Override
			public String get() {
   
				String[] arrayData = input.split(",");
				String name = arrayData[1];
				String value = jedis.hget("AsyncReadUser_Redis", name);
				System.out.println("查询结果output----:" + value);
				return value;
			}
		}).thenAccept((String dbResult) -> {
   
			// 设置请求完成时的回调,将结果返回
			resultFuture.complete(Collections.singleton(dbResult));
		});
	}

	// 连接超时的时候调用的方法
	@Override
	public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
   
		System.out.println("redis connect timeout!");
	}

	@Override
	public void close() throws Exception {
   
		super.close();
		if (jedis.isConnected()) {
   
			jedis.close();
		}

	}

	@Data
	@AllArgsConstructor
	@NoArgsConstructor
	static class User {
   
		private int id;
		private String name;
		private int age;
		private double balance;

		User(String value) {
   
			String[] str = value.split(",");
			this.setId(Integer.valueOf(str[0]));
			this.setName(str[1]);
			this.setAge(Integer.valueOf(str[2]));
			this.setBalance(Double.valueOf(str[3]));
		}
	}

}


2)、读取redis数据时以pojo进行输出

package org.datastreamapi.source.custom.redis;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author alanchan
 *
 */
public class CustomRedisSource2 extends RichAsyncFunction<String, User> {
   
	private JedisPoolConfig config = null;

	private static String ADDR = "192.168.10.41";
	private static int PORT = 6379;
	// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时
	private static int TIMEOUT = 10000;
	private JedisPool jedisPool = null;
	private Jedis jedis = null;

	@Override
	public void open(Configuration parameters) throws Exception {
   
		super.open(parameters);
		config = new JedisPoolConfig();
		jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);

		jedis = jedisPool.getResource();
	}

	@Override
	public void asyncInvoke(String input, ResultFuture<User> resultFuture) throws Exception {
   
		System.out.println("输入查询条件:" + input);

		CompletableFuture.supplyAsync(new Supplier<User>() {
   
			@Override
			public User get() {
   
				String[] arrayData = input.split(",");
				String name = arrayData[1];
				String value = jedis.hget("AsyncReadUser_Redis", name);
				System.out.println("查询redis结果:" + value);

				return new User(value);
			}
		}).thenAccept((User dbResult) -> {
   
			// 设置请求完成时的回调,将结果返回
			resultFuture.complete(Collections.singleton(dbResult));
		});

	}

	// 连接超时的时候调用的方法
	@Override
	public void timeout(String input, ResultFuture<User> resultFuture) throws Exception {
   
		System.out.println("redis connect timeout!");
	}

	@Override
	public void close() throws Exception {
   
		super.close();
		if (jedis.isConnected()) {
   
			jedis.close();
		}

	}

}


3、使用示例

package org.datastreamapi.source.custom.redis;

import java.util.concurrent.TimeUnit;

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;

/**
 * @author alanchan
 *
 */
public class TestCustomRedisSourceDemo {
   

	public static void main(String[] args) throws Exception {
   
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// id,name
		DataStreamSource<String> lines = env.fromElements("1,alan", "2,alanchan", "3,alanchanchn", "4,alan_chan", "5,alan_chan_chn");

		SingleOutputStreamOperator<String> result = AsyncDataStream.orderedWait(lines, new CustomRedisSource(), 10, TimeUnit.SECONDS, 1);
		SingleOutputStreamOperator<User> result2 = AsyncDataStream.orderedWait(lines, new CustomRedisSource2(), 10, TimeUnit.SECONDS, 1);

		result.print("result-->").setParallelism(1);
		result2.print("result2-->").setParallelism(1);

		env.execute();

	}
}


4、验证

1)、准备redis环境数据

hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'

127.0.0.1:6379> hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hgetall AsyncReadUser_Redis
 1) "alan"
 2) "1,alan,18,20,alan.chan.chn@163.com"
 3) "alanchan"
 4) "2,alanchan,19,25,alan.chan.chn@163.com"
 5) "alanchanchn"
 6) "3,alanchanchn,20,30,alan.chan.chn@163.com"
 7) "alan_chan"
 8) "4,alan_chan,27,20,alan.chan.chn@163.com"
 9) "alan_chan_chn"
10) "5,alan_chan_chn,36,10,alan.chan.chn@163.com"

2)、启动应用程序,并观察控制台输出

输入查询条件:5,alan_chan_chn
输入参数input----:2,alanchan
输入参数input----:5,alan_chan_chn
输入查询条件:3,alanchanchn
输入查询条件:1,alan
输入参数input----:1,alan
输入查询条件:2,alanchan
输入查询条件:4,alan_chan
输入参数input----:4,alan_chan
输入参数input----:3,alanchanchn
查询结果output----:3,alanchanchn,20,30,alan.chan.chn@163.com
查询redis结果:1,alan,18,20,alan.chan.chn@163.com
查询结果output----:1,alan,18,20,alan.chan.chn@163.com
查询redis结果:4,alan_chan,27,20,alan.chan.chn@163.com
查询redis结果:2,alanchan,19,25,alan.chan.chn@163.com
查询结果output----:2,alanchan,19,25,alan.chan.chn@163.com
查询redis结果:3,alanchanchn,20,30,alan.chan.chn@163.com
查询结果output----:4,alan_chan,27,20,alan.chan.chn@163.com
查询结果output----:5,alan_chan_chn,36,10,alan.chan.chn@163.com
查询redis结果:5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 4,alan_chan,27,20,alan.chan.chn@163.com
result-->> 5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 3,alanchanchn,20,30,alan.chan.chn@163.com
result-->> 2,alanchan,19,25,alan.chan.chn@163.com
result-->> 1,alan,18,20,alan.chan.chn@163.com
result2-->> CustomRedisSource.User(id=4, name=alan_chan, age=27, balance=4.0)
result2-->> CustomRedisSource.User(id=1, name=alan, age=18, balance=1.0)
result2-->> CustomRedisSource.User(id=3, name=alanchanchn, age=20, balance=3.0)
result2-->> CustomRedisSource.User(id=5, name=alan_chan_chn, age=36, balance=5.0)
result2-->> CustomRedisSource.User(id=2, name=alanchan, age=19, balance=2.0)

以上,本文通过Flink 的异步I/O访问外部数据,以redis作为数据源的异步读取使用示例。

最近更新

  1. TCP协议是安全的吗?

    2024-01-09 10:36:01       14 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-09 10:36:01       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-09 10:36:01       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-09 10:36:01       18 阅读

热门阅读

  1. Docker的基本概念和优势

    2024-01-09 10:36:01       33 阅读
  2. OpenCV 配置选项参考(一)

    2024-01-09 10:36:01       44 阅读
  3. 站长工具之PHP单文件实现IP归属地批量查询

    2024-01-09 10:36:01       29 阅读
  4. Kafka内外网访问

    2024-01-09 10:36:01       38 阅读
  5. 文本分析之词云图的绘制

    2024-01-09 10:36:01       33 阅读
  6. 编程语言的新趋势

    2024-01-09 10:36:01       37 阅读
  7. Python数据可视化交互式HvPlot库

    2024-01-09 10:36:01       33 阅读
  8. [设计模式 Go实现] 结构型~享元模式

    2024-01-09 10:36:01       35 阅读