Using Redis Stream with Python
Produce and consume data from Redis Stream using python, sockets
Problem:
I'd like to store a huge amount of outputs (a sort of logs), as soon I already had a data store in my project (Postgres) configured, I've created a new table and save those data there. Locally everything was fine, however, in production and a lot of requests, I started getting weird postgres/transactions errors.
Solution
Even though, might have solutions/configuration for the Postgres errors, I was researching outside the box. I also remembered a colleague that had mentioned about Redis Streams. So, I did a proof of concept (which I'm sharing here) and it did work very well.
Redis is Cache, right?
No! There are many products behind Redis, perhaps cache is the most known, however, there are a lot of others that I might write another post.
Redis Stream
I won't explain Redis Stream and where you can use it in this post, mainly because there good articles about this topic out there (I'll link few of them down below). I rather prefer to show my POC using Python to produce and consume data from Redis Stream, check it out. If this post has something useful, please, leave a reaction ๐ or please, leave a comment ๐ฌ
However, I'll sum up few useful commands that we execute behind the scenes inside the redis-py
package.
- Write user_id, venue_id and star_rating into the
mystream1
Redis Stream
XADD mystream1 * user_id 9000 venue_id 123 star_rating 3
"1612299047621-0" ๐ The unique key output
XADD mystream1 * user_id 8701 venue_id 3226 star_rating 4
"1612299047621-1"
Note:
The *
tells Redis to add a unique timestamp ID + sequence number
- Read stream based on a period of time (order is older first):
XRANGE mystream1 1612299047621 1612299137690
XRANGE mystream1 1612299047621 1612299137690 COUNT 2
Note: COUNT will limit to get just two rows
- Read Stream (most recent first):
XREVRANGE mystream1 1612299137690 1612299047621
However, the command we are looking for is XREAD
which will keep getting rows:
XREAD COUNT 1 BLOCK 5000 STREAMS mystream1 1612299137697
Note: There are even more commands regarding only the Stream, however, the XADD
and XREAD
will be all we need here.
The POC structure
โโโ client ๐ A web app (FastAPI) consuming in real time
โ โโโ main.py
โ โโโ templates
โ โโโ index.htm
โโโ producer.py ๐ The producer to write data into the Stream
โโโ consumer.py ๐ Another way to consume the Stream
โโโ docker-compose.yml ๐ It runs a Redis locally easily
โโโ requirements.txt ๐ The project dependencies
Requirements
- You'll need a redis running, we're using docker-compose (there is a link how to install docker on ubuntu based linux)
- It's a good idea, using a python virtualenv to install the dependencies
Get Redis up running locally
As soon you have docker and docker-compose installed, create the
docker-compose.yml
file below
# docker-compose.yml
version: "3.4"
services:
redis:
image: redis
ports:
- 6379:6379
expose:
- "6379"
Use the the following command to run the redis container:
docker-compose up -d
Dependencies
Create the requirements.txt
# requirements.txt
# PRODUCER/CONSUMER (redis-py only)
redis==3.5.3
# CLIENT (using fastapi + websocket)
fastapi==0.65.2
asgiref==3.4.1
click==8.0.1
h11==0.12.0
httptools==0.2.0
Jinja2==3.0.1
MarkupSafe==2.0.1
pydantic==1.8.2
python-dotenv==0.18.0
PyYAML==5.4.1
starlette==0.14.2
typing-extensions==3.10.0.0
uvicorn==0.14.0
uvloop==0.15.2
watchgod==0.7
websockets==9.1
Inside your virtualenv, install the dependencies:
pip install -r requirements.txt
Note: Basically all you need is the redis-py
only! All other dependencies is because we are using a web application to consume the data.
The producer
Create the producer.py file:
# producer.py
"""
It sends a python dict (producer, some_id, count)
to REDIS STREAM (using the xadd method)
Usage:
PRODUCER=Roger MESSAGES=10 python producer.py
"""
from os import environ
from redis import Redis
from uuid import uuid4
from time import sleep
stream_key = environ.get("STREAM", "jarless-1")
producer = environ.get("PRODUCER", "user-1")
MAX_MESSAGES = int(environ.get("MESSAGES", "2"))
def connect_to_redis():
hostname = environ.get("REDIS_HOSTNAME", "localhost")
port = environ.get("REDIS_PORT", 6379)
r = Redis(hostname, port, retry_on_timeout=True)
return r
def send_data(redis_connection, max_messages):
count = 0
while count < max_messages:
try:
data = {
"producer": producer,
"some_id": uuid4().hex, # Just some random data
"count": count,
}
resp = redis_connection.xadd(stream_key, data)
print(resp)
count += 1
except ConnectionError as e:
print("ERROR REDIS CONNECTION: {}".format(e))
sleep(0.5)
if __name__ == "__main__":
connection = connect_to_redis()
send_data(connection, MAX_MESSAGES)
The consumer
Create the consumer.py file
"""
It reads the REDIS STREAM events
Using the xread, it gets 1 event per time (from the oldest to the last one)
Usage:
python consumer.py
"""
from os import environ
from redis import Redis
stream_key = environ.get("STREAM", "jarless-1")
def connect_to_redis():
hostname = environ.get("REDIS_HOSTNAME", "localhost")
port = environ.get("REDIS_PORT", 6379)
r = Redis(hostname, port, retry_on_timeout=True)
return r
def get_data(redis_connection):
last_id = 0
sleep_ms = 5000
while True:
try:
resp = redis_connection.xread(
{stream_key: last_id}, count=1, block=sleep_ms
)
if resp:
key, messages = resp[0]
last_id, data = messages[0]
print("REDIS ID: ", last_id)
print(" --> ", data)
except ConnectionError as e:
print("ERROR REDIS CONNECTION: {}".format(e))
if __name__ == "__main__":
connection = connect_to_redis()
get_data(connection)
Then, if you run the two commands below in a different terminal, you will see one sending data and the another getting it.
# Terminal 1
PRODUCER=Roger MESSAGES=42 python producer.py
# Terminal 2
python consumer.py
Getting the stream data from the web in real time
Create the client/main.py
# client/main.py
import asyncio
from os import environ
from fastapi import FastAPI
from fastapi import Request
from fastapi import WebSocket
from fastapi.templating import Jinja2Templates
from redis import Redis
app = FastAPI()
templates = Jinja2Templates(directory="templates")
stream_key = environ.get("STREAM", "jarless-1")
hostname = environ.get("REDIS_HOSTNAME", "localhost")
port = environ.get("REDIS_PORT", 6379)
redis_cli = Redis(hostname, port, retry_on_timeout=True)
@app.get("/")
def read_root(request: Request):
return templates.TemplateResponse("index.htm", {"request": request})
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
TODO:
- Understand deeply the redis-py.xread method:
- why we need to send a list of stream? I didn't want get the first [0] element
- why it returns bytes, Is the .decode a good way?
- is there a better way to convert it to python dict?
- what is the a good number for the sleep/block?
"""
last_id = 0
sleep_ms = 5000
await websocket.accept()
while True:
await asyncio.sleep(0.3)
resp = redis_cli.xread({stream_key: last_id}, count=1, block=sleep_ms)
print("Waitting...")
if resp:
key, messages = resp[0] # :(
last_id, data = messages[0]
data_dict = {k.decode("utf-8"): data[k].decode("utf-8") for k in data}
data_dict["id"] = last_id.decode("utf-8")
data_dict["key"] = key.decode("utf-8")
await websocket.send_json(data_dict)
Cretate the client/templates/index.html
# client/templates/index.html
<!DOCTYPE html>
<html>
<head>
<title>POC - Redis Stream</title>
<style>
body {
background-color: #24292e;
}
#mydata {
padding: 20px;
color: aliceblue;
font-family: ui-monospace,SFMono-Regular,SF Mono,Consolas,Liberation Mono,Menlo,monospace!important;
font-size: 0.8em;
}
</style>
</head>
<body>
<h1>Redis Stream Consumer</h1>
<div id="mydata"></div>
<script>
var el = document.getElementById("mydata");
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
const mydata = JSON.parse(event.data);
var tag = document.createElement("p");
var text = document.createTextNode(
`${mydata.id}: ${mydata.some_id} (${mydata.producer})`);
tag.appendChild(text);
el.appendChild(tag);
window.scrollTo(0,document.body.scrollHeight);
};
</script>
</body>
</html>
Run the web application:
cd client
uvicorn main:app --reload
๐ go to http://localhost:8000
Run again the producer script:
PRODUCER=John MESSAGES=20 python producer.py
The console sending data
The browser getting data
Find the complete source in repo below
Links: