Skip to content

Add Reaction for dapr output bindings #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
**/.DS_Store
**/.vscode
**/.idea
bin
obj
node_modules
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/**
* Copyright 2024 The Drasi Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

const deployResources = require("../fixtures/deploy-resources");
const deleteResources = require("../fixtures/delete-resources");
const yaml = require("js-yaml");
const fs = require("fs");
const pg = require("pg");
const redis = require("redis");
const PortForward = require("../fixtures/port-forward");
const { waitFor } = require('../fixtures/infrastructure'); // Corrected import path

// Define paths to your resource files
const resourcesFilePath = __dirname + '/resources.yaml';
const reactionProviderFilePath = __dirname + '/reaction-provider.yaml';
const sourcesFilePath = __dirname + '/sources.yaml';
const queriesFilePath = __dirname + '/queries.yaml';
const reactionsFilePath = __dirname + '/reactions.yaml';

let resourcesToCleanup = [];

let dbPortForward;
let dbClient;

let productRedisPortForward, inventoryRedisPortForward;
let productRedisClient, inventoryRedisClient;

// Function to get state from Redis Hash
// Attempts to parse the value as JSON, but returns raw value if parsing fails
async function getStateFromRedis(redisClient, key) {
try {
if (!redisClient || !redisClient.isOpen) {
console.error(`Redis client for key "${key}" is not open or not initialized.`);
return null;
}
console.log(`Attempting to get state for key "${key}" from Redis...`);

let rawValue = await redisClient.hGet(key, "data");

if (rawValue !== null) {
console.log(`Raw value for key "${key}" (from HGET key "data"): ${rawValue}`);
} else {
console.log(`Key "${key}" not found or has no parsable data in Redis.`);
return null;
}

try {
return JSON.parse(rawValue);
} catch (e) {
console.error(`Failed to parse Redis value for key "${key}":`, rawValue, e);
return rawValue;
}
} catch (error) {
console.error(`Error during getStateFromRedis for key "${key}":`, error.message, error.stack);
return null;
}
}

beforeAll(async () => {
// Load resources from resources.yaml
const infraResources = yaml.loadAll(fs.readFileSync(resourcesFilePath, 'utf8'));
// Load the reaction-provider
const reactionProviderResources = yaml.loadAll(fs.readFileSync(reactionProviderFilePath, 'utf8'));
// Load Drasi source
const sources = yaml.loadAll(fs.readFileSync(sourcesFilePath, 'utf8'));
// Load Drasi Query
const queries = yaml.loadAll(fs.readFileSync(queriesFilePath, 'utf8'));
// Load Drasi Reactions
const reactions = yaml.loadAll(fs.readFileSync(reactionsFilePath, 'utf8'));

// Combine all resources to be deployed
resourcesToCleanup = [...infraResources, ...reactionProviderResources, ...sources, ...queries, ...reactions];

console.log(`Deploying ${infraResources.length} infra resources...`);
await deployResources(infraResources);

console.log("Waiting for infra resources to initialize...");
await new Promise(r => setTimeout(r, 30000));

console.log(`Deploying ${sources.length} sources...`);
await deployResources(sources);

console.log(`Deploying ${queries.length} queries...`);
await deployResources(queries);

console.log(`Deploying PostOutputBinding reaction provider...`)
await deployResources(reactionProviderResources);

console.log(`Deploying ${reactions.length} reactions...`);
await deployResources(reactions);

// Setup PostgreSQL client
dbPortForward = new PortForward("product-inventory-db", 5432, "default");
const dbPort = await dbPortForward.start();
dbClient = new pg.Client({
user: "postgres",
password: "postgres",
host: "localhost",

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
port: dbPort,
database: "productdb",
});
await dbClient.connect();
console.log("Connected to PostgreSQL, with port forwarded at", dbPort);

// Setup Redis clients
productRedisPortForward = new PortForward("redis-product", 6379, "default");
const productRedisPort = await productRedisPortForward.start();
productRedisClient = redis.createClient({ url: `redis://localhost:${productRedisPort}` });
await productRedisClient.connect();
console.log("Connected to Product Redis, with port forwarded at", productRedisPort);

inventoryRedisPortForward = new PortForward("redis-inventory", 6379, "default");
const inventoryRedisPort = await inventoryRedisPortForward.start();
inventoryRedisClient = redis.createClient({ url: `redis://localhost:${inventoryRedisPort}` });
await inventoryRedisClient.connect();
console.log("Connected to Inventory Redis, with port forwarded at", inventoryRedisPort);

await waitFor({ timeout: 15000, description: "initial propagation after setup" })

console.log("Setup complete.");
}, 480000);

afterAll(async () => {
if (dbClient) {
await dbClient.end();
console.log("PostgreSQL client disconnected.");
}

if (dbPortForward) {
dbPortForward.stop();
console.log("PostgreSQL port forward stopped.");
}

if (productRedisClient) {
await productRedisClient.quit();
console.log("Product Redis client disconnected.");
}
if (productRedisPortForward) {
productRedisPortForward.stop();
console.log("Product Redis port forward stopped.");
}

if (inventoryRedisClient) {
await inventoryRedisClient.quit();
console.log("Inventory Redis client disconnected.");
}
if (inventoryRedisPortForward) {
inventoryRedisPortForward.stop();
console.log("Inventory Redis port forward stopped.");
}

if (resourcesToCleanup.length > 0) {
console.log(`Deleting ${resourcesToCleanup.length} resources...`);
await deleteResources(resourcesToCleanup);
console.log("Teardown complete.");
}
});

describe("Dapr OutputBinding Reaction Test Suite", () => {
test("UNPACKED: should sync the initial state to Dapr statestore with create", async () => {
console.log("Verifying initial state sync for Product data...");
const newProductName = `Test Unpacked Packed ${Date.now()}`;
const newProductPrice = 99.99;
await dbClient.query(
"INSERT INTO product (name, description, price) VALUES ($1, 'Unpacked Test Desc', $2)",
[newProductName, newProductPrice]
);

const receivedMessage = await waitFor({
actionFn: () => productRedisClient.get('inventory'),
predicateFn: (messages) => messages && messages.length >= 1,
timeoutMs: 10000,
pollIntervalMs: 1000,
description: `unpacked message for product "${newProductName}" to appear in Redis`
});
// 1. Verify Product Data (product-statestore)
expect(receivedMessage).toBeDefined();
// JSON Stringify the keys to log them
console.log("Received keys from Redis:", JSON.stringify(receivedMessage));
expect(receivedMessage).toBeDefined();
});
});
44 changes: 44 additions & 0 deletions e2e-tests/07-post-outputbinding-scenario/queries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
apiVersion: v1
kind: ContinuousQuery
name: product-query
spec:
mode: query
sources:
subscriptions:
- id: product-inventory-source
query: >
MATCH
(p:product)
RETURN
p.product_id AS product_id,
p.name AS product_name,
p.description AS product_description
---
apiVersion: v1
kind: ContinuousQuery
name: inventory-query
spec:
mode: query
sources:
subscriptions:
- id: product-inventory-source
nodes:
- sourceLabel: inventory
- sourceLabel: product
joins:
- id: INVENTORY_FOR_PRODUCT
keys:
- label: inventory
property: product_id
- label: product
property: product_id
query: >
MATCH
(i:inventory)-[:INVENTORY_FOR_PRODUCT]->(p:product)
RETURN
i.inventory_id AS inventory_id,
i.product_id AS product_id,
i.quantity AS product_quantity,
i.location AS product_location,
p.name AS product_name,
p.description AS product_description
10 changes: 10 additions & 0 deletions e2e-tests/07-post-outputbinding-scenario/reaction-provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ReactionProvider
name: PostDaprOutputBinding
spec:
services:
reaction:
image: reaction-post-dapr-output-binding
config_schema:
type: object

30 changes: 30 additions & 0 deletions e2e-tests/07-post-outputbinding-scenario/reactions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
kind: Reaction
apiVersion: v1
name: sync-dapr-outputbinding
spec:
kind: PostDaprOutputBinding
queries:
product-query: >
{
"bindingName": "product-outputbinding",
"bindingType": "redis",
"bindingOperation": "create",
"bindingMetadataTemplate": {
"key": "{{payload.after.product_name}}"
},
"packed": "Unpacked",
"maxFailureCount": 5,
"skipControlSignals": true
}
inventory-query: >
{
"bindingName": "inventory-outputbinding",
"bindingType": "redis",
"bindingOperation": "create",
"bindingMetadataTemplate": {
"key": "inventory"
},
"packed": "Unpacked",
"maxFailureCount": 5,
"skipControlSignals": true
}
Loading