conduit

package module
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 8 Imported by: 0

README

Conduit

A lightweight, transport-agnostic messaging framework for Go. Build distributed systems with multiple encoding formats, streaming support, and pluggable transports like NATS.

Go Reference Go Report Card License Sponsor

Table of Contents

Features

  • 📡 Event-Driven Messaging - Broadcast events across services with automatic routing
  • 🌊 Streaming Support - Send messages of any size without loading into memory
  • 🔌 Pluggable Transports - Built-in NATS support, easily add RabbitMQ, Kafka, or Redis
  • 📦 Multiple Encoders - JSON, MessagePack, and Protocol Buffers included
  • ⚖️ Load Balancing - Queue bindings distribute work across service instances
  • 🔄 Request/Reply - Synchronous request/reply pattern when you need it

Installation

go get github.com/RobertWHurst/conduit

For NATS transport:

go get github.com/nats-io/nats.go

Quick Start

Here's a simple example showing event-driven communication between services over NATS:

User Service (broadcasts events):

// Connect to NATS
nc, _ := natsgo.Connect("nats://localhost:4222")

// Create conduit
conduit := conduit.New("user-service", natstransport.New(nc), jsonencoder.New())

// Broadcast user.created event when a user signs up
conduit.Service("notification-service").Send("user.created", UserCreatedEvent{
    UserID: 123,
    Email:  "[email protected]",
    Name:   "Alice",
})

Notification Service (listens for events):

// Connect to NATS
nc, _ := natsgo.Connect("nats://localhost:4222")

// Create conduit
conduit := conduit.New("notification-service", natstransport.New(nc), jsonencoder.New())

// Listen for user.created events
conduit.Bind("user.created").To(func(msg *conduit.Message) {
    var event UserCreatedEvent
    msg.Into(&event)
    
    // Send welcome email
    sendWelcomeEmail(event.Email, event.Name)
})

Core Concepts

Conduit

A conduit allows sending and receiving data from different services. Each conduit takes the name of the service it represents, a transport for facilitating communication, and an encoder for encoding and decoding structs.

Each service should have one conduit instance. The service name identifies your service to others in the distributed system. The transport handles the underlying message delivery (like NATS). The encoder serializes and deserializes your data structures.

conduit := conduit.New(
    "my-service",
    natstransport.New(natsConn),
    jsonencoder.New(),
)
defer conduit.Close()
Service Communication

To send messages to another service, create a service client using conduit.Service() with the target service name. This returns a ServiceClient that provides methods for sending one-way messages or making request/reply calls.

// Create a service client for "notification-service"
notificationService := conduit.Service("notification-service")

// Send one-way message
notificationService.Send("email.send", EmailRequest{
    To:      "[email protected]",
    Subject: "Welcome!",
})

// Request with reply (blocks until response or timeout)
var result EmailResult
notificationService.Request("email.send", emailReq).Into(&result)

// Custom timeout
notificationService.RequestWithTimeout("email.send", emailReq, 5*time.Second).Into(&result)

// Context-based cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
notificationService.RequestWithCtx(ctx, "email.send", emailReq).Into(&result)

// Communicate with instances of the same service
selfClient := conduit.Self()
selfClient.Send("cache.invalidate", CacheKey{Key: "user:123"})
Message Encoding

Encoders serialize and deserialize messages automatically. Send strongly-typed Go structs without manual marshaling.

Encoders handle five types of values:

  • Structs - Marshaled using the encoder (JSON, MessagePack, Protocol Buffers)
  • Strings - Sent as-is without encoding
  • Byte slices - Sent as-is without encoding
  • io.Reader - Streamed directly without buffering
  • nil - Send events without payload data
// Send a struct
conduit.Service("notification-service").Send("user.created", User{ID: 123, Name: "Alice"})

// Send a string
conduit.Service("log-service").Send("log.info", "User logged in")

// Send bytes
conduit.Service("analytics-service").Send("event.track", []byte{0x01, 0x02, 0x03})

// Stream a file
file, _ := os.Open("report.pdf")
conduit.Service("storage-service").Send("file.upload", file)

// Send event without payload
conduit.Service("cache-service").Send("cache.invalidate", nil)
Transports

Transports handle the underlying message delivery between services. Conduit includes a NATS transport with support for reliable messaging and streaming.

The transport interface is simple, making it straightforward to add support for other brokers like RabbitMQ, Redis, or Kafka.

The NATS transport uses a chunked streaming protocol to send messages of any size without loading them into memory. Messages are split into 16KB chunks and streamed between services.

nc, _ := natsgo.Connect("nats://localhost:4222")
conduit := conduit.New("my-service", natstransport.New(nc), jsonencoder.New())

Messaging Patterns

Send (Fire and Forget)

Send delivers one-way messages without waiting for a reply. This is ideal for broadcasting events, logging, and notifications where you don't need confirmation of processing.

// Broadcast login event
conduit.Service("analytics-service").Send("user.login", LoginEvent{
    UserID:    123,
    Timestamp: time.Now(),
    IPAddress: "192.168.1.1",
})

// Send log message
conduit.Service("log-service").Send("log.info", "User 123 logged in")
Request/Reply

Request/Reply is a synchronous pattern where the sender waits for a response. Use this when you need a result back from another service.

// Make request and wait for reply (30 second default timeout)
var result ProcessResult
if err := conduit.Service("worker-service").Request("job.process", job).Into(&result); err != nil {
    log.Fatal(err)
}

// Custom timeout
conduit.Service("worker-service").RequestWithTimeout("job.process", job, 5*time.Second).Into(&result)

// Context-based cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conduit.Service("worker-service").RequestWithCtx(ctx, "job.process", job).Into(&result)
Event Binding

Bind to subjects to receive messages. All service instances with the same binding will receive every message (broadcast).

Use Next() to process messages in a loop, or To() to handle messages with a callback:

// Option 1: Process messages in a loop
binding := conduit.Bind("user.created")
go func() {
    for {
        var event UserCreatedEvent
        if err := binding.Next().Into(&event); err != nil {
            if err == conduit.ErrBindingClosed {
                break
            }
            log.Printf("Failed to decode: %v", err)
            continue
        }
        updateCache(event)
    }
}()

// Option 2: Use a handler function
binding := conduit.Bind("user.created").To(func(msg *conduit.Message) {
    var event UserCreatedEvent
    msg.Into(&event)
    updateCache(event)
})

Unbind when done (safe to call multiple times):

binding := conduit.Bind("user.created")
defer binding.Unbind()

Bind with automatic cleanup using context:

// Binding automatically unbinds when context is cancelled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

binding := conduit.Bind("user.created").WithCtx(ctx).To(func(msg *conduit.Message) {
    var event UserCreatedEvent
    msg.Into(&event)
    updateCache(event)
})

// No need to manually unbind - happens automatically when ctx is cancelled

Note: WithCtx can only be called once per binding and will panic if called multiple times.

Queue Binding

Queue bindings distribute messages across service instances - only one instance receives each message. Use this for load balancing work.

// Each message goes to only one instance
conduit.QueueBind("job.process").To(func(msg *conduit.Message) {
    var job Job
    msg.Into(&job)
    processJob(job)
})

When to use:

  • Bind() - All instances receive the message (cache invalidation, config updates)
  • QueueBind() - One instance receives the message (job processing, work distribution)

Both can be used on the same subject simultaneously.

Message Handling

Decoding Messages

Use Into() to decode messages into Go structs. The decoder respects MaxDecodeSize (default 5MB) to prevent memory exhaustion.

conduit.Bind("order.created").To(func(msg *conduit.Message) {
    var event OrderCreatedEvent
    if err := msg.Into(&event); err != nil {
        log.Printf("Failed to decode: %v", err)
        return
    }
    
    processOrder(event)
})

Change the decode size limit:

conduit.MaxDecodeSize = 10 * 1024 * 1024 // 10MB
Reading Raw Data

Messages implement io.Reader for streaming large data without loading it into memory.

conduit.Bind("file.upload").To(func(msg *conduit.Message) {
    file, _ := os.Create("/tmp/upload")
    defer file.Close()
    
    io.Copy(file, msg)
    log.Println("File uploaded")
})
Replying to Messages

Use Reply() to respond to requests. Only messages sent via Request() have reply subjects - messages from Send() cannot be replied to.

conduit.Bind("job.process").To(func(msg *conduit.Message) {
    var job Job
    msg.Into(&job)
    
    result := processJob(job)
    
    if err := msg.Reply(result); err != nil {
        log.Printf("Failed to reply: %v", err)
    }
})

Reply() accepts structs, strings, byte slices, and io.Reader values.

Built-in Encoders

JSON Encoder

JSON encoding is human-readable and widely supported. Good for development and debugging.

import "github.com/RobertWHurst/conduit/encoder/jsonencoder"

conduit := conduit.New("my-service", transport, jsonencoder.New())

Use JSON when:

  • Human-readable messages are important
  • Broad compatibility is needed
  • Performance is not critical
MessagePack Encoder

MessagePack is a fast, compact binary format - approximately 5x faster than JSON with smaller message sizes.

import "github.com/RobertWHurst/conduit/encoder/msgpackencoder"

conduit := conduit.New("my-service", transport, msgpackencoder.New())

Use MessagePack when:

  • High throughput is needed
  • Bandwidth or memory is constrained
  • All services are under your control
Protocol Buffers Encoder

Protocol Buffers provides type safety, schema validation, and cross-language compatibility.

Define a schema:

syntax = "proto3";

message UserCreatedEvent {
  int64 user_id = 1;
  string email = 2;
  string name = 3;
}

Generate Go code:

protoc --go_out=. --go_opt=paths=source_relative events.proto

Use with Conduit:

import (
    "github.com/RobertWHurst/conduit/encoder/protobufencoder"
    pb "github.com/myuser/myapp/proto"
)

conduit := conduit.New("my-service", transport, protobufencoder.New())

// Send protobuf message
conduit.Service("notification-service").Send("user.created", &pb.UserCreatedEvent{
    UserId: 123,
    Email:  "[email protected]",
    Name:   "Alice",
})

Use Protocol Buffers when:

  • Type-safe schemas are needed
  • Supporting multiple languages
  • Backward/forward compatibility is important

Built-in Transports

NATS Transport

The NATS transport provides reliable, high-performance messaging with support for streaming, request/reply, and events.

import (
    "github.com/RobertWHurst/conduit/transport/natstransport"
    natsgo "github.com/nats-io/nats.go"
)

nc, _ := natsgo.Connect("nats://localhost:4222")
transport := natstransport.New(nc)
conduit := conduit.New("my-service", transport, jsonencoder.New())

Features:

  • At-most-once delivery
  • Subject-based routing
  • Clustering for high availability
  • TLS encryption and authentication
Chunked Streaming

The NATS transport streams messages of any size without loading them into memory. Data is sent in 16KB chunks.

// Stream large file
file, _ := os.Open("large-file.dat")
conduit.Service("storage-service").Send("file.store", file)

Receiver streams directly to disk:

conduit.Bind("file.store").To(func(msg *conduit.Message) {
    outFile, _ := os.Create("uploaded-file.dat")
    defer outFile.Close()
    io.Copy(outFile, msg)
})

Protocol details:

  • Chunk size: 16KB (configurable via nats.ChunkSize)
  • Send timeout: 5 seconds (configurable via nats.SendTimeout)
  • Subject format: conduit.<service-name>

Advanced Usage

Custom Encoders

Implement the Encoder interface to add support for other formats:

type Encoder interface {
    Encode(v any) ([]byte, error)
    Decode(data []byte, v any) error
}

type MyEncoder struct{}

func (e *MyEncoder) Encode(v any) ([]byte, error) {
    // Your encoding logic
    return encoded, nil
}

func (e *MyEncoder) Decode(data []byte, v any) error {
    // Your decoding logic
    return nil
}

conduit := conduit.New("my-service", transport, &MyEncoder{})
Custom Transports

Implement the Transport interface to add support for other message brokers:

type Transport interface {
    Send(serviceName, subject, sourceServiceName, replySubject string, reader io.Reader) error
    Handle(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader))
    HandleQueue(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader))
    Close() error
}

type MyTransport struct{}

func (t *MyTransport) Send(serviceName, subject, sourceServiceName, replySubject string, reader io.Reader) error {
    data, _ := io.ReadAll(reader)
    // Send to your message broker
    return nil
}

func (t *MyTransport) Handle(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader)) {
    // Subscribe to broadcast messages
}

func (t *MyTransport) HandleQueue(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader)) {
    // Subscribe to queue messages
}

func (t *MyTransport) Close() error {
    return nil
}

Help Welcome

If you want to support this project with coffee money, it's greatly appreciated.

sponsor

If you're interested in providing feedback or would like to contribute, please feel free to do so. I recommend first opening an issue expressing your feedback or intent to contribute a change. From there we can consider your feedback or guide your contribution efforts. Any and all help is greatly appreciated.

Thank you!

License

MIT License - see LICENSE for details.

  • Navaros - HTTP framework for Go with powerful pattern matching
  • Velaros - WebSocket framework for Go with message routing
  • Zephyr - Microservice framework built on Navaros with service discovery and streaming
  • Eurus - WebSocket API gateway framework (upcoming, integrates with Velaros)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBindingClosed = errors.New("binding is closed")
View Source
var MaxDecodeSize = int64(1024 * 1024 * 5)

MaxDecodeSize is the maximum number of bytes that will be read when decoding a message. The default is 5MB. Set to -1 to disable the limit.

Functions

This section is empty.

Types

type BindType added in v1.1.0

type BindType int

BindType specifies whether a binding receives all messages (broadcast) or only a share of messages (load-balanced).

const (
	// BindTypeNormal means all instances receive each message (fan-out/broadcast).
	BindTypeNormal BindType = iota
	// BindTypeOnce is like BindTypeNormal but will auto-unbind after one message.
	BindTypeOnce
	// BindTypeQueue means only one instance receives each message (load-balanced).
	BindTypeQueue
)

type Binding

type Binding struct {
	// contains filtered or unexported fields
}

Binding represents a subscription to messages on a specific subject. Bindings provide two ways to consume messages: Next() for blocking retrieval and To() for handler-based processing.

func (*Binding) IsBound added in v1.4.0

func (b *Binding) IsBound() bool

IsBound returns true if the binding is currently active.

func (*Binding) Next

func (b *Binding) Next() *Message

Next blocks until the next message arrives and returns it. This is useful for processing messages sequentially in a loop.

func (*Binding) To

func (b *Binding) To(handler func(msg *Message)) *Binding

To spawns a goroutine that calls the handler for each message. The handler runs asynchronously and continues until the binding is closed.

func (*Binding) Unbind added in v1.3.0

func (b *Binding) Unbind()

Unbind unsubscribes from messages and frees resources. Any goroutines spawned by To() will exit after Close is called.

func (*Binding) WithCtx added in v1.7.0

func (b *Binding) WithCtx(ctx context.Context) *Binding

type Conduit added in v1.4.0

type Conduit struct {
	// contains filtered or unexported fields
}

Conduit represents a service in the messaging system. Each instance has a unique name and can send/receive messages using a configured transport and encoder.

func New added in v1.4.0

func New(serviceName string, transport Transport, encoder Encoder) *Conduit

New creates a new Conduit instance with the given service name, transport, and encoder. The service name identifies this service in the distributed system. The transport handles the underlying message delivery. The encoder serializes and deserializes message payloads.

func (*Conduit) Bind added in v1.4.0

func (c *Conduit) Bind(eventName string) *Binding

Bind creates a binding that subscribes to broadcast messages on the given subject. All instances of the service receive each message (fan-out). Use this for events that all instances should process. The binding must be closed when no longer needed to free resources.

func (*Conduit) BindOnce added in v1.4.0

func (c *Conduit) BindOnce(eventName string) *Binding

BindOnce creates a binding that subscribes to broadcast messages on the given subject. All instances of the service receive each message (fan-out). The binding will automatically unbind after receiving one message. Use this for events that only need to be processed once. The binding must be closed when no longer needed to free resources.

func (*Conduit) Close added in v1.4.0

func (c *Conduit) Close() error

Close closes the conduit instance and cleans up resources.

func (*Conduit) QueueBind added in v1.4.0

func (c *Conduit) QueueBind(eventName string) *Binding

QueueBind creates a binding that subscribes to load-balanced messages on the given subject. Only one instance of the service receives each message (round-robin). Use this for work that should be distributed across instances. The binding must be closed when no longer needed to free resources.

func (*Conduit) Self added in v1.6.0

func (c *Conduit) Self() *ServiceClient

Self creates a client for communicating with instances of the service represented by this Conduit.

func (*Conduit) Service added in v1.4.0

func (c *Conduit) Service(remoteServiceName string) *ServiceClient

Service creates a client for communicating with a remote service. The returned ServiceClient provides methods for sending messages and making requests.

type Encoder

type Encoder interface {
	// Encode serializes v into bytes.
	Encode(v any) ([]byte, error)

	// Decode deserializes data into v.
	Decode(data []byte, v any) error
}

Encoder defines the interface for message serialization and deserialization. Implementations include JSON, MessagePack, and Protocol Buffers encoders.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents an incoming message from another service. It provides methods to decode the message data, read it as a stream, or reply to the sender.

func (*Message) Into

func (m *Message) Into(v any) error

Into decodes the message data into v using the configured encoder. The message data is limited by MaxDecodeSize to prevent memory exhaustion. Returns any error that occurred during transport, reading, or decoding.

func (*Message) Read

func (m *Message) Read(p []byte) (n int, err error)

Read implements io.Reader, allowing the message to be read as a stream. This is useful for large messages that should not be loaded entirely into memory.

func (*Message) Reply

func (m *Message) Reply(v any) error

Reply sends a response back to the original sender. Only messages received via Request have a reply subject. The value v can be a struct (encoded), string, []byte, or io.Reader.

type ServiceClient

type ServiceClient struct {
	// contains filtered or unexported fields
}

ServiceClient provides methods for communicating with a specific remote service. It is created by calling Conduit.Service() with the target service name.

func (*ServiceClient) Request

func (s *ServiceClient) Request(subject string, v any) *Message

Request sends a message and waits for a reply with a default 30-second timeout. The returned Message can be chained with Into() to decode the response. Use RequestWithTimeout or RequestWithCtx for custom timeout control.

func (*ServiceClient) RequestWithCtx

func (s *ServiceClient) RequestWithCtx(ctx context.Context, subject string, v any) *Message

RequestWithCtx sends a message and waits for a reply until the context is canceled. The returned Message can be chained with Into() to decode the response. Returns a Message with an error if the context is canceled or times out.

func (*ServiceClient) RequestWithTimeout

func (s *ServiceClient) RequestWithTimeout(subject string, v any, timeout time.Duration) *Message

RequestWithTimeout sends a message and waits for a reply with a custom timeout. The returned Message can be chained with Into() to decode the response. Returns a Message with an error if the timeout expires.

func (*ServiceClient) Send

func (s *ServiceClient) Send(subject string, v any) error

Send sends a fire-and-forget message to the remote service. The value v can be a struct (encoded), string, []byte, or io.Reader. Returns an error if encoding or sending fails.

type Transport

type Transport interface {
	// Send delivers a message to the specified service and subject.
	// The reader contains the message payload and will be consumed by the transport.
	Send(serviceName, subject, sourceServiceName, replySubject string, reader io.Reader) error

	// Handle registers a handler for broadcast messages (all instances receive).
	// The handler receives the message metadata and an io.Reader for the payload.
	Handle(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader))

	// HandleQueue registers a handler for load-balanced messages (one instance receives).
	// The handler receives the message metadata and an io.Reader for the payload.
	HandleQueue(serviceName string, handler func(sourceServiceName, subject, replySubject string, reader io.Reader))

	// Close cleans up resources and closes connections.
	Close() error
}

Transport defines the interface for underlying message delivery mechanisms. Implementations handle the actual sending and receiving of messages between services.

Directories

Path Synopsis
encoder
jsonencoder
Package json provides a JSON encoder for Conduit messages.
Package json provides a JSON encoder for Conduit messages.
msgpackencoder
Package msgpack provides a MessagePack encoder for Conduit messages.
Package msgpack provides a MessagePack encoder for Conduit messages.
protobufencoder
Package protobuf provides a Protocol Buffers encoder for Conduit messages.
Package protobuf provides a Protocol Buffers encoder for Conduit messages.
transport
natstransport
Package nats provides a NATS transport implementation for Conduit.
Package nats provides a NATS transport implementation for Conduit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL