flux

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MIT Imports: 12 Imported by: 0

README

flux

Scoped Ambient State for Go -- a scope-local atom graph with explicit dependencies and opt-in reactivity.

State lives in the scope, not in the caller. Handlers observe -- they don't own or construct dependencies. The same graph works across HTTP servers, background jobs, CLI tools, and tests.

Servers -- atoms are infrastructure singletons (db pool, cache client, config). Runtime config enters the scope as tags; atoms consume it via Required(tag). Contexts bound per request carry tags (tenantId, traceId) without parameter drilling. Extensions wrap every resolve/exec for logging, tracing, auth. Cleanup is guaranteed.

Workers -- atoms form a dependency graph (processor <- queue <- config). Controllers enable live config reload; invalidation cascades to dependents automatically. Workers are projections of state, not owners.

Both -- presets swap any atom/flow for testing or multi-tenant isolation. Tags carry runtime config; presets replace implementations. No mocks, no test-only code paths.

go get github.com/pumped-fn/flux

How It Works

sequenceDiagram
    participant App
    participant Scope
    participant Ext as Extension
    participant Atom
    participant Ctx as ExecContext
    participant Child as ChildContext
    participant Flow
    participant Ctrl as Controller

    Note over App,Ctrl: (*) stable ref -- same identity until released

    %% Scope Creation & Extension Init
    App->>Scope: NewScope(ctx, WithExtensions, WithPresets, WithScopeTags, WithGC)
    Scope-->>App: Scope (sync return)

    loop each extension (sequential)
        Scope->>Ext: ext.Init(scope)
    end
    Note right of Scope: all Init() done -> scope.Ready() returns nil
    Note right of Scope: any Init() fails -> scope.Ready() returns error
    Note right of Scope: Resolve() auto-awaits scope.Ready()

    %% Observers (register before or after resolve)
    App->>Scope: scope.On(StateResolving | StateResolved | StateFailed, atom, listener)
    Scope-->>App: UnsubscribeFunc
    Note right of Scope: scope.On listens to AtomState transitions

    %% Atom Resolution
    Note right of Scope: singletons -- created once, reused across contexts. deps can include Required(tag)
    App->>Scope: Resolve(scope, atom)
    Scope->>Scope: cache hit? -> return cached
    alt preset hit
        Scope->>Scope: Preset value -> store directly, skip factory
        Scope->>Scope: PresetAtom -> resolve replacement atom instead
        Scope->>Scope: emit StateResolved (no StateResolving)
    end
    Scope->>Scope: state -> StateResolving
    Scope->>Scope: emit StateResolving -> scope.On listeners
    Scope->>Ext: WrapResolve(next, atom, scope)
    Ext->>Atom: next() -> factory(rc)
    Note right of Atom: rc.Cleanup(fn) -> stored per atom
    Note right of Atom: cleanups run LIFO on Release/Invalidate
    Atom-->>Ext: value
    Note right of Ext: ext returns value -- may transform or replace
    alt factory succeeds
        Ext-->>Scope: value (*) cached in entry
        Scope->>Scope: state -> StateResolved
        Scope->>Scope: emit StateResolved -> scope.On + ctrl.On listeners
    else factory returns error
        Atom-->>Scope: error
        Scope->>Scope: state -> StateFailed
        Scope->>Scope: emit StateFailed -> scope.On listeners (ctrl.On EventAll only)
    end

    %% Context Creation
    Note right of Scope: HTTP request, job, transaction -- groups exec calls with shared tags + guaranteed cleanup
    App->>Scope: scope.CreateContext(WithContextTags(...))
    Scope-->>App: *ExecContext

    %% Execution
    alt ExecFlow(ec, flow, input, opts...)
        Ctx->>Ctx: preset? -> PresetFlow: re-exec with replacement / PresetFlowFn: run as factory
        Ctx->>Ctx: flow.parse(input) if defined
        Ctx->>Child: create child (parent = ec, merged tags)
        Child->>Ext: WrapExec(next, flow, childCtx)
        Ext->>Flow: next() -> factory(child, input)
        Note right of Flow: child.OnClose(fn) -> fn receives cause error
        Flow-->>Ext: output
        Note right of Ext: ext returns output -- may transform or replace
        Ext-->>Child: output
    else ExecFn(ec, fn, opts...)
        Ctx->>Child: create child (parent = ec)
        Child->>Ext: WrapExec(next, fn, childCtx)
        Ext->>Child: next() -> fn(child)
        Child-->>Ext: result
    end
    Ctx->>Child: [A] child.Close(cause) -> run OnClose callbacks LIFO
    Child-->>Ctx: output
    Ctx-->>App: output

    %% Reactivity (opt-in)
    rect rgb(240, 248, 255)
        Note over App,Ctrl: Reactivity (opt-in -- atoms are static by default)
        Note right of Scope: live config, cache invalidation -- when values change after initial resolve
        App->>Scope: GetController(scope, atom)
        Scope-->>Ctrl: *Controller[T] (*)

        App->>Ctrl: ctrl.On(EventResolving | EventResolved | EventAll, listener)
        Ctrl-->>App: UnsubscribeFunc
        Note right of Ctrl: ctrl.On listens to per-atom entry events

        App->>Ctrl: ctrl.Set(v) / ctrl.Update(fn)
        Ctrl->>Scope: scheduleInvalidation
        Scope->>Scope: run atom cleanups (LIFO)
        Scope->>Scope: emit StateResolving -> scope.On + ctrl.On
        Scope->>Atom: apply new value (skip factory)
        Scope->>Scope: state -> StateResolved
        Scope->>Scope: emit StateResolved -> scope.On + ctrl.On

        App->>Ctrl: ctrl.Invalidate()
        Ctrl->>Scope: scheduleInvalidation
        Scope->>Scope: run atom cleanups (LIFO)
        Scope->>Scope: emit StateResolving -> scope.On + ctrl.On
        Scope->>Atom: re-run factory
        Scope->>Scope: state -> StateResolved
        Scope->>Scope: emit StateResolved -> scope.On + ctrl.On

        App->>Scope: Select(scope, atom, selector)
        Scope-->>App: *SelectHandle[S] { Get, Subscribe }
    end

    %% Cleanup & Teardown
    rect rgb(255, 245, 238)
        Note over App,Scope: Teardown
        App->>Ctx: ec.Close(cause) -- same as [A]
        Ctx->>Ctx: run OnClose callbacks (LIFO, idempotent via sync.Once)

        App->>Scope: Release(scope, atom)
        Scope->>Scope: run atom cleanups (LIFO)
        Scope->>Scope: remove from cache + controllers

        App->>Scope: scope.Flush()
        Note right of Scope: await pending invalidation chain

        App->>Scope: scope.Dispose()
        loop each extension
            Scope->>Ext: ext.Dispose(scope) (5s timeout)
        end
        Scope->>Scope: release all atoms, run all cleanups
    end

API reference: go doc github.com/pumped-fn/flux | Patterns: PATTERNS.md | Examples: examples/

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotResolved = errors.New("flux: atom not resolved — call Resolve before accessing value")
	ErrClosed      = errors.New("flux: exec context is closed")
	ErrScopeClosed = errors.New("flux: scope has been disposed")
)
View Source
var NameTag = &Tag[string]{label: "name", key: "@flux/name"}

Functions

func Acquire

func Acquire[T any](ec *ExecContext, r *Resource[T]) (T, error)

Acquire lazily resolves a Resource within an ExecContext. On first call, runs the factory and caches the result in ec.Data(). Subsequent calls (or calls from child ExecContexts) return the cached value. Safe for concurrent use from multiple goroutines on the same ExecContext. Errors are NOT cached — a failed Acquire can be retried.

func DeleteTag

func DeleteTag[T any](d *ContextData, tag *Tag[T]) bool

func ExecFlow

func ExecFlow[In, Out any](ec *ExecContext, f *Flow[In, Out], input In, opts ...ExecOption) (Out, error)

func ExecFn

func ExecFn[Out any](ec *ExecContext, fn func(*ExecContext) (Out, error), opts ...ExecOption) (Out, error)

func GetOrSetTag

func GetOrSetTag[T any](d *ContextData, tag *Tag[T], values ...T) T

func GetTag

func GetTag[T any](d *ContextData, tag *Tag[T]) (T, bool)

func HasTag

func HasTag[T any](d *ContextData, tag *Tag[T]) bool

func MustAcquire

func MustAcquire[T any](ec *ExecContext, r *Resource[T]) T

MustAcquire is like Acquire but panics on error.

func MustResolve

func MustResolve[T any](scope Scope, atom *Atom[T]) T

func Release

func Release[T any](scope Scope, atom *Atom[T]) error

func Resolve

func Resolve[T any](scope Scope, atom *Atom[T]) (T, error)

func ResolveAny

func ResolveAny(scope Scope, atom AnyAtom) (any, error)

func ResolveAnyFrom

func ResolveAnyFrom(rc *ResolveContext, atom AnyAtom) (any, error)

func ResolveFrom

func ResolveFrom[T any](rc *ResolveContext, atom *Atom[T]) (T, error)

func SeekTag

func SeekTag[T any](d *ContextData, tag *Tag[T]) (T, bool)

func SetTag

func SetTag[T any](d *ContextData, tag *Tag[T], value T)

Types

type AnyAtom

type AnyAtom interface {
	Name() string
	ID() uint64
	Deps() []Resolvable
	// contains filtered or unexported methods
}

type AnyExecTarget

type AnyExecTarget interface {
	ExecTargetName() string
}

type AnyFlow

type AnyFlow interface {
	Name() string
	ID() uint64
	Deps() []Resolvable
	// contains filtered or unexported methods
}

type AnyTagged

type AnyTagged interface {
	TagKey() string
	TagValue() any
}

type Atom

type Atom[T any] struct {
	// contains filtered or unexported fields
}

func NewAtom

func NewAtom[T any](factory func(*ResolveContext) (T, error), opts ...AtomOption) *Atom[T]

func NewAtomFrom

func NewAtomFrom[D1, T any](dep1 *Atom[D1], factory func(*ResolveContext, D1) (T, error), opts ...AtomOption) *Atom[T]

func NewAtomFrom2

func NewAtomFrom2[D1, D2, T any](dep1 *Atom[D1], dep2 *Atom[D2], factory func(*ResolveContext, D1, D2) (T, error), opts ...AtomOption) *Atom[T]

func NewAtomFrom3

func NewAtomFrom3[D1, D2, D3, T any](dep1 *Atom[D1], dep2 *Atom[D2], dep3 *Atom[D3], factory func(*ResolveContext, D1, D2, D3) (T, error), opts ...AtomOption) *Atom[T]

func NewAtomUnsafe

func NewAtomUnsafe[T any](deps []Resolvable, factory any, opts ...AtomOption) *Atom[T]

func Required

func Required[T any](tag *Tag[T]) *Atom[T]

func (*Atom) Deps

func (a *Atom) Deps() []Resolvable

func (*Atom) ID

func (a *Atom) ID() uint64

func (*Atom) Name

func (a *Atom) Name() string

type AtomOption

type AtomOption func(*atomBase)

func WithAtomName

func WithAtomName(name string) AtomOption

func WithAtomTags

func WithAtomTags(tags ...AnyTagged) AtomOption

func WithKeepAlive

func WithKeepAlive() AtomOption

type AtomState

type AtomState int
const (
	StateIdle AtomState = iota
	StateResolving
	StateResolved
	StateFailed
)

func (AtomState) String

func (s AtomState) String() string

type CircularDepError

type CircularDepError struct {
	Path []uint64
}

func (*CircularDepError) Error

func (e *CircularDepError) Error() string

type ContextData

type ContextData struct {
	// contains filtered or unexported fields
}

func (*ContextData) All

func (d *ContextData) All() iter.Seq2[string, any]

func (*ContextData) Clear

func (d *ContextData) Clear()

func (*ContextData) Delete

func (d *ContextData) Delete(key string) bool

func (*ContextData) Get

func (d *ContextData) Get(key string) (any, bool)

func (*ContextData) Has

func (d *ContextData) Has(key string) bool

func (*ContextData) Seek

func (d *ContextData) Seek(key string) (any, bool)

func (*ContextData) SeekAll

func (d *ContextData) SeekAll(key string) iter.Seq[any]

func (*ContextData) Set

func (d *ContextData) Set(key string, value any)

type Controller

type Controller[T any] struct {
	// contains filtered or unexported fields
}

func GetController

func GetController[T any](s Scope, atom *Atom[T]) *Controller[T]

func GetControllerResolved

func GetControllerResolved[T any](s Scope, atom *Atom[T]) (*Controller[T], error)

func (*Controller[T]) Get

func (c *Controller[T]) Get() (T, error)

func (*Controller[T]) Invalidate

func (c *Controller[T]) Invalidate()

func (*Controller[T]) On

func (c *Controller[T]) On(event ControllerEvent, listener func()) UnsubscribeFunc

func (*Controller[T]) Release

func (c *Controller[T]) Release() error

func (*Controller[T]) Resolve

func (c *Controller[T]) Resolve() (T, error)

func (*Controller[T]) Set

func (c *Controller[T]) Set(value T) error

func (*Controller[T]) State

func (c *Controller[T]) State() AtomState

func (*Controller[T]) Update

func (c *Controller[T]) Update(fn func(T) T) error

type ControllerEvent

type ControllerEvent int
const (
	EventResolving ControllerEvent = iota
	EventResolved
	EventAll
)

func (ControllerEvent) String

func (e ControllerEvent) String() string

type CreateContextOption

type CreateContextOption func(*ecOptions)

func WithContextTags

func WithContextTags(tags ...AnyTagged) CreateContextOption

type Disposer

type Disposer interface {
	Dispose(s Scope) error
}

type ExecContext

type ExecContext struct {
	// contains filtered or unexported fields
}

func (*ExecContext) Close

func (ec *ExecContext) Close(cause error) error

func (*ExecContext) Closed

func (ec *ExecContext) Closed() bool

func (*ExecContext) Context

func (ec *ExecContext) Context() context.Context

func (*ExecContext) Data

func (ec *ExecContext) Data() *ContextData

func (*ExecContext) Depth

func (ec *ExecContext) Depth() int

func (*ExecContext) Input

func (ec *ExecContext) Input() any

func (*ExecContext) Name

func (ec *ExecContext) Name() string

func (*ExecContext) OnClose

func (ec *ExecContext) OnClose(fn func(error) error)

func (*ExecContext) Parent

func (ec *ExecContext) Parent() *ExecContext

func (*ExecContext) Root

func (ec *ExecContext) Root() *ExecContext

func (*ExecContext) Scope

func (ec *ExecContext) Scope() Scope

type ExecOption

type ExecOption func(*execOpts)

func WithExecName

func WithExecName(name string) ExecOption

func WithExecTags

func WithExecTags(tags ...AnyTagged) ExecOption

type ExecWrapper

type ExecWrapper interface {
	WrapExec(next func() (any, error), target AnyExecTarget, ec *ExecContext) (any, error)
}

type Extension

type Extension interface {
	Name() string
}

type Flow

type Flow[In, Out any] struct {
	// contains filtered or unexported fields
}

func NewFlow

func NewFlow[In, Out any](factory func(*ExecContext, In) (Out, error), opts ...FlowOption) *Flow[In, Out]

func NewFlowFrom

func NewFlowFrom[D1, In, Out any](dep1 FlowDep[D1], factory func(*ExecContext, In, D1) (Out, error), opts ...FlowOption) *Flow[In, Out]

NewFlowFrom creates a Flow with one dependency (Atom or Resource).

