Enterprise Integration Zone is brought to you in partnership with:

Distributed Systems Developer, Architect focused on Hadoop, Cassandra, Kafka, MySQL using Scala, C++ and Python. Joe is a DZone MVB and is not an employee of DZone and has posted 26 posts at DZone. You can read more from them at their website. View Full User Profile

Developing Apache Kafka Producers and Consumers

01.27.2014
| 7225 views |
  • submit to reddit

 I gave a presentation recently on Real-time streaming and data pipelines with Apache Kafka.


A correction in the talk (~ 22 minutes in) : I said that you have to have all your topic data fit on one server.  That is not true, you can’t span logs so you have to have all of your data for a partition fit on one server.  Kafka will spread your partitions around for you within topics.

For that presentation I put together sample code for producing and consuming with an Apache Kafka broker using Scala.

To get up and running, use vagrant.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

git clone https://github.com/stealthly/scala-kafka
cd scala-kafka
vagrant up
./sbt test
Your entry point is the test file.

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
package ly.stealth.testing
 
import org.specs2.mutable._
import java.util.UUID
import kafka.consumer._
import kafka.producer._
import kafka.utils._
import kafka.akka._
import akka.actor.{Actor, Props, ActorSystem}
import akka.routing.RoundRobinRouter
 
class KafkaSpec extends Specification with Logging {
 
  "Simple Producer and Consumer" should {
    "send string to broker and consume that string back" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
 
      var testStatus = false
 
      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)
 
      val consumer = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")
 
      def exec(binaryObject: Array[Byte]) = {
        val message = new String(binaryObject)
        info("testMessage = " + testMessage + " and consumed message = " + message)
        testMessage must_== message
        consumer.close()
        testStatus = true
      }
 
      info("KafkaSpec is waiting some seconds")
      consumer.read(exec)
      info("KafkaSpec consumed")
 
      testStatus must beTrue // we need to get to this point but a failure in exec will fail the test
    }
 
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString
 
      var testStatus1 = false
      var testStatus2 = false
 
      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)
 
      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")
 
      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }
 
      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")
 
      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")
 
      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }
 
      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")
 
      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
 
  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString
 
      var testStatus1 = false
      var testStatus2 = false
 
      info("starting akka producertesting")
      val system = ActorSystem("testing")
 
      val actorCount = 1
 
      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")
 
      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }
 
      producer ! testMessage
 
      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")
 
      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }
 
      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")
 
      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")
 
      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }
 
      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")
 
      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

On the producer side I have started to look more into using Akka. The prototype for this implementation is in the test case above “Akka Producer and Consumer” and broken out here.

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString
 
      var testStatus1 = false
      var testStatus2 = false
 
      info("starting akka producertesting")
      val system = ActorSystem("testing")
 
      val actorCount = 1
 
      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")
 
      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }
 
      producer ! testMessage
 
      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")
 
      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }
 
      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")
 
      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")
 
      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }
 
      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")
 
      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

What is really nice is you can up that actorCount and really start to run test data to analyze.

/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
********************************************/

Published at DZone with permission of Joe Stein, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)