Core
IoT Platform v2.x
2

Create an alert with a business rule #

In this tutorial you will learn how to implement business rules to create an alert in the measure ingestion pipeline.

We will use both the Device Manager and the Rules Engine modules.

The following concepts are expected to be known:

In order to enrich measurements, we will use the event triggered downstream of the measures ingestion pipeline: engine:{engine-index}:device-manager:measures:process:after

Events respecting the format engine:{engine-index}:... are events confined to a particular tenant. A tenant user cannot save a workflow on an event that does not respect this format.

Our business rule comes in the form of a workflow that will listen to this event and create alerts if needed.

Rule Trigger #

First, we'll start declaring our workflow with the trigger.

This will be connected to the event of the measures ingestion pipeline.

Also, this workflow will only be triggered when receiving measurements for assets of the Container model.

Containers are assets used for transportation of refrigerated goods. They have a temperature measure and a metadata state indicating if they are moving or not. We want to trigger an alert if the temperature of a container is too high while it is moving.

As a reminder, the payload of this event contains 3 objects:

  • device: the new state of the device associated with the measures
  • measures: array of measures
  • asset: (optional) the new state of the asset linked to the device

We can therefore filter on the assets of the Container model by using the equals filter on the asset._source.model field. We will also use the filters to trigger our rule only when the state metadata is moving.

const coldChainAlertWorkflow: WorkflowContent = {
  name: "Cold Chain Alert",
  description:
    "Create an alert when a container is moving and the temperature is too high",
  payloadPath: ".",

  // We define a trigger of type "event" on our ingestion pipeline event
  trigger: {
    type: "event",
    event: "engine:{engine-index}:device-manager:measures:process:after",

    filters: {
      and: [
        // Check on the asset model
        { equals: { "asset._source.model": "Container" } },
        // Ensure the asset is moving
        { equals: { "asset._source.metadata.state": "moving" } },
      ],
    },
  },

  actions: [],
  lifecycle: {},
};

Action: create an alert document #

We will now define our action in the form of a Task.

This task will be responsible for creating the alert document with the appropriate content so it can be manipulated in the IoT Console.

We also want to acknowledge the existing alert if the temperature is back to normal.

For this, our task will take external arguments:

  • alertName: name of the alert
class CreateAssetAlertTask extends Task {
  constructor() {
    super('create-asset-alert');

    this.description = 'Create an alert on an asset';

    this.argumentsDescription = {
      alertName: {
        description: 'Name of the alert',
        type: 'string',
      },
    };
  }

  async run(
    context: WorkflowContext,
    initiator: DynamicObject,
    args: JSONObject
  ) {
    const asset = context.payload.asset as KDocument<AssetContent>;
    const tenantIndex = context.engineIndex;

    const temperatureTooHigh =
      asset._source.measures.temperature.values.temperature > 0;

    const pendingAlertExists = await this.pendingExists(
      tenantIndex,
      asset,
      args.alertName
    );

    if (temperatureTooHigh && !pendingAlertExists) {
      await this.create(tenantIndex, asset, args.alertName);
    } else if (!temperatureTooHigh && pendingAlertExists) {
      await this.acknowledge(tenantIndex, asset, args.alertName);
    }

    return context;
  }

  /**
   * Searches for an existing pending alert on the asset
   */
  private async pendingExists(
    tenantIndex: string,
    asset: KDocument<AssetContent>,
    alertName: string
  ) {
    const count = await this.context.accessors.sdk.document.count(
      tenantIndex,
      'alerts',
      {
        query: {
          and: [
            { equals: { 'alertRule._source.name': alertName } },
            { equals: { 'document._id': asset._id } },
          ],
        },
      },
      { lang: 'koncorde' }
    );

    return count > 0;
  }

  /**
   * Creates an alert using the standard alert format of the IoT Backend so
   * the alert can be displayed and manipulated in the IoT Console
   */
  private async create(
    tenantIndex: string,
    asset: KDocument<AssetContent>,
    alertName: string
  ) {
    const alert = {
      status: 'pending',
      alertRule: {
        _id: alertName,
        _source: {
          name: alertName,
        },
      },
      document: {
        _id: asset._id,
        _source: asset._source,
        collection: 'assets',
      },
    };
    await this.context.accessors.sdk.document.create(
      tenantIndex,
      'alerts',
      alert
    );
  }

  /**
   * Changes the pending alert status to acknowledged
   */
  private async acknowledge(
    tenantIndex: string,
    asset: KDocument<AssetContent>,
    alertName: string
  ) {
    const result = await this.context.accessors.sdk.document.search(
      tenantIndex,
      'alerts',
      {
        query: {
          and: [
            { equals: { 'alertRule._source.name': alertName } },
            { equals: { 'document._id': asset._id } },
          ],
        },
      },
      { lang: 'koncorde', size: 1 }
    );

    await this.context.accessors.sdk.document.update<AlertContent>(
      tenantIndex,
      'alerts',
      result.hits[0]._id,
      { status: 'acknowledged' }
    );
  }
}

Let's add our Task to our workflow:

const coldChainAlertWorkflow: WorkflowContent = {
  name: "Cold Chain Alert",
  description:
    "Create an alert when a container is moving and the temperature is too high",
  payloadPath: ".",

  // We define a trigger of type "event" on our ingestion pipeline event
  trigger: {
    type: "event",
    event: "engine:{engineId:device-manager:measures:process:after",

    filters: {
      and: [
        // Check on the asset model
        { equals: { "asset._source.model": "Container" } },
        // Ensure the asset is moving
        { equals: { "asset._source.metadata.state": "moving" } },
      ],
    },
  },

  actions: [
    {
      type: "task",
      name: "create-asset-alert",
      args: {
        alertName: "cold-chain",
      },
    },
  ],
  lifecycle: {},
};

Registration on the framework #

We will now register our workflow and our Task in the framework:

import { coldChainWorkflow } from './coldChainWorkflow';
import { CreateAssetAlertTask } from './tasks';

const workflows = app.plugin.get<WorkflowsPlugin>('workflows');

workflows.registerDefaultWorkflow(new Workflow(coldChainWorkflow));
workflows.registerTask(new CreateAssetAlertTask());

Now our workflow is functional and will be automatically added to the workflows list of newly created tenants.

To load this new workflow to existing tenants, it's possible to use the multi-tenancy/tenant:update action