rabbitmq

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 12 Imported by: 0

README ¶

rabbitmq-kit-go

A production-ready RabbitMQ library for Go with high-level abstractions for event-driven architectures following Domain-Driven Design patterns.

Go Version Coverage License

Installation

go get github.com/edaniel30/rabbitmq-kit-go

Quick Start

Publisher
package main

import (
    "context"
    "log"

    "github.com/edaniel30/rabbitmq-kit-go"
    "github.com/edaniel30/rabbitmq-kit-go/config"
)

func main() {
    // Create EventBus
    eventBus, err := rabbitmq.NewEventBus(
        config.DefaultConfig(),
        config.WithURI("amqp://guest:guest@localhost:5672/"),
        config.WithPublisherConfirms(true),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer eventBus.Close()

    // Publish event
    event := &UserCreatedEvent{
        UserID: "user-123",
        Email:  "[email protected]",
    }

    if err := eventBus.Publish(context.Background(), event); err != nil {
        log.Printf("Failed to publish: %v", err)
    }
}
Consumer
package main

import (
    "log"

    "github.com/edaniel30/rabbitmq-kit-go"
    "github.com/edaniel30/rabbitmq-kit-go/config"
    "github.com/edaniel30/rabbitmq-kit-go/router"
)

func main() {
    eventBus, _ := rabbitmq.NewEventBus(
        config.DefaultConfig(),
        config.WithURI("amqp://guest:guest@localhost:5672/"),
    )
    defer eventBus.Close()

    // Register handlers
    eventBus.RegisterHandler("user.created", func(ctx *router.MessageContext) error {
        var user map[string]any
        if err := ctx.BindJSON(&user); err != nil {
            return err // Will retry
        }
        log.Printf("User created: %+v", user)
        return nil // Will ACK
    })

    // Start consuming with 5 workers
    eventBus.StartConsume("users.queue", 5)
    select {} // Keep running
}

Documentation

📖 Configuration Guide

All configuration options, topology setup, and common patterns.

âš¡ Circuit Breaker

Protect your services from cascading failures with automatic circuit breaking.

💀 Dead Letter Queue (DLQ)

Handle failed messages, requeue from DLQ, and monitor poison messages.

🚀 Batch Publishing

Pipelined batch publishing and async worker pools for maximum throughput.

Key Concepts

Event Interface

Events must implement the Event interface:

type UserCreatedEvent struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func (e *UserCreatedEvent) Type() string             { return "user.created" }
func (e *UserCreatedEvent) Exchange() string         { return "users.exchange" }
func (e *UserCreatedEvent) Headers() map[string]any  { return nil } // or custom headers
func (e *UserCreatedEvent) ToMap() map[string]any {
    return map[string]any{
        "type":    e.Type(),
        "user_id": e.UserID,
        "email":   e.Email,
    }
}

A trace_id header is automatically injected on publish if not already present in Headers(). When consuming, retrieve it with ctx.GetTraceID().

Handler Return Values
eventBus.RegisterHandler("user.created", func(ctx *router.MessageContext) error {
    // return nil      → ACK (message processed)
    // return error    → NACK with retry (uses x-retry-count)
    //                   After MaxRetries → NACK without requeue (goes to DLQ)
})
Publisher Confirms
eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithPublisherConfirms(true),
    config.WithConfirmTimeout(5 * time.Second),
)

// Publish waits for confirmation
err := eventBus.Publish(ctx, event)
// Returns ErrPublishNotConfirmed on NACK
// Returns ErrPublishConfirmTimeout on timeout

Configuration Options

Option Default Description
WithURI (required) RabbitMQ connection URI
WithPublisherConfirms false Enable guaranteed delivery
WithConfirmTimeout 5s Timeout for confirmations
WithMaxRetries 3 Max retry attempts
WithPrefetchCount 10 Unacked messages per worker
WithCircuitBreaker false Enable circuit breaker
WithDLQEnabled false Auto-setup DLQ infrastructure

See Configuration Guide for all options.

Batch Publishing

Pipelined Batch (5-10x faster)
events := []Event{event1, event2, event3}

// Pipelining: send all first, wait for all confirms
result, err := eventBus.PublishBatch(ctx, events,
    config.WithPipelining(true),
)
Sequential Batch
// Sequential: publish one by one (legacy mode)
result, err := eventBus.PublishBatch(ctx, events,
    config.WithPipelining(false),
)

See Batch Publishing Guide for performance comparisons.

Circuit Breaker

Protect consumers from cascading failures:

eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithCircuitBreaker(true, 5, 10*time.Second, 3),
)

// After 5 failures → Circuit OPEN (reject all)
// After 10s → Circuit HALF-OPEN (allow 3 test requests)
// After 3 successes → Circuit CLOSED (normal operation)

See Circuit Breaker Guide.

Dead Letter Queue (DLQ)

Auto-setup DLQ
eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithDLQEnabled(true, "dlq."),
)

// Automatically creates:
// - dlq.exchange (DLX)
// - dlq.{queue_name} for each queue
// - Bindings with proper routing
Manual DLQ Handler
eventBus.RegisterDLQHandler("user.created", func(ctx *router.MessageContext) error {
    dlqMsg := router.NewDLQMessage(ctx)

    log.Printf("Failed after %d retries: %s",
        dlqMsg.RetryCount,
        dlqMsg.GetDeathInfo(),
    )

    // Requeue if appropriate
    if dlqMsg.ShouldRetry(10) {
        return eventBus.RequeueFromDLQ(context.Background(), dlqMsg, true)
    }

    return nil // Discard
})

eventBus.StartConsumeDLQ("dlq.users.queue", 1)

See DLQ Guide.

Error Handling

import "github.com/edaniel30/rabbitmq-kit-go/errors"

// Sentinel errors
errors.ErrPublishNotConfirmed   // NACK from broker
errors.ErrPublishConfirmTimeout // Confirmation timeout
errors.ErrMaxRetriesExceeded    // Exceeded max retries
errors.ErrClientClosed          // Client closed

// Typed errors
&errors.PublishError{Exchange: "users", RoutingKey: "user.created", Cause: err}
&errors.ConsumeError{Queue: "users.queue", Cause: err}
&errors.TopologyError{Operation: "declare_exchange", Resource: "users", Cause: err}

Production Best Practices

  1. Always enable Publisher Confirms for critical messages
  2. Configure DLQ for failed message handling
  3. Set appropriate prefetch:
    • Fast processing (<100ms): 50-100
    • Medium (100ms-1s): 10-20
    • Slow (>1s): 1-5
  4. Use Circuit Breaker to prevent cascading failures
  5. Monitor DLQ for poison messages
  6. Use context timeouts for all operations
  7. Graceful shutdown: defer eventBus.Close()

Requirements

  • Go 1.21+
  • RabbitMQ 3.8+

Contributing

We welcome contributions! Please ensure:

  • All tests pass (make test)
  • Coverage meets 80% threshold (make test-coverage)
  • Code follows Go best practices

License

MIT License - see LICENSE file for details.

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type BatchError ¶

type BatchError struct {
	Index int   // Index of the event in the original batch
	Event Event // The event that failed
	Error error // The error that occurred
}

BatchError represents a failed event in a batch publish operation.

type BatchResult ¶

type BatchResult struct {
	Total   int          // Total number of events in the batch
	Success int          // Number of successfully published events
	Failed  int          // Number of failed events
	Errors  []BatchError // Details of failed events
}

BatchResult contains the result of a batch publish operation.

type Event ¶

type Event interface {
	// Type returns the event type (e.g., "user.created", "order.completed")
	// This is used as the routing key when publishing.
	Type() string

	// Exchange returns the exchange name where this event should be published.
	Exchange() string

	// ToMap converts the event to a map for JSON serialization.
	// This map will be marshaled and sent as the message body.
	ToMap() map[string]any

	// Headers returns the headers for the event.
	Headers() map[string]any
}

Event represents a domain event that can be published to RabbitMQ.

Events are used in Domain-Driven Design to represent something that happened in the domain. They are typically published after a successful domain operation and consumed by other services or bounded contexts.

type EventBus ¶

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus provides a high-level interface for publishing domain events.

The EventBus is designed to work with the Event interface and follows the Domain-Driven Design pattern. It allows publishing single events or batches of events that have been accumulated in domain aggregates.

This is the recommended way to interact with RabbitMQ for event-driven architectures. The EventBus manages the underlying RabbitMQ client automatically.

Example usage for publishing events:

eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithURI("amqp://localhost:5672/"),
)
defer eventBus.Close()

event := NewOrderCreatedEvent(order.ID)
err := eventBus.Publish(ctx, event)

Example usage for consuming events:

eventBus, _ := rabbitmq.NewEventBus(config.DefaultConfig(), config.WithURI("..."))
defer eventBus.Close()

// Register handlers
eventBus.RegisterHandler("order.created", orderCreatedHandler)
eventBus.RegisterHandler("order.completed", orderCompletedHandler)

// Start consuming
eventBus.StartConsume("orders.queue", 5)

func NewEventBus ¶

func NewEventBus(cfg config.Config, opts ...config.Option) (*EventBus, error)

NewEventBus creates a new event bus with its own RabbitMQ client.

This is the recommended way to create an EventBus. The EventBus will manage the connection lifecycle automatically.

The router is created internally and handlers can be registered using RegisterHandler() method.

Example:

eventBus, err := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithURI("amqp://guest:guest@localhost:5672/"),
    config.WithExchanges([]config.ExchangeConfig{
        {Name: "orders.exchange", Type: "direct", Durable: true},
    }),
)
if err != nil {
    log.Fatal(err)
}
defer eventBus.Close()

func (*EventBus) Close ¶

func (b *EventBus) Close() error

Close closes the underlying RabbitMQ client and cleans up resources.

This stops the publisher's confirmation processor goroutine (if running) and then closes the RabbitMQ client connection.

func (*EventBus) GetCircuitBreakerMetrics ¶

func (b *EventBus) GetCircuitBreakerMetrics() *circuitbreaker.Metrics

GetCircuitBreakerMetrics returns the current circuit breaker metrics.

Returns nil if circuit breaker is not enabled or consumer is not initialized.

Example:

metrics := eventBus.GetCircuitBreakerMetrics()
if metrics != nil {
    log.Printf("Circuit breaker state: %s", metrics.State)
    log.Printf("Failures: %d, Successes: %d", metrics.Failures, metrics.Successes)
}

func (*EventBus) IsConnected ¶

func (b *EventBus) IsConnected() bool

IsConnected returns true if the underlying client is connected.

func (*EventBus) Publish ¶

func (b *EventBus) Publish(ctx context.Context, event Event) error

Publish publishes a single event to RabbitMQ.

The event's Exchange() and Type() methods determine where the message is published. The ToMap() method is used to serialize the event to JSON.

Example:

event := NewUserCreatedEvent(user.ID, user.Name, user.Email)
err := eventBus.Publish(ctx, event)

func (*EventBus) PublishBatch ¶

func (b *EventBus) PublishBatch(ctx context.Context, events []Event, opts ...config.BatchOption) (*BatchResult, error)

PublishBatch publishes multiple events with optimized pipelining. By default, this method uses pipelining for maximum throughput: all messages are sent first without waiting for confirmations, then all confirmations are collected. This is 5-10x faster than sequential publishing for large batches.

The method returns a BatchResult with detailed information about successes and failures. By default, all events are attempted even if some fail.

Options:

  • WithPipelining(false): Use sequential publishing (legacy behavior)
  • WithFailFast(true): Stop at the first error

Examples:

// Fast pipelining mode (default)
result, err := eventBus.PublishBatch(ctx, events)
if result.Failed > 0 {
    for _, batchErr := range result.Errors {
        log.Printf("Event %d failed: %v", batchErr.Index, batchErr.Error)
    }
}

// Legacy sequential mode with fail-fast
result, err := eventBus.PublishBatch(ctx, events,
    WithPipelining(false),
    WithFailFast(true),
)

func (*EventBus) RegisterDLQHandler ¶

func (b *EventBus) RegisterDLQHandler(eventType string, handler router.HandlerService)

RegisterDLQHandler registers a handler for processing DLQ messages of a specific event type.

DLQ handlers receive messages that have failed processing in the main queue after all retry attempts. Use this to implement custom recovery logic, analysis, or alerting for failed messages.

The handler receives a MessageContext which can be converted to DLQMessage to access metadata about why the message failed (retry count, death reason, etc.).

Example:

eventBus.RegisterDLQHandler("order.created", func(ctx *router.MessageContext) error {
    dlqMsg := router.NewDLQMessage(ctx)
    log.Printf("DLQ: %s", dlqMsg.GetDeathInfo())

    // Decide whether to retry or discard
    if dlqMsg.ShouldRetry(10) {
        return eventBus.RequeueFromDLQ(context.Background(), dlqMsg, true)
    }

    // Log and discard
    log.Printf("Permanently failed: %s", dlqMsg.GetDeathInfo())
    return nil
})

func (*EventBus) RegisterHandler ¶

func (b *EventBus) RegisterHandler(eventType string, handler router.HandlerService)

RegisterHandler registers a handler for a specific event type.

Handlers are used when consuming messages. The event type is extracted from the message's "type" field in the JSON payload.

Example:

eventBus.RegisterHandler("user.created", userCreatedHandler)
eventBus.RegisterHandler("order.completed", orderCompletedHandler)

func (*EventBus) RequeueAllFromDLQ ¶

func (b *EventBus) RequeueAllFromDLQ(ctx context.Context, dlqName string, resetRetryCount bool, maxMessages int) (int, error)

RequeueAllFromDLQ re-enqueues all messages from a DLQ back to their original queues.

This is useful for bulk recovery after fixing an issue that caused many messages to fail. Messages are consumed from the DLQ, published back to their original destinations, and then acknowledged.

Parameters:

  • ctx: context for the operation
  • dlqName: name of the DLQ to drain (e.g., "dlq.orders.queue")
  • resetRetryCount: if true, resets retry count for all messages
  • maxMessages: maximum number of messages to requeue (0 = unlimited)

Returns the number of messages successfully requeued and any error encountered.

Example:

// Requeue up to 100 messages with reset retry count
count, err := eventBus.RequeueAllFromDLQ(
    context.Background(),
    "dlq.orders.queue",
    true,  // reset retry count
    100,   // max 100 messages
)
if err != nil {
    log.Printf("Error requeuing: %v", err)
}
log.Printf("Successfully requeued %d messages", count)

func (*EventBus) RequeueFromDLQ ¶

func (b *EventBus) RequeueFromDLQ(ctx context.Context, dlqMsg *router.DLQMessage, resetRetryCount bool) error

RequeueFromDLQ re-enqueues a message from DLQ back to its original queue for reprocessing.

This allows you to retry failed messages after fixing the underlying issue (e.g., after deploying a bug fix, restoring a downstream service, etc.).

Parameters:

  • ctx: context for the operation
  • dlqMsg: the DLQ message to re-enqueue
  • resetRetryCount: if true, resets the retry count to 0 (gives full retry attempts)

The message will be published back to the original exchange with the original routing key, so it will be routed to the original queue.

Example:

eventBus.RegisterDLQHandler("order.created", func(msgCtx *router.MessageContext) error {
    dlqMsg := router.NewDLQMessage(msgCtx)

    // Check if we should retry
    if dlqMsg.ShouldRetry(5) {
        // Re-enqueue with reset retry count
        err := eventBus.RequeueFromDLQ(context.Background(), dlqMsg, true)
        if err != nil {
            return err
        }
        // Ack the DLQ message after successful re-enqueue
        return msgCtx.Ack()
    }

    return nil
})

func (*EventBus) ResetCircuitBreaker ¶

func (b *EventBus) ResetCircuitBreaker() bool

ResetCircuitBreaker manually resets the circuit breaker to closed state.

This should be used cautiously, typically only for manual intervention. Returns false if circuit breaker is not enabled or consumer is not initialized.

Example:

if eventBus.ResetCircuitBreaker() {
    log.Println("Circuit breaker reset successfully")
}

func (*EventBus) StartConsume ¶

func (b *EventBus) StartConsume(queue string, workers int) error

StartConsume starts consuming messages from a queue with multiple workers.

Before calling this method, you must register handlers using RegisterHandler(). Messages with unregistered event types will be acknowledged and discarded.

Returns ErrNoHandlersRegistered if no handlers have been registered.

Parameters:

  • queue: name of the queue to consume from
  • workers: number of concurrent workers to process messages

Example:

eventBus.RegisterHandler("order.created", orderHandler)
err := eventBus.StartConsume("orders.queue", 5)
if err != nil {
    log.Fatal(err)
}

func (*EventBus) StartConsumeDLQ ¶

func (b *EventBus) StartConsumeDLQ(queue string, workers int) error

StartConsumeDLQ starts consuming messages from a DLQ with multiple workers.

Before calling this method, you must register DLQ handlers using RegisterDLQHandler(). If no handlers are registered, returns ErrNoHandlersRegistered.

The queue parameter should be the DLQ name (e.g., "dlq.orders.queue"). If you enabled automatic DLQ setup, the DLQ name follows the pattern: dlqPrefix + queueName.

Parameters:

  • queue: name of the DLQ to consume from (e.g., "dlq.orders.queue")
  • workers: number of concurrent workers to process DLQ messages

Example:

// Register DLQ handler
eventBus.RegisterDLQHandler("order.created", func(ctx *router.MessageContext) error {
    dlqMsg := router.NewDLQMessage(ctx)
    log.Printf("Processing failed order: %s", dlqMsg.GetDeathInfo())
    // Analyze or retry logic here
    return nil
})

// Start consuming from DLQ
err := eventBus.StartConsumeDLQ("dlq.orders.queue", 2)
if err != nil {
    log.Fatal(err)
}

type Logger ¶

type Logger interface {
	// Info logs an informational message with optional fields
	Info(ctx context.Context, msg string, fields map[string]any)

	// Error logs an error message with optional fields
	Error(ctx context.Context, msg string, fields map[string]any)

	// Warn logs a warning message with optional fields
	Warn(ctx context.Context, msg string, fields map[string]any)

	// Debug logs a debug message with optional fields
	Debug(ctx context.Context, msg string, fields map[string]any)

	// Close closes the logger and flushes any pending logs.
	// Returns an error if the logger fails to close or flush properly.
	Close() error
}

Logger is the interface that any logger implementation must satisfy. This allows the library to be agnostic about the logging implementation.

You can provide your own logger implementation (zap, logrus, zerolog, etc.) or use the provided DefaultLogger from internal/logger.

Example with custom logger:

type MyLogger struct {
    zapLogger *zap.Logger
}

func (l *MyLogger) Info(ctx context.Context, msg string, fields map[string]any) {
    l.zapLogger.Info(msg, zap.Any("fields", fields))
}
// ... implement other methods

client, _ := broker.New(
    config.DefaultConfig(),
    config.WithLogger(&MyLogger{zapLogger: zapLog}),
)

Example with DefaultLogger:

import "github.com/edaniel30/rabbitmq-kit-go/internal/logger"

client, _ := broker.New(
    config.DefaultConfig(),
    config.WithLogger(logger.New()),
)

Directories ¶

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL