Using Redis Stream with Python

Using Redis Stream with Python

Produce and consume data from Redis Stream using python, sockets

Problem:

I'd like to store a huge amount of outputs (a sort of logs), as soon I already had a data store in my project (Postgres) configured, I've created a new table and save those data there. Locally everything was fine, however, in production and a lot of requests, I started getting weird postgres/transactions errors.

Solution

Even though, might have solutions/configuration for the Postgres errors, I was researching outside the box. I also remembered a colleague that had mentioned about Redis Streams. So, I did a proof of concept (which I'm sharing here) and it did work very well.

Redis is Cache, right?

No! There are many products behind Redis, perhaps cache is the most known, however, there are a lot of others that I might write another post.

Redis Stream

I won't explain Redis Stream and where you can use it in this post, mainly because there good articles about this topic out there (I'll link few of them down below). I rather prefer to show my POC using Python to produce and consume data from Redis Stream, check it out. If this post has something useful, please, leave a reaction ๐Ÿ‘ or please, leave a comment ๐Ÿ’ฌ

However, I'll sum up few useful commands that we execute behind the scenes inside the redis-py package.

  • Write user_id, venue_id and star_rating into the mystream1 Redis Stream
XADD mystream1 * user_id 9000 venue_id 123 star_rating 3
"1612299047621-0"   ๐Ÿ‘‰ The unique key output

XADD mystream1 * user_id 8701 venue_id 3226 star_rating 4
"1612299047621-1"

Note: The * tells Redis to add a unique timestamp ID + sequence number

  • Read stream based on a period of time (order is older first):
XRANGE mystream1 1612299047621 1612299137690
XRANGE mystream1 1612299047621 1612299137690 COUNT 2

Note: COUNT will limit to get just two rows

  • Read Stream (most recent first):
XREVRANGE mystream1 1612299137690 1612299047621

However, the command we are looking for is XREAD which will keep getting rows:

XREAD COUNT 1 BLOCK 5000 STREAMS mystream1 1612299137697

Note: There are even more commands regarding only the Stream, however, the XADD and XREAD will be all we need here.

The POC structure

โ”œโ”€โ”€ client               ๐Ÿ‘‰ A web app (FastAPI) consuming in real time
โ”‚   โ”œโ”€โ”€ main.py
โ”‚   โ””โ”€โ”€ templates
โ”‚       โ””โ”€โ”€ index.htm
โ”œโ”€โ”€ producer.py          ๐Ÿ‘‰ The producer to write data into the Stream
โ”œโ”€โ”€ consumer.py          ๐Ÿ‘‰  Another way to consume the Stream
โ”œโ”€โ”€ docker-compose.yml   ๐Ÿ‘‰ It runs a Redis locally easily
โ””โ”€โ”€ requirements.txt     ๐Ÿ‘‰ The project dependencies

Requirements

  • You'll need a redis running, we're using docker-compose (there is a link how to install docker on ubuntu based linux)
  • It's a good idea, using a python virtualenv to install the dependencies

Get Redis up running locally

As soon you have docker and docker-compose installed, create the docker-compose.yml file below

# docker-compose.yml

version: "3.4"
services:
  redis:
    image: redis
    ports:
      - 6379:6379
    expose:
      - "6379"

Use the the following command to run the redis container:

docker-compose up -d

Dependencies

Create the requirements.txt

# requirements.txt

# PRODUCER/CONSUMER (redis-py only)
redis==3.5.3

# CLIENT (using fastapi + websocket)
fastapi==0.65.2
asgiref==3.4.1
click==8.0.1
h11==0.12.0
httptools==0.2.0
Jinja2==3.0.1
MarkupSafe==2.0.1
pydantic==1.8.2
python-dotenv==0.18.0
PyYAML==5.4.1
starlette==0.14.2
typing-extensions==3.10.0.0
uvicorn==0.14.0
uvloop==0.15.2
watchgod==0.7
websockets==9.1

Inside your virtualenv, install the dependencies:

pip install -r requirements.txt

Note: Basically all you need is the redis-py only! All other dependencies is because we are using a web application to consume the data.

The producer

Create the producer.py file:

# producer.py
"""
It sends a python dict (producer, some_id, count)
to REDIS STREAM (using the xadd method)

Usage:
  PRODUCER=Roger MESSAGES=10 python producer.py
"""
from os import environ
from redis import Redis
from uuid import uuid4
from time import sleep

stream_key = environ.get("STREAM", "jarless-1")
producer = environ.get("PRODUCER", "user-1")
MAX_MESSAGES = int(environ.get("MESSAGES", "2"))


def connect_to_redis():
    hostname = environ.get("REDIS_HOSTNAME", "localhost")
    port = environ.get("REDIS_PORT", 6379)

    r = Redis(hostname, port, retry_on_timeout=True)
    return r


def send_data(redis_connection, max_messages):
    count = 0
    while count < max_messages:
        try:
            data = {
                "producer": producer,
                "some_id": uuid4().hex,  # Just some random data
                "count": count,
            }
            resp = redis_connection.xadd(stream_key, data)
            print(resp)
            count += 1

        except ConnectionError as e:
            print("ERROR REDIS CONNECTION: {}".format(e))

        sleep(0.5)


if __name__ == "__main__":
    connection = connect_to_redis()
    send_data(connection, MAX_MESSAGES)

The consumer

Create the consumer.py file

"""
It reads the REDIS STREAM events
Using the xread, it gets 1 event per time (from the oldest to the last one)

Usage:
  python consumer.py
"""
from os import environ
from redis import Redis

stream_key = environ.get("STREAM", "jarless-1")


def connect_to_redis():
    hostname = environ.get("REDIS_HOSTNAME", "localhost")
    port = environ.get("REDIS_PORT", 6379)

    r = Redis(hostname, port, retry_on_timeout=True)
    return r


def get_data(redis_connection):
    last_id = 0
    sleep_ms = 5000
    while True:
        try:
            resp = redis_connection.xread(
                {stream_key: last_id}, count=1, block=sleep_ms
            )
            if resp:
                key, messages = resp[0]
                last_id, data = messages[0]
                print("REDIS ID: ", last_id)
                print("      --> ", data)

        except ConnectionError as e:
            print("ERROR REDIS CONNECTION: {}".format(e))


if __name__ == "__main__":
    connection = connect_to_redis()
    get_data(connection)

Then, if you run the two commands below in a different terminal, you will see one sending data and the another getting it.

# Terminal 1
PRODUCER=Roger MESSAGES=42 python producer.py

# Terminal 2
python consumer.py

Getting the stream data from the web in real time

Create the client/main.py

# client/main.py

import asyncio
from os import environ

from fastapi import FastAPI
from fastapi import Request
from fastapi import WebSocket
from fastapi.templating import Jinja2Templates
from redis import Redis


app = FastAPI()
templates = Jinja2Templates(directory="templates")

stream_key = environ.get("STREAM", "jarless-1")
hostname = environ.get("REDIS_HOSTNAME", "localhost")
port = environ.get("REDIS_PORT", 6379)
redis_cli = Redis(hostname, port, retry_on_timeout=True)


@app.get("/")
def read_root(request: Request):
    return templates.TemplateResponse("index.htm", {"request": request})


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    TODO:
    - Understand deeply the redis-py.xread method:
      - why we need to send a list of stream? I didn't want get the first [0] element
      - why it returns bytes, Is the .decode a good way?
      - is there a better way to convert it to python dict?
      - what is the a good number for the sleep/block?
    """
    last_id = 0
    sleep_ms = 5000

    await websocket.accept()
    while True:
        await asyncio.sleep(0.3)
        resp = redis_cli.xread({stream_key: last_id}, count=1, block=sleep_ms)
        print("Waitting...")
        if resp:
            key, messages = resp[0]  # :(
            last_id, data = messages[0]

            data_dict = {k.decode("utf-8"): data[k].decode("utf-8") for k in data}
            data_dict["id"] = last_id.decode("utf-8")
            data_dict["key"] = key.decode("utf-8")
            await websocket.send_json(data_dict)

Cretate the client/templates/index.html

# client/templates/index.html

<!DOCTYPE html>
<html>
    <head>
        <title>POC - Redis Stream</title>
        <style>
            body {
                background-color: #24292e;
            }
            #mydata {
                padding: 20px;
                color: aliceblue;
                font-family: ui-monospace,SFMono-Regular,SF Mono,Consolas,Liberation Mono,Menlo,monospace!important;
                font-size: 0.8em;
            }
        </style>
    </head>
    <body>
        <h1>Redis Stream Consumer</h1>
        <div id="mydata"></div>
        <script>
            var el = document.getElementById("mydata");
            const ws = new WebSocket("ws://localhost:8000/ws");
            ws.onmessage = function(event) {
                const mydata = JSON.parse(event.data);
                var tag = document.createElement("p");
                var text = document.createTextNode(
                    `${mydata.id}: ${mydata.some_id} (${mydata.producer})`);
                tag.appendChild(text);
                el.appendChild(tag);
                window.scrollTo(0,document.body.scrollHeight);
            };
        </script>
    </body>
</html>

Run the web application:

cd client
uvicorn main:app --reload

๐Ÿ‘‰ go to http://localhost:8000

Run again the producer script:

PRODUCER=John MESSAGES=20 python producer.py

The console sending data image.png

The browser getting data POC-Redis-Stream.gif

Find the complete source in repo below

Links:

ย