======================== Real-time Streams & Logs ======================== Andrew Montalenti, CTO .. rst-class:: logo .. image:: ./_static/parsely.png :width: 40% :align: right Agenda ====== * Parse.ly problem space * Architecture evolution * Organizing around logs (Kafka) * Aggregating the stream (Storm) * Real-time vs Batch tensions ====================== Parse.ly problem space ====================== What is Parse.ly? ================= Analytics for digital storytellers. .. image:: ./_static/banner_01.png :align: center .. image:: ./_static/banner_02.png :align: center .. image:: ./_static/banner_03.png :align: center .. image:: ./_static/banner_04.png :align: center Variety ======= Audience data: * visits * sessions Engagement data: * views / time spent * social shares Crawl data: * keywords / topics * author / section / tag Velocity ======== * average post has **<48-hour shelf life** * many posts get **most traffic in first few hours** * major news events can cause **bursty traffic** .. image:: ./_static/pulse.png :width: 60% :align: center Volume ====== * top publishers write **1000's of posts per day** * huge **long tail of posts** get traffic forever * Parse.ly currently tracks **8 billion pageviews per month** * ... from **over 250 million monthly unique browsers** Time series data ================ .. image:: ./_static/sparklines_multiple.png :align: center .. image:: ./_static/sparklines_stacked.png :align: center Summary data ============ .. rst-class:: spaced .. image:: ./_static/summary_viz.png :align: center Ranked data =========== .. rst-class:: spaced .. image:: ./_static/comparative.png :align: center Benchmark data ============== .. rst-class:: spaced .. image:: ./_static/benchmarked_viz.png :align: center Information radiators ===================== .. rst-class:: spaced .. image:: ./_static/glimpse.png :width: 100% :align: center ====================== Architecture evolution ====================== Stack Overview ============== .. rst-class:: spaced .. image:: ./_static/oss_logos.png :width: 90% :align: center Queues and workers ================== .. rst-class:: spaced .. image:: /_static/queues_and_workers.png :width: 90% :align: center **Queues**: RabbitMQ => Redis => ZeroMQ **Workers**: Cron Jobs => Celery Queue problems ============== Traditional queues (e.g. RabbitMQ / Redis): * not distributed / highly available at core * not persistent ("overflows" easily) * more consumers mean more queue server load (Hint: ZeroMQ trades these problems for another: unreliability.) Lots of moving parts ==================== .. rst-class:: spaced .. image:: /_static/tech_stack.png :width: 90% :align: center To add more features... ======================= ... we had to add more workers and queues! Got harder and harder to develop on "the entire stack". More code devoted to ops, rather than business logic. And, it had big hardware demands ================================ **Scaling Out**: From 2010-2012, went from 3 to 80 nodes running in Rackspace Cloud. **Scaling Up**: From 2012-2013, ran a custom data center with 1 terabyte of RAM. **Scaling In**: From 2013-2014, started building support for more nuanced metrics. And, data management challenges =============================== Running multiple redundant data centers. Need to ship real-time data everywhere. Including data-identical production, staging, beta. New schema designs and new DB technologies, too. In short: it started to get messy ================================= .. rst-class:: spaced .. image:: ./_static/monitors.jpg :width: 90% :align: center ====================== Organizing around logs ====================== LinkedIn's lattice problem ========================== .. rst-class:: spaced .. image:: ./_static/lattice.png :width: 100% :align: center Enter the unified log ===================== .. rst-class:: spaced .. image:: ./_static/unified_log.png :width: 100% :align: center Log-centric is simpler ====================== .. rst-class:: spaced .. image:: ./_static/log_centric.png :width: 65% :align: center Parse.ly is log-centric, too ============================ .. rst-class:: spaced .. image:: ./_static/parsely_log_arch.png :width: 80% :align: center Introducing Kafka ================= =============== ================================================================== Feature Description =============== ================================================================== Speed 100's of megabytes of reads/writes per sec from 1000's of clients Durability Can use your entire disk to create a massive message backlog Scalability Cluster-oriented design allows for horizontal machine scaling Availability Cluster-oriented design allows for node failures without data loss (in 0.8+) Multi-consumer Many clients can read the same stream with no penalty =============== ================================================================== Kafka concepts ============== =============== ================================================================== Concept Description =============== ================================================================== Topic A group of related messages (a stream) Producer Procs that publish msgs to stream Consumer Procs that subscribe to msgs from stream Broker An individual node in the Cluster Cluster An arrangement of Brokers & Zookeeper nodes Offset Coordinated state between Consumers and Brokers (in Zookeeper) =============== ================================================================== Kafka layout ============ .. rst-class:: spaced .. image:: ./_static/kafka_topology.png :width: 80% :align: center Kafka is a "distributed log" ============================ Topics are **logs**, not queues. Consumers **read into offsets of the log**. Consumers **do not "eat" messages**. Logs are **maintained for a configurable period of time**. Messages can be **"replayed"**. Consumers can **share identical logs easily**. Multi-consumer ============== .. rst-class:: spaced .. image:: ./_static/multiconsumer.png :width: 60% :align: center Even if Kafka's availability and scalability story isn't interesting to you, the **multi-consumer story should be**. Queue problems, revisited ========================= Traditional queues (e.g. RabbitMQ / Redis): * not distributed / highly available at core * not persistent ("overflows" easily) * more consumers mean more queue server load **Kafka solves all of these problems.** Kafka in Python (1) =================== .. sourcecode:: python import logging # generic Zookeeper library from kazoo.client import KazooClient # Parse.ly's open source Kafka client library from samsa.cluster import Cluster log = logging.getLogger('test_capture_pageviews') def _connect_kafka(): zk = KazooClient() zk.start() cluster = Cluster(zk) queue = cluster\ .topics['pixel_data']\ .subscribe('test_capture_pageviews') return queue Kafka in Python (2) =================== .. sourcecode:: python def pageview_stream(): queue = _connect_kafka() count = 0 for msg in queue: count += 1 if count % 1000 == 0: # in this example, offsets are committed to # Zookeeper every 1000 messages queue.commit_offsets() urlref, url, ts = parse_msg(msg) yield urlref, url, ts ====================== Aggregating the stream ====================== So, what about Workers? ======================= Kafka solves my Queue problem, but what about Workers? How do I transform streams with **streaming computation**? Worker data transforms ====================== Even with a unified log, workers will proliferate data transformations. These transformations often have complex dependencies: * pixel request is cleaned * referenced URL is crawled * crawled URL's text is analyzed by topic extractor * repeated requests at identical URL rolled up by topic * top performing topics are snapshotted for rankings Workers and databases ===================== .. rst-class:: spaced .. image:: ./_static/queue_storage.png :width: 80% :align: center Worker problems =============== * no control for parallelism and load distribution * no guaranteed processing for multi-stage pipelines * no fault tolerance for individual stages * difficult to do local / beta / staging environments * dependencies between worker stages are unclear Meanwhile, in Batch land... =========================== ... everything is **peachy**! When I have all my data available, I can just run Map/Reduce jobs. **Problem solved.** We use Apache Pig, and I can get all the gurantees I need, and scale up on EMR. ... but, no ability to do this in real-time on the stream! :( Introducing Storm ================= Storm is a **distributed real-time computation system**. Hadoop provides a set of general primitives for doing batch processing. Storm provides a set of **general primitives** for doing **real-time computation**. Hadoop primitives ================= **Durable** Data Set, typically from **S3**. **HDFS** used for inter-process communication. **Mappers** & **Reducers**; Pig's **JobFlow** is a **DAG**. **JobTracker** & **TaskTracker** manage execution. **Tuneable parallelism** + built-in **fault tolerance**. Storm primitives ================ **Streaming** Data Set, typically from **Kafka**. **ZeroMQ** used for inter-process communication. **Bolts** & **Spouts**; Storm's **Topology** is a **DAG**. **Nimbus** & **Workers** manage execution. **Tuneable parallelism** + built-in **fault tolerance**. Storm features ============== =============== ==================================================================== Feature Description =============== ==================================================================== Speed 1,000,000 tuples per second per node, using Kyro and ZeroMQ Fault Tolerance Workers and Storm management daemons self-heal in face of failure Parallelism Tasks run on cluster w/ tuneable parallelism Guaranteed Msgs Tracks lineage of data tuples, providing an at-least-once guarantee Easy Code Mgmt Several versions of code in a cluster; multiple languages supported Local Dev Entire system can run in "local mode" for end-to-end testing =============== ==================================================================== Storm core concepts =================== =============== ======================================================================= Concept Description =============== ======================================================================= Stream Unbounded sequence of data tuples with named fields Spout A source of a Stream of tuples; typically reading from Kafka Bolt Computation steps that consume Streams and emits new Streams Grouping Way of partitioning data fed to a Bolt; for example: by field, shuffle Topology Directed Acyclic Graph (DAG) describing Spouts, Bolts, & Groupings =============== ======================================================================= Wired Topology ============== .. rst-class:: spaced .. image:: ./_static/topology.png :width: 80% :align: center Storm cluster concepts ====================== =============== ======================================================================= Concept Description =============== ======================================================================= Tasks The process/thread corresponding to a running Bolt/Spout in a cluster Workers The JVM process managing work for a given physical node in the cluster Supervisor The process monitoring the Worker processes on a single machine Nimbus Coordinates work among Workers/Supervisors; maintains cluster stats =============== ======================================================================= Running Cluster =============== .. rst-class:: spaced .. image:: ./_static/cluster.png :width: 80% :align: center Tuple Tree ========== Tuple tree, anchoring, and retries. .. rst-class:: spaced .. image:: ./_static/wordcount.png :width: 70% :align: center ================== Real-time vs Batch ================== Queries over data ================= .. rst-class:: spaced .. image:: ./_static/all_data.png :width: 70% :align: center Sample data file ================ A slice of Twitter clickstream (``urls.json``): .. sourcecode:: json {"urlref": "http://t.co/1234", "url": "http://theatlantic.com/1234", "ts": "2014-01-01T08:01:000Z"} {"urlref": "http://t.co/1234", "url": "http://theatlantic.com/1234", "ts": "2014-01-01T08:02:000Z"} {"urlref": "http://t.co/1234", "url": "http://theatlantic.com/1234", "ts": "2014-01-01T08:03:000Z"} {"urlref": "http://t.co/1234", "url": "http://theatlantic.com/1234", "ts": "2014-01-01T08:04:000Z"} Pig example =========== Several billion such records (with much more variety) can be processed to find tweets driving high amounts of traffic to news publishers. .. sourcecode:: sql urls = LOAD 'urls.json' USING JsonLoader( 'url:chararray, urlref:chararray, ts:chararray'); url_group = GROUP urls BY url; url_count = FOREACH url_group GENERATE group, COUNT_STAR(urls) as clicks; DUMP url_count; --> (http://t.co/1234, 4) EMR cluster (lemur) =================== .. sourcecode:: clojure (defcluster pig-cluster :master-instance-type "m1.large" :slave-instance-type "m1.large" :num-instances 2 :keypair "emr_jobs" :enable-debugging? false :bootstrap-action.1 [ "install-pig" (s3-libs "/pig/pig-script") ["--base-path" (s3-libs "/pig/") "--install-pig" "--pig-versions" "latest"] ] :runtime-jar (s3-libs "/script-runner/script-runner.jar") ) EMR Pig steps (lemur) ===================== .. sourcecode:: clojure (defstep twitter-count-step :args.positional [ (s3-libs "/pig/pig-script") "--base-path" (s3-libs "/pig/") "--pig-versions" "latest" "--run-pig-script" "--args" "-f" "s3://pystorm/url_counts.pig" ] ) (fire! pig-cluster twitter-count-step) Precomputed views ================= .. rst-class:: spaced .. image:: ./_static/precomputed_view.png :width: 90% :align: center Twitter Click Spout (Storm) =========================== .. sourcecode:: clojure {"twitter-click-spout" (shell-spout-spec ;; Python Spout implementation: ;; - fetches tweets (e.g. from Kafka) ;; - emits (urlref, url, ts) tuples ["python" "spouts_twitter_click.py"] ;; Stream declaration: ["urlref" "url" "ts"] ) } Mock Spout in Python ==================== .. sourcecode:: python import storm import time class TwitterClickSpout(storm.Spout): def nextTuple(self): urlref = "http://t.co/1234" url = "http://theatlantic.com/1234" ts = "2014-03-10T08:00:000Z" storm.emit([urlref, url, ts]) time.sleep(0.1) TwitterClickSpout().run() Twitter Count Bolt (Storm) ========================== .. sourcecode:: clojure {"twitter-count-bolt" (shell-bolt-spec ;; Bolt input: Spout and field grouping on urlref {"twitter-click-spout" ["urlref"]} ;; Python Bolt implementation: ;; - maintains a Counter of urlref ;; - increments as new clicks arrive ["python" "bolts_twitter_count.py"] ;; Emits latest click count for each tweet as new Stream ["twitter_link" "clicks"] :p 4 ) } Mock Bolt in Python =================== .. sourcecode:: python import storm from collections import Counter class TwitterCountBolt(storm.BasicBolt): def initialize(self, conf, context): self.counter = Counter() def process(self, tup): urlref, url, ts = tup.values self.counter[urlref] += 1 # new count emitted to stream upon increment storm.emit([urlref, self.counter[urlref]]) TwitterCountBolt().run() Running a local cluster ======================= .. sourcecode:: clojure (defn run-local! [] (let [cluster (LocalCluster.)] ;; submit the topology configured above (.submitTopology cluster ;; topology name "test-topology" ;; topology settings {TOPOLOGY-DEBUG true} ;; topology configuration (mk-topology)) ;; sleep for 5 seconds before... (Thread/sleep 5000) ;; shutting down the cluster (.shutdown cluster) ) ) Combining Batch & Real-Time =========================== .. rst-class:: spaced .. image:: ./_static/storm_and_hadoop.png :width: 90% :align: center Marz's Lambda Architecture ========================== .. rst-class:: spaced .. image:: ./_static/lambda_architecture.png :width: 90% :align: center Eventual Accuracy ================= .. rst-class:: spaced .. image:: ./_static/absorb_data.png :width: 90% :align: center Parse.ly's Stream Architecture ============================== .. rst-class:: spaced .. image:: ./_static/parsely_architecture.png :width: 90% :align: center Where are we today? (1) ======================= ============= ========================================== Tool Usage ============= ========================================== ELB + nginx scalable data collection across web S3 cheap, redundant storage of logs Scrapy customizable crawling & scraping MongoDB sharded, replicated historical data Redis real-time data; past 24h, minutely SolrCloud content indexing & trends Storm\* **real-time** distributed task queue Kafka\* **multi-consumer** data integration Pig\* **batch** network data analysis ============= ========================================== Where are we today? (2) ======================= ================ ======================= ===================== Component Current Ideal ================ ======================= ===================== Real-time Storm + Redis Storm + Mongo Historical Pig/Storm + Mongo Evolved Mongo Schema Visitor Pig only Pig/Storm + Cassandra ================ ======================= ===================== Where are we today? (3) ======================= ================== ======================= ===================== Component Current Ideal ================== ======================= ===================== Recommendations Queues + Workers Storm + Solr? Crawling Queues + Workers Storm + Scrapy? Pig Mgmt Pig + boto lemur? Storm Mgmt petrel pystorm? ================== ======================= ===================== Other Log-Centric Companies =========================== ============= ========= ======== Company Logs Workers ============= ========= ======== LinkedIn Kafka* Samza Twitter Kafka Storm* Spotify Kafka Storm Wikipedia Kafka Storm Outbrain Kafka Storm LivePerson Kafka Storm Netflix Kafka ??? ============= ========= ======== Alternative Approaches ====================== ============= ========= ========== Company Logs Workers ============= ========= ========== Yahoo S4 S4 Amazon Kinesis ??? Google ??? Millwheel* Facebook Scribe* ??? UC Berkeley RDDs* Spark* ============= ========= ========== Python + Clojure ================ Opportunity for **Python & Clojure** to work together. **Python**: core computations & DB persistence. **fabric**: deployment & remote server management. **Clojure**: interop with JVM infrastructure: Storm & Hadoop. **lein**: manage Java's classpath & packaging nightmare. Python and JVM interop ====================== .. rst-class:: spaced .. image:: ./_static/python_and_data.png :width: 90% :align: center ========== Conclusion ========== How times change... =================== Two years ago, EC2's biggest memory box had 68GB of RAM & spinning disks. In early 2014, Amazon launched their ``i2`` instance types: =============== ======== ======== ========= Instance RAM SSD (!) Cores =============== ======== ======== ========= ``i2.8xlarge`` 244 GB 6.4 TB 32 ``i2.4xlarge`` 122 GB 3.2 TB 16 ``i2.2xlarge`` 61 GB 1.6 TB 8 =============== ======== ======== ========= * Each <$20/GB of RAM per month on-demand * Big memory, performant CPU, and fast I/O: all three! **It's the golden age of analytics.** What we've learned ================== .. rst-class:: build * There is no **silver bullet** database technology. * Especially for data problems with "the three V's". * Log storage is very cheap, and getting cheaper. * "Timestamped facts" is rawest form of data available. * Organizing around logs is a wise decision. What we're learning =================== .. rst-class:: build * Maybe databases aren't databases, but are just **indexes**. * Database isn't endpoint for data, but a **transformation**. * Duplicating data across databases isn't evil... * ... especially for query flexibility and latency ... * ... but only if **master data set makes rebuilds easy**! What is becoming clear ====================== .. rst-class:: build * There is a gap between Batch and Real-Time processing. * But, it may not be there for long. * Lots of active research going into making gap narrower. * Pig + Storm work today, and offer powerful abstractions. * Log-centric design (Kafka) will prep you for tomorrow. Ideal data architecture ======================= .. rst-class:: spaced .. image:: ./_static/ideal_architecture.png :width: 90% :align: center Questions? ========== Go forth and stream! Parse.ly: * http://parse.ly * http://twitter.com/parsely Me: * http://pixelmonkey.org * http://twitter.com/amontalenti .. raw:: html .. ifnotslides:: .. raw:: html .. ifslides:: .. raw:: html