Simple python example that demonstrates the power of stream processing for cases like inventory management.
Processor takes a tumbling window of 10 seconds, and aggregates the inventory changes by SKU and Location before making them into a more coherent messages to send downstream.
Good example of how to group things to eliminate excessive noise to downstream systems.
- Python (3.12 is what I used)
- Atlas Streams Processor
Create a file called .env
and give it the following properties.
ATLAS_URL=<connection string here>
DB_NAME=retail_demo
COLLECTION_NAME=inventory
Then setup python
python3 -m venv venv
pip install -r requirements.txt
python3 data_loader.py
That will create an initial set of 10,000 records into the DB and Collection defined, and then start pushing changes
to those inventory levels using $inc
.
If you adjust the names of the DB or collection from those listed in the example then you will need to adjust
them in the inventory_processor.js
file as well.
Once done, you create the stream processor instance using the instructions here Atlas Stream Processing - Get Started
Current setup outputs the grouped results to a new collection, but you could change at that to $emit
to a Kafka topic if
needed for the demo.