Streaming data with Amazon Kinesis

Introduction

At Sqreen we use Amazon Kinesis service to process data from our agents in near real-time. This kind of processing became recently popular with the appearance of general use platforms that support it (such as Apache Kafka). Since these platforms deal with the stream of data, such processing is commonly called the “stream processing”. It’s a departure from the old model of analytics that ran the analysis in batches (hence its name “batch processing”) rather than online. The main differences between these two approaches are:

  • stream processing deals with data that are punctual in time, i.e. with events that are generated at specific points in time, whereas batch processing is applied to data batches representing larger slices of time (for example, data stored in databases),
  • stream processing analyses data online, i.e. most often almost immediately after it arrives, whereas batch processing waits for the data collection to be finished (the moment can be defined arbitrarily, for example, at the end of the day) to analyze it off-line,
  • data analysed by stream processing is unbounded, i.e. it does not have the specific end, whereas the batches are bounded, i.e. they have a well-defined window.

Streams as distributed logs

Platforms such as Apache Kafka provide streams that receive data from event sources (producers) and pass them down to consumers, which in turn can forward them to other streams. In essence, they are similar to message queues, but they support multiple consumers that process the same messages in parallel (like in publish-subscribe messaging model) and store the old messages even after they were delivered to the consumers. They are a kind of append-only event logs (Figure 1). Logs are most commonly associated with the flat files sitting in the /var/log directory and meant to be read by a human. Streams are different: they are logs optimized for storing/provisioning binary data (that could be text but also fragments of images, sensor readings, etc.). This log-like design of streams allows new consumers to be added or removed without any impact on the remaining consumers at any point. Consumers can also start reading from the stream at any offset (any message in the past).

Amazon Kinesis Stream Architecture

Figure 1 A sketch of a stream. New events are appended at the left of the stream-log and are consumed by the consumers from right to left starting with any offset.

When events arrive at high frequency, a single machine may not keep up with processing them. In this case, both streams and their consumers can be distributed by partitioning the source events (Figure 2)/. Such a partition is done on a key that will simply be part of the logged messages.

Amazon Kinesis architecture stream

Figure 2 Events emitted from the source (producer) are forwarded to the stream. In this case, the stream is distributed into two shards: an event is sent only to a single shard depending on the partition key that is part of the message (here the IP address). Messages from each shard are handled independently by different consumers.

Streaming applications

Streams have found applications in many problems. They are commonly used for real-time data analytics (such as streams of twits), for replicating databases (both for performance and reliability reasons), for real-time monitoring and detection of special events (such as fraud detection) and for building data-intensive systems that require different representations of the same data (for example, databases for operations, indexes for fast queries, and data warehouses for running batch analyses).

Amazon Kinesis Data Streams (which we will call simply Kinesis) is a managed service that provides a streaming platform. It includes solutions for stream storage and an API to implement producers and consumers. Amazon charges per hour of each stream work partition (called shards in Kinesis) and per volume of data flowing through the stream.

Goal

The goal of this tutorial is to familiarize you with the stream processing with Amazon Kinesis. In particular, we will implement a simple producer-stream-consumer pipeline that counts the number of requests in consecutive, one-minute-long time windows. We will apply this pipeline to simulated data, but it could be easily extended to work with real websites. This is precisely one of the applications that we use Kinesis for at Sqreen (more about it below).

We will demonstrate stream processing using the Jupyter notebook. You can download the notebook from here and execute it on your computer (for instructions, see Jupyter documentation). Alternatively, you can copy-paste the code examples directly to your Python interpreter.

Requirements

To install dependencies, run the following commands at the command line (i.e. in the shell).

$ pip install aws

Configure AWS credentials

To connect to AWS, you must first create your credentials (you will get them from the AWS Console). Then, simply configure them using the following command:

$ aws configure --profile blogpost-kinesis

blogpost-kinesis is the name of the profile you will use for this tutorial. When requested you will need to copy-paste your acess key id and secret obtained from AWS Management Console. For instructions, check the relevant section of AWS User Guide.

Creating a stream

Let’s create our first stream. You can either do it using the AWS Console or the API. We will use the second approach. First, we need to define the name of the stream, the region in which we will create it, and the profile to use for our AWS credentials (you can aws_profile to None if you use the default profile).

stream_name = 'blogpost-word-stream'
region = 'eu-west-1'
aws_profile = 'blogpost-kinesis'

Now we can use boto library to create the stream and wait until it becomes active.

import boto
from boto.kinesis.exceptions import ResourceInUseException
import os
import time

if aws_profile:
    os.environ['AWS_PROFILE'] = aws_profile

# connect to the kinesis
kinesis = boto.kinesis.connect_to_region(region)

def get_status():
    r = kinesis.describe_stream(stream_name)
    description = r.get('StreamDescription')
    status = description.get('StreamStatus')
    return status

def create_stream(stream_name):
    try:
        # create the stream
        kinesis.create_stream(stream_name, 1)
        print('stream {} created in region {}'.format(stream_name, region))
    except ResourceInUseException:
        print('stream {} already exists in region {}'.format(stream_name, region))


    # wait for the stream to become active
    while get_status() != 'ACTIVE':
        time.sleep(1)
    print('stream {} is active'.format(stream_name))

Running the code, generates the following output:

create_stream(stream_name)

stream blogpost-word-stream created in region eu-west-1 stream blogpost-word-stream is active

Putting data into streams

To have an operational stream processing chain, we need a source of messages (a producer in AWS terminology) and a receiver (consumer) that will obtain and process the messages. We will first define the producer.

import datetime
import time
import threading
from boto.kinesis.exceptions import ResourceNotFoundException

class KinesisProducer(threading.Thread):
    """Producer class for AWS Kinesis streams

    This class will emit records with the IP addresses as partition key and
    the emission timestamps as data"""

    def __init__(self, stream_name, sleep_interval=None, ip_addr='8.8.8.8'):
        self.stream_name = stream_name
        self.sleep_interval = sleep_interval
        self.ip_addr = ip_addr
        super().__init__()

    def put_record(self):
        """put a single record to the stream"""
        timestamp = datetime.datetime.utcnow()
        part_key = self.ip_addr
        data = timestamp.isoformat()

        kinesis.put_record(self.stream_name, data, part_key)

    def run_continously(self):
        """put a record at regular intervals"""
        while True:
            self.put_record()
            time.sleep(self.sleep_interval)

    def run(self):
        """run the producer"""
        try:
            if self.sleep_interval:
                self.run_continously()
            else:
                self.put_record()
        except ResourceNotFoundException:
            print('stream {} not found. Exiting'.format(self.stream_name))

Note that for the partition key we used the IP address and for the data the timestamps. In Kinesis, you are almost completely free to choose whatever you want for the data, as long as it can be serialised in binary format and it’s less than 50 KB of size. If you need to emit larger data, you can split it into several messages. The partition key must be a string shorter than 256 characters, it will be used to determine which shard to send the data to (Figure 2). All data that should be processed together must use the same partition key, otherwise it may be forwarded to another shard.

Note that we implemented the KinesisProducer as a Python thread, such that it can run in the background and won’t block the Python interpreter. This way we can continue executing Python instructions.

Now we create two producers with different IP addresses and different intervals between consecutive messages.

producer1 = KinesisProducer(stream_name, sleep_interval=2, ip_addr='8.8.8.8')
producer2 = KinesisProducer(stream_name, sleep_interval=5, ip_addr='8.8.8.9')
producer1.start()
producer2.start()

Sqreen’s Security Automation feature allows one to monitor traffic on a website and set conditions under which a given client should be blocked (such as, trying to read the same page too many times). To implement this feature, we are running similar event sources that inform the stream about the IP addresses from which the requests are emitted together with the timestamp of the request (Figure 3).

Consuming from a stream

Consumers receive the messages from the stream and process them. Their output could be messages forwarded to another stream, file saved on the filesystem (or Amazon S3 storage) or records stored in a database. Consumers can also keep local state. This makes them uniquely suited to work on a stream of similar data and quickly calculate a value from them.

Defining a consumer

First, let’s define a generic consumer, which will consist of a run method polling for new events from the Kinesis stream and process_method that will process the event data and produce any of the side effects (i.e. forwarding the results to another stream or committing them to a database). The process_method will not be implemented in this generic base class, and it will need to be implemented in the sub-classes (see below).

from boto.kinesis.exceptions import ProvisionedThroughputExceededException
import datetime

class KinesisConsumer:
    """Generic Consumer for Amazon Kinesis Streams"""
    def __init__(self, stream_name, shard_id, iterator_type,
                 worker_time=30, sleep_interval=0.5):

        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval

    def process_records(self, records):
        """the main logic of the Consumer that needs to be implemented"""
        raise NotImplementedError

    @staticmethod
    def iter_records(records):
        for record in records:
            part_key = record['PartitionKey']
            data = record['Data']
            yield part_key, data

    def run(self):
        """poll stream for new records and pass them to process_records method"""
        response = kinesis.get_shard_iterator(self.stream_name,
            self.shard_id, self.iterator_type)

        next_iterator = response['ShardIterator']

        start = datetime.datetime.now()
        finish = start + datetime.timedelta(seconds=self.worker_time)

        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)

                records = response['Records']

                if records:
                    self.process_records(records)

                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                time.sleep(1)

Implementing the processing logic

Note that each stream can have many consumers that receive all the messages and process them independently. Now, we will implement process_records method that will simply print the received messages to the standard output. We will do that by sub-classing the KinesisConsumer class.

class EchoConsumer(KinesisConsumer):
    """Consumers that echos received data to standard output"""
    def process_records(self, records):
        """print the partion key and data of each incoming record"""
        for part_key, data in self.iter_records(records):
            print(part_key, ":", data)

We attach the consumer to our stream. To do that we need to pass the shard ID and the position in the stream to start processing the messages. For the latter, we can choose between the newest (LATEST) or the oldest (TRIM_HORIZON) record in the stream. Note that the default retention period for messages in Kinesis streams is 24 hours. It can be extended up to 168 hours at an additional cost.

The streams are partitioned into separate “sub-streams” (called shards) that receive messages from the same source. The target shard for each message is determined from the partition key. Each consumer can read from one or more shards, but there must be at least one consumer must be associated to every shard, otherwise some messages will be lost. Since, we only use one shard in this example, we can directly pass the default shard ID. If you need to configure more than one shard (to increase the throughput), you will need to query the stream for the IDs of all active shards using the API. For the sake of this tutorial, we will assume that we have only a single shard (this is clearly the case, since we created the stream with a single shard, see the call to kinesis.create_stream above).

shard_id = 'shardId-000000000000'
iterator_type = 'LATEST'
worker = EchoConsumer(stream_name, shard_id, iterator_type, worker_time=10)

Now, let run the consumer and observe the output:

worker.run()

8.8.8.8 : 2018-09-06T08:04:27.796125 8.8.8.8 : 2018-09-06T08:04:29.877330 8.8.8.9 : 2018-09-06T08:04:30.895562 8.8.8.8 : 2018-09-06T08:04:31.963790 8.8.8.8 : 2018-09-06T08:04:34.015333

As expected the consumer printed all received records with their partition keys (IP addresses) and data (timestamps).

Event aggregation

Finally, we can implement a consumer with some non-trivial logic. The goal of this consumer is to count the number of distinct requests from each particular IP in a specific time window (here one minute). Again, we will subclass the KinesisConsumer class and re-implement the process_records method. In addition, we will define one extra helper method print_counters that will simply dump the current counts to the standard output. In practice, we would forward the outputs of such processing to another stream for further analysis (filtering, detection of untypical events etc.) or store it in the DB. This is a part of what actually happens in Sqreen’s Security Automation pipeline (see below).

from collections import defaultdict, Counter
from dateutil import parser
from operator import itemgetter

class CounterConsumer(KinesisConsumer):
    """Consumer that counts IP occurences in 1-minute time buckets"""

    def __init__(self, stream_name, shard_id, iterator_type, worker_time):
        self.time_buckets = defaultdict(Counter)
        sleep_interval = 20 # seconds
        super().__init__(stream_name, shard_id, iterator_type, worker_time, sleep_interval)

    def print_counters(self):
        """helper method to show counting results"""

        now = datetime.datetime.utcnow()
        print("##### Last run at {}".format(now))
        for timestamp, ip_counts in self.time_buckets.items():
            # sort counts with respect to the IP address
            ip_counts = sorted(ip_counts.items(), key=itemgetter(0))
            print(timestamp, ':', list(ip_counts))

    def process_records(self, records):
        for ip_addr, timestamp_str in self.iter_records(records):
            timestamp = parser.parse(timestamp_str)
            timestamp = timestamp.replace(second=0, microsecond=0)
            self.time_buckets[timestamp][ip_addr] += 1
        self.print_counters() 

Let’s test the consumer:

worker = CounterConsumer(stream_name, shard_id, iterator_type, worker_time=120)
worker.run()
##### Last run at 2018-09-06 08:04:56.468067
2018-09-06 08:04:00 : [('8.8.8.8', 9), ('8.8.8.9', 4)]
##### Last run at 2018-09-06 08:05:16.563615
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 8), ('8.8.8.9', 3)]
##### Last run at 2018-09-06 08:05:36.670241
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 17), ('8.8.8.9', 7)]
##### Last run at 2018-09-06 08:05:56.775192
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 27), ('8.8.8.9', 11)]
##### Last run at 2018-09-06 08:06:16.881760
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 29), ('8.8.8.9', 12)]
2018-09-06 08:06:00 : [('8.8.8.8', 8), ('8.8.8.9', 3)]

All the lines prefixed by the hash signs ##### show the results of the counting process for a single run of the consumer. Since the consumer is executed each time new events arrive, the lines show updated state of the time_buckets cache. Each line starts with the timestamp denoting the beginning of the time bucket (it ends with the beginning of the next time bucket, i.e. the windows do not overlap), and the it’s followed by the list of IP address and count pairs. Every time the consumer runs the values are updated, such that the counts increase. If new requests arrive at the time that is not covered by any of the buckets, a new bucket is added and the count starts from zero for this bucket. The effect is roughly what we tried to achieve.

How is streaming used at Sqreen?

At Sqreen we intensively use Kinesis streams, especially in the feature called Security Automation. Security Automation is a real-time analytics framework that allows user to control traffic on their servers based on well-defined criteria (called playbooks).

A simplified sketch of Sqreen's streaming pipeline

Figure 3 A simplified sketch of Sqreen’s streaming pipeline.

Our pipeline consists of several streams and associated consumers (Figure 3). The events are produced by agents that sit in the web apps of Sqreen users. They contain the basic information about the connection (source IP etc.) and any extra details relevant to the business logic of user’s application. These events are then consumed by a consumer that filters the events and forwards them to the Log stream. The Detection consumer consumes from the Log stream, applies playbooks and detects anomalies (for example, too many requests from a single IP) and generates a response (for example, notify the owner of the webapp or block the IP). In parallel, the messages from Log stream are consumed by the Counter consumer that does the aggregation similar to the one demonstrated in this tutorial. These aggregated data are then stored in a database and exposed in the form of a graph. This approach, in which data is processed in parallel in different ways to obtain different views is typical for stream processing. Note that Detection and Counter consumers read from Log stream with a different offset and do not interfere with each other (for example, if one consumer crashes or has a significant backlog, the other consumer is not affected). At Sqreen this design allows us to have multiple actions associated with the messages coming from the user web apps (IP blocking, notifications, logging, monitoring etc.).

Conclusions

We demonstrated how to use Amazon Kinesis on a request counting example. Although the example was simplified, it contained the basic components of all stream processors — two producers, a stream (with a single shard) and one consumer. You can easily take this example and adapt it to your needs.

One important limitation of the present CounterConsumer is that it keeps in memory and print all counting windows at each run of the consumer. In real applications, we might save only the completed windows in the database and remove them from the time_buckets cache. This is not a trivial problem, because we can never be sure whether some events will arrive late, for example, due to some network delay or temporary network outage.

Another extension of CounterConsumer is to allow for an overlap between the windows. This overlap would provide some smoothing in the counts and make our pipeline more responsive, because the end user would not have to wait for the full window to be complete before seeing a new event being added to the counts.

Last, but not least we did not cover an important topic of spawning new consumers in the case when the existing consumer fails or we want to increase the number of shards. Similarly, we did not talk the checkpointing that allows for recovery of consumer state from the crash. These are non-trivial problems but they can be handled by the Amazon Kinesis Client Library (KCL), which is based on a Java-based orchestrator called MultiLangDaeamon. We will look into running stream consumer process with KCL in a follow-up blog post.

Cleaning up

We can delete the stream at the end of the exercise to minimize AWS costs (you will be charged for each stream-hour whether you use the created stream or not).

kinesis.delete_stream(stream_name)

stream blogpost-word-stream not found. Exiting
stream blogpost-word-stream not found. Exiting

The two messages are printed by the consumers that do not find the stream anymore and have to exit.

Further reading

[1] Jan Kreps, The Log: What every software engineer should know about real-time data’s unifying abstraction, 2013, blogpost

[2] Martin Kleppmann, Designing data-intensive applications, O’Reilly media, 2017

[3] Martin Kleppmann, Making Sense of Stream Processing, O’Reilly media, 2016, read online