DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Data
  4. Introducing the Real-Time Data Store: Kinesis Streams

Introducing the Real-Time Data Store: Kinesis Streams

This blog post will introduce Kinesis Streams as the foundation of real-time data processing on AWS.

Michael Wittig user avatar by
Michael Wittig
·
Oct. 12, 16 · Opinion
Like (1)
Save
Tweet
Share
5.27K Views

Join the DZone community and get the full member experience.

Join For Free

Kinesis is all about real-time data:

  • Kinesis Streams are a temporary store for real-time data. Append new events or read all events in order.
  • Kinesis Analytics helps you to analyze data in real-time.
  • Kinesis Firehose ingests real-time data into data stores like S3, Elasticsearch or Redshift for batch analytics.

This blog post will introduce Kinesis Streams as the foundation of real-time data processing on AWS.

What Is a Stream?

Imagine a timeline where you can add events at the current time, and you will have a good mental model of a Kinesis stream. The following animation will demonstrate the idea.

Kinesis Stream Animation

Arriving data is symbolized as yellow circles and called a Record. The current time is generated by the blue Kinesis Stream System. You can only insert records at the current time. You can not add records in the past or the future. This implies that records are ordered by time, inside a stream.

If you want to read records from a stream you have three options:

  • read from the beginning of the stream to the present
  • read from a certain record (identified by a sequence number) to the present
  • read from a certain time to the presentDepending on your use case you are interested in historical records or not. A Kinesis Stream supports both cases. But there is one limitation: Kinesis Streams can keep records only 1-7 days. After that, records are deleted. Kinesis is not a long term storage.

How Does It Scale?

A single Kinesis Stream is composed of Shards. A single shard supports:

  • 5 read operations per second (you read in batches)
  • maximum total data read rate of 2 MB per second
  • insertion of up to 1,000 records per second
  • maximum total data write rate of 1 MB per second

Records are assigned to shards based on a Partition Key. Records with the same partition key are always assigned to the same shard. The following animation will demonstrate the idea using two shards and representing partition keys with colors.

Kinesis Shard Animation

It’s important to know that there is no total order of records in the stream. The order is only guaranteed for records of the same shard. So if you want to track actions by many users, it makes sense to select the user id as the partition key. Otherwise, you will not be able to retrieve the records (actions) of a user in order.

How Do You Insert Records?

Kinesis Streams make it very simple to insert records. The application that inserts records is also called the Producer.

When you insert records, you don’t care about shards. You just provide the name of the stream, a partition key, and the payload. A simple Node.js example follows:

var kinesis = new AWS.Kinesis();
kinesis.putRecord({
  Data: '{"action": "click", "productId": "product-123"}',
  PartitionKey: 'user-123',
  StreamName: 'cloudonaut-stream'
}, function(err, data) {
  if (err) {
    console.log(err, err.stack); // an error occurred
  } else {
    console.log(data); // successful response
  }
});

putRecord Output:

{
  ShardId: 'shardId-000000000001',
  SequenceNumber: '49566461377660667441689347227228529918554355624453341202'
}

How Do You Read Records?

The application that reads records is also called the Consumer.

Reading records is trickier because you can only read from a shard, not from a stream.
You need to create a Shard Iterator to read from a shard. The shard iterator contains the information of the next record to read. So when you read records, you will get the records and a new shard iterator that you can use to read the next records from the shard. A simple Node.js example follows:

var kinesis = new AWS.Kinesis();
kinesis.describeStream({
  StreamName: 'cloudonaut-stream'
}, function(err, streamData) {
  if (err) {
    console.log(err, err.stack); // an error occurred
  } else {
    console.log(streamData); // successful response
    streamData.StreamDescription.Shards.forEach(shard => {
      kinesis.getShardIterator({
        ShardId: shard.ShardId,
        ShardIteratorType: 'TRIM_HORIZON',
        StreamName: 'cloudonaut-stream'
      }, function(err, shardIteratordata) {
        if (err) {
          console.log(err, err.stack); // an error occurred
        } else {
          console.log(shardIteratordata); // successful response
          kinesis.getRecords({
            ShardIterator: shardIteratordata.ShardIterator
          }, function(err, recordsData) {
            if (err) {
              console.log(err, err.stack); // an error occurred
            } else {
              console.log(recordsData); // successful response
            }
          });
        }
      });
    });
  }
});

