Friday, May 20, 2016

AdTech Storage on Apache HBase, and Apache Phoenix


In this blog, lets talk about  current data platform that uses Apache Phoenix over Apache HBase as a persistent storage for adtech objects


Why HBase and Phoenix?

Apache HBase is well known for its extremely good horizontal scale and strong consistency on atomic row level updates. In addition HBase provides very fast (~ a few ms to hundreds of ms) query performance over very large data set (i.e. ~ >10 billion rows in a table) with proper row key design.  

Even though HBase provides wonderful features, it still introduces considerable challenges and overhead to our developers when used as raw data storage. The low-level and verbose HBase API and byte array oriented schema design is difficult to master and use efficiently. With these challenges in mind, evaluated  a few open source solutions and chosen.


Apache Phoenix strikes a good balances well between supporting complex use cases and consistent horizontal scaling capability for united data sets. It provides a JDBC abstraction layer on top of HBase. In addition it provides richer schema support, secondary index (both global and local index) support, SQL-compliant query language, atomically auto increase sequence, and many other features are still being explored.


With Apache Phoenix and HBase chosen as a storage , can provide a managed REST API layer, Data Services , to support variety of API clients, such as Python, PHP,  Angular, JavaScript/Node.Js. In addition to providing REST layer over Phoenix, it  also provides streaming changes to Apache Kafka topics (For audit and streaming analytic insights, etc.), authentication and authorization system, and metadata discovery and caching facilities.

System Data Flow
Diagram that illustrates a high-level system data flow.
marin-oltp.png
In this blog, discussing only the persistent storage system.
The persistent storage  is layered on top of the open source Apache Phoenix/HBase. The REST service API layer is to expose the underlying high-performance storage to  various services and UI. For complex online analytics queries, built a custom Presto-OLTP connector that streams the underlying Apache Phoenix data into Presto.


Presto is a very high performing interactive query engine that Facebook open sourced in 2014. For more details about Presto, please refer here


To address some of the performance challenges we had with Apache Phoenix for complex queries, we built the Presto-Phoenix connector to leverage the Presto query engine.

Conclusion
Lessons and practices learned during migration of data platform to Apache Phoenix over HBase for storage:
  • Denormalize your Apache Phoenix data schema to avoid:
    • Joins and FK links between tables on the Phoenix side
    • Group By and other aggregate calc (i.e. Sum, Avg, etc.) on the Phoenix side, instead perform these queries from an analytics engine (i.e. Presto)
    • Sub queries
  • Design the schema focusing on the Primary Keys as Phoenix/HBase prefers to use range scan and skip scan to access the underlying KV stores for performance
  • Design the schema to consider the immutability of the PK. Once a data row key is generated and inserted into HBase, the row key should be always treated as immutable
  • Consider the reasonable salts based on the table cardinality. For high cardinality tables, use salts that are appropriate for the underlying HBase cluster size (i.e.  20% ~ 100% of HBase Region server count). However avoid too many salts for small tables (i.e. tables with less than a couple millions records)
  • Avoid using foreign key references in the primary table. Instead embed related objects (denormalized) into the primary table
  • Avoid filters on the non-PK columns. Instead do queries solely based on the leading PK column values
  • Don't build too many secondary Phoenix indexes over your data table. This will slow down your update throughput which is critical to writes
  • If possible,try to use in conjunction with Phoenix/HBase, other external indexing systems (i.e. SolrCloud, Elasticsearch, etc.) for indexing non-PK columns in a table.
  • Try using Spark or other async job flows to periodically update these external index data stores outside the HBase coprocessor to unblock the HBase process for low latency writes and reads.
  • Do large joins outside the Phoenix context (i.e. use Spark, Presto, etc. for joins)
  • Do aggregates cautiously, and do complex aggregates within an analytics service framework outside of Phoenix (i.e. Spark, Presto, etc.)
  • Try to do your writes through a Phoenix-Spark connector if you are using Spark to write to Phoenix
  • Try to do your writes through batches of a few thousands (i.e. < 20,000 rows) per batch if you need to go through JDBC to write to Phoenix
  • If your batch writes are insertions instead of updates,try to cache and batch sequence ID generations.



Saturday, April 16, 2016

Near Real Time Analytics Store Updates with Spark Streaming on HBase Storage


AdTech customers analyze and redistribute their advertising budget to get the maximum return on investment. Sounds simple. But how to do that for a customer with one million ads? Ten million ads? Within an acceptable time frame? With distributed data ingestion and query system, all the adtech objects in social medium can be serviced.

HBase is an extremely effective key-value store to handle insertions and exact row lookups; however it is not an effective store to provide complex reads with analytical nature such as data joins and aggregate functions. Good news is that there exist optimized stores such as Hive, which are tailored towards these use cases. Since the customer actions are written to HBase, one problem remains to be solved: How do we flow data from HBase to Hive? Answer: Spark streaming.

Spark is quickly becoming a standard tool for classic Extract-Transform-Load needs. While Spark itself is tailored to support heavy batch loads from data dumps to a file system such as HDFS, Spark streaming addresses the needs for ingesting data more frequently from ad hoc data sources like Kafka or Flume. Spark provides a straightforward API for streaming data extraction. SparkSQL and its user defined functions are a powerful tool for data transformation. But the last step, load, is much murkier and usually requires a very deep knowledge of the target system. In the Hive store on HDFS case, we are presented with several pain points:

1  HDFS stores each file in fixed-size blocks. It loves big files that grow as close to a multiplier of the block size as possible. Spark loves small files and would love to use as many executors and cores as possible to write out files in parallel. Analytical friendly columnar formats unfortunately do not support parallel writing of a single HDFS file. Where is the sweet spot?

2  Columnar format serializes data in columns. Since updates come as rows, it is impossible to append such an update to a columnar file, and the store must be rewritten. How to keep this store up-to-date when the data source is a stream? How to prevent failure of executing queries during a rewrite?


Optimizing Load Parallelism:
Naturally, it is advantageous to partition the analytical data in some way. This way we can reduce the amount of data to work with per update. Spark does this via the HashPartitioner class, which is the default RDD partitioner implementation. Unfortunately if the partitions happen to be large, all of the partition data will get put into a single Spark partition and therefore a single file. If partition volume is non-uniform, such as a big customer vs a small customer, writing out a big customer’s partition will become a bottleneck.
If further partitioning cannot provide us with a solution, the key is the ability to write our own partitioner. Spark also provides a very nice API feature of returning an approximate count of records in an RDD, which we can leverage to collect information about a partition size. Knowing the approximate size of each partition, we can easily determine the total number of partitions and the number needed for each. If we split by every 100K rows for instance, large partitions now gain substantial amount of load parallelism but still keep a reasonable size for HDFS. The split size should of course adhere to be appropriate for the average size for a serialized row.

Frequent Updates of a Read-Only Store:
Streaming updates occur much more frequently and randomly than scheduled batch loads. While appending to a Hive store is somewhat straight forward, ETL jobs may need to update an existing store and deduplicate incoming data to maintain idempotence. A lot of existing solutions float around either appending data and then using a max timestamp filter in the queries, or doing a delete-and-copy method to replace the currently used store with the updated one. Both have disadvantages as they either slow the incoming query or kill currently executing queries while the store is being replaced.

Hive metastore allows to hot-change the partition location while the system is running, and we can use this to our advantage. The problem is that Hive Query Language does not allow batch partition location changes within a single call, and making multiple calls can negatively impact SLA timing. Instead of using Spark’s HiveContext to execute HQL, we leverage Thrift to update the Hive Metastore via its API. Thrift supports batch partition location updates and we can quickly make these changes when our streaming job is finished building the new data. Queries currently executing will complete their run on the old data, and new queries will use the new partition location.

Conclusion:
Spark streaming is the right choice to quickly push data into analytics store. While out 
of the box it presents some challenges to overcome, its rapid development is easing the implementation with each version. Extract and Transform steps are already very solid, and while the Load part is tricky, knowledge and commitment to use the advantages of the target system make it painless to use Spark on top of a mature tech stack.

Credits: Filip Jaros



Friday, February 21, 2014

Kili Summit Day - Barafu Hut to Stella Point and Uhuru Peak (Feb 14, 2014)

Jambo,

We roused out of bed at 10:30 pm and it was dark and freezing cold. I put on every layer of clothing (6 layers) I had and my heavy gloves with liner. We drank some hot tea and John, our guide, put us in the order he wanted us in going to the top Rahm, Palani, then me, followed by Narayanan(b/w myself and Narayanan he doesn’t care about the order). In addition to Rahm, Palani, Narayanan and I, we had John, Husseini (assistant guide).

At midnight our group was fully assembled and at 11:55 pm with our headlamps on we began the accent to the top of Kilimanjaro. It is hard to put into words what it was like to begin this final ascent in the dark and freezing cold, other than to say it was surreal. We had been walking for five days, pushing ourselves to the limit to reach the summit, and now we were actually doing it.

As we climbed up the mountain over the prior several days it was always in the back of my mind-- will I make it; can I really do this; do I really want to do anything that is this friggin hard? Couple those doubts and negative feelings with the almost giddy excitement over having made it this far and actually being within striking distance of making it to the top, and you get some idea of the myriad of emotions the moment had for me. Well hopefully you get the point that there is a lot of stuff going through your head as you are lined up and waiting to start the final ascent of mythical, magical and majestic Mount Kilimanjaro.

It was cold. I do not mean the kind of cold that requires putting on a heavy coat and mittens. No this was the kind of cold that required every piece of clothing you brought with you to have a chance of fighting off. It was a cold that cut through you like a knife with a wind of at least 20 mph that pushed the cold into the marrow. So cold was one of the enemies that had to be defeated to conquer the mountain.

Another enemy to be defeated was altitude. Because of the atmospheric pressure we were only able to process 50 percent of the available oxygen. While the drugs we were all taking helped (except Palani), it was still hard to breathe.

Yet another enemy was fatigue. We had been climbing for five days and had only slept a few hours since making it to camp. We were all tired and had sore legs.

To combat these enemies John mapped out a strategy of keeping us moving. He modified the "pole pole" strategy of the past five days just a little and moved us more quickly than I anticipated.

We walked single file in silence up the slope, primarily on a path of crushed quartzite that glittered when struck by the lights of our headlamps. The steep path of quartzite was punctuated by stretches of scrambling over large boulders that took even more energy to climb over than the steep path.

Although there was a half moon it was low on the horizon and it was pretty dark. You could see the outline of path ahead by the headlamps of groups of climbers who had left even earlier than we had. The scene was ghostlike but beautiful. The lights of those farther up the trail extending for what seemed like miles; miles that we were going to have to climb.

In the night sky, the planet Mars was plainly visible and as red and striking as I have ever seen it. It shone like a beacon in front of us, beckoning toward the summit. The Milky Way, plainly visible above us, looked like a picture from the Hubble Telescope.

Behind us and to our left we began to see flashes of lightening as a storm apparently raged over the lower slopes of the mountain. If it reached us before we reached the summit we would have to contend not only with the cold and wind, but with snow. We pushed on through the night.

The very steep path cut back and forth across the face of the mountain in a series of switchbacks as we climbed steadily toward the peak that was not yet visible in the dark, but which drew each hiker to it.

At this point all I could do was put one foot in front of the other, counting out my steps and breathing best I could. I stayed immediately behind Palani, sometimes just a few inches off his left shoulder using him as a partial shield against the biting headwind. My Heavy gloves proved to be lightweights, and my fingers were freezing inside them. From the substantial litter of discarded hand-warmers that appeared along the trail, I realized there was one more item I should have put on inside my gloves.

Our pace was faster than those who left before us and we came upon a group of six or seven hikers who were going very slowly. I thought that we would stay behind them and continue at their speed, but that is not what John had in mind. He cut a path off to the side of the trail to overtake them. This meant walking even faster than before for several hundred feet on "virgin" surface of quartzite that gave way with each step, making walking twice as hard. By the time we overtook them, I was literally gasping for breath. Although I had hopes that after passing them we would slow the pace, or take a rest, this was not part of John’s plan of attack...we kept on going.

After walking continuously for nearly two hours, we did stop to take a rest and have some hot tea. While the tea tasted good, and helped warm and hydrate us, stopping for even a little while allowed the cold gain the upper hand in our struggle to make it to the top. As my hands went literally numb and the exposed portion of my face burned raw, I understood what John was doing; we had to keep moving or the cold would likely win the battle.

It turned out to be our last stop. After that we walked on and on going up the slope at the newly defined pace of "pole, pole" and a little more. It was if I was in a self induced trance. I tried to make each step the same as the last, put one foot in front of the other, breathe with a steady rhythm of three deep breathes in, followed by a long exhale to remove the carbon dioxide, and to keep up with Rahm and Palani. We did this for hours. It was strange that at some times it would seem that I could not take another step and then at others it was not be so bad and I knew I could do this. The key was just to keep your feet moving one in front of the other.

About an hour after our tea break we came upon a very large group of hikers. There were at least 20 of them and they were going very slowly. As before John determined that what you cannot go through, you can go around. Once again we were off the trail at a quickened pace designed to overtake the larger but slower group. The group was so big and spread out that it took several minutes to overtake them. By the time we did my legs were feeling rubbery, my heart was pounding, and I was gasping for air.

Once again there was no stop to rest. The cold by this point was mind numbing and despite our pace and the exertion of energy, my legs and arms started getting very cold. I could no longer feel my index finger on my right hand- which was facing the wind that just kept getting stronger.

After passing the large group we resumed our prior pace and fell into the rhythm of step-breathe, step-breathe, step- breathe. It was at about this point that the hoses on my camel pack began to freeze so that before long I were essentially out of water for the last portion of the ascent. Luckily had a bottle of water inside my backbag. Time essentially ceased to exist and I purposefully did not ask Narayanan about the time fearing it would indicate we still had hours more to go in the estimated six hour climb to the top. If that were the case I just did not want to know. The only thing that mattered was to follow behind Rahm and Palani and keep taking steps. As long as they could do it, I knew I could do it.



Just about the time that I thought there is no way we could keep this up, John informed us that we were about 15 minutes from Stella Point. It was then that I realized we had been climbing for just over four and a half hours. It was at that point that I really believed I could make it to the top. We pushed on and about 20 minutes later came to Stella Point. Stella Point is so-named because it was thought by the first guy to the Summit, Stella, to be the peak of Mt. Kilimanjaro. When Stella got there in the late 1800's, he claimed victory and went home only later to learn that the highest point of Kilimanjaro was Uhuru Peak. Stella came back later and made it all the way to Uhuru, which is approximately 500 feet higher up than Stella Point.



Infused with new energy by knowing we had made the summit, we were determined not to repeat Mr Stella's error and pressed on to Uhuru Peak-the true summit of Kili. The treck from Stella to Uhuru is about an hour. Although the guide books indicated that this would be the hardest part of the ascent, I did not find that to be the case. Knowing the top was in reach seemed to spur us on, and the last hour seemed the shortest.



We continued on, again at the same pace. It was still dark as the moon had disappeared behind the summit at least an hour earlier. As the first barely perceptible glow of the rising sun began to show behind us, I saw in the distance the very feint outline of the sign making Uhuru Peak. It was only hundreds of yards in front of us and it was unbelievable....it was a moment of pure joy which words really cannot express.

The last hundred or so yards to the peak passed in a daze. There was hugging and shouting for joy, and high fives and fist bumps and general euphoria just knowing that we had made it. Officially it was 6:25 am. The sun was just beginning to rise behind us and the eastern sky was bathed in reddish golden light. It was incredibly, stunningly beautiful. John’s planned assault on the summit worked to perfection. We arrived six hours after we started, precisely at sunrise. It was much better live than can ever be told. I was literally dancing in a free flow style.


We took photos of our group, documenting the achievement. It was too cold to take very many. John estimated the temperature to be minus 15 Celsius. So with frozen fingers and elated hearts after about 20 minutes at the peak, we turned around, faced the rising sun and began our decent from the highest point on the continent of Africa and our journey home. The decent turned out to be very harsh due to adverse weather conditions.



It was a one long day with 15+ hours of walking up and down. Of course, it was an amazing feeling thou'. Same day we descended to Mweka hut 3080m from Barafu hut 4640m.