Browse Month

August 2019

Alert engine using Azure Stream Analytics and SQL Azure as Reference data

This article is an extension of some great work link and link .

As part of any IoT solution or workflow you would need an alerting engine.Luckily Azure provides us Azure Stream Analytics which can be used to create an alert engine.

There are quite a few articles available on how to set Azure Stream Analytics job and configure Input and output.I am making an assumption that readers know how to set Input and Output in Azure Stream Analytics.We won’t be going in detail on how to set them. The area which we would look into is how to set an alert when telemetry or metric which devices transmit is not constant and can change.Say “device1” transmit (temperature and humidity) while “device2” sends us details about voltage and current.We can’t have separate job for all these devices with separate rules defined.

The best thing in Azure Stream Analytics to handle these kind of cases is to define reference data.In this article we would see how we can use SQL Azure as our Reference data and create alert engine for devices .

We would start by creating a reference data in SQL Azure table.Lets say we have a rule defined in JSON format like this:

[ { “PropertyName”: “Temperature”, “threshold”: 40.0, “comparisonoperator”: “>” }, {“PropertyName”: “Temperature”, “threshold”: 20.0, “comparisonoperator”: “<” } ]

Here we have defined two conditions for temperature. Threshold is set to 40 and 20 respectively for operator “>” and “<“.It is pretty much clear that we want alert to fire once Temperature crosses 40 and falls below 20.

Now lets see the how we are going to achieve it now.

CREATE TABLE [dbo].[RulesContainer](
[RuleId] [bigint] IDENTITY(1,1) NOT NULL,
[DeviceId] [nvarchar] (125)NULL,
[SensorRules] [nvarchar] (1000) NOT NULL,
[ValidFrom] [datetime2] GENERATED ALWAYS AS ROW START NOT NULL,
[ValidTo] [datetime2] GENERATED ALWAYS AS ROW END NOT NULL,
PRIMARY KEY CLUSTERED
(
[RuleId] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY],
PERIOD FOR SYSTEM_TIME ([ValidFrom], [ValidTo])
) ON [PRIMARY]
WITH
(
SYSTEM_VERSIONING = ON ( HISTORY_TABLE = [dbo].[RulesContainerHistory] )
)
GO

ALTER TABLE [dbo].[RulesContainer] WITH CHECK ADD CONSTRAINT [SensorRules_JSON] CHECK ((isjson([SensorRules])>(0)))
GO

ALTER TABLE [dbo].[RulesContainer] CHECK CONSTRAINT [SensorRules_JSON]
GO

We would first start by creating a temporal table in SQL Azure database.Note that the mandatory elements of every temporal table are the PERIOD definition and the SYSTEM_VERSIONING clause with a reference to another user table that will store historical row versions.

We would then go to Azure portal and create a Stream Analytics job.Under input please add SQL Database as Reference Input.While adding SQL Database as reference data please enter a query that would be used to create snapshot.In our case it would be

select RuleId,DeviceId,TenantId,SensorRules
from dbo.RulesContainer
FOR SYSTEM_TIME AS OF @snapshotTime

In any IoT solution the rules would needed to be updated.For example we would begin with setting temperature threshold to 40 for some devices. Later we may add some more devices in our factory or site and would need to update rules with these devices. Here is the setting which helps to refresh updated rules in ASA job periodically. Please select the option to refresh it “ON” and enter the duration.

Now the most important part of our solution which is the query.

CREATE TABLE sqlref
(
RuleId bigint ,
DeviceId nvarchar(max),
SensorRules array
)

WITH TelemetryAndRules AS
(
SELECT
ref.RuleId as RuleId,
T.DeviceId AS DeviceId,
  sensorRules.ArrayValue.PropertyName as measurementname,
sensorRules.ArrayValue.threshold as Threshold,
sensorRules.ArrayValue.comparisonoperator as Comparisonoperator,
GetRecordPropertyValue(T, sensorRules.ArrayValue.PropertyName) as MeasurementValue,
System.Timestamp() as ProcessedTime
FROM
[zainhubinpout] T PARTITION BY PartitionId
TIMESTAMP BY T.EventEnqueuedUtcTime
JOIN [sqlref] ref ON T.DeviceId = ref.DeviceId
CROSS APPLY GetArrayElements(ref.sensorRules) AS sensorRules
),
AggregateWindow AS (
SELECT
TR.ruleid as ruleid,
TR.deviceid as deviceid,
TR.MeasurementName as measurementname,
TR.Threshold as threshold,
TR.comparisonoperator as comparisonoperator,
AVG(TR.MeasurementValue) as avgmeasurementvalue
FROM
TelemetryAndRules TR PARTITION BY PartitionId
GROUP BY
TR.ruleid,
TR.deviceid,
TR.measurementname,
TR.Threshold,
TR.comparisonoperator,
TumblingWindow(minute, 15)
)
select
ruleid as RuleId,
deviceid as DeviceId,
measurementname as PropertyName,
threshold as Threshold,
comparisonoperator as ComparisionOperator,
avgmeasurementvalue as MeasurementValue,
System.Timestamp as ProcessedTime
INTo sqloutput
FROM AggregateWindow PARTITION BY PartitionId
WHERE
NOT(threshold IS NULL) AND
(
(comparisonoperator = ‘>’ AND
(avgmeasurementvalue) > threshold) OR
(comparisonoperator = ‘<‘ AND
(avgmeasurementvalue)< threshold) OR
(comparisonoperator = ‘=’ AND
(avgmeasurementvalue) =threshold)
)

What this query is all about.At first we declare a table which would tell ASA the datatype of columns which is used in our reference table.One thing to note is SensorRules is defined as array datatype because we are using GetArrayElements function which would return dataset with array values.The usage of this function is documented here .

Once we have the values then next part of query deals with Windowing Function.We set the Tumbling Window to be 15 minutes.You can choose the window as per your convenience. At last we compare the value with our ruleset.

So how our table in which we save our Alerts looks like.

CREATE TABLE [dbo].[TelemetryAlerts](
[AlertId] [bigint] IDENTITY(1,1) NOT NULL,
[RuleId] [bigint] NOT NULL,
[DeviceId] [nvarchar] (125) NOT NULL,
[PropertyName] [nvarchar] (200) NOT NULL,
[Threshold] [float] NOT NULL,
[ComparisionOperator] [varchar] (10) NOT NULL,
[MeasurementValue] [float] NOT NULL,
[ ProcessedTime ] [datetime] NOT NULL
CONSTRAINT [PK_dbo.PK_TelemetryAlerts] PRIMARY KEY CLUSTERED
(
[AlertId] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

A better option would be to store these Alerts in CosmosDB or Table storage but its as per design of application.We selected SQL table for the sake of simplicity.