func NewFlowFrom2

func NewFlowFrom2[D1, D2, In, Out any](dep1 FlowDep[D1], dep2 FlowDep[D2], factory func(*ExecContext, In, D1, D2) (Out, error), opts ...FlowOption) *Flow[In, Out]

NewFlowFrom2 creates a Flow with two dependencies (Atom or Resource).

func NewFlowFrom3

func NewFlowFrom3[D1, D2, D3, In, Out any](dep1 FlowDep[D1], dep2 FlowDep[D2], dep3 FlowDep[D3], factory func(*ExecContext, In, D1, D2, D3) (Out, error), opts ...FlowOption) *Flow[In, Out]

NewFlowFrom3 creates a Flow with three dependencies (Atom or Resource).

func NewFlowUnsafe

func NewFlowUnsafe[In, Out any](deps []Resolvable, factory any, opts ...FlowOption) *Flow[In, Out]

func (*Flow) Deps

func (f *Flow) Deps() []Resolvable

func (*Flow[In, Out]) ExecTargetName

func (f *Flow[In, Out]) ExecTargetName() string

func (*Flow) ID

func (f *Flow) ID() uint64

func (*Flow) Name

func (f *Flow) Name() string

type FlowDep

type FlowDep[T any] interface {
	Resolvable
	// contains filtered or unexported methods
}

FlowDep[T] is a typed dependency that can be resolved from an ExecContext. Both *Atom[T] and *Resource[T] implement this interface, so NewFlowFrom and NewFlowFrom2 accept either as a dependency.

type FlowOption

type FlowOption func(*flowBase)

func WithFlowName

func WithFlowName(name string) FlowOption

func WithFlowTags

func WithFlowTags(tags ...AnyTagged) FlowOption

func WithParse

func WithParse[In any](parse func(any) (In, error)) FlowOption

type GCOptions

type GCOptions struct {
	Enabled bool
	GraceMs int
}

type Initializer

type Initializer interface {
	Init(s Scope) error
}

type InvalidationLoopError

type InvalidationLoopError struct {
	Path []string
}

func (*InvalidationLoopError) Error

func (e *InvalidationLoopError) Error() string

type ParseError

type ParseError struct {
	Phase string
	Label string
	Cause error
}

func (*ParseError) Error

func (e *ParseError) Error() string

func (*ParseError) Unwrap

func (e *ParseError) Unwrap() error

type PresetOption

type PresetOption struct {
	// contains filtered or unexported fields
}

func Preset

func Preset[T any](target *Atom[T], value T) PresetOption

func PresetAtom

func PresetAtom[T any](target *Atom[T], replacement *Atom[T]) PresetOption

func PresetFlow

func PresetFlow[In, Out any](target *Flow[In, Out], replacement *Flow[In, Out]) PresetOption

func PresetFlowFn

func PresetFlowFn[In, Out any](target *Flow[In, Out], fn func(*ExecContext) (Out, error)) PresetOption

type Resolvable

type Resolvable interface {
	Name() string
	ID() uint64
}

Resolvable is anything that can appear as a named dependency. Both *Atom[T] and *Resource[T] implement this interface.

type ResolveContext

type ResolveContext struct {
	// contains filtered or unexported fields
}

func (*ResolveContext) Atom

func (rc *ResolveContext) Atom() AnyAtom

func (*ResolveContext) Cleanup

func (rc *ResolveContext) Cleanup(fn func() error)

func (*ResolveContext) Context

func (rc *ResolveContext) Context() context.Context

func (*ResolveContext) Data

func (rc *ResolveContext) Data() *ContextData

func (*ResolveContext) Invalidate

func (rc *ResolveContext) Invalidate()

func (*ResolveContext) Scope

func (rc *ResolveContext) Scope() Scope

type ResolveWrapper

type ResolveWrapper interface {
	WrapResolve(next func() (any, error), atom AnyAtom, s Scope) (any, error)
}

type Resource

type Resource[T any] struct {
	// contains filtered or unexported fields
}

Resource represents a lazily-resolved, ExecContext-scoped value. Unlike Atom (which is cached per Scope), a Resource is cached per ExecContext and inherited by child contexts via the ContextData parent chain.

func NewResource

func NewResource[T any](name string, factory func(*ExecContext) (T, error)) *Resource[T]

NewResource creates a new Resource with the given name and factory function. The factory receives the ExecContext and can access scope-level atoms, register cleanup via OnClose, and chain to other resources via Acquire.

func NewResourceFrom

func NewResourceFrom[D1, T any](dep1 FlowDep[D1], name string, factory func(*ExecContext, D1) (T, error)) *Resource[T]

NewResourceFrom creates a Resource with one dependency (Resource or Atom).

func NewResourceFrom2

func NewResourceFrom2[D1, D2, T any](dep1 FlowDep[D1], dep2 FlowDep[D2], name string, factory func(*ExecContext, D1, D2) (T, error)) *Resource[T]

NewResourceFrom2 creates a Resource with two dependencies (Resource or Atom).

func (*Resource[T]) ID

func (r *Resource[T]) ID() uint64

ID returns the resource's unique identifier.

func (*Resource[T]) Name

func (r *Resource[T]) Name() string

Name returns the resource's name.

type Scope

type Scope interface {
	Ready() error
	ReadyErr() error
	Tags() []AnyTagged
	CreateContext(opts ...CreateContextOption) *ExecContext
	On(state AtomState, atom AnyAtom, listener func()) UnsubscribeFunc
	Flush() error
	Dispose() error
}

func NewScope

func NewScope(parentCtx context.Context, opts ...ScopeOption) Scope

type ScopeOption

type ScopeOption func(*scopeOpts)

func WithExtensions

func WithExtensions(exts ...Extension) ScopeOption

func WithGC

func WithGC(opts GCOptions) ScopeOption

func WithPresets

func WithPresets(presets ...PresetOption) ScopeOption

func WithScopeTags

func WithScopeTags(tags ...AnyTagged) ScopeOption

type SelectHandle

type SelectHandle[S any] struct {
	// contains filtered or unexported fields
}

func Select

func Select[T any, S comparable](s Scope, atom *Atom[T], selector func(T) S) *SelectHandle[S]

func SelectWith

func SelectWith[T any, S any](s Scope, atom *Atom[T], selector func(T) S, eq func(S, S) bool) *SelectHandle[S]

func (*SelectHandle[S]) Get

func (h *SelectHandle[S]) Get() S

func (*SelectHandle[S]) Subscribe

func (h *SelectHandle[S]) Subscribe(listener func()) UnsubscribeFunc

type Tag

type Tag[T any] struct {
	// contains filtered or unexported fields
}

func NewTag

func NewTag[T any](label string) *Tag[T]

func NewTagWithDefault

func NewTagWithDefault[T any](label string, def T) *Tag[T]

func NewTagWithParse

func NewTagWithParse[T any](label string, parse func(any) (T, error)) *Tag[T]

func (*Tag[T]) All

func (t *Tag[T]) All(source []AnyTagged) iter.Seq[T]

func (*Tag[T]) Collect

func (t *Tag[T]) Collect(source []AnyTagged) []T

func (*Tag[T]) Find

func (t *Tag[T]) Find(source []AnyTagged) (T, bool)

func (*Tag[T]) Get

func (t *Tag[T]) Get(source []AnyTagged) (T, error)

func (*Tag[T]) Key

func (t *Tag[T]) Key() string

func (*Tag[T]) Label

func (t *Tag[T]) Label() string

func (*Tag[T]) Value

func (t *Tag[T]) Value(v T) *Tagged[T]

type Tagged

type Tagged[T any] struct {
	// contains filtered or unexported fields
}

func (*Tagged[T]) TagKey

func (t *Tagged[T]) TagKey() string

func (*Tagged[T]) TagValue

func (t *Tagged[T]) TagValue() any

type UnsubscribeFunc

type UnsubscribeFunc func()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL