DZone
Cloud Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Cloud Zone > Serverless: Building a Mini Producer/Consumer Data Pipeline

Serverless: Building a Mini Producer/Consumer Data Pipeline

Setting up your own serverless data pipeline is easy. Here, we'll use the Serverless toolkit and a Python script to call an API and load that data into a database.

Mark Needham user avatar by
Mark Needham
·
Oct. 06, 17 · Cloud Zone · Tutorial
Like (6)
Save
Tweet
4.23K Views

Join the DZone community and get the full member experience.

Join For Free

I wanted to create a little data pipeline with Serverless. Its main use would be to run once a day, call an API, and load that data into a database.

It’s mostly used to pull in recent data from that API, but I also wanted to be able to invoke it manually and specify a date range.

I created the following pair of lambdas that communicate with each other via an SNS topic.

The Code

serverless.yml:

service: marks-blog

frameworkVersion: ">=1.2.0 <2.0.0"

provider:
  name: aws
  runtime: python3.6
  timeout: 180
  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - "sns:Publish"
      Resource:
        - ${self:custom.BlogTopic}

custom:
  BlogTopic:
    Fn::Join:
      - ":"
      - - arn
        - aws
        - sns
        - Ref: AWS::Region
        - Ref: AWS::AccountId
        - marks-blog-topic

functions:
  message-consumer:
    name: MessageConsumer
    handler: handler.consumer
    events:
      - sns:
          topicName: marks-blog-topic
          displayName: Topic to process events
  message-producer:
    name: MessageProducer
    handler: handler.producer
    events:
      - schedule: rate(1 day)


handler.py:

import boto3
import json
import datetime
from datetime import timezone

def producer(event, context):
    sns = boto3.client('sns')

    context_parts = context.invoked_function_arn.split(':')
    topic_name = "marks-blog-topic"
    topic_arn = "arn:aws:sns:{region}:{account_id}:{topic}".format(
        region=context_parts[3], account_id=context_parts[4], topic=topic_name)

    now = datetime.datetime.now(timezone.utc)
    start_date = (now - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
    end_date = now.strftime("%Y-%m-%d")

    params = {"startDate": start_date, "endDate": end_date, "tags": ["neo4j"]}

    sns.publish(TopicArn= topic_arn, Message= json.dumps(params))


def consumer(event, context):
    for record in event["Records"]:
        message = json.loads(record["Sns"]["Message"])

        start_date = message["startDate"]
        end_date = message["endDate"]
        tags = message["tags"]

        print("start_date: " + start_date)
        print("end_date: " + end_date)
        print("tags: " + str(tags))


Trying It Out

We can simulate a message being received locally by executing the following command:

$ serverless invoke local \
    --function message-consumer \
    --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-29\"  }"}}]}'

start_date: 2017-09-25
end_date: 2017-09-29
tags: ['neo4j']
null


That seems to work fine. What about if we invoke the message-producer on AWS?

$ serverless invoke --function message-producer

null


Did the consumer receive the message?

$ serverless logs --function message-consumer

START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST
start_date: 2017-09-29
end_date: 2017-09-30
tags: ['neo4j']
END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f
REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65fDuration: 0.46 msBilled Duration: 100 ms Memory Size: 1024 MBMax Memory Used: 32 MB


Looks like it! We can also invoke the consumer directly on AWS:

$ serverless invoke \
    --function message-consumer \
    --data '{"Records":[{"Sns": {"Message":"{\"tags\": [\"neo4j\"], \"startDate\": \"2017-09-25\", \"endDate\": \"2017-09-26\"  }"}}]}'

null


And now if we check the consumer’s logs, we’ll see both messages:

$ serverless logs --function message-consumer

START RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f Version: $LATEST
start_date: 2017-09-29
end_date: 2017-09-30
tags: ['neo4j']
END RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65f
REPORT RequestId: 0ef5be87-a5b1-11e7-a905-f1387e68c65fDuration: 0.46 msBilled Duration: 100 ms Memory Size: 1024 MBMax Memory Used: 32 MB

START RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed Version: $LATEST
start_date: 2017-09-25
end_date: 2017-09-26
tags: ['neo4j']
END RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3ed
REPORT RequestId: 4cb42bc9-a5b1-11e7-affb-99fa6b4dc3edDuration: 16.46 msBilled Duration: 100 ms Memory Size: 1024 MBMax Memory Used: 32 MB


Success!

Data (computing) Pipeline (software)

Published at DZone with permission of Mark Needham, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Counting Faster With Postgres
  • Migrating From Heroku To Render
  • Choosing Between REST and GraphQL
  • How to Upload/Download a File To and From the Server

Comments

Cloud Partner Resources

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo