Core
IoT Platform v2.x
2

Enrich measures with a business rule #

In this tutorial you will learn how to implement business rules to enrich measure in the measure ingestion pipeline.

We will use both the Device Manager and the Rules Engine modules.

The following concepts are expected to be known:

In this example, we will develop a business rule that:

  • enriches an existing measure
  • create a new measure

In order to enrich measurements, we will use the event triggered upstream of the measures ingestion pipeline: engine:{engine-index}:device-manager:measures:process:before

Events respecting the format engine:{engine-index}:... are events confined to a particular tenant. A tenant user cannot save a workflow on an event that does not respect this format.

Our business rule comes in the form of a workflow that will listen to this event and enrich the measurements.

Rule Trigger #

First, we'll start declaring our workflow with the trigger.

This will be connected to the event of the measures ingestion pipeline.

Also, this workflow will only be triggered when receiving measurements for devices of the Abeeway model.

As a reminder, the payload of this event contains 3 objects:

  • device: the last state of the device associated with the measures
  • measures: array of measures
  • asset: (optional) the last state of the asset linked to the device

We can therefore filter on the devices of the Abeeway model by using the equals filter on the device._source.model field.

const enrichmentWorkflow: WorkflowContent = {
  name: "Measure Enrichment",
  description: "Enrich an existing measure and create a new one",
  payloadPath: ".",

  // We define a trigger of type "event" on our ingestion pipeline event
  trigger: {
    type: "event",
    event: "engine:{engine-index}:device-manager:measures:process:before",

    filters: {
      equals: { "device._source.model": "Abeeway" },
    },
  },

  actions: [],
  lifecycle: {},
};

Action 1: enrich existing measure #

We will now define our first action in the form of a Task.

Here is the measure definition of our Device:

const powerMeasure: MeasureDefinition = {
  valuesMappings: {
    volt: { type: "float" },
    ampere: { type: "float" },
    watt: { type: "float" },
  },
};

Our Abeeway device sends measures of voltage and amperage, so we will calculate the watts and add them to the measure.

class EnrichWatt extends Task {
  constructor() {
    super('enrich-watt');

    this.description = 'Enrich the measures with the watts computation';
  }

  async run(context: WorkflowContext) {
    const measures = context.payload.measures as MeasureContent[];

    for (const measure of measures) {
      measure.values.watt = measure.values.volt * measure.values.ampere;
    }

    return context;
  }
}

Our Task iterates over all the measurements in the payload (usually just one) and adds the watt field to the measurement.

Let's add our Task to our workflow:

const enrichmentWorkflow: WorkflowContent = {
  name: "Measure Enrichment",
  description: "Enrich an existing measure and create a new one",
  payloadPath: ".",
  trigger: {
    type: "event",
    event: "engine:{engine-index}:device-manager:measures:process:before",
    filters: {
      equals: { "device._source.model": "Abeeway" },
    },
  },

  actions: [
    {
      // Type of the action
      type: "task",
      // Name of the task defined in the constructor
      name: "enrich-watt",
    },
  ],

  lifecycle: {},
};

Action 2: create new measure #

In this action, we are going to use an external API to find the price of the kilowatt hour according to the date of the measure.

This information will then be logged into a new powerPrice measure:

const powerPriceMeasure: MeasureDefinition = {
  valuesMappings: {
    price: { type: "float" },
  },
};

In our Task, we will add this new measure to the table of measures intended to be logged (in the form of a MeasureContent) as well as inside the Device (in the form of an EmbeddedMeasure).

class FetchPowerPrice extends Task {
  constructor() {
    super('fetch-power-price');

    this.description = 'Fetch the current price of the kilowatt hour';
  }

  async run(
    context: WorkflowContext,
    initiator: DynamicObject,
    args: JSONObject
  ) {
    const measures = context.payload.measures as MeasureContent[];
    const device = context.payload.device as KDocument<DeviceContent>;

    const currentPrice = await fetchCurrentPrice();

    const powerPriceMeasure: MeasureContent = {
      type: 'powerPrice',
      measuredAt: Date.now(),
      values: {
        price: currentPrice,
      },
      origin: {
        type: 'computed',
        _id: 'fetch-power-price',
        measureName: 'powerPrice',
        payloadUuids: measures[0].origin.payloadUuids,
      },
    };

    // Add the measure to be historized
    measures.push(powerPriceMeasure);

    // Embed the measure into the device
    device._source.measures.powerPrice = {
      type: 'powerPrice',
      name: 'powerPrice',
      measuredAt: Date.now(),
      payloadUuids: measures[0].origin.payloadUuids,
      values: {
        price: currentPrice,
      },
    };

    return context;
  }
}

Then, add the Task to our workflow:

const enrichmentWorkflow: WorkflowContent = {
  name: "Measure Enrichment",
  description: "Enrich an existing measure and create a new one",
  payloadPath: ".",
  trigger: {
    type: "event",
    event: "engine:{engine-index}:device-manager:measures:process:before",
    filters: {
      equals: { "device._source.model": "Abeeway" },
    },
  },

  actions: [
    {
      type: "task",
      name: "enrich-watt",
    },
    {
      type: "task",
      name: "fetch-power-price",
    },
  ],

  lifecycle: {},
};

Registration on the framework #

We will now register our workflow and our Task in the framework:

import { enrichmentWorkflow } from './enrichmentWorkflow';
import { FetchPowerPrice, EnrichWatt } from './tasks';

const workflows = app.plugin.get<WorkflowsPlugin>('workflows');

workflows.registerDefaultWorkflow(new Workflow(enrichmentWorkflow));
workflows.registerTask(new EnrichWatt());
workflows.registerTask(new FetchPowerPrice());

Now our workflow is functional and will be automatically added to the workflows list of newly created tenants.

To load this new workflow to existing tenants, it's possible to use the multi-tenancy/tenant:update action