class: center, middle # Build and Deploy a Spark Cassandra App cody@koeninger.org --- # Me * Professionally using Scala since 2.7, Spark since 0.7 * Contributed features & bugfixes to Spark and Spark-Cassandra * Don't claim to be an expert, just a guy who gets stuff done * Ask me questions, feel free to challenge my opinions --- # Agenda Some Cassandra knowledge => use Spark to solve your problems 1. Installation 2. Build & Deploy 3. Performance Tips --- # Installation 1. JDK 2. sbt 3. This example app 4. Cassandra 5. Spark --- ## JDK * http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html * Oracle JDK 1.7 * **Not** OpenJDK * **Not** Java 8 --- ## sbt * https://github.com/paulp/sbt-extras * Download, chmod +x, put it on your path * No need to "install" Scala or mess with sbt launch jar * If you want an IDE, use IntelliJ with sbt * Yes, it's another build tool. It's the best build tool for Scala, and Scala is the best tool for writing Spark jobs. --- ## This example app ```shell git clone https://github.com/koeninger/spark-cassandra-example.git ``` Make sure it works ```shell cd spark-cassandra-example sbt test ``` --- ## Cassandra * version 2.0.x, should work with 2.1.x as well * Don't use 2.0.5 through 2.0.9 (see [CASSANDRA-7535](https://issues.apache.org/jira/browse/CASSANDRA-7535)) * follow the datastax [install recommendations](http://www.datastax.com/documentation/cassandra/2.0/cassandra/install/installRecommendSettings.html) * if co-locating spark and cassandra on same nodes, in cassandra-env.sh set MAX_HEAP_SIZE (more on this later) --- ## Spark Install (tl;dr version) * http://spark.apache.org/docs/1.1.0/spark-standalone.html * Set up [passwordless ssh](http://lmgtfy.com/?q=passwordless+ssh) from master to worker nodes as the user you want to run spark * [Download pre-built spark](http://spark.apache.org/downloads.html) version 1.1.0, hadoop version doesn't matter unless you're running hdfs * Unpack spark in the **same location** on every node (rsync is your friend) * Edit /path/to/spark/conf/slaves, one worker hostname per line * On master, ```shell /path/to/spark/start-all.sh ``` --- ## Make sure Spark is working * Look at master web ui, make sure it shows workers as ALIVE http://whatever-master-node:8080 * Run a simple task in spark shell ```shell /path/to/spark/bin/spark-shell ``` ```scala sc.parallelize(1 to 50). map(i => java.net.InetAddress.getLocalHost). collect.foreach(println) ``` * Should see each worker hostname in the printout --- ## Which nodes to run Spark processes on? * driver: one per running job, low resources unless you collect() a bunch of data back to the driver * master: one per cluster, low resources * worker: many per cluster, usually max 1 per node, low resources * executor: spawned by worker on same node, high resources In short, as long as everything's on the same network, it doesn't matter where you put driver and master. Can be on some of the worker nodes, or their own small node. --- ## Spark Configuration * Follow the general [tuning guidelines](http://spark.apache.org/docs/1.1.0/tuning.html) * Spark can write a lot of files, make sure ulimit is set high, at least number of map tasks x number of reduce tasks * Setting spark.shuffle.consolidateFiles = true or spark.shuffle.manager = SORT can help with the number of files, but test carefully * The [hardware guidelines](http://spark.apache.org/docs/1.1.0/hardware-provisioning.html) are aggressive. If your budget doesn't support that start with e.g. several m3.xlarge instances and see if you need more. --- ## Spark and Cassandra cohabitation * Running spark workers on cassandra nodes will improve i/o performance due to data locality, but they will compete for system resources * in spark-env.sh set SPARK_WORKER_MEMORY, in cassandra-env.sh set MAX_HEAP_SIZE, divide ram amongst them * Leave enough ram for off-heap cache (if applicable) and os buffers * in spark-env.sh set SPARK_WORKER_CORES if you want * Cluster scans are rough on cassandra, so limit spark jobs to off-peak hours if you can * Another option is setting up replication to a separate Cassandra DC for analytics --- ## What about other deploy modes? * Local mode: OK for testing, but you can also just run standalone mode with one or two workers on your laptop. * EC2 scripts: A convenience for frequently bring up / shutting down spark, which is probably not what you're doing if you're co-installing spark with cassandra. * YARN: Don't do this unless you have a YARN cluster already * Mesos: Don't do this unless: 1. you have a Mesos cluster already AND 2. need fine-grained sharing of CPU resources, AND 3. like looking through twice as many developer mailing lists when something doesn't work the way you expect. --- # Build ```shell cd /spark-cassandra-example ``` Edit to set cassandra connection info for your environment ```shell vi cassandra-example.conf ``` Load demo schema into cassandra ```shell cqlsh --file cassandra-example.cql ``` Build an assembly jar, aka uber jar, with all dependencies included: ```shell sbt assembly ``` --- ## Deploy ```shell scp target/scala-2.10/cassandra-example-assembly-0.1-SNAPSHOT.jar driver-node:~/ scp cassandra-example.conf driver-node:~/ ``` On driver node (either in a GNU screen session or nohup'ed): ```shell /path/to/spark/bin/spark-submit --properties-file cassandra-example.conf \ --class org.koeninger.HelloWorldExample \ cassandra-example-assembly-0.1-SNAPSHOT.jar ``` If you're writing some shell script to configure spark or do classpath manipulation, you're probably doing it wrong. **Just use spark-submit** --- ## Per-job configuration Don't do this: ```scala val conf = new SparkConf(). setMaster("some hardcoded master"). setJars("some hard to maintain jar name")... ``` That requires a recompile whenever you want to change config. Instead, just do this: ```scala val conf = new SparkConf() ``` then specify all configuration via spark-submit --properties-file somejob.conf Yes, this means any common settings from the master's spark-defaults.conf need to be duplicated in the per-job properties file. If you want to piggyback your own settings onto the spark conf, prefix them with spark.*, e.g. spark.my.com.fizzrate --- ## sbt survival * Useful sbt commands to know: * compile * test * test-only com.foo.SomeFailingTest * console * console-quick * assembly * Prefix any command with ~ to make sbt watch for changes and auto-rerun, e.g. ~compile * If anything goes wrong, make sure you're in the **base directory** of your checkout and run sbt clean update --- ## Key points about assembly Don't include spark in the assembly, it will already be on the classpath, so mark it as **provided** ```scala "org.apache.spark" %% "spark-catalyst" % "1.1.0" % "provided" ``` If there's any transitive dependency that may conflict with spark, **exclude** it ```scala ("com.someorg" % "some-package" % "6.6.6"). exclude("org.slf4j", "slf4j-api"). exclude("com.google.guava", "guava") ``` For more complex assembly issues, see mergeStrategy https://github.com/sbt/sbt-assembly If you have trouble finding out where dependencies came from, https://github.com/jrudolph/sbt-dependency-graph --- # Performance Tips 1. Use the UI 1. Understand closures 1. Understand partitions 1. Understand shuffle --- ## Use the web UI * Tweaking performance without measuring is pointless, and the UI is the easiest way to measure * Master UI is on port 8080, driver UI is on 4040 * Configure spark.eventLog.enabled and spark.eventLog.dir (in the master's spark-defaults.conf as well) * That way, driver UI is also viewable from master after a job ends * What to look at: 1. duration 2. number of partitions (tasks) per stage 3. shuffle write --- ![](pictures/slow.png) --- Same goal as in previous slide, but much less shuffle, so over twice as fast: ![](pictures/fast.png) --- ## Understand closures (what not to do) Don't do this ```scala // this runs in the driver val foo = new SomeExpensiveNotSerializableThing someRdd.map { x => // this runs in the executor, so... // the attempt to close over foo will throw NotSerializableException foo.mangle(x) } ``` Don't do this either ```scala someRdd.map { x => // this runs in the executor, ok... // but gets constructed for every element in the RDD, so will be slow val foo = new SomeExpensiveNotSerializableThing foo.mangle(x) } ``` --- ## Understand closures (what to do) Do this instead: ```scala someRdd.mapPartitions { part => // this runs in the executor, constructed only once per group of elements val foo = new SomeExpensiveNotSerializableThing part.map { x => // then used for each element foo.mangle(x) } } ``` --- ## Understand closures (other issues) If you encounter other closure-related issues, make sure that * all captured references are serializable * all captured references are immutable * all captured references are either local vals, or in a static object As a specific example, don't extend App: ```scala object MyApp extends App { // start running ``` Use an explicit main method instead ```scala object MyApp { def main(args: Array[String]) { // start running ``` --- ## Understand partitions * If data is partitioned into fewer tasks than there are executor cores, some cores may be idle * If there are many more tasks than cores, scheduler delay may be larger than work (delay is visible in the per-stage details link) * General rule of thumb, at least as many tasks as 2 * total number of executor cores * Set spark.default.parallelism, pass # of partitions to methods that accept it * Increase # partitions with rdd.repartition (which triggers a shuffle, so is costly), decrease with rdd.coalesce * If partitions are uneven in size, one executor may get overloaded * Common cause of unevenness is doing a grouping operation on an RDD where many keys are None; in this case, filter them out first * As always, MEASURE. There may be good reasons for a small or large # of partitions. --- ## Understand shuffle * Whenever data needs to be redistributed among executors, a shuffle occurs * This typically occurs for by-key operations on PairRDD: groupBy, reduceByKey, etc. * Shuffle involves writing data to disk, then reading off of other executors' disks over the network. * Memory hierarchy 101: disk and network are slower than cache and ram, so shuffle is slow * Speeding things up requires either writing less data, or triggering fewer shuffles --- ## Write less data * set spark.serializer = org.apache.spark.serializer.KryoSerializer * If you're using groupBy in order to do an aggregation you're probably doing it wrong. * reduceByKey, combineByKey, aggregateByKey do work **before** the shuffle, so they potentially write less data * Similar to combiners in hadoop --- ## Trigger fewer shuffles If an rdd does not have a partitioner, and you do a transformation that requires it to be partitioned a certain way, it will trigger a shuffle. Cassandra RDDs do not have partitioners, and grouping operations on them (even when grouping on the rowkey) trigger a shuffle: ```scala scala> val visits = sc.cassandraTable[(String, Long)]("test", "user_visits"). select("user", "utc_millis") scala> visits.partitioner res0: Option[org.apache.spark.Partitioner] = None scala> visits.groupByKey.partitioner res3: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@1) ``` Some transformations lose the partitioner, so avoid them after a shuffle: ```scala scala> visits.groupByKey.map(x => x).partitioner res4: Option[org.apache.spark.Partitioner] = None scala> visits.groupByKey.mapValues(x => x).partitioner res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@1) ``` --- ## Trigger fewer shuffles Even though spark has no way of knowing it (yet), Cassandra RDDs are partitioned such that a given row of values for a rowkey will all be in the same partition, in order. So instead of doing ```scala visits.reduceByKey { value => SomePerItemAggregationFunction(value) } ``` You can do ```scala visits.mapPartitions { lotsOfValues => SomePerPartitionAggregationFunction(lotsOfValues) } ``` which will result in a map-only stage, instead of a shuffle. See GroupingExample.scala vs PartitionGroupingExample.scala for details --- ## Trigger fewer shuffles Joins require a shuffle, so are not particularly efficient. This is true even if both RDDs are from cassandra tables with the same rowkey, and the join is on the rowkey. If one of the RDDs is significantly smaller than the other, you may be better off collecting it to the driver, converting it to a hashmap, then re-broadcasting it to every executor. Then the join can be performed as a map-only stage against the larger RDD. See JoinExample.scala vs BroadcastJoinExample.scala for details --- # Resources * [Scala for the Impatient](http://www.horstmann.com/scala/index.html), best book for picking up Scala quickly and accurately. First half is [free online](https://typesafe.com/resources/e-books) * [Austin Scala Meetup](http://www.meetup.com/Austin-Scala-Enthusiasts/) * [Learning Spark](http://shop.oreilly.com/product/0636920028512.do), awesome book on Spark by some of the main committers. In Early Access but already well worth the money * [AMP Camp](http://ampcamp.berkeley.edu/), Spark training camps free online * [Spark Internals](https://www.youtube.com/watch?v=49Hr5xZyTEA) * [A Deeper Understanding of Spark Internals](https://www.youtube.com/watch?v=dmL0N3qfSc8) * [Spark Cassandra Connector](https://github.com/datastax/spark-cassandra-connector) * [Spark Project](https://spark.apache.org/)