Skip to main content

AWS IoT Pipelines In Action: IoT Core, Lambda, Kenisis, Analytics

Today we will show you two end to end pipeline using AWS IoT Core and other AWS services.
  1. devices publish their status to the cloud, and cloud will process the events and write abnormal status to a noSQL database.
  2. device publish their status to the cloud, and we'll do real-time stream analytics on the events using Kenisis Analytics.

Pipeline 1 : Process data using Lambda

    +-----------+      +----------+     +------------+    +------------+
    |           |      |  Message |     |            |    |            |
    | IoT Device|      |  Broker  |     |  Rules     |    |  Kenisis   |
    |           +----> |          +---> |  Engine    +--> |            |
    |           |      |          |     |            |    |            |
    +-----------+      +----------+     +------------+    +-----+------+
                                        +------------+    +------------+
                                        |  DynamoDB  |    |            |
                                        |            | ---+  Lambda    |
                                        |            |    |            |
                                        +------------+    +------------+
IoT Devices publish its status to the Message Broker, which is one components of AWS IoT core, using MQTT. Rules Engine (again one components of AWS IoT) is set up to channel the message to a Kenisis stream, which is setup as the trigger event to the Lambda function. The Lambda function is where the process happens, or business logic, if you like.


The lambda function will take records (Kenisis stream's terminology) from the Kenisis stream, i.e it's trigger event, then filter, process, store, and pass to other stage.
Check the code section below for details.

Pipeline 2 : Real-time analytics using Kenisis Analytics

    +-----------+      +----------+     +------------+    +------------+
    |           |      |  Message |     |            |    |            |
    | IoT Device|      |  Broker  |     |  Rules     |    |  Kenisis   |
    |           +----> |          +---> |  Engine    +--> |            |
    |           |      |          |     |            |    |            |
    +-----------+      +----------+     +------------+    +-----+------+
                                        +------------+    +------------+
                                        |  Output    |    |  Kenisis   |
                                        |            |----+  Analytics |
                                        |            |    |            |
                                        +------------+    +------------+
The first four stages of the pipeline is the same as pipeline 1. But here we channel the Kenisis stream to the Kenisis Analytics to do real time analysis. If you know about Hadoop/Spark ecosystem, Kenisis Analytics is equivalent to Spark Stream.

Kenisis Analytics

You can use a syntax similar SQL to analysis the stream data over a window period.
  • An example: filter data
  • result
You'll see new events being added to the result as time goes by. Note that the time stamp is added automatically by Kenisis Analytics.
2017-10-01 03:24:37.978        50      2
2017-10-01 03:24:57.904        50      2
2017-10-01 03:25:05.914        50      2
2017-10-01 03:25:25.978        50      2
2017-10-01 03:25:44.001        50      2
2017-10-01 03:26:02.005        50      2
2017-10-01 03:26:11.898        50      2
2017-10-01 03:26:19.947        50      2
2017-10-01 03:26:29.922        50      2
2017-10-01 03:26:39.973        50      2


I'm not going to show you each and every steps of creating an IoT device, setting the Rules Engine, creating Kenisis and connecting those components to create a pipeline.
What I will show you are:
  1. A device simulator that can be used to drive the whole pipeline. You can easily spin up multiply devices and send message to simulate real user cases.
  2. The complete lambda handler that will parse the Kenisis record, filter the data, and write to a DynamoDB.
  3. A simple Kenisis Analytics SQL that used to filter out abnormal events and generate live report.

IoT device simulator

Launch one or more simulated devices and pushing data to a topic. It is used to drive the whole pipeline.
Basic usage: ./ deviceId. It will start a device as ${deviceId} and publish events declared in the file simulated_events.json in a loop fashion. To quit use ctr+c.
Or, use ./ to launch several devices in the background that will continuously publishing the events.
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import argparse
import json
import logging
import time
import signal

AllowedActions = ['publish', 'subscribe', 'both']

def subscribe_callback(client, userdata, message):
    print("[<< Receive]: ", "topic", message.topic, message.payload)

def args_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("-e", "--endpoint", action="store", required=True,
                        dest="host", help="Your AWS IoT custom endpoint")
    parser.add_argument("-r", "--rootCA", action="store", required=True,
                        dest="rootCAPath", help="Root CA file path")
    parser.add_argument("-c", "--cert", action="store", dest="certificatePath",
                        help="Certificate file path")
    parser.add_argument("-k", "--key", action="store", dest="privateKeyPath",
                        help="Private key file path")
    parser.add_argument("-w", "--websocket", action="store_true",
                        dest="useWebsocket", default=False,
                        help="Use MQTT over WebSocket")
    parser.add_argument("-id", "--clientId", action="store", dest="clientId",
                        help="Targeted client id")
    parser.add_argument("-t", "--topic", action="store", dest="topic",
                        default="sensors", help="topic prefix")
    parser.add_argument("-d", "--deviceId", action="store", dest="deviceId",
                        help="device serial number, used as last part of "
    parser.add_argument("-m", "--mode", action="store", dest="mode",
                        help="Operation modes: %s" % str(AllowedActions))

    args = parser.parse_args()

    return parser, args

def config_logger():
    logger = logging.getLogger("AWSIoTPythonSDK.core")
    streamHandler = logging.StreamHandler()
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s')

def main():
    parser, args = args_parser()

    mqtt_client = create_mqtt_client(args)

    topic = args.topic + "/" + args.deviceId
    print("topic:", topic)

    if args.mode == 'subscribe' or args.mode == 'both':
        mqtt_client.subscribe(topic, 1, subscribe_callback)

    if args.mode == 'subscribe':
        print("endless wait.. ctrl+c to finish");

    # wait a while to make sure subscribe take effect

    if args.mode == 'publish' or args.mode == 'both':
        publish_events(topic, args.deviceId, mqtt_client)

def publish_events(topic, deviceId, mqtt_client):
    if len(topic) == 0:
        print("topic can't be empty")

    with open('simulated_events.json') as f:
        events = json.load(f)

    while True:
        for event in events:
            # override the deviceId using pass-in value
            event['deviceId'] = deviceId
            message = json.dumps(event)
            mqtt_client.publish(topic, message, 1)
            print('[>> publish]', 'topic', topic, message)

def create_mqtt_client(args):
    host =
    rootCAPath = args.rootCAPath
    certificatePath = args.certificatePath
    privateKeyPath = args.privateKeyPath
    useWebsocket = args.useWebsocket
    clientId = args.clientId

    mqttclient = None
    if useWebsocket:
        mqttclient = AWSIoTMQTTClient(clientId, useWebsocket=True)
        mqttclient.configureEndpoint(host, 443)
        mqttclient = AWSIoTMQTTClient(clientId)
        mqttclient.configureEndpoint(host, 8883)
        mqttclient.configureCredentials(rootCAPath, privateKeyPath,

    # AWSIoTMQTTClient connection configuration
    mqttclient.configureAutoReconnectBackoffTime(1, 32, 20)

    return mqttclient

if __name__ == "__main__":

python \
    -e ${endpoint} \
    -r root-CA.crt \
    -c device.cert.pem \
    -k device.private.key \
    --topic 'sensors' \
    --deviceId $1

Lambda Handler

In the handler, we filter all the records whose temperature > 50, and write it DynamoDB table - warning_events.
import base64
import boto3
import json

def lambda_handler(event, context):
    for record in event['Records']:
        event = decode_kenisis_data(record["kinesis"]["data"])
        if filter_event(event):
            print("warning: temperature too high, write it to dynamoDB")
            event = process_event(event)

    return "ok"

def filter_event(event):
    return event['temperature'] > 30

def process_event(event):
    return event

def decode_kenisis_data(data):
    """create event from base64 encoded string"""
        return json.loads(base64.b64decode(data.encode()).decode())
    except Exception:
        print("malformed event data")
        return None

def encode_kenisis_data(event):
    """encode event to base64"""
        return base64.b64encode(json.dumps(event).encode()).decode()
    except Exception:
        return None

# dynamoDB table handling
dynamodb = boto3.resource('dynamodb')
dbclient = boto3.client('dynamodb')

def is_table_exist(name):
    # Not strong enough, a table might not full ready or in destroy status
    # just loose the constraint for now
    return True if name in dbclient.list_tables()['TableNames'] else False

def get_or_create_table(name):
    if not is_table_exist(name):
        print("table", name, " doesn't exist, go and create one")
        return create_table(name)

    return dynamodb.Table(name)

def create_table(table_name):
    table = dynamodb.create_table(
                'AttributeName': 'deviceId',
                'KeyType': 'HASH'

                'AttributeName': 'deviceId',
                'AttributeType': 'S'
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5

    # Wait until the table exists.
    return table

def add_event(event):

def delete_table(name):


  • debug message broker
Use, open the script file and add --mode 'both'. It will receive the message when publishing it.
You can also use the Test Page[1] in the IoT console. You can subscribe/publish to a topic. You can publish using ./ and subscribe using the webpage. It is more "real" than using --mode both.
One thing worth noting the namespace for topic is implicit: /account/region/topic. It means your topic won't collide with my topic. And even for my account, topic `sensors/001' in us-east-1 region are different the one in ap-southeast-2.
  • debug rules engine
Rules engine connect the message broker and other AWS services, for example, Kenisis, lambda.
Seems there is nothing too much to debug, it is kind of black-box AFAIK. It is suggested to use the Cloud Watch to debug it, but I wasn't lucky enough to get it working. A good idea maybe before debugging the connector, making sure the relevant source/sink are working independently. Say, in the case connect message broker to the Kenisis stream. Make sure we unit tested the topic/message broker and the Kenisis stream first.
Hint: If you run out of ideas, try re-create the rules. And it is what I did and the result is very good :).
  • debug Kenisis
Use to check if there is any records in the stream.

SHARD_ITERATOR=$(aws kinesis get-shard-iterator  \
    --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON \
    --region ${region} \
    --stream-name ${stream_name} \
    --query 'ShardIterator')
aws kinesis get-records --region ${region} --shard-iterator ${SHARD_ITERATOR}
If not, something wrong is with the stream publish.
Use aws kinesis put-record to publish a record manually and try to issue get_stream_record.shagain to see if anything changes. If return records now, it means something wrong with the pipeline - no one is produce the record.


We show cases two pipelines/architectures, which can be used to implement two common user case. Actually, you don't have to use one pipeline over the other. In practice, the two pipeline are usually combined. It can be easily be implemented by creating multiple Rules Engines with two output Kenisis Streams, one for Lambda processing and one for analytics.
The beauty of this architectures are:
  1. Very scalable. You can have thousands of devices connected at the same time without worry about the throughput. 
  2. High available. Components such as DynamoDB and the Lambda are built on top of AWS high available services.
  3. Server-less. No single machine, even virtual one, you have to setup and maintain. Agility improves.


Post a Comment

Popular posts from this blog

Understand Container - Index Page

This is an index page to a series of 8 articles on container implementation. OCI Specification Linux Namespaces Linux Cgroup Linux Capability Mount and Jail User and Root Network and Hook Network and CNI
This page has a very good page view after being created. Then I was thinking if anyone would be interested in a more polished, extended, and easier to read version.
So I started a book called "understand container". Let me know if you will be interested in the work by subscribing here and I'll send the first draft version which will include all the 8 articles here. The free subscription will end at 31th, Oct, 2018.

* Remember to click "Share email with author (optional)", so that I can send the book to your email directly. 


Android Camera2 API Explained

Compared with the old camera API, the Camera2 API introduced in the L is a lot more complex: more than ten classes are involved, calls (almost always) are asynchronized, plus lots of capture controls and meta data that you feel confused about.

Understand Container: OCI Specification

OCI is the industry collaborated effort to define an open containers specifications regarding container format and runtime - that is the official tone and is true. The history of how it comes to where it stands today from the initial disagreement is a very interesting story or case study regarding open source business model and competition.

But past is past, nowadays, OCI is non-argumentable THE container standard, IMO, as we'll see later in the article it is adopted by most of the mainstream container implementation, including docker, and container orchestration system, such as kubernetes, Plus, it is particularly helpful to anyone trying to understand how the containers works internally. Open source code are awesome but it is double awesome with high quality documentation!