Posts

AWS DeepLens TechConnect IoT Rule

AWS DeepLens: Creating an IoT Rule (Part 2 of 2)

This post is the second in a series on getting started with the AWS DeepLens. In Part 1, we introduced a program that could detect faces and crop them by extending the boilerplate Greengrass Lambda and pre-built model provided by AWS. This focussed on the local capabilities of the device, but the DeepLens device is much more than that. At its core, DeepLens is a fully fledged IoT device, which is just one part of the 3 Pillars of IoT: devices, cloud and intelligence.

All code and templates mentioned can be found here. This can be deployed using AWS SAM, which helps reduces the the complexity for creating event-based AWS Lambda functions.

Sending faces to IoT Message Broker

AWS DeepLens Device Page

AWS DeepLens Device Console Page

When registering a DeepLens device, AWS creates all things associated with the IoT cloud pillar . If you have a look for yourself in the IoT Core AWS console page, you will see existing IoT groups, devices, certificates, etc.. This all simplifies the process of interacting with the middle-man, the MQTT topic that is displayed on the main DeepLens device console page. The DeepLens (and others if given authorisation) has the right to publish messages to the IoT topic within certain limits.

Previously, the AWS Lambda function responsible for detecting faces only showed them on the output streams and was only publishing to the MQTT topic the threshold of detected faces. We can modify this by including cropped face images as part of the packets that are sent to the topic.

The Greengrass function below extends the original version by publishing a message for each detected face. Encoded cropped face images are set in the “image_string” key of the object. IoT messages have a size limit of 128 KB, but the images will be well within the limit and encoded in Base64.


# File "src/greengrassHelloWorld.py" in code repository
from threading import Thread, Event
import os
import json
import numpy as np
import awscam
import cv2
import greengrasssdk

class LocalDisplay(Thread):
    def __init__(self, resolution):
    ...
    def run(self):
    ...
    def set_frame_data(self, frame):
    ....

    def set_frame_data_padded(self, frame):
        """
        Set the stream frame and return the rendered cropped face
        """
        ....
        return outputImage

def greengrass_infinite_infer_run():
    ...
    # Create a local display instance that will dump the image bytes
    # to a FIFO file that the image can be rendered locally.
    local_display = LocalDisplay('480p')
    local_display.start()
    # The sample projects come with optimized artifacts,
    # hence only the artifact path is required.
    model_path = '/opt/awscam/artifacts/mxnet_deploy_ssd_FP16_FUSED.xml'
    ...
    while True:
        # Get a frame from the video stream
        ret, frame = awscam.getLastFrame()
        # Resize frame to the same size as the training set.
        frame_resize = cv2.resize(frame, (input_height, input_width))
        ...
        model = awscam.Model(model_path, {'GPU': 1})
        # Process the frame
        ...
        # Set the next frame in the local display stream.
        local_display.set_frame_data(frame)

        # Get the detected faces and probabilities
        for obj in parsed_inference_results[model_type]:
            if obj['prob'] > detection_threshold:
                # Add bounding boxes to full resolution frame
                xmin = int(xscale * obj['xmin']) \
                       + int((obj['xmin'] - input_width / 2) + input_width / 2)
                ymin = int(yscale * obj['ymin'])
                xmax = int(xscale * obj['xmax']) \
                       + int((obj['xmax'] - input_width / 2) + input_width / 2)
                ymax = int(yscale * obj['ymax'])

                # Add face detection to iot topic payload
                cloud_output[output_map[obj['label']]] = obj['prob']

                # Zoom in on Face
                crop_img = frame[ymin - 45:ymax + 45, xmin - 30:xmax + 30]
                output_image = local_display.set_frame_data_padded(crop_img)

                # Encode cropped face image and add to IoT message
                frame_string_raw = cv2.imencode('.jpg', output_image)[1]
                frame_string = base64.b64encode(frame_string_raw)
                cloud_output['image_string'] = frame_string

                # Send results to the cloud
                client.publish(topic=iot_topic, payload=json.dumps(cloud_output))
        ...

greengrass_infinite_infer_run()

Save faces to S3 with an IoT Rule

The third IoT pillar intelligence interacts with the cloud pillar, which uses insights to perform actions on other AWS and/or external services. Our goal is to have all detected faces saved to an S3 bucket in the original JPEG format before we encoded it to Base64. To achieve this, we need to create an IoT rule that will launch an action to do so.

IoT Rules listen for incoming MQTT messages of a topic and when a certain condition is met, it will launch an action. The messages from the queue are analysed and transformed using a provided SQL statement. We want to act on all messages, passing on data captured by the DeepLens device and also inject the “unix_time” property. The IoT Rule Engine will allow us to construct statements that do just that, calling the timestamp function within a SQL statement to add it to the result, as seen in the statement below.


# MQTT message
{
    "image_string": "/9j/4AAQ...",
    "face": 0.94287109375
}

# SQL Statement 
SELECT *, timestamp() as unix_time FROM '$aws/things/deeplens_topic_name/infer'

# IoT Rule Action event
{
    "image_string": "/9j/4AAQ...",
    "unix_time": 1540710101060,
    "face": 0.94287109375
}

The action is an AWS Lambda function (seen below) that is given an S3 Bucket name and an event. At a minimum, the event must contain properties: “image_string” representing the encoded image and “unix_time” which used for the name of the file. The last property is not something that is provided when the IoT message is published to the MQTT topic but instead is added by the IoT rule that calls the action.


# File "src/process_queue.py" in code repository
import os
import boto3
import json
import base64

def handler(event, context):
    """
    Decode a Base64 encoded JPEG image and save to an S3 Bucket with an IoT Rule
    """
    # Convert image back to binary
    jpg_original = base64.b64decode(event['image_string'])

    # Save image to S3 with the timestamp as the name
    s3_client = boto3.client('s3')
    s3_client.put_object(
        Body=jpg_original,
        Bucket=os.environ["DETECTED_FACES_BUCKET"],
        Key='{}.jpg'.format(event['unix_time']),
    )

Deploying an IoT Rule with AWS SAM

AWS SAM makes it incredibly easy to deploy an IoT Rule as it is a supported event type for Serverless function resources, a high-level wrapper for AWS Lambda. By providing only the DeepLens topic name as a parameter for the template below, a fully event-driven and least privilege AWS architecture is deployed.


# File "template.yaml" in code repository
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'

Parameters:
  DeepLensTopic:
    Type: String
    Description: Topic path for DeepLens device "$aws/things/deeplens_..."

Resources:
  ProcessDeepLensQueue:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python2.7
      Timeout: 30
      MemorySize: 256
      Handler: process_queue.handler
      CodeUri: ./src
      Environment:
        Variables:
          DETECTED_FACES_BUCKET: !Ref DetectedFaces

      Policies:
        - S3CrudPolicy:
            BucketName: !Ref DetectedFaces

      Events:
        DeepLensRule:
          Type: IoTRule
          Properties:
            Sql: !Sub "SELECT *, timestamp() as unix_time FROM '${DeepLensTopic}'"

  DetectedFaces:
    Type: AWS::S3::Bucket
AWS SAM Project

Using AWS SAM for a CORS Enabled Serverless API

Over the past two years TechConnect has had an increasing demand for creating ‘Serverless’ API backends, from scratch or converting existing services running on expensive virtual machines in AWS. This has been an iterative learning process for us and I feel many others in the industry. However, it feels like each month pioneers in the field answer our cries for help by creating or extending Open-source projects to make our ‘serverless’ lives a little easier.

There are quite a few options for creating serverless applications in AWS (Serverless Framework, Zappa, etc..). However, In this blog post, we will discuss using AWS SAM (Serverless Application Model, previously known as Project Flourish) to create a CORS enabled API. All templates and source code mentioned can be found in this GitHub repository. I heavily recommend having this open in another tab, along with the AWS SAM project.

AWS SAM Project

API Design First with Swagger

Code or Design first? One approach is not necessarily better than the other, but at TechConnect we’ve been focusing on a design first mentality when it comes to building APIs for our clients. We aren’t the users of the APIs we build and we aren’t the front-end developers who might build a website off of it. Instead our goal when creating an external API is to create a logical and human readable API contract specification. To achieve this we use Swagger, the Open API specification to build and document our RESTful backends.

In the image below, we have started to design a simple movie ratings API in YAML using the Open API specification. In its current state, it is just an API contract showing the requests and responses. However, it will be further modified to become an AWS API Gateway compatible and AWS Lambda integrated document in future steps.

Code Structure

Our API is a simple CRUD that will make use of Amazon DynamoDB to create, list and delete movie ratings of a given year. This could all easily reside in a single Python file, but instead we will split it up to make it a little more realistic for larger projects. As this is a small demo, we’ll be missing a few resources that would usually be included in a real project (tests, task runners, etc..), but try having a look at The Hitchhiker’s Guide to Python for a nice Python strucure for your own future APIs.


- template.yaml
- swagger.yaml
- requirements.txt
- movies
  - api
    - __init__.py
    - ratings.py

  - core
    - __init__.py
    - web.py
  - __init__.py

Our Python project movies contains two sub-packages; api and core. Our AWS Lambda handlers are located in api.ratings.py , where each handle will; process the request from API Gateway, interact with DynamoDB (using a table name set by an environment variable) and return an object to API Gateway.

movies.api.ratings.py

...
from movies.core import web

def get_ratings(event, context):
    ...
    return web.cors_web_response(200, ratings_list)

CORS in Lambda Responses

In the previous step you might have noticed we were using a function to build an integration response. The object body is serialized into a JSON string and the headers Access-Control-Allow-Headers, Access-Control-Allow-Methods and Access-Control-Allow-Origin are enabled for Cross-Origin Resource Sharing (CORS).

movies.core.web.py

