Flume详解(3)

Host Interceptor

主机拦截器,本质上不是拦截数据,而是在数据的headers中添加一个host字段,可以用于标记数据来源(被收集)的主机。

Host Interceptor可以配置的选项有:

表-22 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是host

preserveExisting

optional

如果host字段已经存在,是否替换。默认是false

useIP

optional

是否使用IP。如果为true,则显示为IP;如果为false,则显示为主机名。默认为true

hostHeader

optional

显示的字段,默认是host

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Host Interceptor

a1.sources.s1.interceptors.i1.type = host

# 是否使用IP

a1.sources.s1.interceptors.i1.useIP = false



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Static Interceptor

静态拦截器,本质上不是拦截器,而是在数据的headers中添加一个指定的字段。

Static Interceptor可以配置的选项有:

表-23 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是static

preserveExisting

optional

如果指定字段已经存在,是否替换。默认是false

key

optional

指定的键。默认是key

value

optional

指定的值,默认是value

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Static Interceptor

a1.sources.s1.interceptors.i1.type = static

# 指定键

a1.sources.s1.interceptors.i1.key = kind

# 指定值

a1.sources.s1.interceptors.i1.value = test



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

UUID Interceptor

UUID拦截器,本质上也不是一个拦截器,而是在数据的headers中添加一个id字段,可以用于标记数据的唯一性。

UUID Interceptor可以配置的选项包含:

表-24 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

optional

headers中添加的字段名,默认是id

preserveExisting

optional

如果headers中已经存在id字段,是否替换。默认是true

prefix

optional

在生成的id之前添加前缀

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置UUID Interceptor

a1.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Search And Replace Interceptor

搜索和替换拦截器,在使用的时候需要指定正则表达式,会根据正则表达式指定的规则,对Event中body部分的数据进行替换。注意,只替换body部分的数据,而不会影响headers中的数据。正则表达式的使用规则和Java中正则表达式的规则是一致的。

Search And Replace Interceptor中可以配置的选项包含:

表-25 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是search_replace

searchPattern

optional

搜索的正则表达式形式

replaceString

optional

替换的形式

charset

optional

body部分的字符集编码,默认是UTF-8

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Search And Replace Interceptor

a1.sources.s1.interceptors.i1.type = search_replace

# 指定正则表达式

a1.sources.s1.interceptors.i1.searchPattern = [a-z]

# 指定替换后的形式

a1.sources.s1.interceptors.i1.replaceString = *



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Regex Filtering Interceptor

正则过滤拦截器,在使用的时候需要指定一个正则表达式,然后根据属性excludeEvents的值来确定过滤方式。如果excludeEvents的值为true,则表示过滤掉符合正则表达式形式的数据,其他数据留下来;如果excludeEvents的值为false,则表示过滤掉不合符正则表达式形式的数据,符合形式的数据留下来。默认情况下,excludeEvents的值为false。

Regex Filtering Interceptor中可以配置的选项包含:

表-26 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是regex_filter

regex

optional

正则表达式

excludeEvents

optional

替换规则,默认为false

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Regex Filtering Interceptor

a1.sources.s1.interceptors.i1.type = regex_filter

# 指定正则表达式

a1.sources.s1.interceptors.i1.regex = .*[0-9].*

# 指定替换规则

a1.sources.s1.interceptors.i1.excludeEvents = true



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Custom Interceptor

在实际开发过程中,如果Flume提供的拦截器无法满足业务需求,那么Flume同样支持自定义拦截器。但是不同于其他组件的地方在于,Flume中的拦截器在启动的时候,是通过内部类Builder来启动,所以拦截器中需要覆盖其内部类。

步骤:

1)定义类实现Interceptor接口,覆盖其中的intercept方法,其他方法可以忽略掉;同时需要定义内部类实现Interceptor.Builder接口,覆盖其中的build方法;如果需要获取配置,那么可以配置configure方法。

package com.flume.interceptor;



import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;



import java.util.ArrayList;

import java.util.List;

import java.util.Map;



public class AuthInterceptor implements Interceptor {

    @Override

    public void initialize() {

    }



    @Override

    public Event intercept(Event event) {

        // 获取headers部分

        Map<String, String> headers = event.getHeaders();

        // 判断headers是否包含time或者timestamp字段

        if (headers.containsKey("time") || headers.containsKey("timestamp")) return event;

        // 如果没有,则添加当前的时间戳

        headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

        return event;

    }



