Flume 之自定义Sink

1、简介

        前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。

2、自定义Source

        自定义Source参考前文:https://blog.csdn.net/zwl2220943286/article/details/135633120

3、自定义Sink

        本文将Sink定义为mysql。

3.1、引入依赖
<dependency>
	<groupId>org.apache.flume</groupId>
	<artifactId>flume-ng-core</artifactId>
	<version>1.11.0</version>
</dependency>

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.33</version>
</dependency>
3.2、自定义Sink
3.2.1、Sink代码
import com.weilong.flumeselfdefinition.util.MysqlConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    private final static Logger log = LoggerFactory.getLogger(MySink.class);
    private String url;
    private String username;
    private String password;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        Channel channel = getChannel();
        // channel 支持事务
        Transaction thx = channel.getTransaction();
        thx.begin();
        try {
            Event event = channel.take();
            String name = new String(event.getBody());
            int i = MysqlConfig.insertData(this.url, this.username, this.password, name);
            if (i > 0){
                log.info("==插入数据库成功==");
            }
            thx.commit();
            status = Status.READY;
        } catch (Exception ex){
            ex.printStackTrace();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        String url = context.getString("url");
        String username = context.getString("username");
        String password = context.getString("password");
        this.url = url;
        this.username = username;
        this.password = password;
    }
}
 3.2.2、数据库连接配置:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlConfig {

    private MysqlConfig(){
    }

    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }

    public static Connection getConnection(String url, String username, String password) throws SQLException {
        Connection connection = DriverManager.getConnection(url, username, password);
        return connection;
    }

    public static int insertData(String url, String username,String password, String name){
        Connection connection = null;
        try{
            connection = getConnection(url, username, password);
            PreparedStatement preparedStatement = connection.prepareStatement("insert into test(`name`) values( '" + name + "')");
            boolean res = preparedStatement.execute();
            if (res){
                return 1;
            }
            return 0;
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            if (connection != null){
                try {
                    connection.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
        }
        return 0;
    }
}
 3.3、Flume 配置文件

        vim flume-self-source-sink.conf

a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource 
# 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.3:8088/hello 
# 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 自定义Sink
a1.sinks.k1.type = com.weilong.flumeselfdefinition.MySink
a1.sinks.k1.url = jdbc:mysql://192.168.30.3:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
a1.sinks.k1.username = root
a1.sinks.k1.password = 146815
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、将jar包放入lib目录 
 4.1、将自定义jar包放入lib目录

 

4.2、将数据库驱动jar包放入lib目录

        驱动jar包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

注:mysql 驱动jar包不放进lib,会出现驱动类找不到。 

5、启动 Flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source-sink.conf -Dflume.root.logger=INFO,console

注:启动Flume 之前,自定义 web 服务也要启动。

 6、结果

成功保存进数据库。

7、总结 

         本文结合前文完成 Flume 的 Source 和 Sink 的自定义,帮助大家能够完成各种场景下的Flume的使用。关于更高级Flume的知识,关注下面公众号。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

相关推荐

  1. 定义sink

    2024-01-17 13:46:04       36 阅读
  2. Flume配置案例@Source:文件,Channel+Sink:Kafka

    2024-01-17 13:46:04       35 阅读
  3. Flume配置案例@Source:Kafka,Channel:File,Sink:HDFS

    2024-01-17 13:46:04       32 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-17 13:46:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-17 13:46:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-01-17 13:46:04       87 阅读
  4. Python语言-面向对象

    2024-01-17 13:46:04       96 阅读

热门阅读

  1. C Primer Plus(第六版)11.13 编程练习 第6题

    2024-01-17 13:46:04       45 阅读
  2. elasticsearch查询

    2024-01-17 13:46:04       57 阅读
  3. Kotlin Async

    2024-01-17 13:46:04       52 阅读
  4. Python 发微信:实现自动化沟通的利器

    2024-01-17 13:46:04       53 阅读
  5. sqlserver2012 跨服务器查询

    2024-01-17 13:46:04       67 阅读
  6. ARCGIS PRO SDK 地图图层单一符号化_____面图层

    2024-01-17 13:46:04       58 阅读
  7. Flutter开发 键盘弹起导致底部溢出问题

    2024-01-17 13:46:04       58 阅读
  8. C#学习教程

    2024-01-17 13:46:04       58 阅读
  9. 黑洞数(C语言)

    2024-01-17 13:46:04       52 阅读
  10. 快速了解STM32的ADC功能,从入门到精通

    2024-01-17 13:46:04       55 阅读
  11. Github Copilot 的使用方法和快捷键*

    2024-01-17 13:46:04       77 阅读
  12. Nue.js 是什么?

    2024-01-17 13:46:04       47 阅读
  13. What is `HttpServletRequestWrapper` does?

    2024-01-17 13:46:04       64 阅读