class: center, middle # Exactly-Once Streaming from Kafka --- ## Kafka is a ~~message queue~~ circular buffer * Split into topic/partition * Fixed size, based on disk space or time * Oldest messages deleted to maintain size * Messages are otherwise immutable * Indexed only by offset * Client tracks read offset, not server ###Delivery semantics are _your responsibility_ From Kafka, through a transformation, to results in your data store --- ##At-most-once 1. Save offsets 2. !! Possible failure !! 3. Save results ###On failure, restart at last saved offset, messages are lost --- ##At-least-once 1. Save results 2. !! Possible failure !! 3. Save offsets ###On failure, messages are repeated No possible magic config option to do better than this --- ## Idempotent exactly-once 1. Save results with a natural unique key 2. !! Possible failure !! 3. Save offsets ###On failure, messages are repeated, but we don't care Immutable messages and a pure transformation yield the same results --- ## Idempotent pros / cons Pro: * Simple * Works well for shape-preserving transformations (map) Con: * May be hard to identify natural unique key * Especially hard for aggregate transformations (fold) * Won't work for destructive updates Note: * Results and offsets may be in different data stores --- ## Transactional exactly-once 1. Begin transaction 2. Save results 3. Save offsets 4. Ensure offsets are ok (increasing without gaps) 5. Commit transaction ### On failure, rollback, results and offsets remain in sync --- ## Transactional pros / cons Pro: * Works easily for any transformation * Destructive updates ok Con: * More complex * Requires a transactional data store Note: * Results and offsets must be in same data store --- ![](slides/kafka-old.png) --- ## Receiver-based stream pros / cons Pro: * WAL design could work with non-Kafka data sources Con: * Long running receivers make parallelism awkward and costly * Duplication of write operations * Dependent on HDFS * Must use idempotence for exactly-once * No access to offsets, can't use transactional approach --- ![](slides/kafka-new.png) --- ## Direct stream pros / cons Pro: * Spark partition 1:1 Kafka topic/partition, easy cheap parallelism * No duplicate writes * No dependency on HDFS * Access to offsets, can use idempotent or transactional Con: * Specific to Kafka * Need adequate Kafka retention (OffsetOutOfRange is _your fault_) --- ## Don't care about semantics? ![](slides/spark-kafka-change-cpu-utilization.png) ### How about server cost? --- ## Basic direct stream API ```scala val stream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( streamingContext, Map( "" -> "localhost:9092,anotherhost:9092"), "auto.offset.reset" -> "largest" ), Set("sometopic", "anothertopic") ) ``` --- ## Basic direct stream API delivery semantics auto.offset.reset -> largest: * Starts at latest offset, thus losing data * Not at-most-once (need to set maxFailures as well) auto.offset.reset -> smallest: * Starts at earliest offset * At-least-once, but replays whole log ### If you want finer grained control, must store offsets somewhere --- ## Where to store offsets Spark Checkpoint: * Easy * No need to access offsets, will be used on restart * Must use idempotent, not transactional * Checkpoints may not be recoverable Your own data store: * Complex * Need to access offsets, save them, and provide them on (re)start * Idempotent or transactional * Offsets are just as recoverable as your results --- ## Spark checkpoint Same as any other Spark checkpoint: ```scala def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val stream = KafkaUtils.createDirectStream(...) // setup DStream ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate( checkpointDirectory, functionToCreateContext _) ``` Keep in mind you still need idempotent storage of results. Other than that, you're done. --- ## Providing offsets on (re)start ```scala // begin from the the offsets committed to the database val fromOffsets = DB.readOnly { implicit session => sql"select topic, part, off from txn_offsets". map { resultSet => TopicAndPartition(resultSet.string(1), -> resultSet.long(3) }.list.apply().toMap } val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd.message.length val stream: InputDStream[Int] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Int]( streamingContext, Map("" -> "localhost:9092,anotherhost:9092"), fromOffsets, messageHandler ) ``` This is the more advanced API. Instead of a set of topics, it takes: * a map of TopicAndPartition -> the offset to start from * a function to extract the desired value from each message and metadata. --- ## Accessing offsets, per message ```scala val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.partition, mmd.offset, mmd.key, mmd.message) ``` Your message handler has full access to all of the metadata. This may not be the most efficient way, though. --- ## Accessing offsets, per batch ```scala stream.foreachRDD { rdd => // Cast the rdd to an interface that lets us get an array of OffsetRange val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val results = rdd.someTransformationUsingSparkMethods ... // Your save method. Note that this runs on the driver mySaveBatch(offsetRanges, results) } ``` Each OffsetRange in the array has the following fields * topic: Kafka topic name * partition: Kafka partition id * fromOffset: inclusive starting offset * untilOffset: exclusive ending offset --- ## Accessing offsets, per partition This is safe because each Spark partition is 1:1 with a Kafka topic/partition ```scala stream.foreachRDD { rdd => // Cast the rdd to an interface that lets us get an array of OffsetRange val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => // index to get the correct offset range for the rdd partition we're working on val offsetRange: OffsetRange = offsetRanges(TaskContext.get.partitionId) val perPartitionResult = iter.someTransformationUsingScalaMethods // Your save method. Note this runs on the executors. mySavePartition(offsetRange, perPartitionResult) } } ``` This is **not safe** because there is a shuffle, so no longer 1:1 ```scala rdd.reduceByKey.foreachPartition { ... ``` --- ## Storing offsets transactionally Make sure that start of this offset range matches end of last saved offset range ```scala // localTx is transactional DB.localTx { implicit session => // store results ... // store offsets val offsetRows = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (offsetRows != 1) { // rollback and/or throw exception } } ``` --- class: center, middle # Questions?