    @Override

    public List<Event> intercept(List<Event> events) {

        // 定义新的集合

        List<Event> es = new ArrayList<>();

        // 遍历

        for (Event event : events) {

            es.add(intercept(event));

        }

        return es;

    }



    @Override

    public void close() {

    }



    public static class Builder implements Interceptor.Builder {



        @Override

        public Interceptor build() {

            return new AuthInterceptor();

        }



        @Override

        public void configure(Context context) {

        }

    }

}

2)将定义好的Interceptor打成jar包。

3)将jar包上传到Flume安装目录的lib目录下:

# 进入Flume的lib目录

cd /opt/software/flume-1.11.0/lib/

# 选择jar包,上传

rz

4)回到数据目录,编辑文件:

# 回到数据目录

cd ../data

# 编辑文件

vim authin.properties

在文件中添加:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Custom Interceptor

a1.sources.s1.interceptors.i1.type = com.flume.interceptor.AuthInterceptor$Builder



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

5)启动Flume:

flume-ng agent -n a1 -c $FLUME_HOME/conf -f authin.properties -Dflume.root.logger=INFO,console

其他

事务

在Flume中,也存在事务的问题:

图-7 Flume事务

流程如下:

1)Source进行doPut将数据写到临时的缓冲区PutList中;

2)PutList会推送数据给Channel,如果Channel中有足够的位置,则数据推送成功(doCommit),如果Channel中没有位置,则推送失败,进行回滚(doRollback);

3)Channel进行doTake操作将数据写到临时缓冲区TakeList中;

4)将TakeList中的数据通过Sink批量写往目的地;

5)如果写成功了,则执行doCommit操作;如果写失败了,则执行doRollback操作。

执行流程

Flume执行流程如下图所示:

图-8 Flume执行流程

流程如下:

1)Source会先采集数据,然后将数据发送给ChannelProcessor进行处理;

2)ChannelProcessor收到数据处理之后,会将数据交给Interceptor来处理,注意,在Flume允许存在多个Interceptor来构成拦截器链;

3)Interceptor处理完成之后,会交给Channel Selector处理,Selector存在三种模式:replicating、multiplexing和load_balancing。Selector收到数据之后会根据对应的模式将数据交给对应的Channel来处理;

4)Channel处理之后会交给SinkProcessor。SinkProcessor本质上是一个Sinkgroup,包含了三种方式:default,failover和load_balance。SinkProcessor收到数据之后会根据对应的方式将数据交给Sink来处理;

5)Sink收到数据之后,会将数据写到指定的目的地。

扩展:Flume监控

Ganglia概述

Flume的数据流可以通过Ganglia来进行监控。Ganglia是UC Berkeley发起的一个开源的集群监控项目,可以用于监测数以千计的节点性能。

Ganglia的核心主要包含了三个模块:

1)gmond(Ganglia Monitoring Daemon):轻量级的服务,需要安装在每一个需要收集指标数据的主机上。gmond可以用于收集系统指标数据,包含CPU、内存、磁盘、网络以及活跃的进程数量等。

2)gmetad(Ganglia Meta Daemon):用于整合所有的信息,并将这些信息以RRD格式来存储到磁盘上。

3)gweb(Ganglia Web):Ganglia提供的一个可视化工具,本身是使用PHP开发的,提供了WEB页面,在WEB界面中以图标形式来显示集群的运行状态,以及所收集到的不同的指标数据。

Ganglia安装

1)所有节点安装httpd和php服务:

yum -y install httpd php

2)所有节点安装关联依赖:

yum -y install rrdtool perl-rrdtool rrdtool-devel apr-devel

3)安装Epel:

yum -y install epel-release

4)在第一个节点上安装gmetad,gmond和gweb:

yum -y install ganglia-gmetad

yum -y install ganglia-gmond

yum -y install ganglia-web

5)在其他节点上安装gmond:

yum -y install ganglia-gmond

6)第一个节点上修改ganglia.conf:

# 编辑文件

vim /etc/httpd/conf.d/ganglia.conf

# 修改内容如下

<Location /ganglia>

  # Require local

  # Require ip 10.1.2.3

  # Require host example.org

  Require all granted

</Location>

7)第一个节点上修改gmetad.conf:

# 编辑文件

vim /etc/ganglia/gmetad.conf

# 修改data_source属性

data_source "flume_cluster" hadoop01