def cors_web_response(status_code, body):
    return {
        'statusCode': status_code,
        "headers": {
            "Access-Control-Allow-Headers": 
                "Content-Type,Authorization,X-Amz-Date,X-Api-Key,X-Amz-Security-Token",
            "Access-Control-Allow-Methods": 
                "DELETE,GET,HEAD,OPTIONS,PATCH,POST,PUT",
            "Access-Control-Allow-Origin": 
                "*"
        },
        'body': json.dumps(body)
    }

CORS in Swagger

Previously in our Lambda code, we built CORS headers into our responses. However, this is only one half of the solution. Annoyingly we must add an OPTIONS HTTP method to every path level of our API. This is to satisfy the preflight request done by the client to check if CORS requests are enabled. Although it uses x-amazon-apigateway-integration, it is a mocked response by API Gateway. AWS Lambda is not needed to implement this.

swagger.yaml

paths:
  /ratings/{year}:
    options:
      tags:
      - "CORS"
      consumes:
      - application/json
      produces:
      - application/json
      responses:
        200:
          description: 200 response
          schema:
            $ref: "#/definitions/Empty"
          headers:
            Access-Control-Allow-Origin:
              type: string
            Access-Control-Allow-Methods:
              type: string
            Access-Control-Allow-Headers:
              type: string
      x-amazon-apigateway-integration:
        responses:
          default:
            statusCode: 200
            responseParameters:
              method.response.header.Access-Control-Allow-Methods: "'DELETE,GET,HEAD,OPTIONS,PATCH,POST,PUT'"
              method.response.header.Access-Control-Allow-Headers: "'Content-Type,Authorization,X-Amz-Date,X-Api-Key,X-Amz-Security-Token'"
              method.response.header.Access-Control-Allow-Origin: "'*'"
        passthroughBehavior: when_no_match
        requestTemplates:
          application/json: "{\"statusCode\": 200}"
        type: mock

Integrating with SAM

Since AWS SAM is an extension of CloudFormation, the syntax is almost identical. The snippets below show the integration between template.yaml and swagger.yaml. The AWS Lambda function GetRatings name is parsed into the API via a stage variable. swagger.yaml integrates the Lambda proxy using x-amazon-apigateway-integration. One important thing to note is that the Swagger document is not required to create an API Gateway resource in AWS SAM. However, we are using it due to our design first mentality and it being required for CORS preflight responses. The AWS SAM team are currently looking to reduce the need for this in CORS applications. Keep an eye out for the ongoing topic being discussed on GitHub.

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Resources:
  ApiGatewayApi:
    Type: AWS::Serverless::Api
    Properties:
      DefinitionUri: swagger.yaml
      StageName: v1
      Variables:
        GetRatings: !Ref GetRatings
...
  GetRatings:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./build
      Handler: movies.api.ratings.get_ratings
      Role: !GetAtt CrudLambdaIAMRole.Arn
      Environment:
        Variables:
          RATINGS_TABLE: !Ref RatingsTable
      Events:
        GetRaidHandle:
          Type: Api
          Properties:
            RestApiId: !Ref ApiGatewayApi
            Path: /ratings/{year}
            Method: GET
...
swagger.yaml

paths:
  /ratings/{year}:
    get:
      ...
      x-amazon-apigateway-integration:
        responses:
          default:
            statusCode: 200
            responseParameters:
              method.response.header.Access-Control-Allow-Origin: "'*'"
        uri: arn:aws:apigateway:REGION:lambda:path/2015-03-31/functions/arn:aws:lambda:REGION:ACCOUNT_ID:function:${stageVariables.GetRatings}/invocations
        passthroughBehavior: when_no_match
        httpMethod: POST
        type: aws_proxy

Deploying SAM API

Now that all the resources are ready, the final step is to package and deploy the SAM application. You may have noticed in the template.yaml the source of the Lambda function was listed as ./build. Any AWS Lambda function that uses non-standard Python libraries will require them to be included in the deployment. To demonstrate this, we’ll send our code to a build folder and install the dependencies.


$ mkdir ./build
$ cp -p -r ./movies ./build/movies
$ pip install -r requirements.txt -t ./build

Finally, you will need to package your SAM deployment to convert it to a traditional AWS CloudFormation template. First your will need to make sure your own account id and desired region are used (using sed). You will also need to provide an existing S3 bucket to store the packaged code. If you inspect the template-out.yaml you will notice that the source of each AWS Lambda function in an object in S3. This is what is used by aws cloudformation deploy. One final tip is to remember to include --capabilities CAPABILITY_IAM in your deploy if you are creating any roles during your deployment.


$ sed -i "s/account_placeholder/AWS_ACCOUNT_ID/g" 'swagger.yaml'
$ sed -i "s/region_placeholder/AWS_REGION/g" 'swagger.yaml'
$ aws cloudformation package --template-file ./template.yaml --output-template-file ./template-out.yaml --s3-bucket YOUR_S3_BUCKET_NAME
$ aws cloudformation deploy --template-file template-out.yaml --stack-name MoviesAPI --capabilities CAPABILITY_IAM