wire

package module
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: Apache-2.0 Imports: 25 Imported by: 14

README

PSQL wire protocol 🔌

CI Go Reference Latest release Go Report Card

A pure Go PostgreSQL server wire protocol implementation. Build your own PostgreSQL server within a few lines of code. This project attempts to make it as straight forward as possible to set-up and configure your own PSQL server. Feel free to check out the examples directory for various ways on how to configure/set-up your own server.

You can use this package to build your own fully fledged, PostgreSQL compatible database, or simply play with the wire protocol by creating a PSQL honeypot, testing drivers, or experimenting with third-party integrations.

It’s designed to give you a high-level implementation out of the box, while staying hackable so you can bend it to whatever you want to build.

🚧 This project does not include a PSQL parser. Please check out other projects such as auxten/postgresql-parser to parse PSQL SQL queries.

package main

import (
	"context"
	"fmt"

	wire "github.com/jeroenrinzema/psql-wire"
)

func main() {
	wire.ListenAndServe("127.0.0.1:5432", handler)
}

func handler(ctx context.Context, query string) (wire.PreparedStatements, error) {
	return wire.Prepared(wire.NewStatement(func(ctx context.Context, writer wire.DataWriter, parameters []wire.Parameter) error {
		fmt.Println(query)
		return writer.Complete("OK")
	})), nil
}

Session Attributes

You can store custom session attributes for each client connection, allowing you to track session state:

// Set a session attribute
wire.SetAttribute(ctx, "tenant_id", "tenant-123")

// Get a session attribute
tenantID, ok := wire.GetAttribute(ctx, "tenant_id")

🚧 When wanting to debug issues and or inspect the PostgreSQL wire protocol please check out the psql-proxy cli

Support

Feel free to start a new discussion to discuss feature requests or issues.

Used by

Cloudproud      Shopify

Contributing

Thank you for your interest in contributing to psql-wire! Check out the open projects and/or issues and feel free to join any ongoing discussion. Feel free to checkout the open TODO's within the project.

Everyone is welcome to contribute, whether it's in the form of code, documentation, bug reports, feature requests, or anything else. We encourage you to experiment with the project and make contributions to help evolve it to meet your needs!

See the contributing guide for more details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CopySignature = []byte("PGCOPY\n\377\r\n\000")

CopySignature is the signature that is used to identify the start of a copy-in stream. The signature is used to identify the start of a copy-in stream and is used to determine the start of the copy-in data. https://www.postgresql.org/docs/current/sql-copy.html

View Source
var ErrClosedWriter = errors.New("closed writer")

ErrClosedWriter is returned when the data writer has been closed.

View Source
var ErrDataWritten = errors.New("data has already been written")

ErrDataWritten is returned when an empty result is attempted to be sent to the client while data has already been written.

View Source
var ErrRowLimitExceeded = pgerror.WithCode(errors.New("row limit exceeded"), codes.ProgramLimitExceeded)
View Source
var ErrUnknownOid = errors.New("unknown oid")
View Source
var QueryParameters = regexp.MustCompile(`\$(\d+)|\?`)

QueryParameters represents a regex which could be used to identify and lookup parameters defined inside a given query. Parameters could be defined as positional parameters and non-positional parameters.

Functions

func AuthenticatedUsername

func AuthenticatedUsername(ctx context.Context) string

AuthenticatedUsername returns the username of the authenticated user of the given connection context.

func ErrorCode

func ErrorCode(writer *buffer.Writer, err error) error

ErrorCode writes an error message as response to a command with the given severity and error message. A ready for query message is written back to the client once the error has been written indicating the end of a command cycle. https://www.postgresql.org/docs/current/static/protocol-error-fields.html

func GetAttribute added in v0.13.0

func GetAttribute(ctx context.Context, key string) (interface{}, bool)

GetAttribute retrieves a custom attribute from the session by key. The first return value is the attribute value, which will be nil if the attribute doesn't exist. The second return value indicates whether the attribute was found.

Example:

tenantID, ok := wire.GetAttribute(ctx, "tenant_id")
if ok {
    // Use tenantID
}

func IsSuperUser

func IsSuperUser(ctx context.Context) bool

IsSuperUser checks whether the given connection context is a super user.

func ListenAndServe

func ListenAndServe(address string, handler ParseFn) error

ListenAndServe opens a new Postgres server using the given address and default configurations. The given handler function is used to handle simple queries. This method should be used to construct a simple Postgres server for testing purposes or simple use cases.

func NewErrMultipleCommandsStatements added in v0.11.0

func NewErrMultipleCommandsStatements() error

NewErrMultipleCommandsStatements is returned whenever multiple statements have been given within a single query during the extended query protocol.

func NewErrUndefinedStatement added in v0.11.0

func NewErrUndefinedStatement() error

NewErrUndefinedStatement is returned whenever no statement has been defined within the incoming query.

func NewErrUnimplementedMessageType

func NewErrUnimplementedMessageType(t types.ClientMessage) error

NewErrUnimplementedMessageType is called whenever an unimplemented message type is sent. This error indicates to the client that the sent message cannot be processed at this moment in time.

func NewErrUnkownStatement added in v0.4.0

func NewErrUnkownStatement(name string) error

NewErrUnkownStatement is returned whenever no executable has been found for the given name.

func ParseParameters added in v0.6.0

func ParseParameters(query string) []uint32

ParseParameters attempts to parse the parameters in the given string and returns the expected parameters. This is necessary for the query protocol where the parameter types are expected to be defined in the extended query protocol.

func RemoteAddress added in v0.12.0

func RemoteAddress(ctx context.Context) net.Addr

RemoteAddress returns the Postgres remote address connection info if it has been set inside the given context.

func SetAttribute added in v0.13.0

func SetAttribute(ctx context.Context, key string, value interface{}) bool

SetAttribute sets a custom attribute in the session. The key is the attribute name, and value can be any type. Returns true if the attribute was set successfully, false if the session wasn't found.

Example:

wire.SetAttribute(ctx, "tenant_id", "tenant-123")

func TypeMap added in v0.10.2

func TypeMap(ctx context.Context) *pgtype.Map

TypeMap returns the Postgres type connection info if it has been set inside the given context.

Types

type AuthStrategy

type AuthStrategy func(ctx context.Context, writer *buffer.Writer, reader *buffer.Reader) (_ context.Context, err error)

AuthStrategy represents an authentication strategy used to authenticate a user.

func ClearTextPassword

func ClearTextPassword(validate func(ctx context.Context, database, username, password string) (context.Context, bool, error)) AuthStrategy

ClearTextPassword announces to the client to authenticate by sending a clear text password and validates if the provided username and password (received inside the client parameters) are valid. If the provided credentials are invalid or any unexpected error occurs, an error returned and the connection should be closed.

type BackendKeyDataFunc added in v0.16.0

type BackendKeyDataFunc func(ctx context.Context) (processID int32, secretKey int32)

BackendKeyDataFunc represents a function that generates backend key data for query cancellation. It should return a process ID and secret key that can be used by clients to cancel queries.

type BinaryCopyReader added in v0.12.0

type BinaryCopyReader struct {
	// contains filtered or unexported fields
}

func NewBinaryColumnReader added in v0.12.0

func NewBinaryColumnReader(ctx context.Context, copy *CopyReader) (_ *BinaryCopyReader, err error)

NewBinaryColumnReader creates a new column reader that reads rows from the given copy reader and returns the values as a slice of any values. The columns are used to determine the format of the data that is read from the reader. If the end of the copy-in stream is reached, an io.EOF error is returned.

func (*BinaryCopyReader) Read added in v0.12.0

func (r *BinaryCopyReader) Read(ctx context.Context) (_ []any, err error)

Read reads a single row from the copy-in stream. The read row is returned as a slice of any values. If the end of the copy-in stream is reached, an io.EOF error is returned.

type CancelRequestFn added in v0.16.0

type CancelRequestFn func(ctx context.Context, processID int32, secretKey int32) error

CancelRequestFn function called when a cancel request is received. The function receives the process ID and secret key from the cancel request. It should return an error if the cancel request cannot be processed.

type CloseFn

type CloseFn func(ctx context.Context) error

type Column

type Column struct {
	Table        int32  // table id
	ID           int32  // column identifier
	Attr         int16  // column attribute number
	Name         string // column name
	AttrNo       int16  // column attribute no (optional)
	Oid          uint32
	Width        int16
	TypeModifier int32
}

Column represents a table column and its attributes such as name, type and encode formatter.

func (Column) Define

func (column Column) Define(ctx context.Context, writer *buffer.Writer, format FormatCode)

Define writes the column header values to the given writer. This method is used to define a column inside RowDescription message defining the column type, width, and name.

func (Column) Write

func (column Column) Write(ctx context.Context, writer *buffer.Writer, format FormatCode, src any) (err error)

Write encodes the given source value using the column type definition and connection info. The encoded byte buffer is added to the given write buffer. This method Is used to encode values and return them inside a DataRow message.

type Columns

type Columns []Column

Columns represent a collection of columns.

func (Columns) CopyIn added in v0.12.0

func (columns Columns) CopyIn(ctx context.Context, writer *buffer.Writer, format FormatCode) error

CopyIn sends a [CopyInResponse] to the client, to initiate a CopyIn operation. Based on the given columns within the prepared statement. https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COPYINRESPONSE

func (Columns) Define

func (columns Columns) Define(ctx context.Context, writer *buffer.Writer, formats []FormatCode) error

Define writes the table RowDescription headers for the given table and the containing columns. The headers have to be written before any data rows could be send back to the client. The given columns are encoded using the given format codes. Columns could be encoded as Text or Binary. If you provide a single format code, it will be applied to all columns.

func (Columns) Write

func (columns Columns) Write(ctx context.Context, formats []FormatCode, writer *buffer.Writer, srcs []any) (err error)

Write writes the given column values back to the client. The given columns are encoded using the given format codes. Columns could be encoded as Text or Binary. If you provide a single format code, it will be applied to all columns.

type CopyReader added in v0.12.0

type CopyReader struct {
	*buffer.Reader
	// contains filtered or unexported fields
}

func NewCopyReader added in v0.12.0

func NewCopyReader(reader *buffer.Reader, writer *buffer.Writer, columns Columns) *CopyReader

NewCopyReader creates a new copy reader that reads copy-in data from the given reader and writes the data to the given writer. The columns are used to determine the format of the data that is read from the reader.

func (*CopyReader) Columns added in v0.12.0

func (r *CopyReader) Columns() Columns

Columns returns the columns that are currently defined within the copy reader.

func (*CopyReader) Read added in v0.12.0

func (r *CopyReader) Read() error

Read reads a single chunk from the copy-in stream. The read chunk is returned as a byte slice. If the end of the copy-in stream is reached, an io.EOF error is returned.

type DataWriter

type DataWriter interface {
	// Row writes a single data row containing the values inside the given slice to
	// the underlaying Postgres client. The column headers have to be written before
	// sending rows. Each item inside the slice represents a single column value.
	// The slice length needs to be the same length as the defined columns. Nil
	// values are encoded as NULL values.
	Row([]any) error

	// Limit returns the maximum number of rows to be written passed within the
	// wire protocol. A value of 0 indicates no limit.
	Limit() uint32

	// Written returns the number of rows written to the client.
	Written() uint32

	// Empty announces to the client an empty response and that no data rows should
	// be expected.
	Empty() error

	// Columns returns the columns that are currently defined within the writer.
	Columns() Columns

	// Complete announces to the client that the command has been completed and
	// no further data should be expected.
	//
	// See [CommandComplete] for the expected format for different queries.
	//
	// [CommandComplete]: https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COMMANDCOMPLETE
	Complete(description string) error

	// CopyIn sends a [CopyInResponse] to the client, to initiate a CopyIn
	// operation. The copy operation can be used to send large amounts of data to
	// the server in a single transaction. A column reader has to be used to read
	// the data that is sent by the client to the CopyReader.
	CopyIn(format FormatCode) (*CopyReader, error)
}

DataWriter represents a writer interface for writing columns and data rows using the Postgres wire to the connected client.

func NewDataWriter added in v0.4.0

func NewDataWriter(ctx context.Context, columns Columns, formats []FormatCode, limit Limit, reader *buffer.Reader, writer *buffer.Writer) DataWriter

NewDataWriter constructs a new data writer using the given context and buffer. The returned writer should be handled with caution as it is not safe for concurrent use. Concurrent access to the same data without proper synchronization can result in unexpected behavior and data corruption.

type DefaultPortalCache added in v0.4.0

type DefaultPortalCache struct {
	// contains filtered or unexported fields
}

func (*DefaultPortalCache) Bind added in v0.4.0

func (cache *DefaultPortalCache) Bind(ctx context.Context, name string, stmt *Statement, parameters []Parameter, formats []FormatCode) error

func (*DefaultPortalCache) Close added in v0.15.0

func (cache *DefaultPortalCache) Close()

func (*DefaultPortalCache) Execute added in v0.4.0

func (cache *DefaultPortalCache) Execute(ctx context.Context, name string, limit Limit, reader *buffer.Reader, writer *buffer.Writer) (err error)

func (*DefaultPortalCache) Get added in v0.6.0

func (cache *DefaultPortalCache) Get(ctx context.Context, name string) (*Portal, error)

type DefaultStatementCache added in v0.4.0

type DefaultStatementCache struct {
	// contains filtered or unexported fields
}

func (*DefaultStatementCache) Close added in v0.15.0

func (cache *DefaultStatementCache) Close()

func (*DefaultStatementCache) Get added in v0.4.0

func (cache *DefaultStatementCache) Get(ctx context.Context, name string) (*Statement, error)

Get attempts to get the prepared statement for the given name. An error is returned when no statement has been found.

func (*DefaultStatementCache) Set added in v0.4.0

func (cache *DefaultStatementCache) Set(ctx context.Context, name string, stmt *PreparedStatement) error

Set attempts to bind the given statement to the given name. Any previously defined statement is overridden.

type FlushFn added in v0.16.0

type FlushFn func(ctx context.Context) error

type FormatCode

type FormatCode int16

FormatCode represents the encoding format of a given column

const (
	// TextFormat is the default, text format.
	TextFormat FormatCode = 0
	// BinaryFormat is an alternative, binary, encoding.
	BinaryFormat FormatCode = 1
)

type Limit added in v0.16.0

type Limit uint32

Limit represents the maximum number of rows to be written. Zero denotes “no limit”.

const NoLimit Limit = 0

type OptionFn

type OptionFn func(*Server) error

OptionFn options pattern used to define and set options for the given PostgreSQL server.

func BackendKeyData added in v0.16.0

func BackendKeyData(fn BackendKeyDataFunc) OptionFn

BackendKeyData sets the function that generates backend key data for query cancellation. The provided function should return a process ID and secret key that can be used by clients to cancel queries. If not set, no BackendKeyData message will be sent.

func CancelRequest added in v0.16.0

func CancelRequest(fn CancelRequestFn) OptionFn

CancelRequest sets the cancel request handler for the server. This function is called when a client sends a cancel request with a process ID and secret key. The handler should validate the credentials and cancel the appropriate query if valid.

func ClientAuth

func ClientAuth(auth tls.ClientAuthType) OptionFn

ClientAuth sets the client authentication type which is used to authenticate the client connection. The default value is tls.NoClientCert which means that no client authentication is performed.

func CloseConn

func CloseConn(fn CloseFn) OptionFn

CloseConn sets the close connection handle inside the given server instance.

func ExtendTypes added in v0.3.0

func ExtendTypes(fn func(*pgtype.Map)) OptionFn

ExtendTypes provides the ability to extend the underlying connection types. Types registered inside the given github.com/jackc/pgx/v5/pgtype.Map are registered to all incoming connections.

func FlushConn added in v0.16.0

func FlushConn(fn FlushFn) OptionFn

FlushConn registers a handler for Flush messages.

The provided handler is invoked when the frontend sends a Flush command. This allows the server to force any pending data in its output buffers to be delivered immediately.

Typically, a Flush is sent after an extended-query command (except Sync) when the frontend wants to inspect results before issuing more commands.

func GlobalParameters

func GlobalParameters(params Parameters) OptionFn

GlobalParameters sets the server parameters which are send back to the front-end (client) once a handshake has been established.

func Logger

func Logger(logger *slog.Logger) OptionFn

Logger sets the given slog.Logger as the logger for the given server.

func MessageBufferSize

func MessageBufferSize(size int) OptionFn

MessageBufferSize sets the message buffer size which is allocated once a new connection gets constructed. If a negative value or zero value is provided is the default message buffer size used.

func ParallelPipeline added in v0.17.0

func ParallelPipeline(config ParallelPipelineConfig) OptionFn

ParallelPipeline sets the parallel pipeline configuration for the server. This controls whether Execute events can run concurrently within a session.

func Portals added in v0.4.0

func Portals(handler func() PortalCache) OptionFn

Portals sets the portals cache used to cache statements for later use. By default DefaultPortalCache is used.

func SessionAuthStrategy added in v0.5.3

func SessionAuthStrategy(fn AuthStrategy) OptionFn

SessionAuthStrategy sets the given authentication strategy within the given server. The authentication strategy is called when a handshake is initiated.

func SessionMiddleware added in v0.12.1

func SessionMiddleware(fn SessionHandler) OptionFn

SessionMiddleware sets the given session handler within the underlying server. The session handler is called when a new connection is opened and authenticated allowing for additional metadata to be wrapped around the connection context.

func Statements added in v0.4.0

func Statements(handler func() StatementCache) OptionFn

Statements sets the statement cache used to cache statements for later use. By default DefaultStatementCache is used.

func TLSConfig added in v0.12.0

func TLSConfig(config *tls.Config) OptionFn

TLSConfig sets the given TLS config to be used to initialize a secure connection between the front-end (client) and back-end (server).

func TerminateConn

func TerminateConn(fn CloseFn) OptionFn

TerminateConn sets the terminate connection handle inside the given server instance.

func Version added in v0.2.0

func Version(version string) OptionFn

Version sets the PostgreSQL version for the server which is send back to the front-end (client) once a handshake has been established.

func WithShutdownTimeout added in v0.14.2

func WithShutdownTimeout(timeout time.Duration) OptionFn

WithShutdownTimeout sets the timeout duration for graceful shutdown. When Shutdown is called, the server will wait up to this duration for active connections to finish before forcing closure. A timeout of 0 means wait indefinitely (no timeout).

type ParallelPipelineConfig added in v0.17.0

type ParallelPipelineConfig struct {
	Enabled bool // when true, allows concurrent execution of pipelined Execute messages
}

ParallelPipelineConfig controls whether multiple Execute messages within a pipeline can run concurrently. When Enabled is true, the server may process Execute commands in parallel before the Sync message. When false, Execute commands are processed sequentially. Note that pipelining itself (batching multiple messages before Sync) is always supported; this setting only affects parallel execution of those messages.

type Parameter added in v0.10.0

type Parameter struct {
	// contains filtered or unexported fields
}

func NewParameter added in v0.10.0

func NewParameter(types *pgtype.Map, format FormatCode, value []byte) Parameter

func (Parameter) Format added in v0.10.0

func (p Parameter) Format() FormatCode

func (Parameter) Scan added in v0.10.2

func (p Parameter) Scan(oid uint32) (any, error)

func (Parameter) Value added in v0.10.0

func (p Parameter) Value() []byte

type ParameterStatus

type ParameterStatus string

ParameterStatus represents a metadata key that could be defined inside a server/client metadata definition.

const (
	ParamServerEncoding       ParameterStatus = "server_encoding"
	ParamClientEncoding       ParameterStatus = "client_encoding"
	ParamIsSuperuser          ParameterStatus = "is_superuser"
	ParamSessionAuthorization ParameterStatus = "session_authorization"
	ParamApplicationName      ParameterStatus = "application_name"
	ParamDatabase             ParameterStatus = "database"
	ParamUsername             ParameterStatus = "user"
	ParamServerVersion        ParameterStatus = "server_version"
)

At present there is a hard-wired set of parameters for which ParameterStatus will be generated. https://www.postgresql.org/docs/13/protocol-flow.html#PROTOCOL-ASYNC

type Parameters

type Parameters map[ParameterStatus]string

Parameters represents a parameters collection of parameter status keys and their values.

func ClientParameters

func ClientParameters(ctx context.Context) Parameters

ClientParameters returns the connection parameters if it has been set inside the given context.

func ServerParameters

func ServerParameters(ctx context.Context) Parameters

ServerParameters returns the connection parameters if it has been set inside the given context.

type ParseFn added in v0.4.0

type ParseFn func(ctx context.Context, query string) (PreparedStatements, error)

ParseFn parses the given query and returns a prepared statement which could be used to execute at a later point in time.

type Portal added in v0.10.4

type Portal struct {
	// contains filtered or unexported fields
}

type PortalCache added in v0.4.0

type PortalCache interface {
	// Bind attempts to bind the given statement to the given name. Any
	// previously defined statement is overridden.
	Bind(ctx context.Context, name string, statement *Statement, parameters []Parameter, columns []FormatCode) error
	// Get attempts to get the portal for the given name. An error is returned
	// when no portal has been found.
	Get(ctx context.Context, name string) (*Portal, error)
	// Execute executes the prepared statement with the given name and parameters.
	Execute(ctx context.Context, name string, limit Limit, reader *buffer.Reader, writer *buffer.Writer) error
	// Close is called at the end of a connection. Close releases all resources
	// held by the portal cache.
	Close()
}

PortalCache represents a cache which could be used to bind and execute prepared statements with parameters.

func DefaultPortalCacheFn added in v0.12.1

func DefaultPortalCacheFn() PortalCache

type PreparedOptionFn added in v0.11.0

type PreparedOptionFn func(*PreparedStatement)

PreparedOptionFn options pattern used to define options while preparing a new statement.

func WithColumns added in v0.11.0

func WithColumns(columns Columns) PreparedOptionFn

WithColumns sets the given columns as the columns which are returned by the prepared statement.

func WithParameters added in v0.11.0

func WithParameters(parameters []uint32) PreparedOptionFn

WithParameters sets the given parameters as the parameters which are expected by the prepared statement.

type PreparedStatement added in v0.10.0

type PreparedStatement struct {
	// contains filtered or unexported fields
}

func NewStatement added in v0.11.0

func NewStatement(fn PreparedStatementFn, options ...PreparedOptionFn) *PreparedStatement

NewStatement constructs a new prepared statement for the given function.

type PreparedStatementFn added in v0.4.0

type PreparedStatementFn func(ctx context.Context, writer DataWriter, parameters []Parameter) error

PreparedStatementFn represents a query of which a statement has been prepared. The statement could be executed at any point in time with the given arguments and data writer.

type PreparedStatements added in v0.11.0

type PreparedStatements []*PreparedStatement

func Prepared added in v0.11.0

func Prepared(stmts ...*PreparedStatement) PreparedStatements

Prepared is a small wrapper function returning a list of prepared statements. More then one prepared statement could be returned within the simple query protocol. An error is returned when more than one prepared statement is returned in the extended query protocol.

type QueuedDataWriter added in v0.17.0

type QueuedDataWriter struct {
	// contains filtered or unexported fields
}

QueuedDataWriter implements DataWriter interface It collects query results in memory for later replay during pipelining

func NewQueuedDataWriter added in v0.17.0

func NewQueuedDataWriter(ctx context.Context, columns Columns, limit Limit) *QueuedDataWriter

NewQueuedDataWriter creates a DataWriter that collects results for pipelining

func (*QueuedDataWriter) Columns added in v0.17.0

func (rc *QueuedDataWriter) Columns() Columns

func (*QueuedDataWriter) Complete added in v0.17.0

func (rc *QueuedDataWriter) Complete(tag string) error

func (*QueuedDataWriter) CopyIn added in v0.17.0

func (rc *QueuedDataWriter) CopyIn(format FormatCode) (*CopyReader, error)

func (*QueuedDataWriter) Empty added in v0.17.0

func (rc *QueuedDataWriter) Empty() error

func (*QueuedDataWriter) GetError added in v0.17.0

func (rc *QueuedDataWriter) GetError() error

GetError gets the error state

func (*QueuedDataWriter) Limit added in v0.17.0

func (rc *QueuedDataWriter) Limit() uint32

func (*QueuedDataWriter) Replay added in v0.17.0

func (rc *QueuedDataWriter) Replay(ctx context.Context, writer DataWriter) error

Replay writes all collected data to a real DataWriter

func (*QueuedDataWriter) Row added in v0.17.0

func (rc *QueuedDataWriter) Row(values []any) error

func (*QueuedDataWriter) SetError added in v0.17.0

func (rc *QueuedDataWriter) SetError(err error)

SetError sets the error state

func (*QueuedDataWriter) Written added in v0.17.0

func (rc *QueuedDataWriter) Written() uint32

type ResponseEvent added in v0.17.0

type ResponseEvent struct {
	Kind ResponseEventKind

	// For ResponseStmtDescribe: holds parameter OIDs and column definitions
	Parameters []uint32
	Columns    Columns

	// For ResponsePortalDescribe and ResponseExecute: format codes for result columns
	Formats []FormatCode

	// For ResponseExecute: tracks completion and results
	ResultChannel chan *QueuedDataWriter // channel to receive results
	Result        *QueuedDataWriter      // cached result once received
}

ResponseEvent represents an event in the response stream Use the constructor functions (NewParseCompleteEvent, etc.) to create events

func NewBindCompleteEvent added in v0.17.0

func NewBindCompleteEvent() *ResponseEvent

NewBindCompleteEvent creates a BindComplete response event

func NewExecuteEvent added in v0.17.0

func NewExecuteEvent(resultChan chan *QueuedDataWriter, formats []FormatCode) *ResponseEvent

NewExecuteEvent creates an Execute response event

func NewParseCompleteEvent added in v0.17.0

func NewParseCompleteEvent() *ResponseEvent

NewParseCompleteEvent creates a ParseComplete response event

func NewPortalDescribeEvent added in v0.17.0

func NewPortalDescribeEvent(columns Columns, formats []FormatCode) *ResponseEvent

NewPortalDescribeEvent creates a portal Describe response event

func NewStmtDescribeEvent added in v0.17.0

func NewStmtDescribeEvent(parameters []uint32, columns Columns) *ResponseEvent

NewStmtDescribeEvent creates a statement Describe response event

type ResponseEventKind added in v0.17.0

type ResponseEventKind uint8

ResponseEventKind represents the type of event in the ResponseQueue

const (
	// ResponseParseComplete represents a ParseComplete ack
	ResponseParseComplete ResponseEventKind = iota + 1
	// ResponseBindComplete represents a BindComplete ack
	ResponseBindComplete
	// ResponseStmtDescribe represents a composite ParameterDescription + RowDescription
	// for a statement (from Describe Statement)
	ResponseStmtDescribe
	// ResponsePortalDescribe represents a RowDescription for a portal (from Describe Portal)
	ResponsePortalDescribe
	// ResponseExecute represents an Execute with its complete result set
	// (DataRows, CommandComplete)
	ResponseExecute
)

type ResponseQueue added in v0.17.0

type ResponseQueue struct {
	// contains filtered or unexported fields
}

ResponseQueue maintains all events in arrival order for a cycle

func NewResponseQueue added in v0.17.0

func NewResponseQueue() *ResponseQueue

NewResponseQueue creates a new empty ResponseQueue

func (*ResponseQueue) Clear added in v0.17.0

func (q *ResponseQueue) Clear()

Clear resets the queue for a new cycle

func (*ResponseQueue) DrainAll added in v0.17.0

func (q *ResponseQueue) DrainAll() []*ResponseEvent

DrainAll returns all events in arrival order and clears the queue

func (*ResponseQueue) DrainSync added in v0.17.0

func (q *ResponseQueue) DrainSync(ctx context.Context) ([]*ResponseEvent, error)

DrainSync drains all events, waiting for all results to be received It returns early if an error is encountered or the context is cancelled Only returns events up to but not including an error event

func (*ResponseQueue) Enqueue added in v0.17.0

func (q *ResponseQueue) Enqueue(event *ResponseEvent)

Enqueue adds an event to the queue

func (*ResponseQueue) Len added in v0.17.0

func (q *ResponseQueue) Len() int

Len returns the number of events in the queue

type Scanner added in v0.12.0

type Scanner func(value []byte) (any, error)

Scanner is a function that scans a byte slice and returns the value as an any

func NewScanner added in v0.12.0

func NewScanner(tm *pgtype.Map, column Column, format FormatCode) (Scanner, error)

NewScanner creates a new scanner that scans a byte slice and returns the value as an any. The scanner uses the given map to decode the value and the given type to determine the format of the data that is scanned.

type Server

type Server struct {
	Auth            AuthStrategy
	BackendKeyData  BackendKeyDataFunc
	CancelRequest   CancelRequestFn
	BufferedMsgSize int
	Parameters      Parameters
	TLSConfig       *tls.Config
	ClientAuth      tls.ClientAuthType

	Session          SessionHandler
	Statements       func() StatementCache
	Portals          func() PortalCache
	CloseConn        CloseFn
	TerminateConn    CloseFn
	FlushConn        FlushFn
	ParallelPipeline ParallelPipelineConfig
	Version          string
	ShutdownTimeout  time.Duration
	// contains filtered or unexported fields
}

Server contains options for listening to an address.

func NewServer

func NewServer(parse ParseFn, options ...OptionFn) (*Server, error)

NewServer constructs a new Postgres server using the given address and server options.

func (*Server) Close

func (srv *Server) Close() error

Close gracefully closes the underlaying Postgres server.

func (*Server) Handshake

func (srv *Server) Handshake(conn net.Conn) (_ net.Conn, version types.Version, reader *buffer.Reader, err error)

Handshake performs the connection handshake and returns the connection version and a buffered reader to read incoming messages send by the client.

func (*Server) ListenAndServe

func (srv *Server) ListenAndServe(address string) error

ListenAndServe opens a new Postgres server on the preconfigured address and starts accepting and serving incoming client connections.

func (*Server) Serve

func (srv *Server) Serve(listener net.Listener) error

Serve accepts and serves incoming Postgres client connections using the preconfigured configurations. The given listener will be closed once the server is gracefully closed.

func (*Server) Shutdown added in v0.14.2

func (srv *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server with context and timeout support. It stops accepting new connections and waits for active connections to finish within the shorter of the context deadline or the server's configured ShutdownTimeout. If the context has no deadline, the server's ShutdownTimeout is used.

type Session added in v0.5.0

type Session struct {
	*Server
	Statements StatementCache
	Portals    PortalCache
	Attributes map[string]interface{}

	// pipelining
	ParallelPipeline ParallelPipelineConfig
	ResponseQueue    *ResponseQueue
}

func GetSession added in v0.13.0

func GetSession(ctx context.Context) (*Session, bool)

GetSession retrieves the session from the context. The first return value is the session object, which can be used to access all session data. The second return value indicates whether the session was found in the context.

func (*Session) Close added in v0.15.0

func (srv *Session) Close()

type SessionHandler added in v0.5.0

type SessionHandler func(ctx context.Context) (context.Context, error)

SessionHandler represents a wrapper function defining the state of a single session. This function allows the user to wrap additional metadata around the shared context.

type Statement added in v0.6.0

type Statement struct {
	// contains filtered or unexported fields
}

type StatementCache added in v0.4.0

type StatementCache interface {
	// Set attempts to bind the given statement to the given name. Any
	// previously defined statement is overridden.
	Set(ctx context.Context, name string, fn *PreparedStatement) error
	// Get attempts to get the prepared statement for the given name. An error
	// is returned when no statement has been found.
	Get(ctx context.Context, name string) (*Statement, error)
	// Close is called at the end of a connection. Close releases all resources
	// held by the statement cache.
	Close()
}

StatementCache represents a cache which could be used to store and retrieve prepared statements bound to a name.

func DefaultStatementCacheFn added in v0.12.1

func DefaultStatementCacheFn() StatementCache

type TextCopyReader added in v0.15.0

type TextCopyReader struct {
	// contains filtered or unexported fields
}

func NewTextColumnReader added in v0.15.0

func NewTextColumnReader(ctx context.Context, copy *CopyReader, csvReader *csv.Reader, csvReaderBuffer *bytes.Buffer, nullValue string) (_ *TextCopyReader, err error)

func (*TextCopyReader) Read added in v0.15.0

func (r *TextCopyReader) Read(ctx context.Context) (_ []any, err error)

Read reads a single row from the copy-in stream. The read row is returned as a slice of any values. If the end of the copy-in stream is reached, an io.EOF error is returned.

Directories

Path Synopsis
examples
auth command
copy command
error command
pipelining command
session command
simple command
tls command
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL