Stream inventory updates in real time with Postgres
With real-time data streaming, companies can gain valuable insights, make data-driven decisions faster, and automate processes that would otherwise require manual intervention.
From monitoring dynamic customer behaviors to processing financial transactions or managing stock levels, real-time streaming allows data to flow continuously between systems and enables instant reactions to events as they happen. This is especially useful in industries such as retail, finance, and adtech, where you need to quickly adapt to new information to maintain a competitive edge.
Postgres, a robust and highly scalable relational database, remains a popular choice for storing and managing structured data. When paired with Redpanda Connect, a powerful streaming tool designed to simplify real-time data flow, businesses can seamlessly bridge the gap between their streaming platforms and Postgres.
This integration enables continuous data ingestion and storage in Postgres, making it accessible for immediate analytics and reporting. For example, in retail, real-time streaming enables dynamic inventory management where stock levels from multiple stores are updated instantly in a centralized system.
In the adtech industry, Redpanda Connect can help power real-time ad targeting by processing user interaction data as it happens to provide immediate feedback for ad campaigns and optimize their performance based on real-time analytics.
This tutorial shows you how to use Redpanda Serverless, Redpanda Connect, and Postgres for real-time data streaming through an example retail app where inventory updates from multiple stores are streamed and stored instantly.
Scenario: Real-time data streaming in retail for inventory updates
Imagine you work for a retail company that’s developing an application to ingest inventory changes from its stores in real time. Your job is to implement a system that ingests the inventory update data (including information such as store, product ID, product name, current stock, and timestamp) from Redpanda Serverless using Redpanda Connect and saves the data to Postgres.
Once the stock level data is available in the Postgres database, the company’s inventory manager can take further actions, such as reordering products and making them available for distribution to stores.
You can find this demo on GitHub. Here’s a high-level architecture diagram of the required solution:
Prerequisites
You’ll need the following to complete this tutorial:
- Docker Desktop 4.28.0 or above
- The Git client CLI for cloning the project base
- Python 3.9 or above
- A free trial account or existing account on Redpanda Cloud
- Any Postgres client tool of your choice
1. Set up a Redpanda Serverless cluster
Log in to Redpanda Cloud using your account details. On the landing page, click Create cluster to create a Redpanda cluster:
Choose “Serverless” as your cluster type for this tutorial:
With a Serverless cluster, you can focus on application development and don’t need to worry about managing the infrastructure required to run the Redpanda cluster. When you choose this option, you also get instant access to a working Redpanda cluster.
On the next screen, fill in the details for the Serverless cluster. Add a cluster name and set the cloud provider as AWS and the region as us-east-1:
You can either name the cluster anything you like or go ahead with the default name suggested in the Redpanda Cloud portal. Click Create once done.
Once you’re automatically redirected to the cluster overview screen, click the Kafka API tab:
Here, note the Bootstrap server URL details, as you’ll need this information while configuring Redpanda Connect:
Leave this browser page open.
2. Set up a Postgres database
There are various ways to set up a Postgres database. In this tutorial, you’ll use the Docker approach. Open a terminal and execute this command:
docker run -d ^
--name db --rm ^
-p 5434:5432 ^
-e POSTGRES_USER=postgres ^
-e POSTGRES_PASSWORD=P@ssw0rd ^
-e TZ=Asia/Kolkata ^
-e LOGGING_COLLECTOR=on ^
-e LOG_DIRECTORY=/var/log/postgresql ^
-e LOG_MIN_MESSAGES=DEBUG1 ^
-v C:/Postgres/data:/var/lib/postgresql/data ^
-v C:/Postgres/log:/var/log/postgresql ^
postgres:16.3You can change the values of the environment variables and the path for the volume mount according to your needs.
Note: The above
docker runcommand uses^as a line delimiter since the example uses a machine with the Windows operating system. You should use\as the delimiter if your machine is running Linux or macOS.
Once the container starts, use any Postgres client and create a database named inventory using the following command:
create database inventory;
Log in to the inventory database and create a table with the same name where the stock updates from each store will be persisted via Redpanda Connect:
CREATE TABLE inventory (
store_id INT,
product_id INT,
product_name VARCHAR(255),
current_stock INT,
last_updated TIMESTAMP,
PRIMARY KEY (store_id, product_id)
);3. Set up Redpanda Connect
You’ll use Redpanda Connect to consume the messages from the created Redpanda topic and send them to the Postgres database. Redpanda Connect is included with the default Redpanda installation.
Redpanda Connect requires a YAML configuration file that defines several things: the input from a Redpanda topic, the pipeline to define processors for mapping data for the output or carry out any transformations if needed, and the output, which holds the configuration for saving the data to a Postgres table. Before diving further into Redpanda Connect and its YAML configuration file, let’s set up the demo environment.
In your home directory or wherever you prefer, run the following command to clone the repository for this tutorial:
git clone https://github.com/redpanda-data-blog/real-time-data-streaming-postgres.git
When navigating the cloned repository directory on your machine, you should see two main directories, producer_app and redpanda_connect. The producer_app directory includes the producer application you’ll use in the tutorial to produce inventory update data to the earlier created topic, and the redpanda_connect directory includes the required configuration file for Redpanda Connect.
Open the connect.yaml file with an editor of your choice. You should see that it’s empty. Paste in the following configuration snippet to define the input for Redpanda:
input:
label: "redpanda_serverless"
kafka:
addresses:
- [Your Redpanda cluster address]
topics:
- inventory-updates
consumer_group: inventory-consumer-group
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: [Your Redpanda cluster username]
password: [Your Redpanda cluster user's password]This input block contains several fields with placeholder information that need to be replaced with the relevant values.
Replace the addresses field value with your bootstrap server URL from earlier. The config also specifies that data will be consumed from inventory-updates. You’ll create this topic in the next step.
4. Create a topic
Switch back to your Redpanda Cloud browser window. Click Topics in the side menu to create a new Redpanda topic for this tutorial:
The topic will receive inventory update data from your producer app. Click Create topic:
Set the topic name to inventory-updates and partitions to 1, then click Create:
5. Create a user via the cluster’s Security page
Redpanda Serverless clusters are enabled with Transport Layer Security (TLS) by default to safeguard all communication between brokers. To read from and write to topics, you need to create a user and grant it permission to interact with the cluster. From the Redpanda Cloud browser window, click Security in the side menu, then click Create user:
Fill in a username and password. For the purposes of this tutorial, choose SCRAM-SHA-256 for the SASL mechanism:
Click Create to create the user. You’ll be automatically redirected to a screen confirming the user was created successfully. You can click the icons next to the user password to view it or copy it for later reference.
Click Create ACLs to assign privileges to the user:
You’ll be redirected to a screen with a list of available ACLs. Here, double-click the ACL principal name (the username of the user you created):
You’ll be taken to the “Edit ACL” screen. Click Allow all operations to grant all privileges for the user as part of this demo:
Scroll down to the bottom of the screen and click OK:
Edit the connect.yaml file from earlier to add the new user’s credentials in this section:
sasl:
mechanism: SCRAM-SHA-256
user: [Your Redpanda cluster username]
password: [Your Redpanda cluster user's password]6. Update the remaining Redpanda Connect configuration
You’ll now define the pipeline and a processor to map the inventory update data to the root of the output from the Redpanda input. To do this, edit the same connect.yaml file by pasting the following code snippet after the input block:
pipeline:
processors:
- mapping: |
root.store_id = this.store_id
root.product_id = this.product_id
root.product_name = this.product_name
root.current_stock = this.current_stock
root.last_updated = this.last_updatedAfter the pipeline block, define the output block to connect to Postgres by providing the connection URL and INSERT statement, which will insert the incoming stream of records into the desired table:
output:
sql_raw:
driver: postgres
dsn: postgres://[postgres user]:[postgres password]@host.docker.internal:[postgres host port]/inventory?sslmode=disable
query: "INSERT INTO inventory (store_id, product_id, product_name, current_stock, last_updated) VALUES ($1, $2, $3, $4, $5);"
args_mapping: |
root = [
this.store_id,
this.product_id,
this.product_name,
this.current_stock,
this.last_updated,
]Make sure to update the values in the Postgres connection URL to match your database setup and credentials.
Your full YAML configuration file should look as follows:
input:
label: "redpanda_serverless"
kafka:
addresses:
- [Your Redpanda cluster address]
topics:
- inventory-updates
consumer_group: inventory-consumer-group
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: [Your Redpanda cluster username]
password: [Your Redpanda cluster user's password]
pipeline:
processors:
- mapping: |
root.store_id = this.store_id
root.product_id = this.product_id
root.product_name = this.product_name
root.current_stock = this.current_stock
root.last_updated = this.last_updated
output:
sql_raw:
driver: postgres
dsn: postgres://[postgres user]:[postgres password]@host.docker.internal:[postgres host port]/inventory?sslmode=disable
query: "INSERT INTO inventory (store_id, product_id, product_name, current_stock, last_updated) VALUES ($1, $2, $3, $4, $5);"
args_mapping: |
root = [
this.store_id,
this.product_id,
this.product_name,
this.current_stock,
this.last_updated,
]Make sure to replace all the placeholder variable values. With this, Redpanda Connect is ready to use.
7. Start the Redpanda Connect container
To start the container and use Redpanda Connect, open a terminal and execute the following command, ensuring that the volume mount includes the absolute path of the redpanda_connect directory containing the Redpanda Connect configuration file:
docker run -d --name redpanda --rm ^
--network demo-network ^
-p 9092:9092 ^
-p 9644:9644 ^
-v [absolute path]/realtime-data-streaming-with-redpanda-connect-and-postgres/redpanda_connect:/redpanda_connect ^
docker.redpanda.com/redpandadata/redpanda:v24.2.4 ^
redpanda start ^
--advertise-kafka-addr redpanda ^
--overprovisioned ^
--smp 1 ^
--memory 1G ^
--reserve-memory 500M ^
--node-id 0 ^
--check=falseExecute the command below to run Redpanda Connect with the prepared YAML configuration file:
docker exec -it redpanda rpk connect run /redpanda_connect/connect.yaml
You’ll get the following output:
INFO Running main config from specified file @service=redpanda-connect benthos_version=4.34.0 path=/redpanda_connect/connect.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO Output type sql_raw is now active @service=redpanda-connect label="" path=root.output
INFO Input type kafka is now active @service=redpanda-connect label=redpanda_serverless path=root.inputThis means that your Redpanda Connect instance is active and ready to process the streaming messages from the configured input topic. Leave this terminal open.
8. Configure and run the producer app
With your Redpanda Serverless cluster running on Redpanda Cloud and the data connection to Postgres set up via Redpanda Connect, you’re ready to send data to the relevant Redpanda topic and insert data into the inventory table. To do this, open a new terminal window and switch to the producer_app directory inside the cloned project repository folder:
cd producer_app
Set up the virtual environment:
python -m venv venv
And activate it:
venv\Scripts\activate
Install the required dependency:
pip install kafka-python
Edit the following variables in the config.py file inside the producer_app directory with your cluster details and credentials:
"bootstrap_servers": "your redpanda broker address:port",
"sasl_plain_username": "your user",
"sasl_plain_password": "your password",Run the following command to execute the prebuilt producer application:
py produce_inventory_update_from_stores.py
You should see the messages being delivered to the topic:
Inventory update message is: {'store_id': 3, 'product_id': 88, 'product_name': 'Product-88', 'current_stock': 96, 'last_updated': '2024-09-13T11:41:31.734534Z'}
Sent to topic 'inventory-updates' at offset 0
Inventory update message is: {'store_id': 2, 'product_id': 131, 'product_name': 'Product-131', 'current_stock': 29, 'last_updated': '2024-09-13T11:41:36.975798Z'}
Sent to topic 'inventory-updates' at offset 1
Inventory update message is: {'store_id': 6, 'product_id': 123, 'product_name': 'Product-123', 'current_stock': 74, 'last_updated': '2024-09-13T11:41:41.982205Z'}
Sent to topic 'inventory-updates' at offset 2
Inventory update message is: {'store_id': 8, 'product_id': 16, 'product_name': 'Product-16', 'current_stock': 67, 'last_updated': '2024-09-13T11:41:46.989786Z'}
Sent to topic 'inventory-updates' at offset 3
Inventory update message is: {'store_id': 3, 'product_id': 50, 'product_name': 'Product-50', 'current_stock': 36, 'last_updated': '2024-09-13T11:41:51.993210Z'}
Sent to topic 'inventory-updates' at offset 4
Inventory update message is: {'store_id': 10, 'product_id': 94, 'product_name': 'Product-94', 'current_stock': 57, 'last_updated': '2024-09-13T11:41:57.003203Z'}
Sent to topic 'inventory-updates' at offset 5
Inventory update message is: {'store_id': 2, 'product_id': 135, 'product_name': 'Product-135', 'current_stock': 35, 'last_updated': '2024-09-13T11:42:02.012993Z'}
Sent to topic 'inventory-updates' at offset 6Switch over to your Postgres client tool and select the records from the inventory table using SQL:
select * from inventory ;
You should see the streaming records being ingested into the database table:
Conclusion
In this tutorial, you learned how real-time data streaming can transform how you process and analyze information by allowing continuous data flow across systems.
Using Postgres as a reliable relational database and Redpanda Connect as a seamless streaming tool, you saw how easy it is to integrate streaming data into a database for instant data processing. You explored a dynamic retail inventory management use case, illustrating how this combination of technology can enhance operational efficiency and allow you to respond swiftly to changing data.
To get started, you can find Redpanda Community Edition on GitHub or try Redpanda for free. Then, dive into the Redpanda Blog for step-by-step tutorials and real-world use cases to upgrade your real-time data streaming projects.
