Over a million developers have joined DZone.

Brewing Beer With Raspberry Pi: Stream Analytics

DZone's Guide to

Brewing Beer With Raspberry Pi: Stream Analytics

The beer brewing exercise continues, using Azure's IoT Hub and SQL database to perform stream analytics on the temperature of the beer.

· IoT Zone
Free Resource

When cooling beer we want to store history of temperatures for two reasons. First, it gives us valuable history data for next cooling sessions. Second, we can ask measurements when we temporarily lost connection with IoT Hub. In this post we'll make some analysis and then build up a database for our beer cooling solution.

Creating a SQL Azure database

I’m sure I want to brew eisbock more than once and therefore I have more than one cooling session coming. As all these sessions introduce data I want to store I need something to find out what measurements belong to what cooling session. In this point I want to introduce a new term – batch. In brewing, batch means the concrete brew in container.

So, in total we need two tables:

  • Batch – Batch key, measuring state, cooling rate and device used for measuring.
  • Measurement – Batch key, timestamp, beer temperature, ambient tempereture.

Here is the database diagram. Although we expect currently the ambient temperature to be constant this doesn’t hold always true and in the future I also want to consider situations where ambient temperature is changing (it’s important in spring and autumn when cooling takes longer due to less frost).

Beer cooling solution database diagram

SQL script to create these two tables is here. This SQL works on Azure SQL too, so just create database there, take this script with copy-paste and run it:

CREATE TABLE [dbo].[Batch] (
    [BatchKey]    NVARCHAR (15) NOT NULL,
    [DeviceId]    NVARCHAR (15) NOT NULL,
    [CoolingRate] FLOAT (53)    CONSTRAINT [DF_Batch_CoolingRate] DEFAULT ((0)) NOT NULL,
    [IsActive]    BIT           CONSTRAINT [DF_Batch_IsActive] DEFAULT ((0)) NOT NULL,
CREATE TABLE [dbo].[Measurement] (
    [Id]          INT           IDENTITY (1, 1) NOT NULL,
    [BatchKey]    NVARCHAR (15) NOT NULL,
    [Time]        DATETIME      NOT NULL,
    [BeerTemp]    FLOAT (53)    NOT NULL,
    [AmbientTemp] FLOAT (53)    NOT NULL,
    CONSTRAINT [FK_Measurement_Batch] FOREIGN KEY ([BatchKey]) REFERENCES [dbo].[Batch] ([BatchKey])

Now we have simple SQL Azure database where we can save measurements data.

Getting Data From Azure IoT Hub to SQL Database

Now comes the most complex part of this show. We have to get data reported to IoT hub to SQL Azure database, and we don’t want to write any code for this. Also we don’t want to write all data to the database because temperature changes are not rapid, and taking averages over some small time window helps us keep the database smaller without losing any important information.

Here’s the full path for measurements to get from the device to the SQL database.

How data moves from device to SQL database

Stream Analytics is an Azure service for real-time data processing and aggregating. All Stream Analytics queries are run in some time window on data that is flowing in. Stream Analytics takes data from inputs, processes it and then sends to outputs. In our case, Azure IoT Hub is the input, and our SQL database is the output.

Creating a Stream Analytics Job

Now go to the Azure portal and create a new Stream Analytics job:

Creating new Stream Analytics job

Adding Input

When a new Stream Analytics job is created, you are redirected to the main page of the Azure portal. Browse to your newly created Stream Analytics job, open it and click on the Inputs box. This opens an inputs list on the right. Click on the Add button.

Stream Analytics: Adding new inputGive a name to input, and select “Data stream” as its type. For source, select “IoT Hub”.

In the “IoT Hub” field, insert the subhost name part of your IoT Hub address. If your IoT Hub address is something.azure-devices.net then the value on “IoT hub” field must be “something”.

For shared access policy, I took “service”, and to the shared acces key field, you have to insert the key from this policy. You can find the policy key from the IoT Hub settings. Just open the settings in Iot Hub, select “Shared access policies”, and then “service”. From the policy properties window, copy the value from “Primary key” and paste it here.

When the data is inserted and the user saves it, then Azure portal checks if a new input source can be connected. Just wait a few moments after clicking the Save button to see if the input source is okay.

Adding Output

Stream Analytics: Adding new outputNow click on outputs block on main page of Stream Analytics job and add new output. For us, the output will be SQL database.

For the “Database” field, insert the name of the database you created before.

For the “Server name” field, insert your database server address. For SQL Azure, the address is like something.database.windows.net. If you have a database hosted on your own, or if you are running SQL Server on an Azure virtual machine, then insert the IP of your server because you don’t have any access to DNS servers used by Azure.

User name and password should be obvious so I'll skip them. Last field is Table, and for this field write "Measurement."

Now we have all the data inserted, and it’s time to save. Click “Create” to save database as a new output source.

The names you gave to the input and output sources are the ones you will use later when writing Stream Analytics queries. So choose these names carefully and make sure they make sense. It makes it easier to understand queries later.

Creating a Stream Analytics Query

As a last thing we have to add query that is run on incoming data flow. As Azure's new portal doesn’t support testing of Stream Analytics queries yet, we have to switch to the old management portal and insert our query there.

Stream Analytics: Query editor

Now things get a little bit tricky because the query must know input and output formats. On the input side, we have data that is structured like our measurement objects we are sending out from the device. On the output side we have the measurements table we created above. Additionally, we have to define aggregates on all numbers in our query.

Now we have one additional problem. The Stream Analytics query is running on data that is coming in from a data source. Currently the data we are sending to Azure IoT Hub doesn’t have information about what batch it is, and there’s no way on the Stream Analytics side to make the decision. We have to add a batch key to the data transfer object we are using to send data to Azure IoT Hub.

We add a new batchKey attribute to the anonymous DTO we are using in ReportMeasurement method.

private void ReportMeasurement(DateTime time, double beerTemp, double ambientTemp, double estimate)
    var beerMeasurement = new
        deviceId = "MyDevice",
        batchKey = "Eisbock-1",
        timeStamp = time,
        beerTemp = beerTemp,
        ambientTemp = ambientTemp,
        estimate = estimate
    var messageString = JsonConvert.SerializeObject(beerMeasurement);
    var message = new Message(Encoding.ASCII.GetBytes(messageString));

Now we can go on and write a query that saves data to SQL Azure database. Here is the Stream Analytics query:

    batchKey as BatchKey,
    MAX(CAST([timeStamp] AS datetime)) as Time,
    AVG(beerTemp) as BeerTemp,
    AVG(ambientTemp) as AmbientTemp
    TumblingWindow(minute, 5)

Some notes. We cast the timestamp field to datetime, because otherwise Stream Analytics considers it as something that should be casted to float. Not sure why it is so. As we are interested in temperatures during five minute time windows (called also as tumbling window), we take the averages of temperatures reported.

Once we run a Stream Analytics job we can’t make changes there. To change something we have to stop the job and then do our modifications.

Adding First Batch to Database

Before we can start gathering data we need at least one batch to be available in Batch table. Add new batch with following data:

BatchKey: Eisbock-1

DeviceId: MyDevice

IsActive: 1

After adding this row to Batch table we are ready to run to test if data gets from IoT Hub to SQL Azure database.

Testing Stream Analytics Job

Now run Stream Analytics job and open your IoT Hub so you see its dashboard. Also open your database in SQL Server Management Studio, and be ready to make the "select from Measurement" table. Run the beer cooling solution and see if data starts coming. In SQL Azure, the data comes in with some delay. It depends on how wide the data window is over the results we are aggregating.

Data in SQL database inserted by Stream Analytics

If everything is okay, then soon you shoud see new data in your Measurement table. If the time window is five minutes, then you should wait at least five minutes before you see any data.

Wrapping Up

This was long post full of analysis and configuring of services, but we made it and now our data is flowing from IoT Hub to a SQL Azure database. As we are aggregating measurement results over time windows of 5 minutes, we store less data than is coming in, but still we don’t lose much, and for the next batches we have useful historical data to take.

raspberry pi ,azure ,sql

Published at DZone with permission of Gunnar Peipman, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}