Documentation
¶
Index ¶
- Variables
- type BindType
- type Binding
- type Conduit
- type Encoder
- type Message
- type ServiceClient
- func (s *ServiceClient) Request(subject string, v any) *Message
- func (s *ServiceClient) RequestWithCtx(ctx context.Context, subject string, v any) *Message
- func (s *ServiceClient) RequestWithTimeout(subject string, v any, timeout time.Duration) *Message
- func (s *ServiceClient) Send(subject string, v any) error
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ErrBindingClosed = errors.New("binding is closed")
var MaxDecodeSize = int64(1024 * 1024 * 5)
MaxDecodeSize is the maximum number of bytes that will be read when decoding a message. The default is 5MB. Set to -1 to disable the limit.
Functions ¶
This section is empty.
Types ¶
type BindType ¶ added in v1.1.0
type BindType int
BindType specifies whether a binding receives all messages (broadcast) or only a share of messages (load-balanced).
const ( // BindTypeNormal means all instances receive each message (fan-out/broadcast). BindTypeNormal BindType = iota // BindTypeOnce is like BindTypeNormal but will auto-unbind after one message. BindTypeOnce // BindTypeQueue means only one instance receives each message (load-balanced). BindTypeQueue )
type Binding ¶
type Binding struct {
// contains filtered or unexported fields
}
Binding represents a subscription to messages on a specific subject. Bindings provide two ways to consume messages: Next() for blocking retrieval and To() for handler-based processing.
func (*Binding) Next ¶
Next blocks until the next message arrives and returns it. This is useful for processing messages sequentially in a loop.
func (*Binding) To ¶
To spawns a goroutine that calls the handler for each message. The handler runs asynchronously and continues until the binding is closed.
type Conduit ¶ added in v1.4.0
type Conduit struct {
// contains filtered or unexported fields
}
Conduit represents a service in the messaging system. Each instance has a unique name and can send/receive messages using a configured transport and encoder.
func New ¶ added in v1.4.0
New creates a new Conduit instance with the given service name, transport, and encoder. The service name identifies this service in the distributed system. The transport handles the underlying message delivery. The encoder serializes and deserializes message payloads.
func (*Conduit) Bind ¶ added in v1.4.0
Bind creates a binding that subscribes to broadcast messages on the given subject. All instances of the service receive each message (fan-out). Use this for events that all instances should process. The binding must be closed when no longer needed to free resources.
func (*Conduit) BindOnce ¶ added in v1.4.0
BindOnce creates a binding that subscribes to broadcast messages on the given subject. All instances of the service receive each message (fan-out). The binding will automatically unbind after receiving one message. Use this for events that only need to be processed once. The binding must be closed when no longer needed to free resources.
func (*Conduit) QueueBind ¶ added in v1.4.0
QueueBind creates a binding that subscribes to load-balanced messages on the given subject. Only one instance of the service receives each message (round-robin). Use this for work that should be distributed across instances. The binding must be closed when no longer needed to free resources.
func (*Conduit) Self ¶ added in v1.6.0
func (c *Conduit) Self() *ServiceClient
Self creates a client for communicating with instances of the service represented by this Conduit.
func (*Conduit) Service ¶ added in v1.4.0
func (c *Conduit) Service(remoteServiceName string) *ServiceClient
Service creates a client for communicating with a remote service. The returned ServiceClient provides methods for sending messages and making requests.
type Encoder ¶
type Encoder interface {
// Encode serializes v into bytes.
Encode(v any) ([]byte, error)
// Decode deserializes data into v.
Decode(data []byte, v any) error
}
Encoder defines the interface for message serialization and deserialization. Implementations include JSON, MessagePack, and Protocol Buffers encoders.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents an incoming message from another service. It provides methods to decode the message data, read it as a stream, or reply to the sender.
func (*Message) Into ¶
Into decodes the message data into v using the configured encoder. The message data is limited by MaxDecodeSize to prevent memory exhaustion. Returns any error that occurred during transport, reading, or decoding.
type ServiceClient ¶
type ServiceClient struct {
// contains filtered or unexported fields
}
ServiceClient provides methods for communicating with a specific remote service. It is created by calling Conduit.Service() with the target service name.
func (*ServiceClient) Request ¶
func (s *ServiceClient) Request(subject string, v any) *Message
Request sends a message and waits for a reply with a default 30-second timeout. The returned Message can be chained with Into() to decode the response. Use RequestWithTimeout or RequestWithCtx for custom timeout control.
func (*ServiceClient) RequestWithCtx ¶
RequestWithCtx sends a message and waits for a reply until the context is canceled. The returned Message can be chained with Into() to decode the response. Returns a Message with an error if the context is canceled or times out.
func (*ServiceClient) RequestWithTimeout ¶
RequestWithTimeout sends a message and waits for a reply with a custom timeout. The returned Message can be chained with Into() to decode the response. Returns a Message with an error if the timeout expires.
type Transport ¶
type Transport interface {
// Send delivers a message to the specified service and subject.
// The reader contains the message payload and will be consumed by the transport.
Send(serviceName, subject, sourceServiceName, replySubject string, reader io.Reader) error
// Handle registers a handler for broadcast messages (all instances receive).
// The handler receives the message metadata and an io.Reader for the payload.
Handle(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader))
// HandleQueue registers a handler for load-balanced messages (one instance receives).
// The handler receives the message metadata and an io.Reader for the payload.
HandleQueue(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader))
// Close cleans up resources and closes connections.
Close() error
}
Transport defines the interface for underlying message delivery mechanisms. Implementations handle the actual sending and receiving of messages between services.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
encoder
|
|
|
jsonencoder
Package json provides a JSON encoder for Conduit messages.
|
Package json provides a JSON encoder for Conduit messages. |
|
msgpackencoder
Package msgpack provides a MessagePack encoder for Conduit messages.
|
Package msgpack provides a MessagePack encoder for Conduit messages. |
|
protobufencoder
Package protobuf provides a Protocol Buffers encoder for Conduit messages.
|
Package protobuf provides a Protocol Buffers encoder for Conduit messages. |
|
transport
|
|
|
natstransport
Package nats provides a NATS transport implementation for Conduit.
|
Package nats provides a NATS transport implementation for Conduit. |