queue

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 18 Imported by: 0

README

internal/queue

Logic overview

The queue package provides queue abstractions with both in-memory and RabbitMQ-backed implementations.

  • Messages carry retry metadata (Attempt, CreatedAt, headers).
  • RetryPolicy maps current attempt to the next delay and enforces maximum attempts.
  • InMemoryBroker supports named queues with bounded buffers.
  • RabbitMQHTTPBroker uses RabbitMQ management API endpoints for publish/poll/depth operations and supports the same consumer retry contract.
  • Consumer flow supports message-age dropping, retries with backoff, retry-queue routing, and dead-letter routing.
  • RabbitMQ topology declaration supports exchange/main queue/DLQ/retry queues with TTL + dead-letter policies.

API reference

Types
  • Message: queue payload with metadata and headers.
  • RetryPolicy: retry configuration (MaxAttempts, Delays).
  • ConsumerConfig: consumer controls (MaxMessageAge, retry policy, retry queues, dead-letter queue, time/sleep hooks).
  • Handler: consumer callback function.
  • InMemoryBroker: named in-memory queue broker.
  • RabbitMQHTTPConfig: configuration for RabbitMQ management API access.
  • RetryQueueSpec: delayed retry queue declaration (Name, Delay).
  • TopologyConfig: exchange queue topology declaration input.
  • RabbitMQHTTPBroker: RabbitMQ-backed broker implementation.
Functions
  • NewInMemoryBroker(buffer int) *InMemoryBroker: constructs a broker with per-queue bounded buffers.
  • RabbitMQHTTPConfigFromAMQPURL(amqpURL, exchange string) (RabbitMQHTTPConfig, error): derives management endpoint/vhost/auth values from AMQP URL format.
  • NewRabbitMQHTTPBroker(cfg RabbitMQHTTPConfig) (*RabbitMQHTTPBroker, error): constructs a RabbitMQ management API broker.
  • ShouldDropMessageByAge(msg Message, now time.Time, maxAge time.Duration) bool: message age policy helper.
Methods
  • (RetryPolicy) NextDelay(attempt int) (time.Duration, bool): returns retry delay for current attempt and whether retry is allowed.
  • (*InMemoryBroker) Publish(ctx context.Context, queue string, msg Message) error: publishes a message to a queue.
  • (*InMemoryBroker) Consume(ctx context.Context, queue string, cfg ConsumerConfig, handler Handler): consumes and processes messages until context cancellation.
  • (*InMemoryBroker) Depth(queue string) int: returns pending message count for a queue.
  • (*InMemoryBroker) Health(ctx context.Context, queue string) error: validates queue availability for readiness probes.
  • (*RabbitMQHTTPBroker) Publish(ctx context.Context, queue string, msg Message) error: publishes a message via RabbitMQ exchange routing key.
  • (*RabbitMQHTTPBroker) Consume(ctx context.Context, queue string, cfg ConsumerConfig, handler Handler): polls and processes queue messages until context cancellation.
  • (*RabbitMQHTTPBroker) Depth(queue string) int: returns queue depth from RabbitMQ queue metadata.
  • (*RabbitMQHTTPBroker) Health(ctx context.Context, queue string) error: validates queue metadata fetch for readiness probes.
  • (*RabbitMQHTTPBroker) EnsureTopology(ctx context.Context, cfg TopologyConfig) error: declares exchange/queue/binding resources and retry queue policies.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ShouldDropMessageByAge

func ShouldDropMessageByAge(msg Message, now time.Time, maxAge time.Duration) bool

ShouldDropMessageByAge returns true when message age exceeds max age.

Types

type ConsumerConfig

type ConsumerConfig struct {
	MaxMessageAge   time.Duration
	RetryPolicy     RetryPolicy
	RetryQueues     []string
	DeadLetterQueue string
	Now             func() time.Time
	Sleep           func(time.Duration)
}

ConsumerConfig controls in-memory consumer behavior.

type Handler

type Handler func(ctx context.Context, msg Message) error

Handler processes one queue message.

type InMemoryBroker

type InMemoryBroker struct {
	// contains filtered or unexported fields
}

InMemoryBroker is a simple named-queue broker for tests and local development.

func NewInMemoryBroker

func NewInMemoryBroker(buffer int) *InMemoryBroker

NewInMemoryBroker creates an in-memory broker.

func (*InMemoryBroker) Consume

func (b *InMemoryBroker) Consume(ctx context.Context, queue string, cfg ConsumerConfig, handler Handler)

Consume processes messages from the named queue until context cancellation.

func (*InMemoryBroker) Depth

func (b *InMemoryBroker) Depth(queue string) int

Depth returns the queued item count for one queue.

func (*InMemoryBroker) Health

func (b *InMemoryBroker) Health(_ context.Context, queue string) error

Health reports broker readiness for the requested queue.

func (*InMemoryBroker) OldestAge

func (b *InMemoryBroker) OldestAge(queue string, now time.Time) time.Duration

OldestAge returns age of the oldest tracked message for the queue.

func (*InMemoryBroker) Publish

func (b *InMemoryBroker) Publish(_ context.Context, queue string, msg Message) error

Publish writes one message to the named queue.

type Message

type Message struct {
	ID        string
	Body      []byte
	Headers   map[string]string
	CreatedAt time.Time
	Attempt   int
}

Message is a queue payload with retry metadata.

type RabbitMQHTTPBroker

type RabbitMQHTTPBroker struct {
	// contains filtered or unexported fields
}

RabbitMQHTTPBroker uses RabbitMQ management endpoints for queue operations.

func NewRabbitMQHTTPBroker

func NewRabbitMQHTTPBroker(cfg RabbitMQHTTPConfig) (*RabbitMQHTTPBroker, error)

NewRabbitMQHTTPBroker builds a RabbitMQ management API broker.

func (*RabbitMQHTTPBroker) Consume

func (b *RabbitMQHTTPBroker) Consume(ctx context.Context, queueName string, cfg ConsumerConfig, handler Handler)

Consume reads messages from the named queue until context cancellation.

func (*RabbitMQHTTPBroker) Depth

func (b *RabbitMQHTTPBroker) Depth(queueName string) int

Depth returns queued item count for one queue.

func (*RabbitMQHTTPBroker) EnsureTopology

func (b *RabbitMQHTTPBroker) EnsureTopology(ctx context.Context, cfg TopologyConfig) error

EnsureTopology declares the exchange, queues, and bindings required by backfill processing.

func (*RabbitMQHTTPBroker) Health

func (b *RabbitMQHTTPBroker) Health(ctx context.Context, queueName string) error

Health reports broker connectivity by checking queue metadata.

func (*RabbitMQHTTPBroker) OldestAge

func (b *RabbitMQHTTPBroker) OldestAge(queueName string, now time.Time) time.Duration

OldestAge returns the age of the queue head message when available.

func (*RabbitMQHTTPBroker) Publish

func (b *RabbitMQHTTPBroker) Publish(ctx context.Context, queueName string, msg Message) error

Publish writes one message to the named queue using the configured exchange.

type RabbitMQHTTPConfig

type RabbitMQHTTPConfig struct {
	ManagementURL string
	VHost         string
	Exchange      string
	Username      string
	//nolint:gosec // Password is required for RabbitMQ basic auth.
	Password     string
	PollInterval time.Duration
	HTTPClient   *http.Client
	Now          func() time.Time
	Sleep        func(time.Duration)
}

RabbitMQHTTPConfig configures the RabbitMQ management API broker.

func RabbitMQHTTPConfigFromAMQPURL

func RabbitMQHTTPConfigFromAMQPURL(amqpURL, exchange string) (RabbitMQHTTPConfig, error)

RabbitMQHTTPConfigFromAMQPURL derives management API config from an AMQP connection URL.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int
	Delays      []time.Duration
}

RetryPolicy controls consumer retry behavior.

func (RetryPolicy) NextDelay

func (p RetryPolicy) NextDelay(attempt int) (time.Duration, bool)

NextDelay returns the retry delay for the current attempt.

type RetryQueueSpec

type RetryQueueSpec struct {
	Name  string
	Delay time.Duration
}

RetryQueueSpec configures one delayed retry queue.

type TopologyConfig

type TopologyConfig struct {
	MainQueue       string
	DeadLetterQueue string
	RetryQueues     []RetryQueueSpec
}

TopologyConfig declares AMQP exchange/queue resources required by the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL