Create an alert with a business rule #
In this tutorial you will learn how to implement business rules to create an alert in the measure ingestion pipeline.
We will use both the Device Manager and the Rules Engine modules.
The following concepts are expected to be known:
In order to enrich measurements, we will use the event triggered downstream of the measures ingestion pipeline: engine:{engine-index}:device-manager:measures:process:after
Events respecting the format engine:{engine-index}:...
are events confined to a particular tenant. A tenant user cannot save a workflow on an event that does not respect this format.
Our business rule comes in the form of a workflow that will listen to this event and create alerts if needed.
Rule Trigger #
First, we'll start declaring our workflow with the trigger.
This will be connected to the event of the measures ingestion pipeline.
Also, this workflow will only be triggered when receiving measurements for assets of the Container
model.
Containers are assets used for transportation of refrigerated goods. They have a temperature
measure and a metadata state
indicating if they are moving or not. We want to trigger an alert if the temperature of a container is too high while it is moving.
As a reminder, the payload of this event contains 3 objects:
device
: the new state of the device associated with the measuresmeasures
: array of measuresasset
: (optional) the new state of the asset linked to the device
We can therefore filter on the assets of the Container
model by using the equals
filter on the asset._source.model
field. We will also use the filters to trigger our rule only when the state
metadata is moving
.
const coldChainAlertWorkflow: WorkflowContent = {
name: "Cold Chain Alert",
description:
"Create an alert when a container is moving and the temperature is too high",
payloadPath: ".",
// We define a trigger of type "event" on our ingestion pipeline event
trigger: {
type: "event",
event: "engine:{engine-index}:device-manager:measures:process:after",
filters: {
and: [
// Check on the asset model
{ equals: { "asset._source.model": "Container" } },
// Ensure the asset is moving
{ equals: { "asset._source.metadata.state": "moving" } },
],
},
},
actions: [],
lifecycle: {},
};
Action: create an alert document #
We will now define our action in the form of a Task.
This task will be responsible for creating the alert document with the appropriate content so it can be manipulated in the IoT Console.
We also want to acknowledge the existing alert if the temperature is back to normal.
For this, our task will take external arguments:
alertName
: name of the alert
class CreateAssetAlertTask extends Task {
constructor() {
super('create-asset-alert');
this.description = 'Create an alert on an asset';
this.argumentsDescription = {
alertName: {
description: 'Name of the alert',
type: 'string',
},
};
}
async run(
context: WorkflowContext,
initiator: DynamicObject,
args: JSONObject
) {
const asset = context.payload.asset as KDocument<AssetContent>;
const tenantIndex = context.engineIndex;
const temperatureTooHigh =
asset._source.measures.temperature.values.temperature > 0;
const pendingAlertExists = await this.pendingExists(
tenantIndex,
asset,
args.alertName
);
if (temperatureTooHigh && !pendingAlertExists) {
await this.create(tenantIndex, asset, args.alertName);
} else if (!temperatureTooHigh && pendingAlertExists) {
await this.acknowledge(tenantIndex, asset, args.alertName);
}
return context;
}
/**
* Searches for an existing pending alert on the asset
*/
private async pendingExists(
tenantIndex: string,
asset: KDocument<AssetContent>,
alertName: string
) {
const count = await this.context.accessors.sdk.document.count(
tenantIndex,
'alerts',
{
query: {
and: [
{ equals: { 'alertRule._source.name': alertName } },
{ equals: { 'document._id': asset._id } },
],
},
},
{ lang: 'koncorde' }
);
return count > 0;
}
/**
* Creates an alert using the standard alert format of the IoT Backend so
* the alert can be displayed and manipulated in the IoT Console
*/
private async create(
tenantIndex: string,
asset: KDocument<AssetContent>,
alertName: string
) {
const alert = {
status: 'pending',
alertRule: {
_id: alertName,
_source: {
name: alertName,
},
},
document: {
_id: asset._id,
_source: asset._source,
collection: 'assets',
},
};
await this.context.accessors.sdk.document.create(
tenantIndex,
'alerts',
alert
);
}
/**
* Changes the pending alert status to acknowledged
*/
private async acknowledge(
tenantIndex: string,
asset: KDocument<AssetContent>,
alertName: string
) {
const result = await this.context.accessors.sdk.document.search(
tenantIndex,
'alerts',
{
query: {
and: [
{ equals: { 'alertRule._source.name': alertName } },
{ equals: { 'document._id': asset._id } },
],
},
},
{ lang: 'koncorde', size: 1 }
);
await this.context.accessors.sdk.document.update<AlertContent>(
tenantIndex,
'alerts',
result.hits[0]._id,
{ status: 'acknowledged' }
);
}
}
Let's add our Task
to our workflow:
const coldChainAlertWorkflow: WorkflowContent = {
name: "Cold Chain Alert",
description:
"Create an alert when a container is moving and the temperature is too high",
payloadPath: ".",
// We define a trigger of type "event" on our ingestion pipeline event
trigger: {
type: "event",
event: "engine:{engineId:device-manager:measures:process:after",
filters: {
and: [
// Check on the asset model
{ equals: { "asset._source.model": "Container" } },
// Ensure the asset is moving
{ equals: { "asset._source.metadata.state": "moving" } },
],
},
},
actions: [
{
type: "task",
name: "create-asset-alert",
args: {
alertName: "cold-chain",
},
},
],
lifecycle: {},
};
Registration on the framework #
We will now register our workflow and our Task
in the framework:
import { coldChainWorkflow } from './coldChainWorkflow';
import { CreateAssetAlertTask } from './tasks';
const workflows = app.plugin.get<WorkflowsPlugin>('workflows');
workflows.registerDefaultWorkflow(new Workflow(coldChainWorkflow));
workflows.registerTask(new CreateAssetAlertTask());
Now our workflow is functional and will be automatically added to the workflows list of newly created tenants.
To load this new workflow to existing tenants, it's possible to use the multi-tenancy/tenant:update
action