AdTech
customers analyze and redistribute their advertising budget to get the maximum
return on investment. Sounds simple. But how to do that for a customer with one
million ads? Ten million ads? Within an acceptable time frame? With distributed
data ingestion and query system, all the adtech objects in social medium can be
serviced.
HBase is
an extremely effective key-value store to handle insertions and exact row
lookups; however it is not an effective store to provide complex reads with
analytical nature such as data joins and aggregate functions. Good news is that
there exist optimized stores such as Hive, which are tailored towards these use
cases. Since the customer actions are written to HBase, one problem remains to
be solved: How do we flow data from HBase to Hive? Answer: Spark streaming.
Spark is
quickly becoming a standard tool for classic Extract-Transform-Load needs.
While Spark itself is tailored to support heavy batch loads from data dumps to
a file system such as HDFS, Spark streaming addresses the needs for ingesting
data more frequently from ad hoc data sources like Kafka or Flume. Spark
provides a straightforward API for streaming data extraction. SparkSQL and its
user defined functions are a powerful tool for data transformation. But the
last step, load, is much murkier and usually requires a very deep knowledge of
the target system. In the Hive store on HDFS case, we are presented with
several pain points:
1 HDFS
stores each file in fixed-size blocks. It loves big files that grow as close to
a multiplier of the block size as possible. Spark loves small files and would
love to use as many executors and cores as possible to write out files in
parallel. Analytical friendly columnar formats unfortunately do not support
parallel writing of a single HDFS file. Where is the sweet spot?
2 Columnar
format serializes data in columns. Since updates come as rows, it is impossible
to append such an update to a columnar file, and the store must be rewritten.
How to keep this store up-to-date when the data source is a stream? How to
prevent failure of executing queries during a rewrite?
Optimizing Load Parallelism:
Naturally,
it is advantageous to partition the analytical data in some way. This way we
can reduce the amount of data to work with per update. Spark does this via the
HashPartitioner class, which is the default RDD partitioner implementation.
Unfortunately if the partitions happen to be large, all of the partition data
will get put into a single Spark partition and therefore a single file. If
partition volume is non-uniform, such as a big customer vs a small customer,
writing out a big customer’s partition will become a bottleneck.
If
further partitioning cannot provide us with a solution, the key is the ability
to write our own partitioner. Spark also provides a very nice API feature of
returning an approximate count of records in an RDD, which we can leverage to
collect information about a partition size. Knowing the approximate size of
each partition, we can easily determine the total number of partitions and the
number needed for each. If we split by every 100K rows for instance, large
partitions now gain substantial amount of load parallelism but still keep a
reasonable size for HDFS. The split size should of course adhere to be
appropriate for the average size for a serialized row.
Frequent Updates of a Read-Only Store:
Streaming
updates occur much more frequently and randomly than scheduled batch loads.
While appending to a Hive store is somewhat straight forward, ETL jobs may need
to update an existing store and deduplicate incoming data to maintain
idempotence. A lot of existing solutions float around either appending data and
then using a max timestamp filter in the queries, or doing a delete-and-copy
method to replace the currently used store with the updated one. Both have
disadvantages as they either slow the incoming query or kill currently
executing queries while the store is being replaced.
Hive
metastore allows to hot-change the partition location while the system is
running, and we can use this to our advantage. The problem is that Hive Query
Language does not allow batch partition location changes within a single call,
and making multiple calls can negatively impact SLA timing. Instead of using
Spark’s HiveContext to execute HQL, we leverage Thrift to update the Hive
Metastore via its API. Thrift supports batch partition location updates and we
can quickly make these changes when our streaming job is finished building the
new data. Queries currently executing will complete their run on the old data,
and new queries will use the new partition location.
Conclusion:
Spark
streaming is the right choice to quickly push data into analytics store. While
out
of the box it presents some challenges to overcome, its rapid development is easing the implementation with each version. Extract and Transform steps are already very solid, and while the Load part is tricky, knowledge and commitment to use the advantages of the target system make it painless to use Spark on top of a mature tech stack.
Credits: Filip Jaros
of the box it presents some challenges to overcome, its rapid development is easing the implementation with each version. Extract and Transform steps are already very solid, and while the Load part is tricky, knowledge and commitment to use the advantages of the target system make it painless to use Spark on top of a mature tech stack.
Credits: Filip Jaros
No comments:
Post a Comment