describeStream Output:

{
  StreamDescription: {
    StreamName: 'cloudonaut-stream',
    StreamARN: 'arn:aws:kinesis:eu-west-1:123456123456:stream/cloudonaut-stream',
    StreamStatus: 'ACTIVE',
    Shards: [ [Object], [Object] ],
    HasMoreShards: false,
    RetentionPeriodHours: 24,
    EnhancedMonitoring: [ [Object] ] 
  }
}

getShardIterator Outputs:

{
  ShardIterator: 'AAAAAAAAAAFczeAFwB21etXem1x93AV2dp2aLsJ+8W9Kntx72U9TewMHgG1zQSQbykcdepxW5I'
}

{
  ShardIterator: 'AAAAAAAAAAEyoN6Nw/vClRIOJ/dzg/7cE9PawEzP/KxbQHm3q41cvDXOpCzni0MP2lknXF'
}

getRecords Outputs:

{
  Records: [{
    SequenceNumber: '49566461377660667441689347227114890891510548871071662098',
    ApproximateArrivalTimestamp: 2016-10-07T16:50:58.055Z,
    Data: '{"action": "click", "productId": "product-123"}',
    PartitionKey: 'user-123'
  }, {
    SequenceNumber: '49566461377660667441689347227116099817330163775124275218',
    ApproximateArrivalTimestamp: 2016-10-07T16:51:00.726Z,
    Data: '{"action": "click", "productId": "product-234"}',
    PartitionKey: 'user-123'
  }, {
    SequenceNumber: '49566461377660667441689347227117308743149778541737934866',
    ApproximateArrivalTimestamp: 2016-10-07T16:51:03.203Z,
    Data: '{"action": "click", "productId": "product-456"}',
    PartitionKey: 'user-123'
  }],
  NextShardIterator: 'AAAAAAAAAAH+L+N0v7t9gU2myNC7TmpJh15RXfHqSifCMnA+74HFvAtwlpInF5756vQi8NHTMZHfHzP7W2XWRWOiJ',
  MillisBehindLatest: 0
}

{
  Records: [{
    SequenceNumber: '49566461377660667441689347227168083627573593860428791826',
    ApproximateArrivalTimestamp: 2016-10-07T16:51:15.855Z,
    Data: '{"action": "click", "productId": "product-123"}',
    PartitionKey: 'user-234'
  }],
  NextShardIterator: 'AAAAAAAAAAHcxQHyANnjIVul8/W8AYN9LJSkfm0mCEE8P2zm+tExzLAZgrzuQxOwyOhCfosSKZ/3wKXP2CmJPzea5',
  MillisBehindLatest: 0
}

That sounds pretty complicated, right? And that’s why most people use either the Kinesis Client Library or a Lambda function to read from Kinesis Streams. If possible, I recommend to use a Lambda function and connect it the Kinesis stream.

Kinesis Streams store real-time data for up to 7 days. A stream is composed of shards. Only inside a shard, there is an order of the records. When you insert records you can provide a partition key to group records together. When you read records, you read from a shard, not the whole stream. To keep track of the next record to read shard iterators are used.

Kinesis Streams is a managed service that is fault tolerant. It scales based on the number of shards you provision. You can change the number of shards at any time.

The pricing model is based on shard hours and insert record operations.

Stream (computing) Data processing Record (computer science) Shard (database architecture)

Published at DZone with permission of Michael Wittig. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Select ChatGPT From SQL? You Bet!
  • Mr. Over, the Engineer [Comic]
  • How Observability Is Redefining Developer Roles
  • Top Three Docker Alternatives To Consider

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: