Documentation
¶
Index ¶
- type BatchError
- type BatchResult
- type Event
- type EventBus
- func (b *EventBus) Close() error
- func (b *EventBus) GetCircuitBreakerMetrics() *circuitbreaker.Metrics
- func (b *EventBus) IsConnected() bool
- func (b *EventBus) Publish(ctx context.Context, event Event) error
- func (b *EventBus) PublishBatch(ctx context.Context, events []Event, opts ...config.BatchOption) (*BatchResult, error)
- func (b *EventBus) RegisterDLQHandler(eventType string, handler router.HandlerService)
- func (b *EventBus) RegisterHandler(eventType string, handler router.HandlerService)
- func (b *EventBus) RequeueAllFromDLQ(ctx context.Context, dlqName string, resetRetryCount bool, maxMessages int) (int, error)
- func (b *EventBus) RequeueFromDLQ(ctx context.Context, dlqMsg *router.DLQMessage, resetRetryCount bool) error
- func (b *EventBus) ResetCircuitBreaker() bool
- func (b *EventBus) StartConsume(queue string, workers int) error
- func (b *EventBus) StartConsumeDLQ(queue string, workers int) error
- type Logger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchError ¶
type BatchError struct {
Index int // Index of the event in the original batch
Event Event // The event that failed
Error error // The error that occurred
}
BatchError represents a failed event in a batch publish operation.
type BatchResult ¶
type BatchResult struct {
Total int // Total number of events in the batch
Success int // Number of successfully published events
Failed int // Number of failed events
Errors []BatchError // Details of failed events
}
BatchResult contains the result of a batch publish operation.
type Event ¶
type Event interface {
// Type returns the event type (e.g., "user.created", "order.completed")
// This is used as the routing key when publishing.
Type() string
// Exchange returns the exchange name where this event should be published.
Exchange() string
// ToMap converts the event to a map for JSON serialization.
// This map will be marshaled and sent as the message body.
ToMap() map[string]any
// Headers returns the headers for the event.
Headers() map[string]any
}
Event represents a domain event that can be published to RabbitMQ.
Events are used in Domain-Driven Design to represent something that happened in the domain. They are typically published after a successful domain operation and consumed by other services or bounded contexts.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus provides a high-level interface for publishing domain events.
The EventBus is designed to work with the Event interface and follows the Domain-Driven Design pattern. It allows publishing single events or batches of events that have been accumulated in domain aggregates.
This is the recommended way to interact with RabbitMQ for event-driven architectures. The EventBus manages the underlying RabbitMQ client automatically.
Example usage for publishing events:
eventBus, _ := rabbitmq.NewEventBus(
config.DefaultConfig(),
config.WithURI("amqp://localhost:5672/"),
)
defer eventBus.Close()
event := NewOrderCreatedEvent(order.ID)
err := eventBus.Publish(ctx, event)
Example usage for consuming events:
eventBus, _ := rabbitmq.NewEventBus(config.DefaultConfig(), config.WithURI("..."))
defer eventBus.Close()
// Register handlers
eventBus.RegisterHandler("order.created", orderCreatedHandler)
eventBus.RegisterHandler("order.completed", orderCompletedHandler)
// Start consuming
eventBus.StartConsume("orders.queue", 5)
func NewEventBus ¶
NewEventBus creates a new event bus with its own RabbitMQ client.
This is the recommended way to create an EventBus. The EventBus will manage the connection lifecycle automatically.
The router is created internally and handlers can be registered using RegisterHandler() method.
Example:
eventBus, err := rabbitmq.NewEventBus(
config.DefaultConfig(),
config.WithURI("amqp://guest:guest@localhost:5672/"),
config.WithExchanges([]config.ExchangeConfig{
{Name: "orders.exchange", Type: "direct", Durable: true},
}),
)
if err != nil {
log.Fatal(err)
}
defer eventBus.Close()
func (*EventBus) Close ¶
Close closes the underlying RabbitMQ client and cleans up resources.
This stops the publisher's confirmation processor goroutine (if running) and then closes the RabbitMQ client connection.
func (*EventBus) GetCircuitBreakerMetrics ¶
func (b *EventBus) GetCircuitBreakerMetrics() *circuitbreaker.Metrics
GetCircuitBreakerMetrics returns the current circuit breaker metrics.
Returns nil if circuit breaker is not enabled or consumer is not initialized.
Example:
metrics := eventBus.GetCircuitBreakerMetrics()
if metrics != nil {
log.Printf("Circuit breaker state: %s", metrics.State)
log.Printf("Failures: %d, Successes: %d", metrics.Failures, metrics.Successes)
}
func (*EventBus) IsConnected ¶
IsConnected returns true if the underlying client is connected.
func (*EventBus) Publish ¶
Publish publishes a single event to RabbitMQ.
The event's Exchange() and Type() methods determine where the message is published. The ToMap() method is used to serialize the event to JSON.
Example:
event := NewUserCreatedEvent(user.ID, user.Name, user.Email) err := eventBus.Publish(ctx, event)
func (*EventBus) PublishBatch ¶
func (b *EventBus) PublishBatch(ctx context.Context, events []Event, opts ...config.BatchOption) (*BatchResult, error)
PublishBatch publishes multiple events with optimized pipelining. By default, this method uses pipelining for maximum throughput: all messages are sent first without waiting for confirmations, then all confirmations are collected. This is 5-10x faster than sequential publishing for large batches.
The method returns a BatchResult with detailed information about successes and failures. By default, all events are attempted even if some fail.
Options:
- WithPipelining(false): Use sequential publishing (legacy behavior)
- WithFailFast(true): Stop at the first error
Examples:
// Fast pipelining mode (default)
result, err := eventBus.PublishBatch(ctx, events)
if result.Failed > 0 {
for _, batchErr := range result.Errors {
log.Printf("Event %d failed: %v", batchErr.Index, batchErr.Error)
}
}
// Legacy sequential mode with fail-fast
result, err := eventBus.PublishBatch(ctx, events,
WithPipelining(false),
WithFailFast(true),
)
func (*EventBus) RegisterDLQHandler ¶
func (b *EventBus) RegisterDLQHandler(eventType string, handler router.HandlerService)
RegisterDLQHandler registers a handler for processing DLQ messages of a specific event type.
DLQ handlers receive messages that have failed processing in the main queue after all retry attempts. Use this to implement custom recovery logic, analysis, or alerting for failed messages.
The handler receives a MessageContext which can be converted to DLQMessage to access metadata about why the message failed (retry count, death reason, etc.).
Example:
eventBus.RegisterDLQHandler("order.created", func(ctx *router.MessageContext) error {
dlqMsg := router.NewDLQMessage(ctx)
log.Printf("DLQ: %s", dlqMsg.GetDeathInfo())
// Decide whether to retry or discard
if dlqMsg.ShouldRetry(10) {
return eventBus.RequeueFromDLQ(context.Background(), dlqMsg, true)
}
// Log and discard
log.Printf("Permanently failed: %s", dlqMsg.GetDeathInfo())
return nil
})
func (*EventBus) RegisterHandler ¶
func (b *EventBus) RegisterHandler(eventType string, handler router.HandlerService)
RegisterHandler registers a handler for a specific event type.
Handlers are used when consuming messages. The event type is extracted from the message's "type" field in the JSON payload.
Example:
eventBus.RegisterHandler("user.created", userCreatedHandler)
eventBus.RegisterHandler("order.completed", orderCompletedHandler)
func (*EventBus) RequeueAllFromDLQ ¶
func (b *EventBus) RequeueAllFromDLQ(ctx context.Context, dlqName string, resetRetryCount bool, maxMessages int) (int, error)
RequeueAllFromDLQ re-enqueues all messages from a DLQ back to their original queues.
This is useful for bulk recovery after fixing an issue that caused many messages to fail. Messages are consumed from the DLQ, published back to their original destinations, and then acknowledged.
Parameters:
- ctx: context for the operation
- dlqName: name of the DLQ to drain (e.g., "dlq.orders.queue")
- resetRetryCount: if true, resets retry count for all messages
- maxMessages: maximum number of messages to requeue (0 = unlimited)
Returns the number of messages successfully requeued and any error encountered.
Example:
// Requeue up to 100 messages with reset retry count
count, err := eventBus.RequeueAllFromDLQ(
context.Background(),
"dlq.orders.queue",
true, // reset retry count
100, // max 100 messages
)
if err != nil {
log.Printf("Error requeuing: %v", err)
}
log.Printf("Successfully requeued %d messages", count)
func (*EventBus) RequeueFromDLQ ¶
func (b *EventBus) RequeueFromDLQ(ctx context.Context, dlqMsg *router.DLQMessage, resetRetryCount bool) error
RequeueFromDLQ re-enqueues a message from DLQ back to its original queue for reprocessing.
This allows you to retry failed messages after fixing the underlying issue (e.g., after deploying a bug fix, restoring a downstream service, etc.).
Parameters:
- ctx: context for the operation
- dlqMsg: the DLQ message to re-enqueue
- resetRetryCount: if true, resets the retry count to 0 (gives full retry attempts)
The message will be published back to the original exchange with the original routing key, so it will be routed to the original queue.
Example:
eventBus.RegisterDLQHandler("order.created", func(msgCtx *router.MessageContext) error {
dlqMsg := router.NewDLQMessage(msgCtx)
// Check if we should retry
if dlqMsg.ShouldRetry(5) {
// Re-enqueue with reset retry count
err := eventBus.RequeueFromDLQ(context.Background(), dlqMsg, true)
if err != nil {
return err
}
// Ack the DLQ message after successful re-enqueue
return msgCtx.Ack()
}
return nil
})
func (*EventBus) ResetCircuitBreaker ¶
ResetCircuitBreaker manually resets the circuit breaker to closed state.
This should be used cautiously, typically only for manual intervention. Returns false if circuit breaker is not enabled or consumer is not initialized.
Example:
if eventBus.ResetCircuitBreaker() {
log.Println("Circuit breaker reset successfully")
}
func (*EventBus) StartConsume ¶
StartConsume starts consuming messages from a queue with multiple workers.
Before calling this method, you must register handlers using RegisterHandler(). Messages with unregistered event types will be acknowledged and discarded.
Returns ErrNoHandlersRegistered if no handlers have been registered.
Parameters:
- queue: name of the queue to consume from
- workers: number of concurrent workers to process messages
Example:
eventBus.RegisterHandler("order.created", orderHandler)
err := eventBus.StartConsume("orders.queue", 5)
if err != nil {
log.Fatal(err)
}
func (*EventBus) StartConsumeDLQ ¶
StartConsumeDLQ starts consuming messages from a DLQ with multiple workers.
Before calling this method, you must register DLQ handlers using RegisterDLQHandler(). If no handlers are registered, returns ErrNoHandlersRegistered.
The queue parameter should be the DLQ name (e.g., "dlq.orders.queue"). If you enabled automatic DLQ setup, the DLQ name follows the pattern: dlqPrefix + queueName.
Parameters:
- queue: name of the DLQ to consume from (e.g., "dlq.orders.queue")
- workers: number of concurrent workers to process DLQ messages
Example:
// Register DLQ handler
eventBus.RegisterDLQHandler("order.created", func(ctx *router.MessageContext) error {
dlqMsg := router.NewDLQMessage(ctx)
log.Printf("Processing failed order: %s", dlqMsg.GetDeathInfo())
// Analyze or retry logic here
return nil
})
// Start consuming from DLQ
err := eventBus.StartConsumeDLQ("dlq.orders.queue", 2)
if err != nil {
log.Fatal(err)
}
type Logger ¶
type Logger interface {
// Info logs an informational message with optional fields
Info(ctx context.Context, msg string, fields map[string]any)
// Error logs an error message with optional fields
Error(ctx context.Context, msg string, fields map[string]any)
// Warn logs a warning message with optional fields
Warn(ctx context.Context, msg string, fields map[string]any)
// Debug logs a debug message with optional fields
Debug(ctx context.Context, msg string, fields map[string]any)
// Close closes the logger and flushes any pending logs.
// Returns an error if the logger fails to close or flush properly.
Close() error
}
Logger is the interface that any logger implementation must satisfy. This allows the library to be agnostic about the logging implementation.
You can provide your own logger implementation (zap, logrus, zerolog, etc.) or use the provided DefaultLogger from internal/logger.
Example with custom logger:
type MyLogger struct {
zapLogger *zap.Logger
}
func (l *MyLogger) Info(ctx context.Context, msg string, fields map[string]any) {
l.zapLogger.Info(msg, zap.Any("fields", fields))
}
// ... implement other methods
client, _ := broker.New(
config.DefaultConfig(),
config.WithLogger(&MyLogger{zapLogger: zapLog}),
)
Example with DefaultLogger:
import "github.com/edaniel30/rabbitmq-kit-go/internal/logger"
client, _ := broker.New(
config.DefaultConfig(),
config.WithLogger(logger.New()),
)