Flume10 Apr 2019 • Leave Comments
flume [c] an inclined channel for conveying water (as for power); a ravine or gorge with a stream running through it.
Apache Flume is a distributed system for collecting, aggregating and moving log data from many different sources to a centralized data store.
- Require Java Runtime Environment (JRE) - 1.8 or later.
- A Flume event is byte payload with optional string atrributes - also called Flume data.
A Flume agent is a JVM process consisting of three components: sources, channels, and sinks.
When a source receives an event, it stores it into one or more channels. The channel keeps the event until it is consumed by a Flume sink.
Channels are passive buffers between sources and sinks: neither send nor pull events proactively.
Multiple agents can be connected to form a multi-hop events flow. For example, an avro-typed source can recive external events from an avro-typed sink, using RPC.
Usually, Flume configuration files end with .properties suffix.
Always keep in mind that, an agent may have multiple sources, multiple channels, and multiple sinks. How these multiple objects are configured to cooperate are important!
Each source, channel and sink has a type property.
For example, a source type may be exec; a channel type may be memory; a sink type may be avro.
Other properties are set accordingly.
A Flume source can have an interceptors property. Interceptors are used to tranform recevied events before putting them onto a channel.
For example, an interceptor may be chosen to translate raw event data into Json format.
my_agent.sources.r12a.type = exec my_agent.sources.r12a.channels=c12 my_agent.sources.r12a.command=tail -F /var/log/server.log my_agent.sources.r12a.logStdErr = true my_agent.sources.r12a.batchSize = 100 my_agent.sources.r12a.interceptors=log2Json my_agent.sources.r12a.interceptors.log2Json.type=Flume.log2Json$Builder
Flume.log2Json$Builderis an external Java class.
Multiple sinks can be grouped (sink group) to achieve load_balance or failover from one sink to another, with the help of sink processor.
my_agent.sinkgroups=g12a my_agent.sinkgroups.g12a.sinks = k12a k12b k12c k12d k12e k12f k12g k12h my_agent.sinkgroups.g12a.processor.type = load_balance my_agent.sinkgroups.g12a.processor.backoff = true my_agent.sinkgroups.g12a.processor.selector = round_robin my_agent.sinkgroups.g12a.processor.selector.maxTimeOut = 3000
If backoff is enabled, the sink processor will backlist sinks that fail, removing them for a given timeout. When the timeout ends, if the sink is still unresponsive, the timeout is increased exponentionally.
Each source and sink should be assigned to a channel.
my_agent.sources.r12a.channels = c12 my_agent.sources.r12b.channels = c12 my_agent.sinks.k12a.channel = c12 my_agent.sinks.k12b.channel = c12 my_agent.sinks.k12c.channel = c12 my_agent.sinks.k12d.channel = c12 my_agent.sinks.k12e.channel = c12 my_agent.sinks.k12f.channel = c12 my_agent.sinks.k12g.channel = c12 my_agent.sinks.k12h.channel = c12
Starting an Agent
Flume distribution brings in a shell script under its bin directory: flume-ng. We should specify the agent name defined in configuration file.
~ # bin/flume-ng --help ~ # bin/flume-ng agent -c conf -f conf/flume-conf.properties -n $agent_name