8)所有节点修改gmond.conf文件:

# 编辑文件

vim /etc/ganglia/gmond.conf

# 修改cluster中的属性值

cluster {

  name = "flume_cluster"

  owner = "unspecified"

  latlong = "unspecified"

  url = "unspecified"

}

# 修改udp_send_channel中的属性值

udp_send_channel {

  #bind_hostname = yes # Highly recommended, soon to be default.

                       # This option tells gmond to use a source address

                       # that resolves to the machine's hostname.  Without

                       # this, the metrics may appear to come from any

                       # interface and the DNS names associated with

                       # those IPs will be used to create the RRDs.

  #mcast_join = 239.2.11.71

  # 监控数据发送给hadoop01

  host = hadoop01

  port = 8649

  ttl = 1

}

# 修改udp_recv_channel中的属性值

udp_recv_channel {

  # mcast_join = 239.2.11.71

  port = 8649

  # 接收来自任意连接的数据

  bind = 0.0.0.0

  retry_bind = true

  # Size of the UDP buffer. If you are handling lots of metrics you really

  # should bump it up to e.g. 10MB or even higher.

  # buffer = 10485760

}

9)所有节点关闭selinux:

# 编辑文件

vim /etc/selinux/config

# 修改SELINUX属性的值为disabled

SELINUX=disabled

# 保存退出,重启服务器

reboot

10)所有节点启动gmond:

systemctl start gmond

11)在第一个节点上启动gmetad和httpd:

systemctl start gmetad

systemctl start httpd

12)在浏览器输入http://IP/ganglia,查看Ganglia页面。

图-9 Ganglia界面

监控Flume

1)修改Flume的配置文件:

# 进入Flume的配置目录

cd /opt/software/flume-1.11.0/conf/

# 复制文件

cp flume-env.sh.template flume-env.sh

# 编辑文件

vim flume-env.sh

# 在文件尾部添加

export JAVA_HOME=/opt/software/jdk1.8

export JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.root.monitoring.hosts=hadoop01:8649 -Xms100m -Xmx200m"

# 保存退出,生效

source flume-env.sh

2)启动Flume:

# 回到数据目录

cd ../data

# 启动Flume

flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop01:8649

3)属性解释:

表-27 属性解释

属性

解释

ChannelCapacity

Channel的容量

ChannelFillPercentage

Channel的利用率

ChannelSize

Channel的大小

EventPutAttemptCount

Source试图放入Channel的次数

EventPutSuccessCount

Source向Channel成功放入数据的次数

EventTakeAttemptCount

Channel试图向Sink发送数据的次数

EventTakeSuccessCount

Channel向Sink成功发送数据的次数

startTime

起始时间

stopTime

结束时间

相关推荐

  1. Flume详解

    2024-03-31 03:38:02       39 阅读
  2. Flume工具详解

    2024-03-31 03:38:02       24 阅读
  3. <span style='color:red;'>Flume</span>

    Flume

    2024-03-31 03:38:02      58 阅读
  4. flume

    2024-03-31 03:38:02       39 阅读
  5. <span style='color:red;'>Flume</span>

    Flume

    2024-03-31 03:38:02      142 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-31 03:38:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-31 03:38:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-31 03:38:02       87 阅读
  4. Python语言-面向对象

    2024-03-31 03:38:02       96 阅读

热门阅读

  1. 智慧工地整体解决方案(1)

    2024-03-31 03:38:02       43 阅读
  2. sort和priority_queue的自定义比较函数

    2024-03-31 03:38:02       43 阅读
  3. 智能合约测试例子

    2024-03-31 03:38:02       50 阅读
  4. vector快速入门

    2024-03-31 03:38:02       46 阅读
  5. 【MySQL】MySQL中SQL语句的索引分析

    2024-03-31 03:38:02       36 阅读
  6. QT5.14.2 码上热浪,用Qt5狂暴轰入多媒体狂潮

    2024-03-31 03:38:02       39 阅读
  7. IMBoy项目的缓存机制:高效数据处理的秘诀

    2024-03-31 03:38:02       40 阅读
  8. 韦东山TCP/UDP编程

    2024-03-31 03:38:02       38 阅读
  9. MySQL5.7源码分析--连接

    2024-03-31 03:38:02       47 阅读
  10. 花钱的艺术:消费和投资如何分配

    2024-03-31 03:38:02       38 阅读