Friday, May 20, 2016

AdTech Storage on Apache HBase, and Apache Phoenix


In this blog, lets talk about  current data platform that uses Apache Phoenix over Apache HBase as a persistent storage for adtech objects


Why HBase and Phoenix?

Apache HBase is well known for its extremely good horizontal scale and strong consistency on atomic row level updates. In addition HBase provides very fast (~ a few ms to hundreds of ms) query performance over very large data set (i.e. ~ >10 billion rows in a table) with proper row key design.  

Even though HBase provides wonderful features, it still introduces considerable challenges and overhead to our developers when used as raw data storage. The low-level and verbose HBase API and byte array oriented schema design is difficult to master and use efficiently. With these challenges in mind, evaluated  a few open source solutions and chosen.


Apache Phoenix strikes a good balances well between supporting complex use cases and consistent horizontal scaling capability for united data sets. It provides a JDBC abstraction layer on top of HBase. In addition it provides richer schema support, secondary index (both global and local index) support, SQL-compliant query language, atomically auto increase sequence, and many other features are still being explored.


With Apache Phoenix and HBase chosen as a storage , can provide a managed REST API layer, Data Services , to support variety of API clients, such as Python, PHP,  Angular, JavaScript/Node.Js. In addition to providing REST layer over Phoenix, it  also provides streaming changes to Apache Kafka topics (For audit and streaming analytic insights, etc.), authentication and authorization system, and metadata discovery and caching facilities.

System Data Flow
Diagram that illustrates a high-level system data flow.
marin-oltp.png
In this blog, discussing only the persistent storage system.
The persistent storage  is layered on top of the open source Apache Phoenix/HBase. The REST service API layer is to expose the underlying high-performance storage to  various services and UI. For complex online analytics queries, built a custom Presto-OLTP connector that streams the underlying Apache Phoenix data into Presto.


Presto is a very high performing interactive query engine that Facebook open sourced in 2014. For more details about Presto, please refer here


To address some of the performance challenges we had with Apache Phoenix for complex queries, we built the Presto-Phoenix connector to leverage the Presto query engine.

Conclusion
Lessons and practices learned during migration of data platform to Apache Phoenix over HBase for storage:
  • Denormalize your Apache Phoenix data schema to avoid:
    • Joins and FK links between tables on the Phoenix side
    • Group By and other aggregate calc (i.e. Sum, Avg, etc.) on the Phoenix side, instead perform these queries from an analytics engine (i.e. Presto)
    • Sub queries
  • Design the schema focusing on the Primary Keys as Phoenix/HBase prefers to use range scan and skip scan to access the underlying KV stores for performance
  • Design the schema to consider the immutability of the PK. Once a data row key is generated and inserted into HBase, the row key should be always treated as immutable
  • Consider the reasonable salts based on the table cardinality. For high cardinality tables, use salts that are appropriate for the underlying HBase cluster size (i.e.  20% ~ 100% of HBase Region server count). However avoid too many salts for small tables (i.e. tables with less than a couple millions records)
  • Avoid using foreign key references in the primary table. Instead embed related objects (denormalized) into the primary table
  • Avoid filters on the non-PK columns. Instead do queries solely based on the leading PK column values
  • Don't build too many secondary Phoenix indexes over your data table. This will slow down your update throughput which is critical to writes
  • If possible,try to use in conjunction with Phoenix/HBase, other external indexing systems (i.e. SolrCloud, Elasticsearch, etc.) for indexing non-PK columns in a table.
  • Try using Spark or other async job flows to periodically update these external index data stores outside the HBase coprocessor to unblock the HBase process for low latency writes and reads.
  • Do large joins outside the Phoenix context (i.e. use Spark, Presto, etc. for joins)
  • Do aggregates cautiously, and do complex aggregates within an analytics service framework outside of Phoenix (i.e. Spark, Presto, etc.)
  • Try to do your writes through a Phoenix-Spark connector if you are using Spark to write to Phoenix
  • Try to do your writes through batches of a few thousands (i.e. < 20,000 rows) per batch if you need to go through JDBC to write to Phoenix
  • If your batch writes are insertions instead of updates,try to cache and batch sequence ID generations.



Saturday, April 16, 2016

Near Real Time Analytics Store Updates with Spark Streaming on HBase Storage


AdTech customers analyze and redistribute their advertising budget to get the maximum return on investment. Sounds simple. But how to do that for a customer with one million ads? Ten million ads? Within an acceptable time frame? With distributed data ingestion and query system, all the adtech objects in social medium can be serviced.

HBase is an extremely effective key-value store to handle insertions and exact row lookups; however it is not an effective store to provide complex reads with analytical nature such as data joins and aggregate functions. Good news is that there exist optimized stores such as Hive, which are tailored towards these use cases. Since the customer actions are written to HBase, one problem remains to be solved: How do we flow data from HBase to Hive? Answer: Spark streaming.

Spark is quickly becoming a standard tool for classic Extract-Transform-Load needs. While Spark itself is tailored to support heavy batch loads from data dumps to a file system such as HDFS, Spark streaming addresses the needs for ingesting data more frequently from ad hoc data sources like Kafka or Flume. Spark provides a straightforward API for streaming data extraction. SparkSQL and its user defined functions are a powerful tool for data transformation. But the last step, load, is much murkier and usually requires a very deep knowledge of the target system. In the Hive store on HDFS case, we are presented with several pain points:

1  HDFS stores each file in fixed-size blocks. It loves big files that grow as close to a multiplier of the block size as possible. Spark loves small files and would love to use as many executors and cores as possible to write out files in parallel. Analytical friendly columnar formats unfortunately do not support parallel writing of a single HDFS file. Where is the sweet spot?

2  Columnar format serializes data in columns. Since updates come as rows, it is impossible to append such an update to a columnar file, and the store must be rewritten. How to keep this store up-to-date when the data source is a stream? How to prevent failure of executing queries during a rewrite?


Optimizing Load Parallelism:
Naturally, it is advantageous to partition the analytical data in some way. This way we can reduce the amount of data to work with per update. Spark does this via the HashPartitioner class, which is the default RDD partitioner implementation. Unfortunately if the partitions happen to be large, all of the partition data will get put into a single Spark partition and therefore a single file. If partition volume is non-uniform, such as a big customer vs a small customer, writing out a big customer’s partition will become a bottleneck.
If further partitioning cannot provide us with a solution, the key is the ability to write our own partitioner. Spark also provides a very nice API feature of returning an approximate count of records in an RDD, which we can leverage to collect information about a partition size. Knowing the approximate size of each partition, we can easily determine the total number of partitions and the number needed for each. If we split by every 100K rows for instance, large partitions now gain substantial amount of load parallelism but still keep a reasonable size for HDFS. The split size should of course adhere to be appropriate for the average size for a serialized row.

Frequent Updates of a Read-Only Store:
Streaming updates occur much more frequently and randomly than scheduled batch loads. While appending to a Hive store is somewhat straight forward, ETL jobs may need to update an existing store and deduplicate incoming data to maintain idempotence. A lot of existing solutions float around either appending data and then using a max timestamp filter in the queries, or doing a delete-and-copy method to replace the currently used store with the updated one. Both have disadvantages as they either slow the incoming query or kill currently executing queries while the store is being replaced.

Hive metastore allows to hot-change the partition location while the system is running, and we can use this to our advantage. The problem is that Hive Query Language does not allow batch partition location changes within a single call, and making multiple calls can negatively impact SLA timing. Instead of using Spark’s HiveContext to execute HQL, we leverage Thrift to update the Hive Metastore via its API. Thrift supports batch partition location updates and we can quickly make these changes when our streaming job is finished building the new data. Queries currently executing will complete their run on the old data, and new queries will use the new partition location.

Conclusion:
Spark streaming is the right choice to quickly push data into analytics store. While out 
of the box it presents some challenges to overcome, its rapid development is easing the implementation with each version. Extract and Transform steps are already very solid, and while the Load part is tricky, knowledge and commitment to use the advantages of the target system make it painless to use Spark on top of a mature tech stack.

Credits: Filip Jaros