AWS IoT Pipelines In Action: IoT Core, Lambda, Kenisis, Analytics


Today we will show you two end to end pipeline using AWS IoT Core and other AWS services.
  1. devices publish their status to the cloud, and cloud will process the events and write abnormal status to a noSQL database.
  2. device publish their status to the cloud, and we'll do real-time stream analytics on the events using Kenisis Analytics.

Pipeline 1 : Process data using Lambda

    +-----------+      +----------+     +------------+    +------------+
    |           |      |  Message |     |            |    |            |
    | IoT Device|      |  Broker  |     |  Rules     |    |  Kenisis   |
    |           +----> |          +---> |  Engine    +--> |            |
    |           |      |          |     |            |    |            |
    +-----------+      +----------+     +------------+    +-----+------+
                                                                |
                                                                v
                                        +------------+    +------------+
                                        |  DynamoDB  |    |            |
                                        |            | ---+  Lambda    |
                                        |            |    |            |
                                        +------------+    +------------+
IoT Devices publish its status to the Message Broker, which is one components of AWS IoT core, using MQTT. Rules Engine (again one components of AWS IoT) is set up to channel the message to a Kenisis stream, which is setup as the trigger event to the Lambda function. The Lambda function is where the process happens, or business logic, if you like.

Lambda

The lambda function will take records (Kenisis stream's terminology) from the Kenisis stream, i.e it's trigger event, then filter, process, store, and pass to other stage.
Check the code section below for details.

Pipeline 2 : Real-time analytics using Kenisis Analytics

    +-----------+      +----------+     +------------+    +------------+
    |           |      |  Message |     |            |    |            |
    | IoT Device|      |  Broker  |     |  Rules     |    |  Kenisis   |
    |           +----> |          +---> |  Engine    +--> |            |
    |           |      |          |     |            |    |            |
    +-----------+      +----------+     +------------+    +-----+------+
                                                                |
                                                                v
                                        +------------+    +------------+
                                        |  Output    |    |  Kenisis   |
                                        |            |----+  Analytics |
                                        |            |    |            |
                                        +------------+    +------------+
The first four stages of the pipeline is the same as pipeline 1. But here we channel the Kenisis stream to the Kenisis Analytics to do real time analysis. If you know about Hadoop/Spark ecosystem, Kenisis Analytics is equivalent to Spark Stream.

Kenisis Analytics

You can use a syntax similar SQL to analysis the stream data over a window period.
  • An example: filter data
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TEMP INT, EVENTID INT);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM TEMP, EVENTID
FROM "SOURCE_SQL_STREAM_001"
WHERE EVENTID = 2;
  • result
You'll see new events being added to the result as time goes by. Note that the time stamp is added automatically by Kenisis Analytics.
2017-10-01 03:24:37.978        50      2
2017-10-01 03:24:57.904        50      2
2017-10-01 03:25:05.914        50      2
2017-10-01 03:25:25.978        50      2
2017-10-01 03:25:44.001        50      2
2017-10-01 03:26:02.005        50      2
2017-10-01 03:26:11.898        50      2
2017-10-01 03:26:19.947        50      2
2017-10-01 03:26:29.922        50      2
2017-10-01 03:26:39.973        50      2

code

I'm not going to show you each and every steps of creating an IoT device, setting the Rules Engine, creating Kenisis and connecting those components to create a pipeline.
What I will show you are:
  1. A device simulator that can be used to drive the whole pipeline. You can easily spin up multiply devices and send message to simulate real user cases.
  2. The complete lambda handler that will parse the Kenisis record, filter the data, and write to a DynamoDB.
  3. A simple Kenisis Analytics SQL that used to filter out abnormal events and generate live report.

IoT device simulator

Launch one or more simulated devices and pushing data to a topic. It is used to drive the whole pipeline.
Basic usage: ./pub.sh deviceId. It will start a device as ${deviceId} and publish events declared in the file simulated_events.json in a loop fashion. To quit use ctr+c.
Or, use ./start_ants.sh to launch several devices in the background that will continuously publishing the events.
#devicueSimulator.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import argparse
import json
import logging
import time
import signal

