VarMQ is a high-performance message queue for Go that simplifies concurrent task processing using worker pool. Using Go generics, it provides type safety without sacrificing performance.
With VarMQ, you can process messages asynchronously, handle errors properly, store data persistently, and scale across systems when needed. All through a clean, intuitive API that feels natural to Go developers.
This isn't meant to replace RabbitMQ or Kafka - VarMQ serves a different purpose as a lightweight, in-process message queue with strong worker management. For persistence and distribution, it offers a flexible adapter system that extends its capabilities beyond simple in-memory queues.
- ⚡ High performance: Optimized for throughput control with minimal overhead
- 🛠️ Variants of queue types:
- Standard queues for in-memory processing
- Priority queues for importance-based ordering
- Persistent queues for durability across restarts
- Distributed queues for processing across multiple systems
- 🧩 Worker abstractions:
NewWorker
- Fire-and-forget operations (most performant)NewErrWorker
- Returns only error (when result isn't needed)NewResultWorker
- Returns result and error
- 🚦 Concurrency control: Fine-grained control over worker pool size, dynamic tuning and idle workers management
- 💾 Persistence: Support for durable storage through adapter interfaces
- 🌐 Distribution: Scale processing across multiple instances via adapter interfaces
- 🧩 Extensible: Build your own storage adapters by implementing simple interfaces
go get github.com/goptics/varmq
package main
import (
"fmt"
"time"
"github.com/goptics/varmq"
)
func main() {
w := varmq.NewWorker(func(j varmq.Job[int]) {
fmt.Printf("Processing %d\n", j.Data())
time.Sleep(1 * time.Second)
}, 10) // with concurrency 10
defer w.WaitUntilFinished()
q := w.BindQueue()
for i := range 100 {
q.Add(i)
}
}
VarMQ supports both persistent and distributed queue processing through adapter interfaces:
- Persistent Queues: Store jobs durably so they survive program restarts
- Distributed Queues: Process jobs across multiple systems
Usage is simple:
// For persistent queues (with any IPersistentQueue adapter)
queue := worker.WithPersistentQueue(persistentQueueAdapter)
// For distributed queues (with any IDistributedQueue adapter)
queue := worker.WithDistributedQueue(distributedQueueAdapter)
See complete working examples in the examples directory:
- Persistent Queue Example (SQLite)
- Persistent Queue Example (Redis)
- Distributed Queue Example (Redis)
Create your own adapters by implementing the IPersistentQueue
or IDistributedQueue
interfaces.
Note: Before testing examples, make sure to start the Redis server using
docker compose up -d
.
Process important jobs first:
// Create a standard priority queue
queue := worker.BindPriorityQueue()
// Add jobs with priorities (lower number = higher priority)
queue.Add("High priority", 1)
queue.Add("Low priority", 10)
goos: linux
goarch: amd64
pkg: github.com/goptics/varmq
cpu: AMD EPYC 7763 64-Core Processor
Benchmark Operation | Time (ns/op) | Memory (B/op) | Allocations (allocs/op) |
---|---|---|---|
Queue Add | 1217 | 112 | 3 |
Queue AddAll (1000 items) | 810354 | 130185 | 4002 |
PriorityQueue Add | 1296 | 144 | 4 |
PriorityQueue AddAll (1000 items) | 1078373 | 162177 | 5002 |
ErrWorker Add | 1391 | 288 | 6 |
ErrWorker AddAll (1000 items) | 881515 | 154713 | 4505 |
ErrPriorityQueue Add | 1452 | 320 | 7 |
ErrPriorityQueue AddAll (1000 items) | 1182968 | 186706 | 5505 |
ResultWorker Add | 1354 | 336 | 6 |
ResultWorker AddAll (1000 items) | 864143 | 171320 | 4005 |
ResultPriorityQueue Add | 1450 | 368 | 7 |
ResultPriorityQueue AddAll (1000 items) | 1151502 | 203314 | 5005 |
Note: AddAll
benchmarks were performed by adding 1000 items in a single call. The reported ns/op
, B/op
, and allocs/op
for AddAll
are for the entire batch operation. To estimate per-item metrics for an AddAll
operation, divide the table values by 1000 (e.g., for Queue AddAll, the average time per item is approximately 810 ns).
- Simple API: Clean, intuitive interface that doesn't get in your way
- Minimal Dependencies: Core library has no external dependencies
- Production Ready: Built for real-world scenarios and high-load applications
- Highly Extensible: Create your own storage adapters by implementing VarMQ's internal queue interfaces
For detailed API documentation, see the API Reference.
VarMQ primarily uses its own Event-Loop internally to handle concurrency.
This event loop checks if there are any pending jobs in the queue and if any workers are available in the worker pool. If there are, it distributes jobs to all available workers and then goes back into sleep mode.
When a worker becomes free, it then send pull job request to the event loop.
The event loop then checks again if there are any pending jobs in the queue. If there are, it continues to distribute them to the workers. otherwise, the idle workers been removed from the pool automatically or stay based on WorkerConfig
.
Contributions are welcome! Please feel free to submit a Pull Request.
- GitHub: @fahimfaisaal
- LinkedIn: in/fahimfaisaal
- Twitter: @FahimFaisaal