Sitemap

Stream inventory updates in real time with Postgres

11 min readDec 26, 2024
Press enter or click to view image in full size

With real-time data streaming, companies can gain valuable insights, make data-driven decisions faster, and automate processes that would otherwise require manual intervention.

From monitoring dynamic customer behaviors to processing financial transactions or managing stock levels, real-time streaming allows data to flow continuously between systems and enables instant reactions to events as they happen. This is especially useful in industries such as retail, finance, and adtech, where you need to quickly adapt to new information to maintain a competitive edge.

Postgres, a robust and highly scalable relational database, remains a popular choice for storing and managing structured data. When paired with Redpanda Connect, a powerful streaming tool designed to simplify real-time data flow, businesses can seamlessly bridge the gap between their streaming platforms and Postgres.

This integration enables continuous data ingestion and storage in Postgres, making it accessible for immediate analytics and reporting. For example, in retail, real-time streaming enables dynamic inventory management where stock levels from multiple stores are updated instantly in a centralized system.

In the adtech industry, Redpanda Connect can help power real-time ad targeting by processing user interaction data as it happens to provide immediate feedback for ad campaigns and optimize their performance based on real-time analytics.

This tutorial shows you how to use Redpanda Serverless, Redpanda Connect, and Postgres for real-time data streaming through an example retail app where inventory updates from multiple stores are streamed and stored instantly.

Scenario: Real-time data streaming in retail for inventory updates

Imagine you work for a retail company that’s developing an application to ingest inventory changes from its stores in real time. Your job is to implement a system that ingests the inventory update data (including information such as store, product ID, product name, current stock, and timestamp) from Redpanda Serverless using Redpanda Connect and saves the data to Postgres.

Once the stock level data is available in the Postgres database, the company’s inventory manager can take further actions, such as reordering products and making them available for distribution to stores.

You can find this demo on GitHub. Here’s a high-level architecture diagram of the required solution:

Press enter or click to view image in full size
Architecture diagram

Prerequisites

You’ll need the following to complete this tutorial:

1. Set up a Redpanda Serverless cluster

Log in to Redpanda Cloud using your account details. On the landing page, click Create cluster to create a Redpanda cluster:

Press enter or click to view image in full size
Redpanda Cloud home page

Choose “Serverless” as your cluster type for this tutorial:

Press enter or click to view image in full size
Choosing Serverless as your cluster type

With a Serverless cluster, you can focus on application development and don’t need to worry about managing the infrastructure required to run the Redpanda cluster. When you choose this option, you also get instant access to a working Redpanda cluster.

On the next screen, fill in the details for the Serverless cluster. Add a cluster name and set the cloud provider as AWS and the region as us-east-1:

Press enter or click to view image in full size
Serverless cluster details

You can either name the cluster anything you like or go ahead with the default name suggested in the Redpanda Cloud portal. Click Create once done.

Once you’re automatically redirected to the cluster overview screen, click the Kafka API tab:

Press enter or click to view image in full size
Cluster overview

Here, note the Bootstrap server URL details, as you’ll need this information while configuring Redpanda Connect:

Press enter or click to view image in full size
Bootstrap server URL

Leave this browser page open.

2. Set up a Postgres database

There are various ways to set up a Postgres database. In this tutorial, you’ll use the Docker approach. Open a terminal and execute this command:

docker run -d ^
--name db --rm ^
-p 5434:5432 ^
-e POSTGRES_USER=postgres ^
-e POSTGRES_PASSWORD=P@ssw0rd ^
-e TZ=Asia/Kolkata ^
-e LOGGING_COLLECTOR=on ^
-e LOG_DIRECTORY=/var/log/postgresql ^
-e LOG_MIN_MESSAGES=DEBUG1 ^
-v C:/Postgres/data:/var/lib/postgresql/data ^
-v C:/Postgres/log:/var/log/postgresql ^
postgres:16.3

You can change the values of the environment variables and the path for the volume mount according to your needs.

Note: The above docker run command uses ^ as a line delimiter since the example uses a machine with the Windows operating system. You should use \ as the delimiter if your machine is running Linux or macOS.

Once the container starts, use any Postgres client and create a database named inventory using the following command:

create database inventory;

Log in to the inventory database and create a table with the same name where the stock updates from each store will be persisted via Redpanda Connect:

CREATE TABLE inventory (
store_id INT,
product_id INT,
product_name VARCHAR(255),
current_stock INT,
last_updated TIMESTAMP,
PRIMARY KEY (store_id, product_id)
);

3. Set up Redpanda Connect

You’ll use Redpanda Connect to consume the messages from the created Redpanda topic and send them to the Postgres database. Redpanda Connect is included with the default Redpanda installation.

Redpanda Connect requires a YAML configuration file that defines several things: the input from a Redpanda topic, the pipeline to define processors for mapping data for the output or carry out any transformations if needed, and the output, which holds the configuration for saving the data to a Postgres table. Before diving further into Redpanda Connect and its YAML configuration file, let’s set up the demo environment.

In your home directory or wherever you prefer, run the following command to clone the repository for this tutorial:

git clone https://github.com/redpanda-data-blog/real-time-data-streaming-postgres.git

When navigating the cloned repository directory on your machine, you should see two main directories, producer_app and redpanda_connect. The producer_app directory includes the producer application you’ll use in the tutorial to produce inventory update data to the earlier created topic, and the redpanda_connect directory includes the required configuration file for Redpanda Connect.

Open the connect.yaml file with an editor of your choice. You should see that it’s empty. Paste in the following configuration snippet to define the input for Redpanda:

input:
label: "redpanda_serverless"
kafka:
addresses:
- [Your Redpanda cluster address]
topics:
- inventory-updates
consumer_group: inventory-consumer-group
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: [Your Redpanda cluster username]
password: [Your Redpanda cluster user's password]

This input block contains several fields with placeholder information that need to be replaced with the relevant values.

Replace the addresses field value with your bootstrap server URL from earlier. The config also specifies that data will be consumed from inventory-updates. You’ll create this topic in the next step.

4. Create a topic

Switch back to your Redpanda Cloud browser window. Click Topics in the side menu to create a new Redpanda topic for this tutorial:

Press enter or click to view image in full size
Topics menu item

The topic will receive inventory update data from your producer app. Click Create topic:

Press enter or click to view image in full size
Create topic

Set the topic name to inventory-updates and partitions to 1, then click Create:

Press enter or click to view image in full size
Topic details

5. Create a user via the cluster’s Security page

Redpanda Serverless clusters are enabled with Transport Layer Security (TLS) by default to safeguard all communication between brokers. To read from and write to topics, you need to create a user and grant it permission to interact with the cluster. From the Redpanda Cloud browser window, click Security in the side menu, then click Create user:

Press enter or click to view image in full size
Security menu item

Fill in a username and password. For the purposes of this tutorial, choose SCRAM-SHA-256 for the SASL mechanism:

Press enter or click to view image in full size
New user details

Click Create to create the user. You’ll be automatically redirected to a screen confirming the user was created successfully. You can click the icons next to the user password to view it or copy it for later reference.

Click Create ACLs to assign privileges to the user:

Press enter or click to view image in full size
Create ACLs

You’ll be redirected to a screen with a list of available ACLs. Here, double-click the ACL principal name (the username of the user you created):

Press enter or click to view image in full size
ACLs list

You’ll be taken to the “Edit ACL” screen. Click Allow all operations to grant all privileges for the user as part of this demo:

Press enter or click to view image in full size
Allow all operations in ACL

Scroll down to the bottom of the screen and click OK:

Press enter or click to view image in full size
OK button on Edit ACL page

Edit the connect.yaml file from earlier to add the new user’s credentials in this section:

   sasl:
mechanism: SCRAM-SHA-256
user: [Your Redpanda cluster username]
password: [Your Redpanda cluster user's password]

6. Update the remaining Redpanda Connect configuration

You’ll now define the pipeline and a processor to map the inventory update data to the root of the output from the Redpanda input. To do this, edit the same connect.yaml file by pasting the following code snippet after the input block:

pipeline:
processors:
- mapping: |
root.store_id = this.store_id
root.product_id = this.product_id
root.product_name = this.product_name
root.current_stock = this.current_stock
root.last_updated = this.last_updated

After the pipeline block, define the output block to connect to Postgres by providing the connection URL and INSERT statement, which will insert the incoming stream of records into the desired table:

output:
sql_raw:
driver: postgres
dsn: postgres://[postgres user]:[postgres password]@host.docker.internal:[postgres host port]/inventory?sslmode=disable
query: "INSERT INTO inventory (store_id, product_id, product_name, current_stock, last_updated) VALUES ($1, $2, $3, $4, $5);"
args_mapping: |
root = [
this.store_id,
this.product_id,
this.product_name,
this.current_stock,
this.last_updated,
]

Make sure to update the values in the Postgres connection URL to match your database setup and credentials.

Your full YAML configuration file should look as follows:

input:
label: "redpanda_serverless"
kafka:
addresses:
- [Your Redpanda cluster address]
topics:
- inventory-updates
consumer_group: inventory-consumer-group
tls:
enabled: true
sasl:
mechanism: SCRAM-SHA-256
user: [Your Redpanda cluster username]
password: [Your Redpanda cluster user's password]
pipeline:
processors:
- mapping: |
root.store_id = this.store_id
root.product_id = this.product_id
root.product_name = this.product_name
root.current_stock = this.current_stock
root.last_updated = this.last_updated
output:
sql_raw:
driver: postgres
dsn: postgres://[postgres user]:[postgres password]@host.docker.internal:[postgres host port]/inventory?sslmode=disable
query: "INSERT INTO inventory (store_id, product_id, product_name, current_stock, last_updated) VALUES ($1, $2, $3, $4, $5);"
args_mapping: |
root = [
this.store_id,
this.product_id,
this.product_name,
this.current_stock,
this.last_updated,
]

Make sure to replace all the placeholder variable values. With this, Redpanda Connect is ready to use.

7. Start the Redpanda Connect container

To start the container and use Redpanda Connect, open a terminal and execute the following command, ensuring that the volume mount includes the absolute path of the redpanda_connect directory containing the Redpanda Connect configuration file:

docker run -d --name redpanda --rm ^
--network demo-network ^
-p 9092:9092 ^
-p 9644:9644 ^
-v [absolute path]/realtime-data-streaming-with-redpanda-connect-and-postgres/redpanda_connect:/redpanda_connect ^
docker.redpanda.com/redpandadata/redpanda:v24.2.4 ^
redpanda start ^
--advertise-kafka-addr redpanda ^
--overprovisioned ^
--smp 1 ^
--memory 1G ^
--reserve-memory 500M ^
--node-id 0 ^
--check=false

Execute the command below to run Redpanda Connect with the prepared YAML configuration file:

docker exec -it redpanda rpk connect run /redpanda_connect/connect.yaml

You’ll get the following output:

INFO Running main config from specified file       @service=redpanda-connect benthos_version=4.34.0 path=/redpanda_connect/connect.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO Output type sql_raw is now active @service=redpanda-connect label="" path=root.output
INFO Input type kafka is now active @service=redpanda-connect label=redpanda_serverless path=root.input

This means that your Redpanda Connect instance is active and ready to process the streaming messages from the configured input topic. Leave this terminal open.

8. Configure and run the producer app

With your Redpanda Serverless cluster running on Redpanda Cloud and the data connection to Postgres set up via Redpanda Connect, you’re ready to send data to the relevant Redpanda topic and insert data into the inventory table. To do this, open a new terminal window and switch to the producer_app directory inside the cloned project repository folder:

cd producer_app

Set up the virtual environment:

python -m venv venv

And activate it:

venv\Scripts\activate

Install the required dependency:

pip install kafka-python

Edit the following variables in the config.py file inside the producer_app directory with your cluster details and credentials:

   "bootstrap_servers": "your redpanda broker address:port",
"sasl_plain_username": "your user",
"sasl_plain_password": "your password",

Run the following command to execute the prebuilt producer application:

py produce_inventory_update_from_stores.py

You should see the messages being delivered to the topic:

Inventory update message is: {'store_id': 3, 'product_id': 88, 'product_name': 'Product-88', 'current_stock': 96, 'last_updated': '2024-09-13T11:41:31.734534Z'}
Sent to topic 'inventory-updates' at offset 0
Inventory update message is: {'store_id': 2, 'product_id': 131, 'product_name': 'Product-131', 'current_stock': 29, 'last_updated': '2024-09-13T11:41:36.975798Z'}
Sent to topic 'inventory-updates' at offset 1
Inventory update message is: {'store_id': 6, 'product_id': 123, 'product_name': 'Product-123', 'current_stock': 74, 'last_updated': '2024-09-13T11:41:41.982205Z'}
Sent to topic 'inventory-updates' at offset 2
Inventory update message is: {'store_id': 8, 'product_id': 16, 'product_name': 'Product-16', 'current_stock': 67, 'last_updated': '2024-09-13T11:41:46.989786Z'}
Sent to topic 'inventory-updates' at offset 3
Inventory update message is: {'store_id': 3, 'product_id': 50, 'product_name': 'Product-50', 'current_stock': 36, 'last_updated': '2024-09-13T11:41:51.993210Z'}
Sent to topic 'inventory-updates' at offset 4
Inventory update message is: {'store_id': 10, 'product_id': 94, 'product_name': 'Product-94', 'current_stock': 57, 'last_updated': '2024-09-13T11:41:57.003203Z'}
Sent to topic 'inventory-updates' at offset 5
Inventory update message is: {'store_id': 2, 'product_id': 135, 'product_name': 'Product-135', 'current_stock': 35, 'last_updated': '2024-09-13T11:42:02.012993Z'}
Sent to topic 'inventory-updates' at offset 6

Switch over to your Postgres client tool and select the records from the inventory table using SQL:

select * from inventory ;

You should see the streaming records being ingested into the database table:

Press enter or click to view image in full size
Output in the database table

Conclusion

In this tutorial, you learned how real-time data streaming can transform how you process and analyze information by allowing continuous data flow across systems.

Using Postgres as a reliable relational database and Redpanda Connect as a seamless streaming tool, you saw how easy it is to integrate streaming data into a database for instant data processing. You explored a dynamic retail inventory management use case, illustrating how this combination of technology can enhance operational efficiency and allow you to respond swiftly to changing data.

To get started, you can find Redpanda Community Edition on GitHub or try Redpanda for free. Then, dive into the Redpanda Blog for step-by-step tutorials and real-world use cases to upgrade your real-time data streaming projects.

--

--

Redpanda Data
Redpanda Data

Written by Redpanda Data

The streaming data platform for developers—fully Kafka compatible. Helping developers build the best data streaming experiences.

No responses yet