You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Simple python example that demonstrates the power of stream processing for cases like inventory management.
4
+
5
+
Processor takes a tumbling window of 10 seconds, and aggregates the inventory changes by SKU and Location before making them into a
6
+
more coherent messages to send downstream.
7
+
8
+
Good example of how to group things to eliminate excessive noise to downstream systems.
9
+
10
+
11
+
## Setup
12
+
13
+
- Python (3.12 is what I used)
14
+
- Atlas Streams Processor
15
+
16
+
17
+
###
18
+
Create a file called `.env` and give it the following properties.
19
+
20
+
```commandline
21
+
ATLAS_URL=<connection string here>
22
+
DB_NAME=retail_demo
23
+
COLLECTION_NAME=inventory
24
+
25
+
```
26
+
27
+
Then setup python
28
+
29
+
```commandline
30
+
python3 -m venv venv
31
+
pip install -r requirements.txt
32
+
python3 data_loader.py
33
+
```
34
+
35
+
That will create an initial set of 10,000 records into the DB and Collection defined, and then start pushing changes
36
+
to those inventory levels using `$inc`.
37
+
38
+
If you adjust the names of the DB or collection from those listed in the example then you will need to adjust
39
+
them in the `inventory_processor.js` file as well.
40
+
41
+
Once done, you create the stream processor instance using the instructions here [Atlas Stream Processing - Get Started](https://www.mongodb.com/docs/atlas/atlas-stream-processing/tutorial/)
42
+
43
+
Current setup outputs the grouped results to a new collection, but you could change at that to `$emit` to a Kafka topic if
0 commit comments