Skip to content

Files

Latest commit

author
Josh Smith
Jun 3, 2025
bf563a7 · Jun 3, 2025

History

History
50 lines (28 loc) · 1.42 KB

README.md

File metadata and controls

50 lines (28 loc) · 1.42 KB

Streams Processing for Inventory Management

Simple python example that demonstrates the power of stream processing for cases like inventory management.

Processor takes a tumbling window of 10 seconds, and aggregates the inventory changes by SKU and Location before making them into a more coherent messages to send downstream.

Good example of how to group things to eliminate excessive noise to downstream systems.

Setup

  • Python (3.12 is what I used)
  • Atlas Streams Processor

Create a file called .env and give it the following properties.

ATLAS_URL=<connection string here>
DB_NAME=retail_demo
COLLECTION_NAME=inventory

Then setup python

python3 -m venv venv
pip install -r requirements.txt
python3 data_loader.py

That will create an initial set of 10,000 records into the DB and Collection defined, and then start pushing changes to those inventory levels using $inc.

If you adjust the names of the DB or collection from those listed in the example then you will need to adjust them in the inventory_processor.js file as well.

Once done, you create the stream processor instance using the instructions here Atlas Stream Processing - Get Started

Current setup outputs the grouped results to a new collection, but you could change at that to $emit to a Kafka topic if needed for the demo.