Hi! My name is Malcolm McFarland. I‘m a programmer living in San Francisco and specializing in Python systems programming (using tools like Flask, ZeroMQ, SQLAlchemy, Django, CherryPy, etc), Javascript-intensive frontend programming (using jQuery, Bootstrap, BackboneJS, etc), and general systems management (with Linux, AWS, Docker, Fabric, etc). You can view my full resume here.

back to index

Dec 06, 2016 06:18

Samza and Jruby, or Streaming Dynamic Typing to the Masses (Pt 1)

PREFACE (AND A SHORTCUT)

If you’re impatient, Elias Levy was (to my knowledge) the first to do this and make it publically available. His port used version 0.9.1 of Samza and JRuby 1.7.23 (equivalent to Ruby 1.9.3). His work provided an invaluable starting point. However, if you’d like to work with a different version of Samza or JRuby, or just generally want to have a stronger understanding of how JRuby (and other JVM-based languages) can integrate into Samza’s build process, read on!

(I’m only going to as back-to-basics as the official hello-samza repository from the Samza project. The actual core of the project is elsewhere, but we’re going to stick with adapting the officially-sanctioned project baseline.)

INTRO

Samza

Samza is a general-purpose distributed stream processing framework that uses queue-based message passing for communication (by default via Kafka and guarantees at-least-once delivery of messages. It stubs out pluggable functionality for message serialization/deserialization, metrics aggregation, node-local key-value storage (by default, RocksDB), and more. It’s pretty open-ended in how it can be used – basically, so long as your message can be serialized, you can do whatever you want with it. The only limits on the content or structure of the message are those imposed the JVM or the subsystems.

On the flip side, this freedom can make implementation an arduous process; since Samza is general by design, it doesn’t prescribe how a system should be designed, and thus there are no real tools for topology declaration. That onus lies with the developer; I’ve personally found it helpful to settle on a basic message-passing structure up front (ie action/payload pairs).

JRuby

JRuby is a project to port Ruby to the JVM. Version 0.9.0 was released in 2006, and it has continued since then to be an actively maintained project with a host of contributors. There are 2 currently maintained release branches: the releases track Ruby 1.9.x, and the 9.x.x.x releases track Ruby 2.2.x. We’re going with the latter (9.1.6.0) in this project.

FIRST STEPS

To start, let’s pull down the latest version of hello-samza from Github into the local directory jruby-hello-samza:

$ git clone https://github.com/apache/samza-hello-samza jruby-hello-samza

Open pom.xml, the file used to coordinate Maven builds. Let’s add the JRuby maven plugin to the <dependencies></dependencies> section:

<dependency><groupid>org.jruby</groupid><artifactid>jruby-complete</artifactid><version>9.1.6.0</version></dependency>

Let’s also add some plugins for JRuby->Java source transpilation and downloading of JRuby gems; add this w/in the <plugins></plugins> section:

<plugin><groupid>de.saumya.mojo</groupid><artifactid>jruby-maven-plugin</artifactid><version>1.1.5</version><executions><execution><phase>generate-sources</phase><goals><goal>compile</goal></goals><configuration><generatejava>true</generatejava><generatedjavadirectory>${jruby.generated.sources}</generatedjavadirectory><verbose>true</verbose></configuration></execution></executions></plugin><plugin><groupid>de.saumya.mojo</groupid><artifactid>gem-maven-plugin</artifactid><version>1.1.5</version><configuration><includerubygemsinresources>true</includerubygemsinresources></configuration><executions><execution><goals><goal>initialize</goal></goals></execution></executions></plugin>

While you’re there, remove the org.apache.rat configuration from the <plugins> section; it’s there to ensure that all source code files have a license attached, which is just going to be a bloody nuisance for our current project.

Let’s also make a directory to store the Ruby source code:

$ mkdir -p src/main/ruby

and remove the Java source code and configuration files:

$ rm -r src/main/java
$ rm -r src/main/config/*

At this point we can follow the instructions from the Samza project’s Hello Samza documentation:

$ ./bin/grid bootstrap
$ mvn clean package
$ mkdir -p deploy/samza
$ tar -zxf ./target/hello-samza-0.11.0-dist.tar.gz -C deploy/samza

You now have a Samza build system running the latest version of Samza with a recent version of JRuby. Those last three lines build your Samza source code and “deploy” it. You’ll need to run them every time you make changes to your source or configuration files.

THERE’S NO RUBY LIKE J-RUBY

Now that we have Samza running with JRuby in tow, let’s write some JRuby. We’re going to start by creating a very simple task, one which will mirror the very basic elements of a Samza stream task. The purpose of this task is ludicrously simple(-minded): get a message from the Kafka input stream and write it to a file. While not in the slightest useful, it will demonstrate the minimum we need to get the two systems cooperating.

Add the following source code to a new file src/main/ruby/SourceStreamTask.rb:

require 'java'

java_package 'hello.jruby.test'

java_import 'org.apache.samza.system.IncomingMessageEnvelope'
java_import 'org.apache.samza.task.MessageCollector'
java_import 'org.apache.samza.task.TaskCoordinator'
java_import 'org.apache.samza.task.StreamTask'

class SourceStreamTask
  java_implements StreamTask

  java_signature 'void process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)'
  def process(envelope, collector, coordinator)
    msg = envelope.getMessage
    File.open("/tmp/message-stream-output.txt", "a") {|f| f.write("#{msg}\n")}
  end
end

The important elements here are:

java_package 'hello.jruby.test'

This is the full package path (important for the properties file).

java_implements StreamTask

java_signature 'void process(IncomingMessageEnvelope,     MessageCollector, TaskCoordinator)'

The basic Java interface stream tasks must implement to process data. There are others; we’ll get to those later.

CONFIGURATION

Each stream task needs to have a *.properties config file (Java properties file format) where to find the class, what systems it works with, etc. We see from the docs that the only truly required attributes are job.factory.class, job.name, task.class, and task.inputs, but let’s fill out a few more items to demonstrate some of the basic configurability. Save the following to the file src/main/config/source-task.properties:

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=source-task

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=hello.jruby.test.SourceStreamTask
task.inputs=kafka.source-input

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092

# Job Coordinator
job.coordinator.system=kafka
job.coordinator.replication.factor=1

A quick overview:

  • job.factory.class – almost always going to be YarnJobFactory
  • job.name – a unique identifier for this task
  • task.class – this is the fully-qualified name of our class; for JRuby, it’s the contents of the java_package statement followed by the Ruby class name
  • task.inputs – the system (kafka) and queue name (source-input) this task will read messages from
  • serializers.registry.string.class – class used to serialize/deserialize data; serdes (serializer/deserializers) are instantiated via serde factories, and basically are the translator between the local environment and the queueing system
  • systems.kafka.samza.msg.serde – declares that the above defined serde (“string”) will be used to translate messages in and out of kafka; we can also describe a separate serde for translating the key data, and serdes can be described on a per-queue basis (more on this later)

The rest of the options are system-level configuration options and can be left as-is. As you can probably see, cranking out more than a few of these properties files can be somewhat tiring; even if you reuse many of the options, this is still a lot of redundancy. Samza currently lacks a standard topology definition mechanism (a la Storm); this is by intent, as Samza aims to be a general stream processing framework (pass in anything, do anything, I don’t care).

We now need to update our assembly instructions to include this properties file in the build. Open up the file at src/main/assmembly/src.xml, and find <files> within the <assembly> section. You’ll see several entries for the deleted Wikipedia files; remove all of these <file> entries. Add the following to the <filesets> option group:

<fileset><directory>${basedir}/src/main/config</directory><includes><include>**/*.properties</include></includes><outputdirectory>config</outputdirectory><filtered>true</filtered></fileset>

This tells the Pom assembler to read and interpret every *.properties file in that directory. The corrollary here is that every properties file in that directory will need to be valid – ie, all of the wikpedia *.properties files (if you haven’t removed them) will fail, being as we’ve removed all of their corresponding Java classes.

Quick overview of the other properties for this option:

  • <outputdirectory>config</outputdirectory> means this file will be copied to the config/ directory in the Samza deployment package
  • <filtered>true</filtered> means that the variable placeholders will be substituted w/ real values (ie ${basedir} -> /complete/path/to/basedir); a complete list of interpolatable variables can be found at here

COMPILING/RUNNING

We now have a valid, albeit silly, stream task that simply waits for something to come in on its input and writes that message to a file. Let’s go ahead and compile it:

$ ./bin/grid stop all  ## just in case
$ mvn clean package
$ tar -zxf ./target/hello-samza-0.11.0-dist.tar.gz -C deploy/samza
$ ./bin/grid start all

Now we’ll run it using Samza’s run-job.sh script:

$ ./deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///Users/user-account/hello-samza/deploy/samza/config/source-task.properties

Notice how we’re using a complete path to the assembled version of the properties file, not the one we’re editing (ie not in src/main/config); the variables in this one have been interpolated by Maven.

Give the task a few (maybe 10) seconds to get running. You can see the input queue for this task listed as one of the available Kafka queues:

$ ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list

Let’s go ahead and throw some data at it (using one of the scripts available for interaction with Kafka):

$ echo "This is a great line" | ./deploy/kafka/bin/kafka-console-producer.sh --topic source-input --broker-list localhost:9092

We can see that this was written to our output file:

$ cat /tmp/message-stream-output.txt

Hooray! A totally pointless, bare-basics demonstration of writing a Samza task using JRuby. Next, we’ll actually do something useful.

Hi! My name is Malcolm McFarland. I‘m a programmer living in San Francisco and specializing in Python systems programming (using tools like Flask, ZeroMQ, SQLAlchemy, Django, CherryPy, etc), Javascript-intensive frontend programming (using jQuery, Bootstrap, BackboneJS, etc), and general systems management (with Linux, AWS, Docker, Fabric, etc). You can view my full resume here.