Saturday, April 16, 2016

Near Real Time Analytics Store Updates with Spark Streaming on HBase Storage


AdTech customers analyze and redistribute their advertising budget to get the maximum return on investment. Sounds simple. But how to do that for a customer with one million ads? Ten million ads? Within an acceptable time frame? With distributed data ingestion and query system, all the adtech objects in social medium can be serviced.

HBase is an extremely effective key-value store to handle insertions and exact row lookups; however it is not an effective store to provide complex reads with analytical nature such as data joins and aggregate functions. Good news is that there exist optimized stores such as Hive, which are tailored towards these use cases. Since the customer actions are written to HBase, one problem remains to be solved: How do we flow data from HBase to Hive? Answer: Spark streaming.

Spark is quickly becoming a standard tool for classic Extract-Transform-Load needs. While Spark itself is tailored to support heavy batch loads from data dumps to a file system such as HDFS, Spark streaming addresses the needs for ingesting data more frequently from ad hoc data sources like Kafka or Flume. Spark provides a straightforward API for streaming data extraction. SparkSQL and its user defined functions are a powerful tool for data transformation. But the last step, load, is much murkier and usually requires a very deep knowledge of the target system. In the Hive store on HDFS case, we are presented with several pain points:

1  HDFS stores each file in fixed-size blocks. It loves big files that grow as close to a multiplier of the block size as possible. Spark loves small files and would love to use as many executors and cores as possible to write out files in parallel. Analytical friendly columnar formats unfortunately do not support parallel writing of a single HDFS file. Where is the sweet spot?

2  Columnar format serializes data in columns. Since updates come as rows, it is impossible to append such an update to a columnar file, and the store must be rewritten. How to keep this store up-to-date when the data source is a stream? How to prevent failure of executing queries during a rewrite?


Optimizing Load Parallelism:
Naturally, it is advantageous to partition the analytical data in some way. This way we can reduce the amount of data to work with per update. Spark does this via the HashPartitioner class, which is the default RDD partitioner implementation. Unfortunately if the partitions happen to be large, all of the partition data will get put into a single Spark partition and therefore a single file. If partition volume is non-uniform, such as a big customer vs a small customer, writing out a big customer’s partition will become a bottleneck.
If further partitioning cannot provide us with a solution, the key is the ability to write our own partitioner. Spark also provides a very nice API feature of returning an approximate count of records in an RDD, which we can leverage to collect information about a partition size. Knowing the approximate size of each partition, we can easily determine the total number of partitions and the number needed for each. If we split by every 100K rows for instance, large partitions now gain substantial amount of load parallelism but still keep a reasonable size for HDFS. The split size should of course adhere to be appropriate for the average size for a serialized row.

Frequent Updates of a Read-Only Store:
Streaming updates occur much more frequently and randomly than scheduled batch loads. While appending to a Hive store is somewhat straight forward, ETL jobs may need to update an existing store and deduplicate incoming data to maintain idempotence. A lot of existing solutions float around either appending data and then using a max timestamp filter in the queries, or doing a delete-and-copy method to replace the currently used store with the updated one. Both have disadvantages as they either slow the incoming query or kill currently executing queries while the store is being replaced.

Hive metastore allows to hot-change the partition location while the system is running, and we can use this to our advantage. The problem is that Hive Query Language does not allow batch partition location changes within a single call, and making multiple calls can negatively impact SLA timing. Instead of using Spark’s HiveContext to execute HQL, we leverage Thrift to update the Hive Metastore via its API. Thrift supports batch partition location updates and we can quickly make these changes when our streaming job is finished building the new data. Queries currently executing will complete their run on the old data, and new queries will use the new partition location.

Conclusion:
Spark streaming is the right choice to quickly push data into analytics store. While out 
of the box it presents some challenges to overcome, its rapid development is easing the implementation with each version. Extract and Transform steps are already very solid, and while the Load part is tricky, knowledge and commitment to use the advantages of the target system make it painless to use Spark on top of a mature tech stack.

Credits: Filip Jaros