Flume Agent和Collector 的监控(Monitor)
使用Flume实时收集日志的过程中,尽管有事务机制保证数据不丢失,但仍然需要时刻关注Source、Channel、Sink之间的消息传输是否正常,比如,SouceàChannel传输了多少消息,ChannelàSink又传输了多少,两处的消息量是否偏差过大等等。
Flume为我们提供了Monitor的机制:http://flume.apache.org/FlumeUserGuide.html#monitoring 通过Reporting的方式,把过程中的Counter都打印出来。一共有4种Reporting方式,JMX Reporting、Ganglia Reporting、JSON Reporting、Custom Reporting, 这里以最简单的JSON Reporting为例。
在启动Flume Agent时候,增加两个参数:
flume-ng agent -n agent_lxw1234 –conf . -f agent_lxw1234_file_2_kafka.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
flume.monitoring.type=http 指定了Reporting的方式为http,flume.monitoring.port 指定了http服务的端口号。
启动后,会在Flume Agent所在的机器上启动http服务,http://<hostname>:34545/metrics 打开该地址后,返回一段JSON:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
{ "SINK.sink_lxw1234":{ "ConnectionCreatedCount":"0", "BatchCompleteCount":"0", "BatchEmptyCount":"72", "EventDrainAttemptCount":"0", "StartTime":"1518400034824", "BatchUnderflowCount":"43", "ConnectionFailedCount":"0", "ConnectionClosedCount":"0", "Type":"SINK", "RollbackCount":"0", "EventDrainSuccessCount":"244", "KafkaEventSendTimer":"531", "StopTime":"0" }, "CHANNEL.file_channel_lxw1234":{ "Unhealthy":"0", "ChannelSize":"0", "EventTakeAttemptCount":"359", "StartTime":"1518400034141", "Open":"true", "CheckpointWriteErrorCount":"0", "ChannelCapacity":"10000", "ChannelFillPercentage":"0.0", "EventTakeErrorCount":"0", "Type":"CHANNEL", "EventTakeSuccessCount":"244", "Closed":"0", "CheckpointBackupWriteErrorCount":"0", "EventPutAttemptCount":"244", "EventPutSuccessCount":"244", "EventPutErrorCount":"0", "StopTime":"0" }, "SOURCE.source_lxw1234":{ "EventReceivedCount":"244", "AppendBatchAcceptedCount":"45", "Type":"SOURCE", "AppendReceivedCount":"0", "EventAcceptedCount":"244", "StartTime":"1518400034767", "AppendAcceptedCount":"0", "OpenConnectionCount":"0", "AppendBatchReceivedCount":"45", "StopTime":"0" } } |
我的例子中,Source为TAILDIR,Channel为FileChannel,Sink为Kafka Sink。三个JSON对象分别打印出三个组件的Counter信息。
比如:SOURCE中”EventReceivedCount”:”244″ 表示SOURCE从文件中读取到244条消息;
CHANNEL中”EventPutSuccessCount”:”244″ 表示成功存放244条消息;
SINK中”EventDrainSuccessCount”:”244″ 表示成功向Kafka发送了244条消息。