Skip to main content

AWS IoT Pipelines In Action: IoT Core, Lambda, Kenisis, Analytics


Today we will show you two end to end pipeline using AWS IoT Core and other AWS services.
  1. devices publish their status to the cloud, and cloud will process the events and write abnormal status to a noSQL database.
  2. device publish their status to the cloud, and we'll do real-time stream analytics on the events using Kenisis Analytics.

Pipeline 1 : Process data using Lambda

    +-----------+      +----------+     +------------+    +------------+
    |           |      |  Message |     |            |    |            |
    | IoT Device|      |  Broker  |     |  Rules     |    |  Kenisis   |
    |           +----> |          +---> |  Engine    +--> |            |
    |           |      |          |     |            |    |            |
    +-----------+      +----------+     +------------+    +-----+------+
                                                                |
                                                                v
                                        +------------+    +------------+
                                        |  DynamoDB  |    |            |
                                        |            | ---+  Lambda    |
                                        |            |    |            |
                                        +------------+    +------------+
IoT Devices publish its status to the Message Broker, which is one components of AWS IoT core, using MQTT. Rules Engine (again one components of AWS IoT) is set up to channel the message to a Kenisis stream, which is setup as the trigger event to the Lambda function. The Lambda function is where the process happens, or business logic, if you like.

Lambda

The lambda function will take records (Kenisis stream's terminology) from the Kenisis stream, i.e it's trigger event, then filter, process, store, and pass to other stage.
Check the code section below for details.

Pipeline 2 : Real-time analytics using Kenisis Analytics

    +-----------+      +----------+     +------------+    +------------+
    |           |      |  Message |     |            |    |            |
    | IoT Device|      |  Broker  |     |  Rules     |    |  Kenisis   |
    |           +----> |          +---> |  Engine    +--> |            |
    |           |      |          |     |            |    |            |
    +-----------+      +----------+     +------------+    +-----+------+
                                                                |
                                                                v
                                        +------------+    +------------+
                                        |  Output    |    |  Kenisis   |
                                        |            |----+  Analytics |
                                        |            |    |            |
                                        +------------+    +------------+
The first four stages of the pipeline is the same as pipeline 1. But here we channel the Kenisis stream to the Kenisis Analytics to do real time analysis. If you know about Hadoop/Spark ecosystem, Kenisis Analytics is equivalent to Spark Stream.

Kenisis Analytics

You can use a syntax similar SQL to analysis the stream data over a window period.
  • An example: filter data
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TEMP INT, EVENTID INT);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM TEMP, EVENTID
FROM "SOURCE_SQL_STREAM_001"
WHERE EVENTID = 2;
  • result
You'll see new events being added to the result as time goes by. Note that the time stamp is added automatically by Kenisis Analytics.
2017-10-01 03:24:37.978        50      2
2017-10-01 03:24:57.904        50      2
2017-10-01 03:25:05.914        50      2
2017-10-01 03:25:25.978        50      2
2017-10-01 03:25:44.001        50      2
2017-10-01 03:26:02.005        50      2
2017-10-01 03:26:11.898        50      2
2017-10-01 03:26:19.947        50      2
2017-10-01 03:26:29.922        50      2
2017-10-01 03:26:39.973        50      2

code

I'm not going to show you each and every steps of creating an IoT device, setting the Rules Engine, creating Kenisis and connecting those components to create a pipeline.
What I will show you are:
  1. A device simulator that can be used to drive the whole pipeline. You can easily spin up multiply devices and send message to simulate real user cases.
  2. The complete lambda handler that will parse the Kenisis record, filter the data, and write to a DynamoDB.
  3. A simple Kenisis Analytics SQL that used to filter out abnormal events and generate live report.

IoT device simulator

Launch one or more simulated devices and pushing data to a topic. It is used to drive the whole pipeline.
Basic usage: ./pub.sh deviceId. It will start a device as ${deviceId} and publish events declared in the file simulated_events.json in a loop fashion. To quit use ctr+c.
Or, use ./start_ants.sh to launch several devices in the background that will continuously publishing the events.
#devicueSimulator.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import argparse
import json
import logging
import time
import signal

AllowedActions = ['publish', 'subscribe', 'both']

def subscribe_callback(client, userdata, message):
    print("[<< Receive]: ", "topic", message.topic, message.payload)


def args_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-e", "--endpoint", action="store", required=True,
                        dest="host", help="Your AWS IoT custom endpoint")
    parser.add_argument("-r", "--rootCA", action="store", required=True,
                        dest="rootCAPath", help="Root CA file path")
    parser.add_argument("-c", "--cert", action="store", dest="certificatePath",
                        help="Certificate file path")
    parser.add_argument("-k", "--key", action="store", dest="privateKeyPath",
                        help="Private key file path")
    parser.add_argument("-w", "--websocket", action="store_true",
                        dest="useWebsocket", default=False,
                        help="Use MQTT over WebSocket")
    parser.add_argument("-id", "--clientId", action="store", dest="clientId",
                        default="basicPubSub",
                        help="Targeted client id")
    parser.add_argument("-t", "--topic", action="store", dest="topic",
                        default="sensors", help="topic prefix")
    parser.add_argument("-d", "--deviceId", action="store", dest="deviceId",
                        required=True,
                        help="device serial number, used as last part of "
                             "topic")
    parser.add_argument("-m", "--mode", action="store", dest="mode",
                        default="publish",
                        help="Operation modes: %s" % str(AllowedActions))

    args = parser.parse_args()

    return parser, args

def config_logger():
    logger = logging.getLogger("AWSIoTPythonSDK.core")
    logger.setLevel(logging.ERROR)
    streamHandler = logging.StreamHandler()
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    streamHandler.setFormatter(formatter)
    logger.addHandler(streamHandler)


def main():
    parser, args = args_parser()

    config_logger()
    mqtt_client = create_mqtt_client(args)
    mqtt_client.connect()

    topic = args.topic + "/" + args.deviceId
    print("topic:", topic)

    if args.mode == 'subscribe' or args.mode == 'both':
        mqtt_client.subscribe(topic, 1, subscribe_callback)

    if args.mode == 'subscribe':
        print("endless wait.. ctrl+c to finish");
        signal.pause()

    # wait a while to make sure subscribe take effect
    time.sleep(2)

    if args.mode == 'publish' or args.mode == 'both':
        publish_events(topic, args.deviceId, mqtt_client)


def publish_events(topic, deviceId, mqtt_client):
    if len(topic) == 0:
        print("topic can't be empty")
        exit(2)

    with open('simulated_events.json') as f:
        events = json.load(f)

    while True:
        for event in events:
            # override the deviceId using pass-in value
            event['deviceId'] = deviceId
            message = json.dumps(event)
            mqtt_client.publish(topic, message, 1)
            print('[>> publish]', 'topic', topic, message)
            time.sleep(2)

def create_mqtt_client(args):
    host = args.host
    rootCAPath = args.rootCAPath
    certificatePath = args.certificatePath
    privateKeyPath = args.privateKeyPath
    useWebsocket = args.useWebsocket
    clientId = args.clientId

    mqttclient = None
    if useWebsocket:
        mqttclient = AWSIoTMQTTClient(clientId, useWebsocket=True)
        mqttclient.configureEndpoint(host, 443)
        mqttclient.configureCredentials(rootCAPath)
    else:
        mqttclient = AWSIoTMQTTClient(clientId)
        mqttclient.configureEndpoint(host, 8883)
        mqttclient.configureCredentials(rootCAPath, privateKeyPath,
                                        certificatePath)

    # AWSIoTMQTTClient connection configuration
    mqttclient.configureAutoReconnectBackoffTime(1, 32, 20)
    mqttclient.configureOfflinePublishQueueing(-1)
    mqttclient.configureDrainingFrequency(2)
    mqttclient.configureConnectDisconnectTimeout(10)
    mqttclient.configureMQTTOperationTimeout(5)

    return mqttclient


if __name__ == "__main__":
    main()
#pub.sh
endpoint=your_things_endpoint

python deviceSimulator.py \
    -e ${endpoint} \
    -r root-CA.crt \
    -c device.cert.pem \
    -k device.private.key \
    --topic 'sensors' \
    --deviceId $1

Lambda Handler

In the handler, we filter all the records whose temperature > 50, and write it DynamoDB table - warning_events.
import base64
import boto3
import json

def lambda_handler(event, context):
    for record in event['Records']:
        event = decode_kenisis_data(record["kinesis"]["data"])
        if filter_event(event):
            print("warning: temperature too high, write it to dynamoDB")
            event = process_event(event)
            add_event(event)

    return "ok"


def filter_event(event):
    return event['temperature'] > 30


def process_event(event):
    return event


def decode_kenisis_data(data):
    """create event from base64 encoded string"""
    try:
        return json.loads(base64.b64decode(data.encode()).decode())
    except Exception:
        print("malformed event data")
        return None


def encode_kenisis_data(event):
    """encode event to base64"""
    try:
        return base64.b64encode(json.dumps(event).encode()).decode()
    except Exception:
        return None

# dynamoDB table handling
dynamodb = boto3.resource('dynamodb')
dbclient = boto3.client('dynamodb')

def is_table_exist(name):
    # Not strong enough, a table might not full ready or in destroy status
    # just loose the constraint for now
    return True if name in dbclient.list_tables()['TableNames'] else False


def get_or_create_table(name):
    if not is_table_exist(name):
        print("table", name, " doesn't exist, go and create one")
        return create_table(name)

    return dynamodb.Table(name)


def create_table(table_name):
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {
                'AttributeName': 'deviceId',
                'KeyType': 'HASH'
            }
        ],

        AttributeDefinitions=[
            {
                'AttributeName': 'deviceId',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    return table


def add_event(event):
    get_or_create_table('warning_events').put_item(Item=event)


def delete_table(name):
    dynamodb.Table(name).delete()

Tips

  • debug message broker
Use pub.sh, open the script file and add --mode 'both'. It will receive the message when publishing it.
You can also use the Test Page[1] in the IoT console. You can subscribe/publish to a topic. You can publish using ./pub.sh and subscribe using the webpage. It is more "real" than using --mode both.
One thing worth noting the namespace for topic is implicit: /account/region/topic. It means your topic won't collide with my topic. And even for my account, topic `sensors/001' in us-east-1 region are different the one in ap-southeast-2.
  • debug rules engine
Rules engine connect the message broker and other AWS services, for example, Kenisis, lambda.
Seems there is nothing too much to debug, it is kind of black-box AFAIK. It is suggested to use the Cloud Watch to debug it, but I wasn't lucky enough to get it working. A good idea maybe before debugging the connector, making sure the relevant source/sink are working independently. Say, in the case connect message broker to the Kenisis stream. Make sure we unit tested the topic/message broker and the Kenisis stream first.
Hint: If you run out of ideas, try re-create the rules. And it is what I did and the result is very good :).
  • debug Kenisis
Use get_stream_record.sh to check if there is any records in the stream.
region=us-east-1
stream_name=us-east-sensors-kinesis

SHARD_ITERATOR=$(aws kinesis get-shard-iterator  \
    --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON \
    --region ${region} \
    --stream-name ${stream_name} \
    --query 'ShardIterator')
aws kinesis get-records --region ${region} --shard-iterator ${SHARD_ITERATOR}
If not, something wrong is with the stream publish.
Use aws kinesis put-record to publish a record manually and try to issue get_stream_record.shagain to see if anything changes. If return records now, it means something wrong with the pipeline - no one is produce the record.

Summary

We show cases two pipelines/architectures, which can be used to implement two common user case. Actually, you don't have to use one pipeline over the other. In practice, the two pipeline are usually combined. It can be easily be implemented by creating multiple Rules Engines with two output Kenisis Streams, one for Lambda processing and one for analytics.
The beauty of this architectures are:
  1. Very scalable. You can have thousands of devices connected at the same time without worry about the throughput. 
  2. High available. Components such as DynamoDB and the Lambda are built on top of AWS high available services.
  3. Server-less. No single machine, even virtual one, you have to setup and maintain. Agility improves.

Popular posts from this blog

Android Camera2 API Explained

Compared with the old camera API, the Camera2 API introduced in the L is a lot more complex: more than ten classes are involved, calls (almost always) are asynchronized, plus lots of capture controls and meta data that you feel confused about.

No worries. Let me help you out. Whenever facing a complex system need a little bit effort to understand, I usually turns to the UML class diagram to capture the big picture.

So, here is the class diagram for Camera2 API.




You are encouraged to read this Android document first and then come back to this article, with your questions. I'll expand what is said there, and list the typical steps of using camera2 API. 

1. Start from CameraManager. We use it to iterate all the cameras that are available in the system, each with a designated cameraId. Using the cameraId, we can get the properties of the specified camera device. Those properties are represented by class CameraCharacteristics. Things like "is it front or back camera", "outpu…

Java Collections Framework Cheat Sheet

Java Collections Framework (JCF) implements the Abstract Data Type  for Java platform. Every serious Java programmer should familiar himself on this topic and be able to choose the right class for specific need.  A thorough introduction to JCF is not the target of this small article and to achieve that goal you can start with this excellent tutorial . 

Instead, I'd like to
1) Provide an overview of JCF's classes ,   2) Provide a cheat sheet you can post in your cubicel for daily reference, 3) Underline the relationship between JCF's implementation and the data structure and algorithm you learned in your undergraduate course

With these goals in mind, I came up following diagram - Java Collection Cheat Sheet. You can click it to zoom in. There is no necessity for more explanation once your familiar with UML class diagram and have a basic understanding of common data structures.


Android Security: An Overview Of Application Sandbox

The Problem: Define a policy to control how various clients can access different resources. A solution: Each resource has an owner and belongs to a group.Each client has an owner but can belongs to multiple groups.Each resource has a mode stating the access permissions allowed for its owner, group members and others, respectively. In the context of operating system, or Linux specifically, the resources can be files, sockets, etc; the clients are actually processes; and we have three access permissions:read, write and execute.