Documentation
¶
Index ¶
- Constants
- Variables
- func AdapterFor(category AdapterCategory, id string) string
- func DictionarySubscribe[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], db DB, keys []string, ...) error
- func Refresh[T any, PS IPubSubBackend, C Change[C], S Subscription[C]](ls *ListenerSubscriptions[T, PS, C], identifier string)
- func Want[T any, PS IPubSubBackend, C Change[C], S Subscription[C]](ls *ListenerSubscriptions[T, PS, C], identifier string, subscription S) error
- type AdapterCategory
- type BatchOptions
- type BatchOutput
- type BatchRequest
- type Batcher
- type Change
- type Config
- type DatabaseListenerCreate
- type DatabaseListenerDictionary
- func (ld *DatabaseListenerDictionary[T, PS, DB, C]) ForOutbox(key string, change C) dbldOutboxMessage[T, PS, DB, C]
- func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Get(db DB, keys []string) (map[string]C, error)
- func (ld *DatabaseListenerDictionary[T, PS, DB, C]) GetIdentifier() string
- func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Reset(db DB, keys []string) error
- func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Save(db DB, key string, change Change[C]) error
- type HydroAdapter
- type HydroAddress
- type IPubSubBackend
- type ISubWorker
- type Identifiable
- type Instance
- type ListenerSubscriptions
- type LocalPubSub
- type LocalSubWorker
- func (w *LocalSubWorker) Close()
- func (w *LocalSubWorker) OnError(handler func(channel string, err error))
- func (w *LocalSubWorker) OnMessage(handler func(channel string, message string))
- func (w *LocalSubWorker) Subscribe(ctx context.Context, channels ...string) error
- func (w *LocalSubWorker) Unsubscribe(ctx context.Context, channels ...string) error
- type OutboxCreate
- type OutboxMessage
- type PackedMessage
- type PoolConfig
- type PubSubOutbox
- func (o *PubSubOutbox[DB, PS]) Close()
- func (o *PubSubOutbox[DB, PS]) CreateWorker() ISubWorker
- func (o *PubSubOutbox[DB, PS]) Publish(c context.Context, identifier string, message string) error
- func (o *PubSubOutbox[DB, PS]) Save(db DB, identifier string, data []byte) error
- func (o *PubSubOutbox[DB, PS]) SaveMultiple(db DB, sends []PackedMessage) error
- type SubPool
- func (p *SubPool[T]) Close()
- func (p *SubPool[PS]) OnError(handler func(channel string, err error))
- func (p *SubPool[PS]) OnMessage(handler func(channel string, message string))
- func (p *SubPool[PS]) Subscribe(ctx context.Context, channels ...string) error
- func (p *SubPool[PS]) Unsubscribe(ctx context.Context, channels ...string) error
- type Subscription
Constants ¶
const RecommendedWantInterval = 20 * time.Second // The interval how often you should call "Want" on a subscription
const ServerLocal = "_local"
The Hydro server for just sending the packet directly to local clients
const SubscriptionDuration = 60 * time.Second // The duration after which a subscription will be deleted
Variables ¶
var ( ErrChannelNotRegistered = errors.New("channel is not registered") ErrChannelAlreadyRegistered = errors.New("channel already registered by different worker") )
Standardized errors for pub/sub
Functions ¶
func AdapterFor ¶
func AdapterFor(category AdapterCategory, id string) string
Get the name of an adapter Hydro will send to for an id and a category
func DictionarySubscribe ¶
func DictionarySubscribe[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], db DB, keys []string, identifier string, subscription S) error
func Refresh ¶
func Refresh[T any, PS IPubSubBackend, C Change[C], S Subscription[C]](ls *ListenerSubscriptions[T, PS, C], identifier string)
Refresh a listener subscription and mark it as wanted (identifier is a unique identifier of the subscription)
func Want ¶
func Want[T any, PS IPubSubBackend, C Change[C], S Subscription[C]](ls *ListenerSubscriptions[T, PS, C], identifier string, subscription S) error
Mark a listener subscription as wanted (identifier is a unique identifier of the subscription)
Types ¶
type AdapterCategory ¶
type AdapterCategory string
type BatchOptions ¶
type BatchOutput ¶
type BatchOutput[I comparable, O any] struct { Err error Outputs map[I]O }
type BatchRequest ¶
type BatchRequest[I comparable, O any] struct { Inputs []I Collector chan BatchOutput[I, O] }
type Batcher ¶
type Batcher[I comparable, O any] struct { Options BatchOptions Collector chan BatchRequest[I, O] BatchFunc func([]I) (map[I]O, error) }
type Config ¶
type Config[T any, PS IPubSubBackend] struct { // The neogate instance. All events will be sent through here. Gate *neogate.Instance[T] // Path for the external Hydro gateway (just leave empty if you don't plan on mounting it anyway) GatewayPath string // The backend for Hydro's pub/sub model (if not set we'll use a local backend that acts as a replacement for a dedicated pub/sub service) PubSubBackend PS }
type DatabaseListenerCreate ¶
type DatabaseListenerCreate[DB any, C Change[C]] struct { Identifier string // Identifier for the listener (REQUIRED) Get func(DB, []string) (map[string]C, error) // Get the base data from results of listeners or just with key (required) ToEvent func(key string, change Change[C]) neogate.Event // Should convert string and change info into an event that can be sent with Neo (required) PoolConfig PoolConfig // Config for the pooling of subscription workers }
type DatabaseListenerDictionary ¶
type DatabaseListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]] struct { Instance *Instance[T, PS] // Hydro instance related Identifier string // Unique identifier for this listener dictionary // contains filtered or unexported fields }
func NewListenerDictionary ¶
func NewListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]](instance *Instance[T, PS], outbox *PubSubOutbox[DB, PS], create DatabaseListenerCreate[DB, C]) *DatabaseListenerDictionary[T, PS, DB, C]
Helper function for initializing a new listener dictionary properly
func (*DatabaseListenerDictionary[T, PS, DB, C]) ForOutbox ¶
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) ForOutbox(key string, change C) dbldOutboxMessage[T, PS, DB, C]
func (*DatabaseListenerDictionary[T, PS, DB, C]) Get ¶
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Get(db DB, keys []string) (map[string]C, error)
Get the value for keys from the listener dictionary (makes sure we can add batching in the future)
func (*DatabaseListenerDictionary[T, PS, DB, C]) GetIdentifier ¶
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) GetIdentifier() string
func (*DatabaseListenerDictionary[T, PS, DB, C]) Reset ¶
func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Reset(db DB, keys []string) error
Reset all values for the keys back to the original value by re-getting them and pushing that update
type HydroAdapter ¶
type HydroAdapter struct {
Category AdapterCategory // The category the adapter is in
Adapter string // The id of the adapter
}
type HydroAddress ¶
type HydroAddress struct {
Server string // Server where the thing exists
Adapter HydroAdapter // Adapter to call on the target server
}
Address for an adapter on another server in Hydro (might be an account or something else)
type IPubSubBackend ¶
type ISubWorker ¶
type Identifiable ¶
type Identifiable interface {
GetIdentifier() string
}
type Instance ¶
type Instance[T any, PS IPubSubBackend] struct { // contains filtered or unexported fields }
func New ¶
func New[T any, PS IPubSubBackend](config *Config[T, PS]) *Instance[T, PS]
Create a new Hydro instance
type ListenerSubscriptions ¶
type ListenerSubscriptions[T any, PS IPubSubBackend, C Change[C]] struct { // contains filtered or unexported fields }
A manager of subscriptions to a Listener that automatically evicts them statelessly when no longer wanted.
Also aggressively caches the current return value from the listener. This is done by stacking the changes using the Stack method from the Change interface. OnSubscribe is usually pretty expensive and managing it like this makes sure we always only call it exactly once.
func NewSubs ¶
func NewSubs[T any, PS IPubSubBackend, C Change[C]](instance *Instance[T, PS], convert func(Change[C]) neogate.Event) *ListenerSubscriptions[T, PS, C]
Create a new manager of listener subscriptions
func (*ListenerSubscriptions[T, PS, C]) DisableQueuing ¶
func (ls *ListenerSubscriptions[T, PS, C]) DisableQueuing(base Change[C])
After DisableQueuing the subscriptions start to actually send changes when they are received, before all are queued
func (*ListenerSubscriptions[T, PS, C]) IsQueuing ¶
func (ls *ListenerSubscriptions[T, PS, C]) IsQueuing() bool
Check if the subscriptions are currently still in queuing mode
func (*ListenerSubscriptions[T, PS, C]) OnChange ¶
func (ls *ListenerSubscriptions[T, PS, C]) OnChange(change Change[C])
Handles a change and sends it to all subscribers of the listener
type LocalPubSub ¶
type LocalPubSub struct {
// contains filtered or unexported fields
}
func NewLocalPubSub ¶
func NewLocalPubSub() *LocalPubSub
func (*LocalPubSub) CreateWorker ¶
func (lpb *LocalPubSub) CreateWorker() ISubWorker
type LocalSubWorker ¶
type LocalSubWorker struct {
// contains filtered or unexported fields
}
func (*LocalSubWorker) Close ¶
func (w *LocalSubWorker) Close()
func (*LocalSubWorker) OnError ¶
func (w *LocalSubWorker) OnError(handler func(channel string, err error))
func (*LocalSubWorker) OnMessage ¶
func (w *LocalSubWorker) OnMessage(handler func(channel string, message string))
func (*LocalSubWorker) Subscribe ¶
func (w *LocalSubWorker) Subscribe(ctx context.Context, channels ...string) error
func (*LocalSubWorker) Unsubscribe ¶
func (w *LocalSubWorker) Unsubscribe(ctx context.Context, channels ...string) error
type OutboxCreate ¶
type OutboxCreate[DB any] struct { // How long the outbox waits before pulling from the database again (default: 100 milliseconds). WaitDuration time.Duration // This function should save an event with an identifier to the database. Save func(database DB, messages []OutboxMessage) error // This is the main function handling the pulling of messages for the outbox. // // handler takes in a list of messages that you pulled from your database of choice in a transaction. It then returns which message identifiers were completed and an error for the first one that failed. Make sure your transaction skips any currently locked items in the table for the pull (the best pattern here is to just use one row / identifier to make sure no out-of-order stuff happens, if an insertion fails you can always just return a "already processing" error in most cases). Make sure to delete all the ones that have been completed. Tx func(database DB, handler func([]OutboxMessage) ([]string, error)) }
type OutboxMessage ¶
type PackedMessage ¶
type PackedMessage interface {
// contains filtered or unexported methods
}
type PoolConfig ¶
type PoolConfig struct {
// How many subscriptions should, at maximum, be done by one worker (default: 100)
MaxAmountByWorker int
}
type PubSubOutbox ¶
type PubSubOutbox[DB any, PS IPubSubBackend] struct { // contains filtered or unexported fields }
func NewOutbox ¶
func NewOutbox[T any, DB any, PS IPubSubBackend](instance *Instance[T, PS], connection DB, create OutboxCreate[DB]) *PubSubOutbox[DB, PS]
Create a new Outbox for pub/sub. This is a data structure that can make sure all of your pub/sub stays transactional no matter which pub/sub implementation to use. This works with basically any database. All you need to do is create tables for the Outbox and also make sure you implement all the functions as required by the create.
func (*PubSubOutbox[DB, PS]) Close ¶
func (o *PubSubOutbox[DB, PS]) Close()
Stop the outbox from pulling from the database. After this you can not restart it.
func (*PubSubOutbox[DB, PS]) CreateWorker ¶
func (o *PubSubOutbox[DB, PS]) CreateWorker() ISubWorker
This method just wraps the function from the backend so you can still use the outbox as a pub/sub backend
func (*PubSubOutbox[DB, PS]) Publish ¶
WATCH OUT: This does not publish to the outbox. Use Save for that. This is the same as calling the Publish function on the original pub/sub backend.
func (*PubSubOutbox[DB, PS]) Save ¶
func (o *PubSubOutbox[DB, PS]) Save(db DB, identifier string, data []byte) error
Save an event to the outbox. Use this for transactional pub/sub using the database.
func (*PubSubOutbox[DB, PS]) SaveMultiple ¶
func (o *PubSubOutbox[DB, PS]) SaveMultiple(db DB, sends []PackedMessage) error
Method for inserting multiple things into the outbox with one query
type SubPool ¶
type SubPool[PS IPubSubBackend] struct { // contains filtered or unexported fields }
func NewPubSubPool ¶
func NewPubSubPool[PS IPubSubBackend](backend PS, config PoolConfig) *SubPool[PS]
type Subscription ¶
type Subscription[C Change[C]] interface { func(neogate.Event, Change[C]) | HydroAddress }
A subscription can either be done using a callback or a Hydro address