This article quickly describes how we handled the transit of external-generated customers events toward our internal Kafka cluster and how we built a reliable failover system using Flume-NG.
We started to work with Apache Flume on its 0.9 version since the beginning, because it was fitting well our need to make internet events landing into a first dumb backet, before being processed.
Flume and the old structure
In brief, the architecture of the old Flume centralizes all the logical agents configuration into a Master Flume server, which is constantly able to communicate and push changes. A new Flume agent hence, needs to be initialized against this master and a master/agent communication must be open all the time.
That said, it’s easy to consider that such architecture is not the best for scaling up in general and can be a bottleneck if you think about autoscaling or multi-region setup.
So, with the need of opening new countries, we started to upgrade to Flume-NG, attracted by its masterless architecture.
From the beginning, Flume-NG was easier to setup, even from a provisioning prospective, compared to its predecessor. A source/sink setup now is living on each agent independently.
In our case and as example for this article, we are working with Flume-NG 1.6, using THRIFT as source and KAFKA as remote sink:
Flume 0.x and Disk Failover
However, a very special out-of-the-box feature shipped with Flume 0.x was apparently lost in Flume-NG, the “diskFailover”, which simply stores events on a local disk when a remote sink is down.
It took me some time to understand that the sink-on-disk feature was not actually missing, but rather, with Flume-NG we could have relied on more options for auto-failing over. So, instead of a single sink-to-disk option, we can now configure on a pool of real sinks that will take place in the failover system following a specific priority.
However, despite the many choices, the idea to keep unsent events to the disk was still the best suitable option for us. The following, shows how we achieved this feature using Flume-NG and which kind of problems we eventually solved.
Analysis of the new sink pool
So, as previously said, in order to achieve pretty much the same type of “DiskFailover” feature, we configured what, in Flume-NG, is called the Failover Group of sinks. In our case, it will only include Kafka (main sink) and Fileroll sink (failover), used to write on the disk in case Kafka is unavailable.
The Flume-NG agent sink group is defined as g1 here since you can have as many groups as you need. The last line is utilized to determine the type of the group, in our case failover (it can also be load balancer, for instance):
# FAILOVER GROUP
agent1.sinkgroups.g1.sinks = kafka fileroll
agent1.sinkgroups.g1.processor.type = failover
Now we assign a prio among the sinks:
agent1.sinkgroups.g1.processor.priority.kafka = 10
agent1.sinkgroups.g1.processor.priority.fileroll = 5
If the highest prio sink is down, let the lower prio sink take over. When the highest prio is available again, get back to it:
A spool directory
After a failover episode, we end up having some events collected locally that we still want to send over the main sink in a reasonable time. However, in our specific case, the arrival order of the events on Kafka is not important.
To accomplish this task, we just set up what Flume-NG calls SpoolDir, which is also connected to the Kafka cluster:
# SPOOLDIR (source)
# Each event moved here (failover/spool) from the file-roll sink directory
# (failover/Events), will be pushed automatically to the Kafka sink.
agent1.sources.spool.type = spooldir
agent1.sources.spool.channels = ch1
agent1.sources.spool.spoolDir = /opt/flume-ng/failover/spool
agent1.sources.spool.fileHeader = true
So, basically, as soon as a event is moved in this directory, it will be automatically sent to Kafka:
Issues using SpoolDir source
By design, in case of Kafka, SpoolDir will work as long as you write a Kafka topic’s name in the Flume-NG agent configuration file:
agent1.sinks.kafka.topic = MyTopic
Unfortunately, this fixed configuration is not suitable in case you’re collecting varioius events aimed at different topics.
In theory, the SpoolDir should read the topic name from some event metadata stored on the event itself. In practice, in our case, this never worked.
When a developer from our core team did the magic (fixed the deserialization procedure, thanks Sebastian Alfers!) , we were able to make the SpooDir sends each event to the proper destination topic.
Processing SpoolDir regularly
The last step of this implementation was to put a trivial cronjob which periodically:
- Check the presence of rolled files (unsent) events in Flume-NG/failover/Events
- Check whether those events are still “useful” for our data processing (in our case, only events younger than n days; in case of older events, just archive them somewhere for a further analysis; you’ll never know.)
- Check once again whether or not the remote sink is available
- Move the events to the SpoolDir