AllowedActions = ['publish', 'subscribe', 'both']

def subscribe_callback(client, userdata, message):
    print("[<< Receive]: ", "topic", message.topic, message.payload)


def args_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-e", "--endpoint", action="store", required=True,
                        dest="host", help="Your AWS IoT custom endpoint")
    parser.add_argument("-r", "--rootCA", action="store", required=True,
                        dest="rootCAPath", help="Root CA file path")
    parser.add_argument("-c", "--cert", action="store", dest="certificatePath",
                        help="Certificate file path")
    parser.add_argument("-k", "--key", action="store", dest="privateKeyPath",
                        help="Private key file path")
    parser.add_argument("-w", "--websocket", action="store_true",
                        dest="useWebsocket", default=False,
                        help="Use MQTT over WebSocket")
    parser.add_argument("-id", "--clientId", action="store", dest="clientId",
                        default="basicPubSub",
                        help="Targeted client id")
    parser.add_argument("-t", "--topic", action="store", dest="topic",
                        default="sensors", help="topic prefix")
    parser.add_argument("-d", "--deviceId", action="store", dest="deviceId",
                        required=True,
                        help="device serial number, used as last part of "
                             "topic")
    parser.add_argument("-m", "--mode", action="store", dest="mode",
                        default="publish",
                        help="Operation modes: %s" % str(AllowedActions))

    args = parser.parse_args()

    return parser, args

def config_logger():
    logger = logging.getLogger("AWSIoTPythonSDK.core")
    logger.setLevel(logging.ERROR)
    streamHandler = logging.StreamHandler()
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    streamHandler.setFormatter(formatter)
    logger.addHandler(streamHandler)


def main():
    parser, args = args_parser()

    config_logger()
    mqtt_client = create_mqtt_client(args)
    mqtt_client.connect()

    topic = args.topic + "/" + args.deviceId
    print("topic:", topic)

    if args.mode == 'subscribe' or args.mode == 'both':
        mqtt_client.subscribe(topic, 1, subscribe_callback)

    if args.mode == 'subscribe':
        print("endless wait.. ctrl+c to finish");
        signal.pause()

    # wait a while to make sure subscribe take effect
    time.sleep(2)

    if args.mode == 'publish' or args.mode == 'both':
        publish_events(topic, args.deviceId, mqtt_client)


def publish_events(topic, deviceId, mqtt_client):
    if len(topic) == 0:
        print("topic can't be empty")
        exit(2)

    with open('simulated_events.json') as f:
        events = json.load(f)

    while True:
        for event in events:
            # override the deviceId using pass-in value
            event['deviceId'] = deviceId
            message = json.dumps(event)
            mqtt_client.publish(topic, message, 1)
            print('[>> publish]', 'topic', topic, message)
            time.sleep(2)

def create_mqtt_client(args):
    host = args.host
    rootCAPath = args.rootCAPath
    certificatePath = args.certificatePath
    privateKeyPath = args.privateKeyPath
    useWebsocket = args.useWebsocket
    clientId = args.clientId

    mqttclient = None
    if useWebsocket:
        mqttclient = AWSIoTMQTTClient(clientId, useWebsocket=True)
        mqttclient.configureEndpoint(host, 443)
        mqttclient.configureCredentials(rootCAPath)
    else:
        mqttclient = AWSIoTMQTTClient(clientId)
        mqttclient.configureEndpoint(host, 8883)
        mqttclient.configureCredentials(rootCAPath, privateKeyPath,
                                        certificatePath)

    # AWSIoTMQTTClient connection configuration
    mqttclient.configureAutoReconnectBackoffTime(1, 32, 20)
    mqttclient.configureOfflinePublishQueueing(-1)
    mqttclient.configureDrainingFrequency(2)
    mqttclient.configureConnectDisconnectTimeout(10)
    mqttclient.configureMQTTOperationTimeout(5)

    return mqttclient


if __name__ == "__main__":
    main()
#pub.sh
endpoint=your_things_endpoint

python deviceSimulator.py \
    -e ${endpoint} \
    -r root-CA.crt \
    -c device.cert.pem \
    -k device.private.key \
    --topic 'sensors' \
    --deviceId $1

Lambda Handler

In the handler, we filter all the records whose temperature > 50, and write it DynamoDB table - warning_events.
import base64
import boto3
import json

def lambda_handler(event, context):
    for record in event['Records']:
        event = decode_kenisis_data(record["kinesis"]["data"])
        if filter_event(event):
            print("warning: temperature too high, write it to dynamoDB")
            event = process_event(event)
            add_event(event)

    return "ok"


def filter_event(event):
    return event['temperature'] > 30


def process_event(event):
    return event


def decode_kenisis_data(data):
    """create event from base64 encoded string"""
    try:
        return json.loads(base64.b64decode(data.encode()).decode())
    except Exception:
        print("malformed event data")
        return None


def encode_kenisis_data(event):
    """encode event to base64"""
    try:
        return base64.b64encode(json.dumps(event).encode()).decode()
    except Exception:
        return None

# dynamoDB table handling
dynamodb = boto3.resource('dynamodb')
dbclient = boto3.client('dynamodb')

def is_table_exist(name):
    # Not strong enough, a table might not full ready or in destroy status
    # just loose the constraint for now
    return True if name in dbclient.list_tables()['TableNames'] else False


def get_or_create_table(name):
    if not is_table_exist(name):
        print("table", name, " doesn't exist, go and create one")
        return create_table(name)

    return dynamodb.Table(name)


def create_table(table_name):
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {
                'AttributeName': 'deviceId',
                'KeyType': 'HASH'
            }
        ],

        AttributeDefinitions=[
            {
                'AttributeName': 'deviceId',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    return table


def add_event(event):
    get_or_create_table('warning_events').put_item(Item=event)


def delete_table(name):
    dynamodb.Table(name).delete()

Tips

  • debug message broker
Use pub.sh, open the script file and add --mode 'both'. It will receive the message when publishing it.
You can also use the Test Page[1] in the IoT console. You can subscribe/publish to a topic. You can publish using ./pub.sh and subscribe using the webpage. It is more "real" than using --mode both.
One thing worth noting the namespace for topic is implicit: /account/region/topic. It means your topic won't collide with my topic. And even for my account, topic `sensors/001' in us-east-1 region are different the one in ap-southeast-2.
  • debug rules engine
Rules engine connect the message broker and other AWS services, for example, Kenisis, lambda.
Seems there is nothing too much to debug, it is kind of black-box AFAIK. It is suggested to use the Cloud Watch to debug it, but I wasn't lucky enough to get it working. A good idea maybe before debugging the connector, making sure the relevant source/sink are working independently. Say, in the case connect message broker to the Kenisis stream. Make sure we unit tested the topic/message broker and the Kenisis stream first.
Hint: If you run out of ideas, try re-create the rules. And it is what I did and the result is very good :).
  • debug Kenisis
Use get_stream_record.sh to check if there is any records in the stream.
region=us-east-1
stream_name=us-east-sensors-kinesis

SHARD_ITERATOR=$(aws kinesis get-shard-iterator  \
    --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON \
    --region ${region} \
    --stream-name ${stream_name} \
    --query 'ShardIterator')
aws kinesis get-records --region ${region} --shard-iterator ${SHARD_ITERATOR}
If not, something wrong is with the stream publish.
Use aws kinesis put-record to publish a record manually and try to issue get_stream_record.shagain to see if anything changes. If return records now, it means something wrong with the pipeline - no one is produce the record.

Summary

We show cases two pipelines/architectures, which can be used to implement two common user case. Actually, you don't have to use one pipeline over the other. In practice, the two pipeline are usually combined. It can be easily be implemented by creating multiple Rules Engines with two output Kenisis Streams, one for Lambda processing and one for analytics.
The beauty of this architectures are:
  1. Very scalable. You can have thousands of devices connected at the same time without worry about the throughput. 
  2. High available. Components such as DynamoDB and the Lambda are built on top of AWS high available services.
  3. Server-less. No single machine, even virtual one, you have to setup and maintain. Agility improves.