大数据
基础组件

Flume 01 - Flume的安装及Source案例

简介:Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

1. Flume

Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有以下设计目标:

  1. 可靠性

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。

  1. 可扩展性

Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。

  1. 可管理性

所有agent和colletor由master统一管理,这使得系统便于维护。多master情况,Flume利用ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。

  1. 功能可扩展性

用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file, syslog等),collector和storage(file,HDFS等)。

1.1. Flume的安装

Flume的安装非常简单,只需要解压相应的安装包,并且配置环境变量即可:

  • ubuntu@s100:~/software$ tar zxf apache-flume-1.7.0-bin.tar.gz -C /soft/
  • ubuntu@s100:~/software$ cd /soft/
  • ubuntu@s100:/soft$ ln -s apache-flume-1.7.0-bin flume

环境变量:

  • FLUME_HOME=/soft/flume
  • PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/soft/jdk/bin:/soft/hadoop/bin:/soft/hadoop/sbin:/soft/eclipse:/soft/maven/bin:/soft/hive/bin:/soft/hbase/bin:/soft/zk/bin:/soft/pig/bin:/soft/sqoop/bin:/soft/flume/bin"

安装完后,使用flume-ng version查看版本验证是否安装成功:

  • ubuntu@s100:/soft$ flume-ng version
  • Flume 1.7.0
  • Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
  • Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
  • Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
  • From source with checksum 0d21b3ffdc55a07e1d08875872c00523

1.2. Flume的配置

${FLUME_HOME}/conf目录下有几个配置文件的模板,我们需要复制其中的两个配置文件:

  • ubuntu@s100:/soft/flume/conf$ cp flume-env.sh.template flume-env.sh
  • ubuntu@s100:/soft/flume/conf$ cp flume-conf.properties.template flume-conf.properties

另外还需要修改flume-env.sh文件中的JAVA_HOME环境变量为我们的JDK目录。我们可以查看flume-conf.properties,该文件的内容说明如下:

  • # 配置agent的所有source
  • agent.sources = seqGenSrc
  • # 配置agent所有通道
  • agent.channels = memoryChannel
  • # 配置agent所有sink
  • agent.sinks = loggerSink
  • # source
  • agent.sources.seqGenSrc.type = seq
  • agent.sources.seqGenSrc.channels = memoryChannel
  • # sink
  • agent.sinks.loggerSink.type = logger
  • agent.sinks.loggerSink.channel = memoryChannel
  • # channel
  • agent.channels.memoryChannel.type = memory
  • # Other config values specific to each type of channel(sink or source)
  • # can be defined as well
  • # In this case, it specifies the capacity of the memory channel
  • # agent.channels.memoryChannel.capacity = 100

注:其中的agent可以是我们自己命名的agentName。

我们使用命令启动agent:

  • ubuntu@s100:/soft/flume/conf$ flume-ng agent -f /soft/flume/conf/flume-conf.properties -n agent

1.3. nc案例

我们可以配置以下的配置文件example.conf:

  • # example.conf: A single-node Flume configuration
  • # Name the components on this agent
  • a1.sources = r1
  • a1.sinks = k1
  • a1.channels = c1
  • # Describe/configure the source
  • a1.sources.r1.type = netcat
  • a1.sources.r1.bind = 0.0.0.0
  • a1.sources.r1.port = 44444
  • # Describe the sink
  • a1.sinks.k1.type = logger
  • # Use a channel which buffers events in memory
  • a1.channels.c1.type = memory
  • a1.channels.c1.capacity = 1000
  • a1.channels.c1.transactionCapacity = 100
  • # Bind the source and sink to the channel
  • a1.sources.r1.channels = c1
  • a1.sinks.k1.channel = c1

然后启动Flume的Agent:

  • ubuntu@s100:/soft/flume$ bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=INFO,console
  • Info: Sourcing environment configuration script /soft/flume/conf/flume-env.sh
  • Info: Including Hadoop libraries found via (/soft/hadoop/bin/hadoop) for HDFS access
  • Info: Excluding /soft/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath
  • Info: Excluding /soft/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath
  • ...
  • 2017-07-20 08:30:01,645 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
  • 2017-07-20 08:30:01,646 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
  • 2017-07-20 08:30:01,659 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:44444]

可以发现,此时它已经在监听本地的44444端口了,我们可以向该端口发送消息:

  • ubuntu@s100:/soft/flume/conf$ nc localhost 44444
  • hello flume
  • OK

Flume是可以收到的:

  • 2017-07-20 08:33:29,151 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 hello flume }

注1:在启动Flume的时候,使用的SLF4J包会与Hive及HBase的冲突;
注2:在Flume的配置项中a1.sources.r1.bind = localhost指定只能由本机为Flume的Source,如果需要外界也可以连接,则需要指定为0.0.0.0

2. 各类测试

2.1. Avro源测试

可以配置Avro作为Flume的源,将数据文件以Avro串行化的方式输入到Fume的source中;我们在{FLUME_HOME/conf}目录下创建下列的配置文件:

  • #component
  • avro-agent.sources = r1
  • avro-agent.channels = c1
  • avro-agent.sinks = s1
  • #r1
  • avro-agent.sources.r1.type = avro
  • avro-agent.sources.r1.channels = c1
  • avro-agent.sources.r1.bind = 0.0.0.0
  • avro-agent.sources.r1.port = 4141
  • #s1
  • avro-agent.sinks.s1.type = logger
  • #c1
  • avro-agent.channels.c1.type = memory
  • #bind
  • avro-agent.sources.r1.channels = c1
  • avro-agent.sinks.s1.channel = c1

然后开启Flume的Agent:

  • ubuntu@s100:/soft/flume/conf$ flume-ng agent -f avro-source.conf -n avro-agent -Dflume.root.logger=INFO,console
  • ...
  • 17/07/22 01:29:26 INFO source.AvroSource: Avro source r1 started.

在另一个控制台使用下列命令将文件输出到Flume:

  • ubuntu@s100:~$ flume-ng avro-client -F /home/ubuntu/customer.data -H localhost -p 4141

可以在之前的控制台得到打印内容,表明Flume接收到了数据:

  • 17/07/22 01:30:53 INFO ipc.NettyServer: [id: 0x1de0540b, /127.0.0.1:47395 => /127.0.0.1:4141] OPEN
  • 17/07/22 01:30:53 INFO ipc.NettyServer: [id: 0x1de0540b, /127.0.0.1:47395 => /127.0.0.1:4141] BOUND: /127.0.0.1:4141
  • 17/07/22 01:30:53 INFO ipc.NettyServer: [id: 0x1de0540b, /127.0.0.1:47395 => /127.0.0.1:4141] CONNECTED: /127.0.0.1:47395
  • 17/07/22 01:30:54 INFO ipc.NettyServer: [id: 0x1de0540b, /127.0.0.1:47395 :> /127.0.0.1:4141] DISCONNECTED
  • 17/07/22 01:30:54 INFO ipc.NettyServer: [id: 0x1de0540b, /127.0.0.1:47395 :> /127.0.0.1:4141] UNBOUND
  • 17/07/22 01:30:54 INFO ipc.NettyServer: [id: 0x1de0540b, /127.0.0.1:47395 :> /127.0.0.1:4141] CLOSED
  • 17/07/22 01:30:54 INFO ipc.NettyServer: Connection to /127.0.0.1:47395 disconnected.
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 31 2C 54 6F 6D 2C 31 38 2C 43 41 1,Tom,18,CA }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 32 2C 4A 6F 68 6E 2C 32 30 2C 46 4C 2,John,20,FL }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 33 2C 4A 61 63 6B 2C 32 32 2C 4F 48 3,Jack,22,OH }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 35 2C 52 6F 73 65 2C 32 31 2C 46 4C 5,Rose,21,FL }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 36 2C 4C 65 65 2C 32 31 2C 43 4F 6,Lee,21,CO }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 37 2C 54 69 6D 2C 32 31 2C 43 41 7,Tim,21,CA }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 38 2C 4D 69 63 6B 2C 32 31 2C 46 4C 8,Mick,21,FL }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 39 2C 4C 69 6C 79 2C 32 31 2C 4F 48 9,Lily,21,OH }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 31 30 2C 6C 75 63 79 2C 32 31 2C 50 41 10,lucy,21,PA }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 2C 54 65 64 2C 32 31 2C 43 4F 11,Ted,21,CO }
  • 17/07/22 01:30:58 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 2C 4A 61 6D 65 73 2C 32 31 2C 43 41 12,James,21,CA }

对于Flume的配置文件,可以将其放置在Zookeeper上,我们使用下列的Java程序将刚刚的Avro-agent.conf文件内容写入到Zookeeper的/flume/avro-agent节点:

  • package com.coderap.flume;
  • import java.io.FileInputStream;
  • import org.apache.zookeeper.CreateMode;
  • import org.apache.zookeeper.ZooDefs;
  • import org.apache.zookeeper.ZooKeeper;
  • public class App {
  • public static void main(String[] args) throws Exception {
  • ZooKeeper zk = new ZooKeeper("s101:2181", 5000, null);
  • FileInputStream fis = new FileInputStream("d:/_Software/avro-source.conf");
  • byte[] bytes = new byte[fis.available()];
  • fis.read(bytes);
  • fis.close();
  • String path = zk.create("/flume/avro-agent", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  • System.out.println(path);
  • }
  • }

查看Zookeeper上写入的节点内容:

  • [zk: localhost:2181(CONNECTED) 7] create /flume ""
  • ...
  • [zk: localhost:2181(CONNECTED) 15] get /flume/avro-agent
  • #component
  • avro-agent.sources = r1
  • avro-agent.channels = c1
  • avro-agent.sinks = s1
  • #r1
  • avro-agent.sources.r1.type = avro
  • avro-agent.sources.r1.channels = c1
  • avro-agent.sources.r1.bind = 0.0.0.0
  • avro-agent.sources.r1.port = 4141
  • #s1
  • avro-agent.sinks.s1.type = logger
  • #c1
  • avro-agent.channels.c1.type = memory
  • #bind
  • avro-agent.sources.r1.channels = c1
  • avro-agent.sinks.s1.channel = c1
  • cZxid = 0xe0000001b
  • ctime = Sat Jul 22 01:46:56 PDT 2017
  • mZxid = 0xe0000001b
  • mtime = Sat Jul 22 01:46:56 PDT 2017
  • pZxid = 0xe0000001b
  • cversion = 0
  • dataVersion = 0
  • aclVersion = 0
  • ephemeralOwner = 0x0
  • dataLength = 403
  • numChildren = 0

在启动Flume的Agent的时候,使用Zookeeper上的agent信息:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf -z s101:2181 -p /flume --name avro-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 01:51:11,067 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
  • 2017-07-22 01:51:11,073 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started.
  • ...

开启另一个终端发送文件数据:

  • ubuntu@s100:~$ flume-ng avro-client -F /home/ubuntu/customer.data -H localhost -p 4141

同样可以收到接收数据打印信息:

  • 2017-07-22 01:51:27,662 (New I/O server boss #1 ([id: 0x38fd7c7f, /0:0:0:0:0:0:0:0:4141])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x8bb3bdd5, /127.0.0.1:47397 => /127.0.0.1:4141] OPEN
  • 2017-07-22 01:51:27,665 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x8bb3bdd5, /127.0.0.1:47397 => /127.0.0.1:4141] BOUND: /127.0.0.1:4141
  • 2017-07-22 01:51:27,667 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x8bb3bdd5, /127.0.0.1:47397 => /127.0.0.1:4141] CONNECTED: /127.0.0.1:47397
  • 2017-07-22 01:51:28,044 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 2C 54 6F 6D 2C 31 38 2C 43 41 1,Tom,18,CA }
  • 2017-07-22 01:51:28,045 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 32 2C 4A 6F 68 6E 2C 32 30 2C 46 4C 2,John,20,FL }
  • 2017-07-22 01:51:28,045 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 2C 4A 61 63 6B 2C 32 32 2C 4F 48 3,Jack,22,OH }
  • 2017-07-22 01:51:28,045 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 35 2C 52 6F 73 65 2C 32 31 2C 46 4C 5,Rose,21,FL }
  • 2017-07-22 01:51:28,045 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 36 2C 4C 65 65 2C 32 31 2C 43 4F 6,Lee,21,CO }
  • 2017-07-22 01:51:28,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 37 2C 54 69 6D 2C 32 31 2C 43 41 7,Tim,21,CA }
  • 2017-07-22 01:51:28,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 38 2C 4D 69 63 6B 2C 32 31 2C 46 4C 8,Mick,21,FL }
  • 2017-07-22 01:51:28,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 39 2C 4C 69 6C 79 2C 32 31 2C 4F 48 9,Lily,21,OH }
  • 2017-07-22 01:51:28,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 30 2C 6C 75 63 79 2C 32 31 2C 50 41 10,lucy,21,PA }
  • 2017-07-22 01:51:28,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 2C 54 65 64 2C 32 31 2C 43 4F 11,Ted,21,CO }
  • 2017-07-22 01:51:28,080 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 2C 4A 61 6D 65 73 2C 32 31 2C 43 41 12,James,21,CA }
  • 2017-07-22 01:51:28,085 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x8bb3bdd5, /127.0.0.1:47397 :> /127.0.0.1:4141] DISCONNECTED
  • 2017-07-22 01:51:28,086 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x8bb3bdd5, /127.0.0.1:47397 :> /127.0.0.1:4141] UNBOUND
  • 2017-07-22 01:51:28,086 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x8bb3bdd5, /127.0.0.1:47397 :> /127.0.0.1:4141] CLOSED
  • 2017-07-22 01:51:28,086 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:47397 disconnected.

2.2. Exec源测试

Flume可以配置一条Shell脚本的执行输出作为Source;同样的,在{FLUME_HOME}/conf目录下创建exec-source.conf配置文件,内容如下:

  • #component
  • exec-agent.sources = r1
  • exec-agent.channels = c1
  • exec-agent.sinks = s1
  • #r1
  • exec-agent.sources.r1.type = exec
  • exec-agent.sources.r1.command=tail -F /home/ubuntu/exec.data
  • exec-agent.sources.r1.port = 4141
  • #s1
  • exec-agent.sinks.s1.type = logger
  • #c1
  • exec-agent.channels.c1.type = memory
  • #bind
  • exec-agent.sources.r1.channels = c1
  • exec-agent.sinks.s1.channel = c1

然后启动Flume的Agent:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf -f conf/exec-source.conf -n exec-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 02:36:11,782 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started

我们向~/exec.data文件中输入一些内容:

  • ubuntu@s100:~$ echo hello world >> exec.data

就会在Flume的Agent环境终端下得到打印内容:

  • 2017-07-22 02:36:44,434 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

2.3. Spool源测试

Flume可以监控某个目录下的文件的改变,这个功能常用作一些Web服务器的Log文件监控;在{FLUME_HOME}/conf下配置spool-source.conf文件,内容如下:

  • #component
  • spool-agent.sources = r1
  • spool-agent.channels = c1
  • spool-agent.sinks = s1
  • #r1
  • spool-agent.sources.r1.type = spoolDir
  • spool-agent.sources.r1.spoolDir=/home/ubuntu/spoolDir
  • #s1
  • spool-agent.sinks.s1.type = logger
  • #c1
  • spool-agent.channels.c1.type = memory
  • #bind
  • spool-agent.sources.r1.channels = c1
  • spool-agent.sinks.s1.channel = c1

然后启动Flume的Agent:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf --conf-file conf/spool-source.conf --name spool-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 04:46:34,520 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:78)] SpoolDirectorySource source starting with directory: /home/ubuntu/spoolDir
  • 2017-07-22 04:46:34,554 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
  • 2017-07-22 04:46:34,556 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started

此时我们向/home/ubuntu/spoolDir目录中创建文件,Flume的Agent就可以收到信息:

  • ubuntu@s100:~$ echo hello world > spoolDir/1.txt

Flume收到的信息如下:

  • 2017-07-22 04:48:29,090 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
  • 2017-07-22 04:48:29,091 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /home/ubuntu/spoolDir/1.txt to /home/ubuntu/spoolDir/1.txt.COMPLETED
  • 2017-07-22 04:48:32,563 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

2.4. Seq测试源

Flume可以配置Sequence数据作为源;在{FLUME_HOME}/conf目录下创建seq-source.conf文件,配置如下:

  • #component
  • seq-agent.sources = r1
  • seq-agent.channels = c1
  • seq-agent.sinks = s1
  • #r1
  • seq-agent.sources.r1.type = seq
  • #s1
  • seq-agent.sinks.s1.type = logger
  • #c1
  • seq-agent.channels.c1.type = memory
  • #bind
  • seq-agent.sources.r1.channels = c1
  • seq-agent.sinks.s1.channel = c1

启动Flume的Agent,将源源不断的打印信息:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf --conf-file conf/seq-source.conf --name seq-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 04:53:47,616 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
  • 2017-07-22 04:53:47,623 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 30 0 }
  • 2017-07-22 04:53:47,624 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 1 }
  • 2017-07-22 04:53:47,625 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 32 2 }
  • 2017-07-22 04:53:47,625 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
  • 2017-07-22 04:53:47,625 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 34 4 }
  • 2017-07-22 04:53:47,625 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 35 5 }
  • 2017-07-22 04:53:47,626 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 36 6 }
  • 2017-07-22 04:53:47,626 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 37 7 }
  • 2017-07-22 04:53:47,627 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 38 8 }
  • 2017-07-22 04:53:47,627 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 39 9 }
  • 2017-07-22 04:53:47,628 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 30 10 }
  • ...

2.5. TCP报文源测试

Flume可以接受TCP报文传递的源数据;在{FLUME_HOME}/conf目录下创建syslogtcp-source.conf文件,配置如下:

  • #component
  • syslogtcp-agent.sources = r1
  • syslogtcp-agent.channels = c1
  • syslogtcp-agent.sinks = s1
  • #r1
  • syslogtcp-agent.sources.r1.type = syslogtcp
  • syslogtcp-agent.sources.r1.host = localhost
  • syslogtcp-agent.sources.r1.port = 8888
  • #s1
  • syslogtcp-agent.sinks.s1.type = logger
  • #c1
  • syslogtcp-agent.channels.c1.type = memory
  • #bind
  • syslogtcp-agent.sources.r1.channels = c1
  • syslogtcp-agent.sinks.s1.channel = c1

然后启动Flume的Agent:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf --conf-file conf/syslogtcp-source.conf --name syslogtcp-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 04:57:27,581 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
  • 2017-07-22 04:57:27,582 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink s1
  • 2017-07-22 04:57:27,583 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
  • 2017-07-22 04:57:27,630 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.SyslogTcpSource.start(SyslogTcpSource.java:119)] Syslog TCP Source starting...

此时可以使用nc命令向8888端口发送消息:

  • ubuntu@s100:~$ nc localhost 8888
  • hello world

可以在Flume的终端接收到数据打印:

  • 2017-07-22 04:59:48,290 (New I/O worker #1) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:316)] Event created from Invalid Syslog data.
  • 2017-07-22 04:59:49,641 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6F 08 hello world }

2.6. UDP报文源测试

Flume同样可以接收UCP报文传递的源数据;在{FLUME_HOME}/conf目录下创建syslogucp-source.conf文件,配置如下:

  • #component
  • syslogudp-agent.sources = r1
  • syslogudp-agent.channels = c1
  • syslogudp-agent.sinks = s1
  • #r1
  • syslogudp-agent.sources.r1.type = syslogudp
  • syslogudp-agent.sources.r1.host = localhost
  • syslogudp-agent.sources.r1.port = 8888
  • #s1
  • syslogudp-agent.sinks.s1.type = logger
  • #c1
  • syslogudp-agent.channels.c1.type = memory
  • #bind
  • syslogudp-agent.sources.r1.channels = c1
  • syslogudp-agent.sinks.s1.channel = c1

然后启动Flume的Agent:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf --conf-file conf/syslogudp-source.conf --name syslogudp-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 05:04:53,320 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
  • 2017-07-22 05:04:53,323 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink s1
  • 2017-07-22 05:04:53,323 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1

需要使用Java代码来实现UDP报文发送:

  • @Test
  • public void testSyslogudp() throws Exception {
  • DatagramSocket socket = new DatagramSocket();
  • byte[] data = "hello world, udp".getBytes();
  • DatagramPacket packet = new DatagramPacket(data, data.length);
  • packet.setAddress(InetAddress.getByName("192.168.127.255"));
  • packet.setPort(8888);
  • socket.send(packet);
  • socket.close();
  • System.out.println("send over");
  • }

可以在Flume的Agent终端收到消息打印:

  • 2017-07-22 06:47:52,045 (Old I/O datagram worker ([id: 0x68fedd2f, 0.0.0.0/0.0.0.0:8888])) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:316)] Event created from Invalid Syslog data.
  • 2017-07-22 06:47:56,397 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 2C 20 75 64 70 hello world, udp }

2.7. HTTP请求源测试

Flume可以接收HTTP请求传递的源数据,需要注意的是,请求的HTTP传递的数据格式必须为JSON数据;在{FLUME_HOME}/conf目录下创建http-source.conf文件,配置如下:

  • #component
  • http-agent.sources = r1
  • http-agent.channels = c1
  • http-agent.sinks = s1
  • #r1
  • http-agent.sources.r1.type = http
  • http-agent.sources.r1.host = 0.0.0.0
  • http-agent.sources.r1.port = 8888
  • #s1
  • http-agent.sinks.s1.type = logger
  • #c1
  • http-agent.channels.c1.type = memory
  • #bind
  • http-agent.sources.r1.channels = c1
  • http-agent.sinks.s1.channel = c1

启动Flume的Agent:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf --conf-file conf/http-source.conf --name http-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 07:06:55,617 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started

然后使用PostMan向http://s100:8888发送以下JSON数据:

  • [{
  • "headers" : {
  • "timestamp" : "434324343",
  • "host" : "random_host.example.com"
  • },
  • "body" : "random_body"
  • },
  • {
  • "headers" : {
  • "namenode" : "namenode.example.com",
  • "datanode" : "random_datanode.example.com"
  • },
  • "body" : "really_random_body"
  • }]

在Flume的终端会收到以下打印信息:

  • 2017-07-22 07:14:53,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{host=random_host.example.com, timestamp=434324343} body: 72 61 6E 64 6F 6D 5F 62 6F 64 79 random_body }
  • 2017-07-22 07:14:53,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo }

注:package -DskipTests可以在打包时跳过测试阶段

2.8. Stress压力源测试

2.9. 自定义源测试

我们可以自己使用Java代码实现Flume源,如下代码:

注:FQCN,Full Qulifier Component Name,即类全限定名。

  • package com.coderap.flume.source;
  • import java.util.Date;
  • import java.util.HashMap;
  • import org.apache.flume.Context;
  • import org.apache.flume.Event;
  • import org.apache.flume.FlumeException;
  • import org.apache.flume.channel.ChannelProcessor;
  • import org.apache.flume.event.SimpleEvent;
  • import org.apache.flume.source.AbstractEventDrivenSource;
  • public class MySource extends AbstractEventDrivenSource {
  • @Override
  • protected void doConfigure(Context context) throws FlumeException {
  • }
  • @Override
  • protected void doStart() throws FlumeException {
  • ChannelProcessor cp = this.getChannelProcessor();
  • HashMap<String, String> map = new HashMap<String, String>();
  • map.put("owner", "me");
  • map.put("date", new Date().toString());
  • Event e = null;
  • for (int i = 0; i < 1000; i++) {
  • e = new SimpleEvent();
  • e.setBody(("Tom" + i).getBytes());
  • e.setHeaders(map);
  • cp.processEvent(e);
  • }
  • }
  • @Override
  • protected void doStop() throws FlumeException {
  • }
  • }

然后在{FLUME_HOME}/conf目录下添加一个custom-source.conf配置文件:

  • #component
  • custom-agent.sources = r1
  • custom-agent.channels = c1
  • custom-agent.sinks = s1
  • #r1
  • custom-agent.sources.r1.type = com.coderap.flume.source.MySource
  • #s1
  • custom-agent.sinks.s1.type = logger
  • #c1
  • custom-agent.channels.c1.type = memory
  • #bind
  • custom-agent.sources.r1.channels = c1
  • custom-agent.sinks.s1.channel = c1

在上面的配置文件中,指定了自定义源的全限定类名;接下来只需要启动Flume的Agent,就可以看到相应的打印信息:

  • ubuntu@s100:/soft/flume$ flume-ng agent --conf conf --conf-file conf/custom-source.conf --name custom-agent -Dflume.root.logger=INFO,console
  • ...
  • 2017-07-22 23:25:43,701 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
  • 2017-07-22 23:25:44,073 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink s1
  • 2017-07-22 23:25:44,084 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
  • 2017-07-22 23:25:44,121 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 30 Tom0 }
  • 2017-07-22 23:25:44,132 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 31 Tom1 }
  • 2017-07-22 23:25:44,132 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 32 Tom2 }
  • 2017-07-22 23:25:44,134 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 33 Tom3 }
  • 2017-07-22 23:25:44,134 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 34 Tom4 }
  • 2017-07-22 23:25:44,136 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 35 Tom5 }
  • 2017-07-22 23:25:44,136 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 36 Tom6 }
  • 2017-07-22 23:25:44,137 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 37 Tom7 }
  • 2017-07-22 23:25:44,137 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 38 Tom8 }
  • 2017-07-22 23:25:44,138 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{owner=me, date=Sat Jul 22 23:25:44 PDT 2017} body: 54 6F 6D 39 Tom9 }
  • ...