Documentation
¶
Index ¶
- Variables
- func EnableReadyCacheStats()
- func IsQueueMessageKey(rel string) bool
- func IsQueueStateKey(key string) bool
- func MessageLeaseKey(namespace, queue, id string) (string, error)
- func ResetReadyCacheStats()
- func StateLeaseKey(namespace, queue, id string) (string, error)
- type Candidate
- type Config
- type Dispatcher
- func (d *Dispatcher) CancelNotify(namespace, queue, messageID string)
- func (d *Dispatcher) HasActiveWatcher(namespace, queue string) bool
- func (d *Dispatcher) Notify(namespace, queue string)
- func (d *Dispatcher) NotifyAt(namespace, queue, messageID string, due time.Time)
- func (d *Dispatcher) QueueStats(namespace, queue string) Stats
- func (d *Dispatcher) Try(ctx context.Context, namespace, queue string) (*Candidate, error)
- func (d *Dispatcher) Wait(ctx context.Context, namespace, queue string) (*Candidate, error)
- type DispatcherOption
- func WithLogger(logger pslog.Logger) DispatcherOption
- func WithMaxConsumers(n int) DispatcherOption
- func WithPollInterval(d time.Duration) DispatcherOption
- func WithPollJitter(d time.Duration) DispatcherOption
- func WithQueuePageSize(size int) DispatcherOption
- func WithResilientPollInterval(interval time.Duration) DispatcherOption
- func WithWatchFactory(factory WatchFactory) DispatcherOption
- type EnqueueOptions
- type LeaseKeyParts
- type Message
- type MessageCandidateResult
- type MessageDescriptor
- type MessageDocument
- type MessageListResult
- type MessageResult
- type PayloadResult
- type ReadyCacheStats
- type Service
- func (s *Service) Ack(ctx context.Context, namespace, queue, id string, metaETag string, ...) error
- func (s *Service) DeleteMessage(ctx context.Context, namespace, queue, id string, expectedETag string) error
- func (s *Service) DeleteState(ctx context.Context, namespace, queue, id string, expectedETag string) error
- func (s *Service) Enqueue(ctx context.Context, namespace, queue string, payload io.Reader, ...) (*Message, error)
- func (s *Service) EnsureMessageReady(ctx context.Context, namespace, queue, id string) error
- func (s *Service) EnsureStateExists(ctx context.Context, namespace, queue, id string) (string, error)
- func (s *Service) ExtendVisibility(ctx context.Context, namespace, queue string, doc *messageDocument, ...) (string, error)
- func (s *Service) GetMessage(ctx context.Context, namespace, queue, id string) (MessageResult, error)
- func (s *Service) GetPayload(ctx context.Context, namespace, queue, id string) (PayloadResult, error)
- func (s *Service) IncrementAttempts(ctx context.Context, namespace, queue string, doc *messageDocument, ...) (string, error)
- func (s *Service) ListMessages(ctx context.Context, namespace, queue string, startAfter string, limit int) (MessageListResult, error)
- func (s *Service) LoadState(ctx context.Context, namespace, queue, id string) (StateResult, error)
- func (s *Service) MoveToDLQ(ctx context.Context, namespace, queue, id string, doc *messageDocument, ...) error
- func (s *Service) Nack(ctx context.Context, namespace, queue string, doc *messageDocument, ...) (string, error)
- func (s *Service) NextCandidate(ctx context.Context, namespace, queue string, startAfter string, pageSize int) (MessageCandidateResult, error)
- func (s *Service) RefreshReadyCache(ctx context.Context, namespace, queue string) error
- func (s *Service) Reschedule(ctx context.Context, namespace, queue string, doc *messageDocument, ...) (string, error)
- func (s *Service) SaveMessageDocument(ctx context.Context, namespace, queue, id string, doc *messageDocument, ...) (string, error)
- func (s *Service) SaveState(ctx context.Context, namespace, queue, id string, doc *stateDocument, ...) (string, error)
- func (s *Service) UpdateReadyCache(namespace, queue string, doc *MessageDocument, etag string)
- type StateDocument
- type StateResult
- type Stats
- type WatchFactory
- type WatchSubscription
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidQueue is returned when the requested queue name contains invalid characters. ErrInvalidQueue = errors.New("queue: invalid queue name") )
var ErrTooManyConsumers = errors.New("queue: too many consumers")
ErrTooManyConsumers indicates the per-server consumer cap was reached.
Functions ¶
func EnableReadyCacheStats ¶ added in v0.3.0
func EnableReadyCacheStats()
EnableReadyCacheStats turns on ready cache stats collection at runtime.
func IsQueueMessageKey ¶ added in v0.1.0
IsQueueMessageKey reports whether the relative key refers to a queue message lease.
func IsQueueStateKey ¶
IsQueueStateKey reports whether the logical key belongs to a queue state lease.
func MessageLeaseKey ¶
MessageLeaseKey returns the logical lock key used for message leasing.
func ResetReadyCacheStats ¶ added in v0.3.0
func ResetReadyCacheStats()
ResetReadyCacheStats clears ready cache counters.
func StateLeaseKey ¶
StateLeaseKey returns the logical lock key used for workflow state leasing.
Types ¶
type Candidate ¶
type Candidate struct {
Descriptor MessageDescriptor
NextCursor string
}
Candidate describes a message snapshot that can be leased by a consumer.
type Config ¶
type Config struct {
DefaultVisibilityTimeout time.Duration
DefaultMaxAttempts int
DefaultTTL time.Duration
QueuePollInterval time.Duration
Crypto *storage.Crypto
}
Config governs queue behaviour defaults.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher centralises queue polling so storage is only listed once per queue regardless of how many consumers are connected.
func NewDispatcher ¶
func NewDispatcher(svc candidateProvider, opts ...DispatcherOption) *Dispatcher
NewDispatcher builds a dispatcher.
func (*Dispatcher) CancelNotify ¶ added in v0.4.0
func (d *Dispatcher) CancelNotify(namespace, queue, messageID string)
CancelNotify removes a scheduled NotifyAt wake-up for a message.
func (*Dispatcher) HasActiveWatcher ¶
func (d *Dispatcher) HasActiveWatcher(namespace, queue string) bool
HasActiveWatcher reports whether the dispatcher currently maintains an active watch subscription for the provided queue.
func (*Dispatcher) Notify ¶
func (d *Dispatcher) Notify(namespace, queue string)
Notify nudges the dispatcher to poll the queue immediately (typically after enqueue).
func (*Dispatcher) NotifyAt ¶ added in v0.4.0
func (d *Dispatcher) NotifyAt(namespace, queue, messageID string, due time.Time)
NotifyAt schedules a queue wake-up for messageID at due time. This is the in-process fast path for delayed visibility transitions; polling/watch remains the fallback for missed events and cross-node updates.
func (*Dispatcher) QueueStats ¶
func (d *Dispatcher) QueueStats(namespace, queue string) Stats
QueueStats returns a snapshot of dispatcher metrics for the given queue.
type DispatcherOption ¶
type DispatcherOption func(*Dispatcher)
DispatcherOption customises Dispatcher behaviour.
func WithLogger ¶
func WithLogger(logger pslog.Logger) DispatcherOption
WithLogger assigns a base logger used for dispatcher diagnostics.
func WithMaxConsumers ¶
func WithMaxConsumers(n int) DispatcherOption
WithMaxConsumers caps the number of simultaneous Wait calls (default 1000).
func WithPollInterval ¶
func WithPollInterval(d time.Duration) DispatcherOption
WithPollInterval sets the base polling interval (default 3s).
func WithPollJitter ¶
func WithPollJitter(d time.Duration) DispatcherOption
WithPollJitter sets the additional random jitter added to the polling interval.
func WithQueuePageSize ¶ added in v0.3.0
func WithQueuePageSize(size int) DispatcherOption
WithQueuePageSize overrides the default queue list page size per poll.
func WithResilientPollInterval ¶
func WithResilientPollInterval(interval time.Duration) DispatcherOption
WithResilientPollInterval configures the safety poll interval used when watchers are active.
func WithWatchFactory ¶
func WithWatchFactory(factory WatchFactory) DispatcherOption
WithWatchFactory installs a queue change watcher factory used to reduce polling when available.
type EnqueueOptions ¶
type EnqueueOptions struct {
Delay time.Duration
Visibility time.Duration
TTL time.Duration
MaxAttempts int
Attributes map[string]any
ContentType string
}
EnqueueOptions captures optional parameters for Enqueue.
type LeaseKeyParts ¶ added in v0.1.0
LeaseKeyParts captures the queue name and message id from a lease key.
func ParseMessageLeaseKey ¶ added in v0.1.0
func ParseMessageLeaseKey(rel string) (LeaseKeyParts, bool)
ParseMessageLeaseKey extracts the queue name and message id from a relative queue message lease key (e.g. "q/orders/msg/<id>"). Returns ok=false when the key does not match the expected layout.
func ParseStateLeaseKey ¶ added in v0.1.0
func ParseStateLeaseKey(rel string) (LeaseKeyParts, bool)
ParseStateLeaseKey extracts the queue name and message id from a relative queue state lease key (e.g. "q/orders/state/<id>").
type Message ¶
type Message struct {
Namespace string
Queue string
ID string
EnqueuedAt time.Time
NotVisibleUntil time.Time
Visibility time.Duration
MaxAttempts int
Attempts int
FailureAttempts int
TTLDeadline time.Time
PayloadBytes int64
PayloadContent string
MetadataObject string
PayloadObject string
MetadataRevision string
CorrelationID string
}
Message describes an enqueued message.
type MessageCandidateResult ¶ added in v0.1.0
type MessageCandidateResult struct {
Descriptor *MessageDescriptor
NextCursor string
}
MessageCandidateResult captures the next ready candidate and cursor.
type MessageDescriptor ¶
type MessageDescriptor struct {
Namespace string
ID string
MetadataKey string
MetadataETag string
Document messageDocument
}
MessageDescriptor provides a lightweight summary from listing.
type MessageDocument ¶ added in v0.1.0
type MessageDocument struct {
Type string `json:"type"`
Namespace string `json:"namespace,omitempty"`
Queue string `json:"queue"`
ID string `json:"id"`
EnqueuedAt time.Time `json:"enqueued_at"`
UpdatedAt time.Time `json:"updated_at"`
Attempts int `json:"attempts"`
FailureAttempts int `json:"failure_attempts,omitempty"`
NotVisibleUntil time.Time `json:"not_visible_until"`
MaxAttempts int `json:"max_attempts"`
Attributes map[string]any `json:"attributes,omitempty"`
LastError any `json:"last_error,omitempty"`
PayloadBytes int64 `json:"payload_bytes,omitempty"`
PayloadContentType string `json:"payload_content_type,omitempty"`
VisibilityTimeout int64 `json:"visibility_timeout_seconds,omitempty"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
LeaseID string `json:"lease_id,omitempty"`
LeaseFencingToken int64 `json:"lease_fencing_token,omitempty"`
LeaseTxnID string `json:"lease_txn_id,omitempty"`
PayloadDescriptor []byte `json:"payload_descriptor,omitempty"`
MetaDescriptor []byte `json:"meta_descriptor,omitempty"`
}
MessageDocument captures per-message metadata stored alongside payloads.
type MessageListResult ¶ added in v0.1.0
type MessageListResult struct {
Descriptors []MessageDescriptor
NextStartAfter string
Truncated bool
}
MessageListResult captures message listing output.
type MessageResult ¶ added in v0.1.0
type MessageResult struct {
Document *MessageDocument
ETag string
}
MessageResult captures a message document and its ETag.
type PayloadResult ¶ added in v0.1.0
type PayloadResult struct {
Reader io.ReadCloser
Info *storage.ObjectInfo
}
PayloadResult captures a payload reader with metadata.
type ReadyCacheStats ¶ added in v0.3.0
type ReadyCacheStats struct {
InflightSkips int64
CursorResets int64
InflightClears int64
ListScans int64
ListObjects int64
ListNanos int64
LoadCount int64
LoadNanos int64
NextCalls int64
RefreshWaits int64
RefreshWaitNanos int64
}
ReadyCacheStats captures optional counters for queue cache behavior.
func ReadyCacheStatsSnapshot ¶ added in v0.3.0
func ReadyCacheStatsSnapshot() ReadyCacheStats
ReadyCacheStatsSnapshot returns the current ready cache counters.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service implements queue primitives atop the storage backend.
func (*Service) Ack ¶
func (s *Service) Ack(ctx context.Context, namespace, queue, id string, metaETag string, stateETag string, stateRequired bool) error
Ack removes state (if present) and message metadata/payload.
func (*Service) DeleteMessage ¶
func (s *Service) DeleteMessage(ctx context.Context, namespace, queue, id string, expectedETag string) error
DeleteMessage removes metadata and payload (best-effort) respecting CAS.
func (*Service) DeleteState ¶
func (s *Service) DeleteState(ctx context.Context, namespace, queue, id string, expectedETag string) error
DeleteState removes the workflow state document for queue message.
func (*Service) Enqueue ¶
func (s *Service) Enqueue(ctx context.Context, namespace, queue string, payload io.Reader, opts EnqueueOptions) (*Message, error)
Enqueue inserts a new message into queue and returns its metadata.
func (*Service) EnsureMessageReady ¶
EnsureMessageReady verifies that the message payload and its metadata exist before delivery.
func (*Service) EnsureStateExists ¶
func (s *Service) EnsureStateExists(ctx context.Context, namespace, queue, id string) (string, error)
EnsureStateExists creates an empty state document if missing.
func (*Service) ExtendVisibility ¶
func (s *Service) ExtendVisibility(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, extension time.Duration) (string, error)
ExtendVisibility pushes not_visible_until forward relative to now.
func (*Service) GetMessage ¶
func (s *Service) GetMessage(ctx context.Context, namespace, queue, id string) (MessageResult, error)
GetMessage loads the metadata document for the provided queue/id pair.
func (*Service) GetPayload ¶
func (s *Service) GetPayload(ctx context.Context, namespace, queue, id string) (PayloadResult, error)
GetPayload streams the message payload.
func (*Service) IncrementAttempts ¶
func (s *Service) IncrementAttempts(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, visibility time.Duration) (string, error)
IncrementAttempts updates attempts and visibility after a successful lease acquire.
func (*Service) ListMessages ¶
func (s *Service) ListMessages(ctx context.Context, namespace, queue string, startAfter string, limit int) (MessageListResult, error)
ListMessages enumerates message metadata objects in lexical order.
func (*Service) LoadState ¶
LoadState retrieves the JSON state document associated with the queue message.
func (*Service) MoveToDLQ ¶
func (s *Service) MoveToDLQ(ctx context.Context, namespace, queue, id string, doc *messageDocument, metaETag string) error
MoveToDLQ moves metadata/payload/state to the dead-letter queue.
func (*Service) Nack ¶
func (s *Service) Nack(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, delay time.Duration, lastErr any) (string, error)
Nack updates metadata with a delay and optional error payload.
func (*Service) NextCandidate ¶
func (s *Service) NextCandidate(ctx context.Context, namespace, queue string, startAfter string, pageSize int) (MessageCandidateResult, error)
NextCandidate returns the next visible message ready for processing.
func (*Service) RefreshReadyCache ¶
RefreshReadyCache syncs the ready cache for the provided queue immediately.
func (*Service) Reschedule ¶
func (s *Service) Reschedule(ctx context.Context, namespace, queue string, doc *messageDocument, expectedETag string, delay time.Duration) (string, error)
Reschedule moves message visibility forward without incrementing attempts (used for nack).
func (*Service) SaveMessageDocument ¶
func (s *Service) SaveMessageDocument(ctx context.Context, namespace, queue, id string, doc *messageDocument, expectedETag string) (string, error)
SaveMessageDocument writes metadata via CAS.
func (*Service) SaveState ¶
func (s *Service) SaveState(ctx context.Context, namespace, queue, id string, doc *stateDocument, expectedETag string) (string, error)
SaveState updates the workflow state document via CAS.
func (*Service) UpdateReadyCache ¶ added in v0.3.0
func (s *Service) UpdateReadyCache(namespace, queue string, doc *MessageDocument, etag string)
UpdateReadyCache seeds the ready cache with a fresh descriptor.
type StateDocument ¶ added in v0.1.0
type StateDocument struct {
Type string `json:"type"`
Namespace string `json:"namespace"`
Queue string `json:"queue"`
MessageID string `json:"message_id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CorrelationID string `json:"correlation_id,omitempty"`
}
StateDocument captures metadata about queue state blobs.
type StateResult ¶ added in v0.1.0
type StateResult struct {
Document *StateDocument
ETag string
}
StateResult captures a queue state document and its ETag.
type Stats ¶
type Stats struct {
Namespace string
Queue string
WaitingConsumers int
PendingCandidates int
TotalConsumers int
}
Stats captures dispatcher-level metrics for a specific queue.
type WatchFactory ¶
type WatchFactory interface {
Subscribe(namespace, queue string) (WatchSubscription, error)
}
WatchFactory constructs WatchSubscriptions for queue names.
func WatchFactoryFromStorage ¶
func WatchFactoryFromStorage(feed storage.QueueChangeFeed) WatchFactory
WatchFactoryFromStorage adapts a storage.QueueChangeFeed into a Dispatcher watch factory.
type WatchSubscription ¶
type WatchSubscription interface {
Events() <-chan struct{}
Close() error
}
WatchSubscription reports queue change events.