As I was looking into few open, unassigned Jira issues in the Apache Ignite project, I observed this issue about Apache Flink integration - data streaming connector. It was a new feature request from the community about Ignite Flink Streamer, which consumes messages from Apache Flink and streams them into the Ignite cache. I started contributing to the Apache Ignite project in early 2016. I initially started with a few small issues and wanted to contribute new features. I went through the coding guidelines and made changes accordingly and shared my pull request. I was surprised to get initial code review feedback soon on few changes and once those were completed, the pull request was merged to master.

Stream processing topology

Apache IgniteSink offers a streaming connector to inject Flink data into the Ignite cache. The sink emits its input data to the Ignite cache. The key feature to note is the performance and scale both Apache Flink and Apache Ignite offer. Apache Flink can process unbounded and bounded data sets and has been designed to run stateful streaming applications at scale. Application computation is distributed and concurrently executed in clusters. Apache Flink is also optimized for local state access for tasks and does checkpointing of local state for durability. Apache Ignite provides streaming capabilities that allow data ingestion at high scale in its in-memory data grid.

In this article, we will discuss how we can build a data streaming application using Apache Flink and Apache Ignite. Building a data streaming application offers the benefit of ingesting large finite and infinite volumes of data in an optimized and fault tolerant way into the Ignite cluster. The data ingestion rate is very high and can scale up to millions of events per seconds.

Download a binary from the downloads page (https://flink.apache.org/downloads.html). You can pick Apache Flink only Binary with scala 2.11 version.

Go to the download directory.

Unpack the Downloaded Archive.

$ cd ~/Downloads        # Go to download directory
$ tar xzf flink-*.tgz   # Unpack the downloaded archive
$ cd flink-1.6.0
$ ./bin/start-cluster.sh  # Start Flink

Check the Dispatcher’s web frontend at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

You can also verify that the system is running by checking the log files in the logs directory:

$ tail log/flink-*-standalonesession-*.log

Download Kafka

Download a binary from the downloads page (https://kafka.apache.org/downloads). You can pick Apache Kafka 0.10.2.2 version with scala 2.11.

Unpack the Downloaded Archive

$ cd ~/Downloads        # Go to download directory
$ tar xzf kafka_2.11-0.10.2.2.tgz   # Unpack the downloaded archive
$ cd kafka_2.11-0.10.2.2

Start Zookeeper Server

$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

Start Broker

$ ./bin/kafka-server-start.sh ./config/server.properties 

Create Topic “mytopic”

$ ./bin/kafka-topics.sh --create --topic mytopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1

Describe topic “mytopic”

$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic

Produce Something Into the Topic

$ ./bin/kafka-console-producer.sh --topic mytopic --broker-list localhost:9092

Consume From the Topic Using the Console Producer

$ ./bin/kafka-console-consumer.sh --topic mytopic --zookeeper localhost:2181

Clone Apache Ignite

As of the writing of this document, the IgniteSink support for data streaming applications in Flink cluster is available in master branch.

$ git clone https://github.com/apache/ignite

Build Apache Ignite

$ mvn clean package install -DskipTests

Here we are using a Word Count example that is a simple example for a data streaming application. This application splits words as they arrive in a Kafka topic. The words are then grouped together in a time window of 10 seconds. Later, the frequency of the words are counted. Finally, the words and their frequency count value are published in Ignite using IgniteSink

// get the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// get input data by connecting to kafka
val text = env.addSource(FlinkKafkaConsumer010<String>(
            appProperties.getProperty("topic"), SimpleStringSchema(), appProperties))

// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
            .flatMap(Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(10))
            .sum(1)
            .map(Formatter())

//windowCounts.print()
windowCounts.addSink(igniteSink)
env.execute("Stream Window Count")
$ ./bin/flink run myapp—1.0-SNAPSHOT.jar

Produce Something Into the Topic

$ ./bin/kafka-console-producer.sh --topic mytopic --broker-list localhost:9092

The .out file will print the counts at the end of each time window as long as words are entered in, e.g.:

$ tail -f log/flink-*-taskexecutor-*.out
lorem : 1
bye : 1
ipsum : 4

Ignite Rest Service

As the above application continues to stream data of words and their frequency count, we can check the cache key value to monitor the published data. We can use the Ignite rest service to validate the data stream that was published.

$ curl -X GET http://localhost:8080/ignite\?cmd\=getall\&amp;k1\=key1\&amp;cacheName\=testCache

Scan Cache

To check all the keys from an Ignite cache, the following rest service can be used

$ curl -X GET http://localhost:8080/ignite?cmd=qryscanexe&amp;pageSize=10&amp;cacheName=testCache

$ ./bin/stop-cluster.sh

Summary

We covered how we can build a simple data streaming application using Apache Flink and Apache Ignite and create stream processing topology that will allow data streaming in a distributed, scalable and fault tolerant way that can process unbounded data sets consisting of millions of events.


About the Author

Saikat Maitra is a lead engineer at Target and committer on Apache Ignite.