queue

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidQueue is returned when the requested queue name contains invalid characters.
	ErrInvalidQueue = errors.New("queue: invalid queue name")
)
View Source
var ErrTooManyConsumers = errors.New("queue: too many consumers")

ErrTooManyConsumers indicates the per-server consumer cap was reached.

Functions

func EnableReadyCacheStats added in v0.3.0

func EnableReadyCacheStats()

EnableReadyCacheStats turns on ready cache stats collection at runtime.

func IsQueueMessageKey added in v0.1.0

func IsQueueMessageKey(rel string) bool

IsQueueMessageKey reports whether the relative key refers to a queue message lease.

func IsQueueStateKey

func IsQueueStateKey(key string) bool

IsQueueStateKey reports whether the logical key belongs to a queue state lease.

func MessageLeaseKey

func MessageLeaseKey(namespace, queue, id string) (string, error)

MessageLeaseKey returns the logical lock key used for message leasing.

func ResetReadyCacheStats added in v0.3.0

func ResetReadyCacheStats()

ResetReadyCacheStats clears ready cache counters.

func StateLeaseKey

func StateLeaseKey(namespace, queue, id string) (string, error)

StateLeaseKey returns the logical lock key used for workflow state leasing.

Types

type Candidate

type Candidate struct {
	Descriptor MessageDescriptor
	NextCursor string
}

Candidate describes a message snapshot that can be leased by a consumer.

type Config

type Config struct {
	DefaultVisibilityTimeout time.Duration
	DefaultMaxAttempts       int
	DefaultTTL               time.Duration
	QueuePollInterval        time.Duration
	Crypto                   *storage.Crypto
}

Config governs queue behaviour defaults.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher centralises queue polling so storage is only listed once per queue regardless of how many consumers are connected.

func NewDispatcher

func NewDispatcher(svc candidateProvider, opts ...DispatcherOption) *Dispatcher

NewDispatcher builds a dispatcher.

func (*Dispatcher) CancelNotify added in v0.4.0

func (d *Dispatcher) CancelNotify(namespace, queue, messageID string)

CancelNotify removes a scheduled NotifyAt wake-up for a message.

func (*Dispatcher) HasActiveWatcher

func (d *Dispatcher) HasActiveWatcher(namespace, queue string) bool

HasActiveWatcher reports whether the dispatcher currently maintains an active watch subscription for the provided queue.

func (*Dispatcher) Notify

func (d *Dispatcher) Notify(namespace, queue string)

Notify nudges the dispatcher to poll the queue immediately (typically after enqueue).

func (*Dispatcher) NotifyAt added in v0.4.0

func (d *Dispatcher) NotifyAt(namespace, queue, messageID string, due time.Time)

NotifyAt schedules a queue wake-up for messageID at due time. This is the in-process fast path for delayed visibility transitions; polling/watch remains the fallback for missed events and cross-node updates.

func (*Dispatcher) QueueStats

func (d *Dispatcher) QueueStats(namespace, queue string) Stats

QueueStats returns a snapshot of dispatcher metrics for the given queue.

func (*Dispatcher) Try

func (d *Dispatcher) Try(ctx context.Context, namespace, queue string) (*Candidate, error)

Try performs a non-blocking fetch for an immediately available candidate. It returns nil when no message is ready.

func (*Dispatcher) Wait

func (d *Dispatcher) Wait(ctx context.Context, namespace, queue string) (*Candidate, error)

Wait blocks until a candidate is available for the queue or the context is cancelled.

type DispatcherOption

type DispatcherOption func(*Dispatcher)

DispatcherOption customises Dispatcher behaviour.

func WithLogger

func WithLogger(logger pslog.Logger) DispatcherOption

WithLogger assigns a base logger used for dispatcher diagnostics.

func WithMaxConsumers

func WithMaxConsumers(n int) DispatcherOption

WithMaxConsumers caps the number of simultaneous Wait calls (default 1000).

func WithPollInterval

func WithPollInterval(d time.Duration) DispatcherOption

WithPollInterval sets the base polling interval (default 3s).

func WithPollJitter

func WithPollJitter(d time.Duration) DispatcherOption

WithPollJitter sets the additional random jitter added to the polling interval.

func WithQueuePageSize added in v0.3.0

func WithQueuePageSize(size int) DispatcherOption

WithQueuePageSize overrides the default queue list page size per poll.

func WithResilientPollInterval

func WithResilientPollInterval(interval time.Duration) DispatcherOption

WithResilientPollInterval configures the safety poll interval used when watchers are active.

func WithWatchFactory

func WithWatchFactory(factory WatchFactory) DispatcherOption

WithWatchFactory installs a queue change watcher factory used to reduce polling when available.

type EnqueueOptions

type EnqueueOptions struct {
	Delay       time.Duration
	Visibility  time.Duration
	TTL         time.Duration
	MaxAttempts int
	Attributes  map[string]any
	ContentType string
}

EnqueueOptions captures optional parameters for Enqueue.

type LeaseKeyParts added in v0.1.0

type LeaseKeyParts struct {
	Queue string
	ID    string
}

LeaseKeyParts captures the queue name and message id from a lease key.

func ParseMessageLeaseKey added in v0.1.0

func ParseMessageLeaseKey(rel string) (LeaseKeyParts, bool)

ParseMessageLeaseKey extracts the queue name and message id from a relative queue message lease key (e.g. "q/orders/msg/<id>"). Returns ok=false when the key does not match the expected layout.

func ParseStateLeaseKey added in v0.1.0

func ParseStateLeaseKey(rel string) (LeaseKeyParts, bool)

ParseStateLeaseKey extracts the queue name and message id from a relative queue state lease key (e.g. "q/orders/state/<id>").

type Message

type Message struct {
	Namespace        string
	Queue            string
	ID               string
	EnqueuedAt       time.Time
	NotVisibleUntil  time.Time
	Visibility       time.Duration
	MaxAttempts      int
	Attempts         int
	FailureAttempts  int
	TTLDeadline      time.Time
	PayloadBytes     int64
	PayloadContent   string
	MetadataObject   string
	PayloadObject    string
	MetadataRevision string
	CorrelationID    string
}

Message describes an enqueued message.

type MessageCandidateResult added in v0.1.0

type MessageCandidateResult struct {
	Descriptor *MessageDescriptor
	NextCursor string
}

MessageCandidateResult captures the next ready candidate and cursor.

type MessageDescriptor

type MessageDescriptor struct {
	Namespace    string
	ID           string
	MetadataKey  string
	MetadataETag string
	Document     messageDocument
}

MessageDescriptor provides a lightweight summary from listing.

type MessageDocument added in v0.1.0

type MessageDocument struct {
	Type               string         `json:"type"`
	Namespace          string         `json:"namespace,omitempty"`
	Queue              string         `json:"queue"`
	ID                 string         `json:"id"`
	EnqueuedAt         time.Time      `json:"enqueued_at"`
	UpdatedAt          time.Time      `json:"updated_at"`
	Attempts           int            `json:"attempts"`
	FailureAttempts    int            `json:"failure_attempts,omitempty"`
	NotVisibleUntil    time.Time      `json:"not_visible_until"`
	MaxAttempts        int            `json:"max_attempts"`
	Attributes         map[string]any `json:"attributes,omitempty"`
	LastError          any            `json:"last_error,omitempty"`
	PayloadBytes       int64          `json:"payload_bytes,omitempty"`
	PayloadContentType string         `json:"payload_content_type,omitempty"`
	VisibilityTimeout  int64          `json:"visibility_timeout_seconds,omitempty"`
	ExpiresAt          *time.Time     `json:"expires_at,omitempty"`
	CorrelationID      string         `json:"correlation_id,omitempty"`
	LeaseID            string         `json:"lease_id,omitempty"`
	LeaseFencingToken  int64          `json:"lease_fencing_token,omitempty"`
	LeaseTxnID         string         `json:"lease_txn_id,omitempty"`
	PayloadDescriptor  []byte         `json:"payload_descriptor,omitempty"`
	MetaDescriptor     []byte         `json:"meta_descriptor,omitempty"`
}

MessageDocument captures per-message metadata stored alongside payloads.

type MessageListResult added in v0.1.0

type MessageListResult struct {
	Descriptors    []MessageDescriptor
	NextStartAfter string
	Truncated      bool
}

MessageListResult captures message listing output.

type MessageResult added in v0.1.0

type MessageResult struct {
	Document *MessageDocument
	ETag     string
}

MessageResult captures a message document and its ETag.

type PayloadResult added in v0.1.0

type PayloadResult struct {
	Reader io.ReadCloser
	Info   *storage.ObjectInfo
}

PayloadResult captures a payload reader with metadata.

type ReadyCacheStats added in v0.3.0

type ReadyCacheStats struct {
	InflightSkips    int64
	CursorResets     int64
	InflightClears   int64
	ListScans        int64
	ListObjects      int64
	ListNanos        int64
	LoadCount        int64
	LoadNanos        int64
	NextCalls        int64
	RefreshWaits     int64
	RefreshWaitNanos int64
}

ReadyCacheStats captures optional counters for queue cache behavior.

func ReadyCacheStatsSnapshot added in v0.3.0

func ReadyCacheStatsSnapshot() ReadyCacheStats

ReadyCacheStatsSnapshot returns the current ready cache counters.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service implements queue primitives atop the storage backend.

func New

func New(store storage.Backend, clk clock.Clock, cfg Config) (*Service, error)

New constructs a queue Service.

func (*Service) Ack

func (s *Service) Ack(ctx context.Context, namespace, queue, id string, metaETag string, stateETag string, stateRequired bool) error

Ack removes state (if present) and message metadata/payload.

func (*Service) DeleteMessage

func (s *Service) DeleteMessage(ctx context.Context, namespace, queue, id string, expectedETag string) error

DeleteMessage removes metadata and payload (best-effort) respecting CAS.

func (*Service) DeleteState

func (s *Service) DeleteState(ctx context.Context, namespace, queue, id string, expectedETag string) error

DeleteState removes the workflow state document for queue message.

func (*Service) Enqueue

func (s *Service) Enqueue(ctx context.Context, namespace, queue string, payload io.Reader, opts EnqueueOptions) (*Message, error)

Enqueue inserts a new message into queue and returns its metadata.

func (*Service) EnsureMessageReady

func (s *Service) EnsureMessageReady(ctx context.Context, namespace, queue, id string) error

EnsureMessageReady verifies that the message payload and its metadata exist before delivery.

func (*Service) EnsureStateExists

func (s *Service) EnsureStateExists(ctx context.Context, namespace, queue, id string) (string, error)

EnsureStateExists creates an empty state document if missing.

func (*Service) ExtendVisibility

func (s *Service) ExtendVisibility(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, extension time.Duration) (string, error)

ExtendVisibility pushes not_visible_until forward relative to now.

func (*Service) GetMessage

func (s *Service) GetMessage(ctx context.Context, namespace, queue, id string) (MessageResult, error)

GetMessage loads the metadata document for the provided queue/id pair.

func (*Service) GetPayload

func (s *Service) GetPayload(ctx context.Context, namespace, queue, id string) (PayloadResult, error)

GetPayload streams the message payload.

func (*Service) IncrementAttempts

func (s *Service) IncrementAttempts(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, visibility time.Duration) (string, error)

IncrementAttempts updates attempts and visibility after a successful lease acquire.

func (*Service) ListMessages

func (s *Service) ListMessages(ctx context.Context, namespace, queue string, startAfter string, limit int) (MessageListResult, error)

ListMessages enumerates message metadata objects in lexical order.

func (*Service) LoadState

func (s *Service) LoadState(ctx context.Context, namespace, queue, id string) (StateResult, error)

LoadState retrieves the JSON state document associated with the queue message.

func (*Service) MoveToDLQ

func (s *Service) MoveToDLQ(ctx context.Context, namespace, queue, id string, doc *messageDocument, metaETag string) error

MoveToDLQ moves metadata/payload/state to the dead-letter queue.

func (*Service) Nack

func (s *Service) Nack(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, delay time.Duration, lastErr any) (string, error)

Nack updates metadata with a delay and optional error payload.

func (*Service) NextCandidate

func (s *Service) NextCandidate(ctx context.Context, namespace, queue string, startAfter string, pageSize int) (MessageCandidateResult, error)

NextCandidate returns the next visible message ready for processing.

func (*Service) RefreshReadyCache

func (s *Service) RefreshReadyCache(ctx context.Context, namespace, queue string) error

RefreshReadyCache syncs the ready cache for the provided queue immediately.

func (*Service) Reschedule

func (s *Service) Reschedule(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, delay time.Duration) (string, error)

Reschedule moves message visibility forward without incrementing attempts (used for nack).

func (*Service) SaveMessageDocument

func (s *Service) SaveMessageDocument(ctx context.Context, namespace, queue, id string, doc *messageDocument, expectedETag string) (string, error)

SaveMessageDocument writes metadata via CAS.

func (*Service) SaveState

func (s *Service) SaveState(ctx context.Context, namespace, queue, id string, doc *stateDocument, expectedETag string) (string, error)

SaveState updates the workflow state document via CAS.

func (*Service) UpdateReadyCache added in v0.3.0

func (s *Service) UpdateReadyCache(namespace, queue string, doc *MessageDocument, etag string)

UpdateReadyCache seeds the ready cache with a fresh descriptor.

type StateDocument added in v0.1.0

type StateDocument struct {
	Type          string    `json:"type"`
	Namespace     string    `json:"namespace"`
	Queue         string    `json:"queue"`
	MessageID     string    `json:"message_id"`
	CreatedAt     time.Time `json:"created_at"`
	UpdatedAt     time.Time `json:"updated_at"`
	CorrelationID string    `json:"correlation_id,omitempty"`
}

StateDocument captures metadata about queue state blobs.

type StateResult added in v0.1.0

type StateResult struct {
	Document *StateDocument
	ETag     string
}

StateResult captures a queue state document and its ETag.

type Stats

type Stats struct {
	Namespace         string
	Queue             string
	WaitingConsumers  int
	PendingCandidates int
	TotalConsumers    int
}

Stats captures dispatcher-level metrics for a specific queue.

type WatchFactory

type WatchFactory interface {
	Subscribe(namespace, queue string) (WatchSubscription, error)
}

WatchFactory constructs WatchSubscriptions for queue names.

func WatchFactoryFromStorage

func WatchFactoryFromStorage(feed storage.QueueChangeFeed) WatchFactory

WatchFactoryFromStorage adapts a storage.QueueChangeFeed into a Dispatcher watch factory.

type WatchSubscription

type WatchSubscription interface {
	Events() <-chan struct{}
	Close() error
}

WatchSubscription reports queue change events.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL