Apache NiFi And Kafka Docker Example

May 12, 2019

Big Data: Managing The Flow Of Data With Apache NiFi And Apache Kafka

In the Hadoop ecosystem, Apache NiFi is commonly used for the ingestion phase. Apache NiFi offers a scalable way of managing the flow of data between systems. When you’re trying to get information from point A to B, numerous issues can occur. For instance, networks can fail, software crashes, people make mistakes, the data can be too big, too fast, or in the wrong format. NiFi will handle these issues under the hood so developers can focus on the applications themselves.

Some of NiFi’s features include:

Guaranteed Delivery

NiFi guarantees the delivery of data. This is achieved through effective use of a purpose-built persistent write-ahead log and content repository.

Data Buffering / Pressure Release

Nifi can buffer data when a given data source outpaces some part of the processing or delivery chain. NiFi also supports the ability to delete queued data after a specified amount of time has elapsed.

Prioritized Queuing

NiFi allows the setting of one or more prioritization schemes for how data is retrieved from a queue. The default is oldest first, but there are times when data should be pulled newest first, largest first, or some other custom scheme.

Quality of Service

There are times when the data must be processed and delivered within seconds to be of any value. NiFi enables the administrator to prioritize latency over throughput or loss tolerance, etc.

Terminology

Apache NiFi revolves around the idea of processors. A processor is a node in the graph that does work. This typically consists of performing some kind of operation on the data, loading the data into NiFi or sending the data out to some external system. Some example of processors are:

  • GetFile: Loads the content of a file
  • UpdateAttribute: Updates FlowFile attributes (i.e. schema.name) which can then be accessed by other processors
  • PublishKafka: Sends the contents of a FlowFile as a message to Apache Kafka

On the other hand, a process group is a collection of processors and their connections. Controller services are available to all of the processors in the process group in which they were created.

Some examples of controller services include:

  • AvroSchemaRegistry: Stores Avro schemas in the registry which can then be retrieved by other controller services
  • AvroRecordSetWriter: Write and codes data in Avro format

Another key concept in NiFi is the FlowFile. A FlowFile is data at a given position in the graph and some additional metadata. We can view FlowFiles by clicking on the ‘List queue’ dropdown menu option of a connection.

For example, the following Flowfile has a unique id and file name.

By clicking on view, we can see the actual data moving from one processor to another.

Architecture

NiFi executes within a JVM on a host operating system.

Web Server

Unlike most software, the administration of Apache NiFi is done through a user interface.

Flow Controller

Handles all of the logic related to processors.

FlowFile Repository

The FlowFile Repository is where NiFi stores the metadata for a FlowFile that is presently active in the flow.

Content Repository

The Content Repository is where the actual content of a given FlowFile live. More than one file system storage location can be specified so as to reduce contention.

Provenance Repository

The Provenance Repository is where all provenance event data is stored. In essence, provenance event data tells you what occurred and when.

NiFi is also able to operate within a cluster enabling it to scale indefinitely.

Every node in the NiFi cluster performs the same tasks on the data, but each operates on a different set of data. Apache ZooKeeper is used to elect the Cluster Coordinator and handle failover automatically. The administrator can interact with the NiFi cluster through the user interface of any node and any change are replicated to all nodes in the cluster.

Code

In this example, we’ll take a CSV file and publish it to Kafka. We’ll be using docker to setup our environment. Copy the following content into docker-compose.yml and run docker-compose up -d.

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines: 
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup. 
    # If the latter is true, you will need to change the value 'localhost' in 
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those 
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  nifi:
    image: apache/nifi:latest
    ports:
      - 8080:8080

Next, open a browser on your host machine and go to localhost:8080/nifi.

In the top left configure, drag a processor onto the main panel. Select and configure the GenerateFlowFile processor to run every 60 seconds, so that we don’t constantly generate data.

We’ll be using a simple dataset that contains the years of experience and salary of 30 individuals.

YearsExperience,Salary  
1.1,39343.00  
1.3,46205.00  
1.5,37731.00  
2.0,43525.00  
2.2,39891.00  
2.9,56642.00  
3.0,60150.00  
3.2,54445.00  
3.2,64445.00  
3.7,57189.00  
3.9,63218.00  
4.0,55794.00  
4.0,56957.00  
4.1,57081.00  
4.5,61111.00  
4.9,67938.00  
5.1,66029.00  
5.3,83088.00  
5.9,81363.00  
6.0,93940.00  
6.8,91738.00  
7.1,98273.00  
7.9,101302.00  
8.2,113812.00  
8.7,109431.00  
9.0,105582.00  
9.5,116969.00  
9.6,112635.00  
10.3,122391.00  
10.5,121872.00

Again, configure the GenerateFlowFile by pasting the data into the custom text property.

Note: any property in bold is mandatory, the rest are optional.

Drag a processor onto the main panel and select UpdateAttribute. For the UpdateAttribute processor, under properties, click on the plus sign in the top right corner. Then, create a schema.name property with a value of test.schema.

Now, we’ll create our controller services. Right click on the main panel and select Configure.

From here we can create all the controller services for our processor group.

Click the plus sign in the top right corner and select AvroSchemaRegistry. Under the properties tab, create a new property called test-schema and paste the following schema into the value field.

{  
   "type" : "record",  
   "namespace" : "Test",  
   "name" : "Employee",  
   "fields" : [  
      { "name" : "YearsExperience" , "type" : "float" },  
      { "name" : "Salary" , "type" : "float" }  
   ]  
}

We’re going to need CSVReader as well. Under the properties tab, configure it to use the schema name property, the AvroSchemaRegistry and treat the first line as the column headers.

Finally, create a AvroRecordSetWriter and configure it to use the AvroSchemaRegistry.

Before continuing, make sure that you enable all the controller services by clicking on the lightning bolt on the far right.

Now that we finished setting up the controller services, we’ll create a Kafka topic by running the following command.

docker-compose exec kafka  \  
kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181

Verify it worked correctly.

docker-compose exec kafka  \  
kafka-topics --describe --topic test --zookeeper zookeeper:2181

Create PublishKafkaRecord and configure it as follows.

Be sure to check the Automatically Terminate Relationships since this is the last processor in our graph.

To create connections processors, drag arrows between them.

Finally, to start the flow, right click on each processor and select start. If everything is working as expected, each of the original rows should be written back out.

docker-compose exec kafka  \  
  kafka-console-consumer --bootstrap-server localhost:29092 --topic test --from-beginning --max-messages 30

Profile picture

Written by Cory Maklin Genius is making complex ideas simple, not making simple ideas complex - Albert Einstein You should follow them on Twitter