Skip to content

A basic example of how to read and write data to Azure Event Hubs from an Apache Storm topology running on HDInsight.

License

Notifications You must be signed in to change notification settings

YichenZhang21/hdinsight-java-storm-eventhub

 
 

Repository files navigation

services platforms author
hdinsight
java
blackmist

Process events from Azure Event Hubs with Storm on HDInsight (Java)

Learn how to use Azure Event Hubs with Storm on HDInsight. This example uses Java-based components to read and write data in Azure Event Hubs. It also demonstrates how to write data to the default storage for your cluster, and how to send data to Power BI using the Power BI real-time streaming API.

Note: This example is created and tested on HDInsight. It may work on other Hadoop distributions, but you will need to change things like the scheme used to store data to HDFS.

Prerequisites

How it works

The resources/writer.yaml topology writes random data to an event hub. The data is generated by the DeviceSpout component, and is a random device ID and device value. So it's simulating some hardware that emits a string ID and a numeric value.

The resources/reader.yaml topology reads data from Event Hub (the data written by EventHubWriter,) parses the JSON data. It also emits the values read from Event Hub to Storm logs.

The resources/readertofile.yaml topology is the same as the reader.yaml topology, but it uses the HDFS-bolt component to write data to the HDFS-compatible file system used by HDInsight.

The resources/readertopowerbi.yaml topology is the same as the reader.yaml topology, but it uses a custom bolt component to write data to Microsoft Power BI using the Power BI real-time streaming API.

The data format in Event Hub is a JSON document with the following format:

{ "temperature": integer, "humidity": integer, "co2Level": integer }

The reason it's stored in JSON is compatibility - I ran into someone who wasn't formatting data sent to Event Hub as JSON (from a Java application,) and was reading it into a Java app. Worked fine. Then they wanted to replace the reading component with a C# application that expected JSON. Problem! Always store to a nice format that is future proofed in case your components change.

The parser component adds a timestamp value when it processes JSON data read from Event Hub. To demonstrate that you can add new values and aren't limited to just what you read in.

Required information

  • An Azure event hub with two shared access policies:

    • The writer policy must have write permission to the event hub.

    • The reader policy must have listen permissions to the event hub.

  • To connect to the event hub from Storm, you need the following information:

    • The connection string for the writer policy.

    • The policy key for the reader policy.

    • The name of your Event Hub.

    • The Service Bus namespace that your Event Hub was created in.

    • The number of partitions available with your Event Hub configuration.

    For information on creating an event hub, see the Create Event Hubs document.

Confgure and build

  1. Fork & clone the repository so you have a local copy.

  2. Add the Event Hub configuration to the dev.properties file. This is used to configure the spout that reads from Event Hub and the bolt that writes to it.

  3. Use mvn package to build everything.

    Once the build completes, the target directory will contain a file named EventHubExample-1.0-SNAPSHOT.jar.

Test locally

Since these topologies just read and write to Event Hubs, you can test them locally if you have a Storm development environment. Use the following steps to run locally in the dev environment:

  1. Run the writer:

    storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /writer.yaml --filter dev.properties
  2. Run the reader:

    storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties

Output is logged to the console when running locally. Use Ctrl+C to stop the topology.

Deploy

  1. Use SCP to copy the jar package to your HDInsight cluster. Replace USERNAME with the SSH user for your cluster. Replace CLUSTERNAME with the name of your HDInsight cluster:

     scp ./target/EventHubExample-1.0-SNAPSHOT.jar [email protected]:EventHubExample-1.0-SNAPSHOT.jar
    
    
    For more information on using `scp` and `ssh`, see [Use SSH with HDInsight](https://docs.microsoft.com/azure/hdinsight/hdinsight-hadoop-linux-use-ssh-unix).
    
    
  2. Use SCP to copy the dev.properties file to the server:

    scp dev.properties [email protected]:dev.properties
  3. Once the file has finished uploading, use SSH to connect to the HDInsight cluster. Replace USERNAME the the name of your SSH login. Replace CLUSTERNAME with your HDInsight cluster name:

  4. Use the following commands to start the topologies:

    storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties

    This will start the topologies and give them a friendly name of "reader" and "writer".

  5. To view the logged data, go to https://CLUSTERNAME.azurehdinsight.net/stormui, where CLUSTERNAME is the name of your HDInsight cluster. Select the topologies and drill down to the components. Select the port entry for an instance of a component to view logged information.

  6. Use the following commands to stop the reader:

    storm kill eventhubreader

Write output to HDFS

By default, the components needed to write to WASB or ADL (the file schemes used by HDInsight's HDFS-compatable storage) are not in Storm's classpath. To add them and write output to file, use the following steps:

  1. From the Azure Portal, select your HDInsight cluster.

  2. Select the Script actions entry, and then select + Submit new.

  3. Use the following values to fill in the Submit script action form:

    • Select a script: Select Custom
    • Name: Enter a name for this script. This is how it will appear in the script history.
    • Bash script URI: https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh
    • Node type(s): Select Nimbus and Supervisor node types.
    • Parameters: Leave this field blank.
    • Persist: Check this field.
  4. Select Create to run the script action.

  5. IMPORTANT! The dev.properties file assumes that your cluster is using Azure Storage as the default storage. If you are using Azure Data Lake Store instead, change the hdfs.url value in dev.properties to adl:///.

  6. Once the script completes, use the following command to start the topology that writes data to file:

    storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /readtofile.yaml --filter dev.properties
  7. To view the files generated by the topology, use the following command:

    hdfs dfs -ls /stormdata

    This command returns results similar to the following:

     Found 1 items
     -rw-r--r--   1 storm supergroup    5123 2017-10-05 17:25 /stormdata/hdfs-bolt-5-0-1507224331637.txt
    

    To view the contents of a file, use the following command:

    hdfs dfs -text /stormdata/filename

    Replace filename with the name of one of the files.

  8. To stop the topologies, use the following commands:

    storm kill eventhubwriter

Write data to Power BI

There isn't a pre-built Storm bolt for communicating with Power BI. However, Power BI provides a real-time streaming REST API that is easy to use. This project includes a PowerBIBolt.java component that demonstrates the basics of using the Power BI real-time streaming API.

  1. Use the steps in the https://powerbi.microsoft.com/en-us/documentation/powerbi-service-real-time-streaming/ document to learn how to work with real-time streaming in Power BI.

  2. Create a new Custom streaming dataset in Power BI. In the Values from stream section, add the following values:

    • temperature with a date type of Number
    • humidity with a data type of Number
    • co2level with a data type of Number
    • timestamp with a data type of Datetime
  3. Select Create, and then save the Push URL value returned. Select Done to finish configuring the real-time streaming API.

  4. On the Storm cluster, modify the dev.properties file and set powerbi.push.url to the push URL returned in step 3. This URL is used by the PowerBIBolt.java component when posting to Power BI.

  5. Use the following command to start the topology:

    storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /readtopowerbi.yaml --filter dev.properties
  6. In Power BI, add some tiles to the dashboard and set the real-time streaming API as the source. Note that the values update as data is read by the topology.

  7. Use the following commands to stop the reader and writer topologies:

    storm kill eventhubreader
    storm kill eventhubwriter

Project code of conduct

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

About

A basic example of how to read and write data to Azure Event Hubs from an Apache Storm topology running on HDInsight.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 97.7%
  • Shell 2.3%