A comprehensive performance benchmarking tool for streaming platforms using the Nexmark benchmark suite. This tool tests and compares the performance of Apache Flink, Timeplus, and ksqlDB on standardized streaming queries.
The Nexmark benchmark is a suite of queries and data generators designed to evaluate the performance and correctness of streaming SQL systems. This implementation provides automated testing across multiple streaming platforms with detailed performance metrics and resource monitoring.
- Multi-Platform Support: Test Apache Flink, Timeplus, and ksqlDB
- Automated Infrastructure: Docker-based setup with automatic container management
- Performance Monitoring: Real-time container statistics collection
- Comprehensive Reporting: CSV reports with execution times and output metrics
- Configurable Testing: Customizable data sizes, event rates, and resource limits
- Query Coverage: Support for Nexmark queries q0-q22 (platform-dependent)
The benchmark suite consists of:
- Data Generator: Generates auction, person, and bid events at configurable rates
- Platform Orchestrators: Manages Flink clusters, Timeplus instances, and ksqlDB servers
- Query Executor: Runs SQL queries against each platform
- Results Collector: Gathers performance metrics and output validation
- Statistics Monitor: Tracks container resource usage during tests
- Docker and Docker Compose
- Python 3.7+
- At least 8GB RAM recommended
- 4+ CPU cores recommended for optimal performance
- Clone the repository:
git clone <repository-url>
cd nexmark
- Navigate to the Python directory:
cd python
- Install Python dependencies:
pip install -r requirements.txt
Run a basic benchmark test:
python nexmark.py --cases q1,q2,q3 --platforms flink,timeplus --data-size 1000000
python nexmark.py [OPTIONS]
--cases
: Comma-separated list of test cases to run (default: 'base')- Available: base, q0, q1, q2, ..., q22
--platforms
: Target platforms to test (default: 'flink')- Available: flink, timeplus, ksqldb
--data-size
: Number of events to generate (default: 10000000)--event-rate
: Events per second generation rate (default: 300000)--config-file
: Path to JSON configuration file--cpu-cores
: CPU cores to allocate (default: 2.0)--memory-limit
: Memory limit for containers (default: '4g')
Test multiple queries on Flink:
python nexmark.py --cases q1,q3,q5 --platforms flink --data-size 5000000
Compare all platforms on a single query:
python nexmark.py --cases q1 --platforms flink,timeplus,ksqldb --data-size 1000000
Run with custom resource limits:
python nexmark.py --cases q1,q2 --platforms flink --cpu-cores 4.0 --memory-limit 8g
The tool supports extensive configuration through the PerformanceConfig
class or JSON files:
{
"cpu_quota_cores": 4.0,
"kafka_memory": "4G",
"flink_jobmanager_memory": "2g",
"flink_taskmanager_memory": "4g",
"timeplus_memory": "4g",
"ksqldb_memory": "4g",
"default_data_size": 10000000,
"default_event_rate": 300000,
"kafka_partitions": 1
}
Query | Flink | timeplus | ksqlDB | Description |
---|---|---|---|---|
base | ✅ | ✅ | ✅ | Basic connectivity test |
q0-q4 | ✅ | ✅ | ✅ | Simple aggregations |
q5 | ✅ | ✅ | ❌ | Hot items auction |
q6 | ❌ | ✅ | ❌ | Average selling price |
q7-q8 | ✅ | ✅ | ❌ | Highest bid |
q9 | ✅ | ✅ | ❌ | Winning bids |
q10-q12 | ✅ | ✅ | ✅ | Various aggregations |
q13 | ❌ | ❌ | ❌ | Side input |
q14 | ✅ | ✅ | ❌ | Filter |
q15-q19 | ✅ | ✅ | ❌ | Complex joins |
q20-q22 | ✅ | ✅ | ✅ | Advanced analytics |
The tool generates timestamped CSV reports with the following columns:
case
: Test case identifier (e.g., q1, q2)platform
: Streaming platform (flink, timeplus, ksqldb)execution_time
: Query execution time in secondsoutput_size
: Number of result recordserror
: Error message if test failed
Container statistics are collected in JSON format including:
- CPU usage and limits
- Memory consumption
- Network I/O
- Container lifecycle events
The ContainerManager
class provides:
- Automatic container lifecycle management
- Network creation and cleanup
- Resource monitoring and cleanup
- Error handling and recovery
- Infrastructure Setup: Kafka cluster initialization
- Data Generation: Nexmark event generation at specified rates
- Platform Deployment: Target platform container startup
- Query Execution: SQL query execution and timing
- Result Collection: Output validation and metrics gathering
- Cleanup: Automatic resource cleanup
- Comprehensive error logging
- Graceful container cleanup on failures
- Retry mechanisms for transient failures
- Detailed error reporting in results
-
Platform support
- all test runs on linux/amd, on other platforms and architecture, there might be issues.
-
Memory Issues
- Increase Docker memory limits
- Reduce data size or event rate
- Use fewer concurrent platforms
-
Port Conflicts
- Ensure ports 8081, 8088, 8123, 3218, 19092 are available
- Stop conflicting services
-
Container Startup Failures
- Check Docker daemon status
- Verify image availability
- Review container logs
Enable debug logging:
export PYTHONPATH=.
python -c "import logging; logging.basicConfig(level=logging.DEBUG)"
python nexmark.py --cases q1 --platforms flink
Check container logs:
docker logs <container-name>
python/
├── nexmark.py # Main benchmark orchestrator
├── requirements.txt # Python dependencies
├── scripts/ # SQL query definitions
│ ├── flink/ # Flink SQL queries
│ ├── timeplus/ # timeplus SQL queries
│ └── ksqldb/ # ksqlDB queries
└── Makefile # Build automation
- Create SQL files in respective platform directories
- Follow naming convention:
q<number>.sql
- Ensure query outputs to topic:
NEXMARK_Q<NUMBER>
- Test with single platform before multi-platform testing