Apache NiFi automates dataflows by receiving data from any source, such as Twitter, Kafka, databases, and so on, and sends it to any data processing system, such as Hadoop or Spark, and then finally to data storage systems, such as HBase, Cassandra, and other databases. There can be multiple problems at these three layers, such as systems being down, or data production and consumption rates are not in sync. Apache NiFi addresses the dataflow challenges by providing the following key features:
Hortonworks supports Apache NiFi in their product Hortonworks DataFlow (HDF).
Hortonworks' latest sandbox VM has a pre-configured NiFi setup. The NiFi service should be added from Ambari. Go to ipaddressofsandbox:8080
and log in with admin
/admin
credentials. On the bottom left of the Ambari page, navigate to Actions | Add Service, check the NiFi service, then configure and deploy.
Configure the port number and other properties in the nifi.properties
as needed during the configuration stage.
Log in to the NiFi UI at ipaddressofsandbox:9090/nifi
.
If you want to download HDF, instructions for downloading, installing, and running the service can be found at http://docs.hortonworks.com/HDPDocuments/HDF1/HDF-1.2.0.1/bk_HDF_GettingStarted/content/ch_HDF_GettingStarted.html.
There are prebuilt NiFi templates that can be downloaded and tested in your NiFi environment. Templates are available at https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates. Download the necessary templates to your machine.
In the NiFi window, you need to click on the Templates link in the upper right corner. Click on Browse, select the template, click on Open, and then import it.
Drag the template from the upper left-hand side menu to the main screen and then select the template to create the workflow from an XML document. Follow the instructions mentioned for every template in the preceding link. Right-click on the processors and verify the configurations. Click on the play button at the top to start the workflow.
More information on how to create and manage dataflows is available at https://nifi.apache.org/docs.html.
Let's create a simple dataflow in NiFi by getting data from Kafka and writing to a Spark Streaming application and HDFS. The goal of this application is to analyze Ambari logs and produce a number of INFO
, WARN
, and ERROR
messages in a given timeframe. Perform the following commands to create a Kafka topic and start sending the Ambari agent log data to the topic to analyze in the Spark Streaming application:
cd /usr/hdp/current/kafka-broker/bin/ ./kafka-topics.sh --zookeeper localhost:2181 --topic ambarilogs --create --partitions 2 --replication-factor 1 tail -f /var/log/ambari-agent/ambari-agent.log | ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic ambarilogs
Download spark-reciever
and site-to-site client jars. Check the NiFi version and download compatible versions. This example downloads 0.5.1 version related jars:
mkdir /opt/spark-receiver cd /opt/spark-receiver wget http://central.maven.org/maven2/org/apache/nifi/nifi-site-to-site-client/0.5.1/nifi-site-to-site-client-0.5.1.jar wget http://central.maven.org/maven2/org/apache/nifi/nifi-spark-receiver/0.5.1/nifi-spark-receiver-0.5.1.jar
In Ambari, go to Spark service configurations and add the following two properties in custom spark-defaults
and then restart Spark service:
spark.driver.allowMultipleContexts true spark.driver.extraClassPath /opt/spark-receiver/nifi-spark-receiver-0.5.1.jar:/opt/spark-receiver/nifi-site-to-site-client-0.5.1.jar:/opt/nifi-0.5.1.1.1.2.0-32/lib/nifi-api-0.5.1.1.1.2.0-32.jar:/opt/nifi-0.5.1.1.1.2.0-32/lib/bootstrap/nifi-utils-0.5.1.1.1.2.0-32.jar:/opt/nifi-0.5.1.1.1.2.0-32/work/nar/framework/nifi-framework-nar-0.5.1.1.1.2.0-32.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-0.5.1.1.1.2.0-32.jar
In Ambari NiFi configurations, change the following site-to-site configurations and restart the service:
nifi.remote.input.socket.host= nifi.remote.input.socket.port=8055 nifi.remote.input.secure=false
On the NiFi UI (ipaddressofsandbox:9090/nifi
), drag a processor and choose and add GetKafka processor. Right-click on the processor and click on configure. Enter the Zookeeper connection string as ipaddressofsandbox:2181
(for example, 192.168.139.167:2181)
and topic as ambarilogs
. Add another PutHDFS processor and configure to write to the /tmp/kafka
directory and specify the configuration resources as /etc/hadoop/conf/core-site.xml
and /etc/hadoop/conf/hdfs-site.xml
. Set auto terminate relationships in HDFS processor. Also, drag an output port and name it as Data for Spark
. Connect the processors and start the dataflow. Make sure to remove all warnings that are shown at the top of the processor.
Now, let's create a Scala Spark streaming application ambari-logs.sh
with the following code to pull data from NiFi workflow:
cd /opt/spark-receiver vi ambari-logs.sh
The code is as follows:
import org.apache.nifi._ import java.nio.charset._ import org.apache.nifi.spark._ import org.apache.nifi.remote.client._ import org.apache.spark._ import org.apache.nifi.events._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.nifi.remote._ import org.apache.nifi.remote.client._ import org.apache.nifi.remote.protocol._ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import java.io._ import org.apache.spark.serializer._ import org.apache.nifi.remote.client.SiteToSiteClient import org.apache.nifi.spark.{NiFiDataPacket, NiFiReceiver} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkNiFiAmbari { def main(args: Array[String]) { val conf = new SiteToSiteClient.Builder().url("http://localhost:9090/nifi").portName("Data for Spark").buildConfig() val config = new SparkConf().setAppName("Ambari Log Analyzer") val ssc = new StreamingContext(config, Seconds(30)) val packetStream: ReceiverInputDStream[NiFiDataPacket] = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) val lines: DStream[String] = packetStream.flatMap(packet => new String(packet.getContent).split(" ")) val pairs: DStream[(String, Int)] = lines.map(line => (line.split(" ")(0), 1)) val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() } } SparkNiFiAmbari.main(Array())
Start the Spark shell by passing the program to it:
spark-shell –i ambari-logs.sh
The output will appear on the screen as follows:
------------------------------------------- Time: 1471091460000 ms ------------------------------------------- (INFO,46) (WARN,4) (ERROR,1)
Check the HDFS directory (/tmp/kafka
) to see whether the data is being written from Kafka. The NiFi dataflow looks similar to Figure 6.9. Note that data buffering occurs if you stop the Spark Streaming application, and data will not be lost:
We are now ready to play with the various notebooks and dataflow tools. Happy coding!