An enterprise as large as Target generates a lot of data and on my Big Data platform team we want to make it as easy as possible for our users to get it into Hadoop in real-time. I want to discuss how we are starting to approach this problem, what we’ve done so far, and what is still to come.

Our requirements

We wanted to build a system with flexible open source components. Our experience with proprietary products on Hadoop is that they tend to be inflexible and only work with a narrow set of use cases. That can also be true of open source products, but we have found them to be easier to adapt to our needs. Indeed, that ended up being the case as we went through this journey and made several contributions to Apache projects.

More specifically, we wanted to create a system that:

  • was highly resilient to the failure of any individual component
  • supported a variety of message formats
  • delivered data to Hadoop with very low latency
  • made the data immedaiately usable
  • streamed data from Apache Kafka sources

A Streaming Framework

There are many excellent comparisons of streaming frameworks available, and I won’t attempt to recreate them here. The first criteria we considered was what tooling was needed to monitor and administer the streaming framework, with a strong preference to use our primary Hadoop administration tool, Apache Ambari. Apache Storm fit that bill and was also a proven solution for stream processing. If Storm could meet our other requirements, it would be our first choice.

To test its resiliency we ran a simple scenario: start a data stream into Hadoop, disable HDFS, and then reenable it.
Streaming would obviously fail while HDFS was disabled, but we needed the system to recover gracefully when HDFS came back online. Unfortunately our first test of this scenario left our Storm topology in an unrecoverable state, which required a manual restart. That’s not something we could live with.

We also needed very fine control over the latency of arriving data. In general Storm processes messages one at a time and not in batches, which leads to very low latency. However, almost all Storm “bolts” that write data to a destination system will batch their writes for effeciency. If the batch size was set too large final latency could be quite high as no data was written until the batch filled up. We observed that behavior when we set the batch size to be several minutes worth of data in a test scenario.

However, examination of the Storm source code led us to conclude that both of these problems were solvable. We contributed fixes for both of these issues back to the Storm project as part of STORM-969. With those issues resolved we were comfortable that Storm would meet our initial requirements to get data into HDFS.

Making the data immediately usable

Now that raw data was landed in HDFS, we wanted to make it immediately consumable via external Hive tables. In general that requires knowing the schema of the data when you define the pipeline. You must also be aware of schema changes, especially incompatible schema changes, made by the upstream systems producing the data you’re consuming. This proved to be a challenge for several teams because they could not guarntee a source of truth or restrict schema changes. Additionally, many teams were trying to consume JSON formatted messages. JSON, like XML, is a popular message format, but not one particularly well-suited to Hadoop because it is not splittable. However, one team took an approach that solved almost all of these problems at once.

This team adopted Apache Avro as a message encoding scheme. Avro also defines schemas and schema changes. Together with a centralized schema registry, this creates a source of truth for the schema and can, if enabled, allow only foward and backward compatabile schema changes. Avro is also robustly supported by Hadoop and Hive and allows true schema on read.

By streaming binary Avro data directly into HDFS and creating a Hive table referencing the enterprise definition of the schema we could make the data instantly usable and robust to schema changes. Our next obstacle was that Storm did not support writing Avro data to HDFS!

Storm and Avro

Working on the problem of adding an Avro output option for Storm resulted in a tremendous amount of code duplication to get the enhancements we had contributed to STORM-969. Since code duplication is really bad we instead looked to see how we could refactor the HDFS facilities in Storm and make them more extensible, which led to STORM-1073. This enhancement made it much easier to extend Storm to write new data formats to HDFS while keeping all of the improvements we made for resiliency and throughput. With these changes in place we opened STORM-1074 to add support for writing Avro data. The very minimal amount of code needed to add this feature was a validation of our refactoring approach.

Finally, we ran into a small issue with Hive’s handling of Avro data. Storm would sometimes leave behind zero-length files where Hive was expecting to see Avro data. A zero length file is not valid Avro and any Hive queries on that directory would fail, even if other perfectly good Avro files were present. That led us to open and contribute a fix for HIVE-11977.

A Commitment to Open Source

Submitting all of these changes back to Apache projects definitely added work for us; it would have been much simpler to keep them to ourselves. But Target is committed to Open Source Software and we understand the virtuous cycle behind our contributions. By making products like Storm better, others are more likely to adopt them and make contributions of their own that will directly benefit us.

What’s next? (Spoiler: A Lot!!)

While we can immediately work with the raw Avro data in Hive, it does not lead to fast queries and performance will degrade as our data sets grow. We need to partition the data and ultimately fit it into a columnar format like Apache Parquet or Apache ORC. We’ll also likely have to refresh these transformation as the schema evolves over time.

We also have designs on ingesting the data to Apache Solr and Apache Phoenix

The lessons we’re learning through this process are going back to the data creation teams as well so that they can create message schemas to facilitate these transformation and still keep the benefits we talked about above.

Finally we want to store metadata about these processes in a tool like Apache Atlas so that users can reason about dataflows on our Hadoop platform.

##If you want to help us solve these problems and more, we are hiring! You can reach me at aaron.dossett@target.com


Aaron Dossett is Senior Lead Data Engineer at Target and a contributor to several Apache Software Foundation projects.