hydro

package module
v0.1.0-alpha.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

README

Hydro

[!WARNING] Hydro is an experimental project and currently in its research phase. We're still developing it and releases will be unstable. Expect bugs and breaking changes, even completely without notice.

Scalable, transactional and low latency real-time data updates for Golang, wrapped in one very simple API built on top of Neogate.

Documentation

Index

Constants

View Source
const RecommendedWantInterval = 20 * time.Second // The interval how often you should call "Want" on a subscription
View Source
const ServerLocal = "_local"

The Hydro server for just sending the packet directly to local clients

View Source
const SubscriptionDuration = 60 * time.Second // The duration after which a subscription will be deleted

Variables

View Source
var (
	ErrChannelNotRegistered     = errors.New("channel is not registered")
	ErrChannelAlreadyRegistered = errors.New("channel already registered by different worker")
)

Standardized errors for pub/sub

View Source
var Log = log.New(os.Stdout, "hydro ", log.Flags())

Functions

func AdapterFor

func AdapterFor(category AdapterCategory, id string) string

Get the name of an adapter Hydro will send to for an id and a category

func DictionarySubscribe

func DictionarySubscribe[T any, PS IPubSubBackend, DB any, C Change[C], S Subscription[C]](ld *DatabaseListenerDictionary[T, PS, DB, C], db DB, keys []string, identifier string, subscription S) error

func Refresh

func Refresh[T any, PS IPubSubBackend, C Change[C], S Subscription[C]](ls *ListenerSubscriptions[T, PS, C], identifier string)

Refresh a listener subscription and mark it as wanted (identifier is a unique identifier of the subscription)

func Want

func Want[T any, PS IPubSubBackend, C Change[C], S Subscription[C]](ls *ListenerSubscriptions[T, PS, C], identifier string, subscription S) error

Mark a listener subscription as wanted (identifier is a unique identifier of the subscription)

Types

type AdapterCategory

type AdapterCategory string

type BatchOptions

type BatchOptions struct {
	BatchDuration time.Duration
	MaxAmount     int
}

type BatchOutput

type BatchOutput[I comparable, O any] struct {
	Err     error
	Outputs map[I]O
}

type BatchRequest

type BatchRequest[I comparable, O any] struct {
	Inputs    []I
	Collector chan BatchOutput[I, O]
}

type Batcher

type Batcher[I comparable, O any] struct {
	Options   BatchOptions
	Collector chan BatchRequest[I, O]
	BatchFunc func([]I) (map[I]O, error)
}

func (*Batcher[I, O]) Do

func (b *Batcher[I, O]) Do(inputs []I) (map[I]O, error)

Submit a task to the batcher

func (*Batcher[I, O]) Init

func (b *Batcher[I, O]) Init()

type Change

type Change[T any] interface {
	Stack(c Change[T]) Change[T]
}

type Config

type Config[T any, PS IPubSubBackend] struct {
	// The neogate instance. All events will be sent through here.
	Gate *neogate.Instance[T]

	// Path for the external Hydro gateway (just leave empty if you don't plan on mounting it anyway)
	GatewayPath string

	// The backend for Hydro's pub/sub model (if not set we'll use a local backend that acts as a replacement for a dedicated pub/sub service)
	PubSubBackend PS
}

type DatabaseListenerCreate

type DatabaseListenerCreate[DB any, C Change[C]] struct {
	Identifier string                                           // Identifier for the listener (REQUIRED)
	Get        func(DB, []string) (map[string]C, error)         // Get the base data from results of listeners or just with key (required)
	ToEvent    func(key string, change Change[C]) neogate.Event // Should convert string and change info into an event that can be sent with Neo (required)

	PoolConfig PoolConfig // Config for the pooling of subscription workers
}

type DatabaseListenerDictionary

type DatabaseListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]] struct {
	Instance   *Instance[T, PS] // Hydro instance related
	Identifier string           // Unique identifier for this listener dictionary
	// contains filtered or unexported fields
}

func NewListenerDictionary

func NewListenerDictionary[T any, PS IPubSubBackend, DB any, C Change[C]](instance *Instance[T, PS], outbox *PubSubOutbox[DB, PS], create DatabaseListenerCreate[DB, C]) *DatabaseListenerDictionary[T, PS, DB, C]

Helper function for initializing a new listener dictionary properly

func (*DatabaseListenerDictionary[T, PS, DB, C]) ForOutbox

func (ld *DatabaseListenerDictionary[T, PS, DB, C]) ForOutbox(key string, change C) dbldOutboxMessage[T, PS, DB, C]

func (*DatabaseListenerDictionary[T, PS, DB, C]) Get

func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Get(db DB, keys []string) (map[string]C, error)

Get the value for keys from the listener dictionary (makes sure we can add batching in the future)

func (*DatabaseListenerDictionary[T, PS, DB, C]) GetIdentifier

func (ld *DatabaseListenerDictionary[T, PS, DB, C]) GetIdentifier() string

func (*DatabaseListenerDictionary[T, PS, DB, C]) Reset

func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Reset(db DB, keys []string) error

Reset all values for the keys back to the original value by re-getting them and pushing that update

func (*DatabaseListenerDictionary[T, PS, DB, C]) Save

func (ld *DatabaseListenerDictionary[T, PS, DB, C]) Save(db DB, key string, change Change[C]) error

Save a change to the outbox, makes sure all of this stays transactional

type HydroAdapter

type HydroAdapter struct {
	Category AdapterCategory // The category the adapter is in
	Adapter  string          // The id of the adapter
}

type HydroAddress

type HydroAddress struct {
	Server  string       // Server where the thing exists
	Adapter HydroAdapter // Adapter to call on the target server
}

Address for an adapter on another server in Hydro (might be an account or something else)

type IPubSubBackend

type IPubSubBackend interface {
	CreateWorker() ISubWorker
	Publish(ctx context.Context, channel string, message string) error
}

type ISubWorker

type ISubWorker interface {
	Subscribe(ctx context.Context, channels ...string) error
	Unsubscribe(ctx context.Context, channels ...string) error
	OnMessage(func(channel string, message string))
	OnError(func(channel string, err error))
	Close()
}

type Identifiable

type Identifiable interface {
	GetIdentifier() string
}

type Instance

type Instance[T any, PS IPubSubBackend] struct {
	// contains filtered or unexported fields
}

func New

func New[T any, PS IPubSubBackend](config *Config[T, PS]) *Instance[T, PS]

Create a new Hydro instance

func (*Instance[T, PS]) Gateway

func (i *Instance[T, PS]) Gateway() func(c *fiber.Ctx) error

Create a gateway for the Hydro instance

func (*Instance[T, PS]) Receive

func (i *Instance[T, PS]) Receive(adapters []HydroAdapter, event neogate.Event) error

Handle receiving of a local event using Hydro. Will also let all adapters receive in Neo.

func (*Instance[T, PS]) Send

func (i *Instance[T, PS]) Send(targets []HydroAddress, event neogate.Event)

Send an event to Hydro addresses on any server.

type ListenerSubscriptions

type ListenerSubscriptions[T any, PS IPubSubBackend, C Change[C]] struct {
	// contains filtered or unexported fields
}

A manager of subscriptions to a Listener that automatically evicts them statelessly when no longer wanted.

Also aggressively caches the current return value from the listener. This is done by stacking the changes using the Stack method from the Change interface. OnSubscribe is usually pretty expensive and managing it like this makes sure we always only call it exactly once.

func NewSubs

func NewSubs[T any, PS IPubSubBackend, C Change[C]](instance *Instance[T, PS], convert func(Change[C]) neogate.Event) *ListenerSubscriptions[T, PS, C]

Create a new manager of listener subscriptions

func (*ListenerSubscriptions[T, PS, C]) DisableQueuing

func (ls *ListenerSubscriptions[T, PS, C]) DisableQueuing(base Change[C])

After DisableQueuing the subscriptions start to actually send changes when they are received, before all are queued

func (*ListenerSubscriptions[T, PS, C]) IsQueuing

func (ls *ListenerSubscriptions[T, PS, C]) IsQueuing() bool

Check if the subscriptions are currently still in queuing mode

func (*ListenerSubscriptions[T, PS, C]) OnChange

func (ls *ListenerSubscriptions[T, PS, C]) OnChange(change Change[C])

Handles a change and sends it to all subscribers of the listener

type LocalPubSub

type LocalPubSub struct {
	// contains filtered or unexported fields
}

func NewLocalPubSub

func NewLocalPubSub() *LocalPubSub

func (*LocalPubSub) CreateWorker

func (lpb *LocalPubSub) CreateWorker() ISubWorker

func (*LocalPubSub) Publish

func (w *LocalPubSub) Publish(ctx context.Context, channel string, message string) error

type LocalSubWorker

type LocalSubWorker struct {
	// contains filtered or unexported fields
}

func (*LocalSubWorker) Close

func (w *LocalSubWorker) Close()

func (*LocalSubWorker) OnError

func (w *LocalSubWorker) OnError(handler func(channel string, err error))

func (*LocalSubWorker) OnMessage

func (w *LocalSubWorker) OnMessage(handler func(channel string, message string))

func (*LocalSubWorker) Subscribe

func (w *LocalSubWorker) Subscribe(ctx context.Context, channels ...string) error

func (*LocalSubWorker) Unsubscribe

func (w *LocalSubWorker) Unsubscribe(ctx context.Context, channels ...string) error

type OutboxCreate

type OutboxCreate[DB any] struct {

	// How long the outbox waits before pulling from the database again (default: 100 milliseconds).
	WaitDuration time.Duration

	// This function should save an event with an identifier to the database.
	Save func(database DB, messages []OutboxMessage) error

	// This is the main function handling the pulling of messages for the outbox.
	//
	// handler takes in a list of messages that you pulled from your database of choice in a transaction. It then returns which message identifiers were completed and an error for the first one that failed. Make sure your transaction skips any currently locked items in the table for the pull (the best pattern here is to just use one row / identifier to make sure no out-of-order stuff happens, if an insertion fails you can always just return a "already processing" error in most cases). Make sure to delete all the ones that have been completed.
	Tx func(database DB, handler func([]OutboxMessage) ([]string, error))
}

type OutboxMessage

type OutboxMessage struct {
	Identifier string
	Data       []byte
}

type PackedMessage

type PackedMessage interface {
	// contains filtered or unexported methods
}

type PoolConfig

type PoolConfig struct {
	// How many subscriptions should, at maximum, be done by one worker (default: 100)
	MaxAmountByWorker int
}

type PubSubOutbox

type PubSubOutbox[DB any, PS IPubSubBackend] struct {
	// contains filtered or unexported fields
}

func NewOutbox

func NewOutbox[T any, DB any, PS IPubSubBackend](instance *Instance[T, PS], connection DB, create OutboxCreate[DB]) *PubSubOutbox[DB, PS]

Create a new Outbox for pub/sub. This is a data structure that can make sure all of your pub/sub stays transactional no matter which pub/sub implementation to use. This works with basically any database. All you need to do is create tables for the Outbox and also make sure you implement all the functions as required by the create.

func (*PubSubOutbox[DB, PS]) Close

func (o *PubSubOutbox[DB, PS]) Close()

Stop the outbox from pulling from the database. After this you can not restart it.

func (*PubSubOutbox[DB, PS]) CreateWorker

func (o *PubSubOutbox[DB, PS]) CreateWorker() ISubWorker

This method just wraps the function from the backend so you can still use the outbox as a pub/sub backend

func (*PubSubOutbox[DB, PS]) Publish

func (o *PubSubOutbox[DB, PS]) Publish(c context.Context, identifier string, message string) error

WATCH OUT: This does not publish to the outbox. Use Save for that. This is the same as calling the Publish function on the original pub/sub backend.

func (*PubSubOutbox[DB, PS]) Save

func (o *PubSubOutbox[DB, PS]) Save(db DB, identifier string, data []byte) error

Save an event to the outbox. Use this for transactional pub/sub using the database.

func (*PubSubOutbox[DB, PS]) SaveMultiple

func (o *PubSubOutbox[DB, PS]) SaveMultiple(db DB, sends []PackedMessage) error

Method for inserting multiple things into the outbox with one query

type SubPool

type SubPool[PS IPubSubBackend] struct {
	// contains filtered or unexported fields
}

func NewPubSubPool

func NewPubSubPool[PS IPubSubBackend](backend PS, config PoolConfig) *SubPool[PS]

func (*SubPool[T]) Close

func (p *SubPool[T]) Close()

func (*SubPool[PS]) OnError

func (p *SubPool[PS]) OnError(handler func(channel string, err error))

OnError sets the error handler for all workers

func (*SubPool[PS]) OnMessage

func (p *SubPool[PS]) OnMessage(handler func(channel string, message string))

OnMessage sets the message handler for all workers

func (*SubPool[PS]) Subscribe

func (p *SubPool[PS]) Subscribe(ctx context.Context, channels ...string) error

Subscribe subscribes to channels, distributing them across workers

func (*SubPool[PS]) Unsubscribe

func (p *SubPool[PS]) Unsubscribe(ctx context.Context, channels ...string) error

Unsubscribe unsubscribes from channels

type Subscription

type Subscription[C Change[C]] interface {
	func(neogate.Event, Change[C]) | HydroAddress
}

A subscription can either be done using a callback or a Hydro address

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL