enviar

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 13 Imported by: 0

README

enviar

Go Reference

enviar ("to send" in Spanish) is a lightweight Go wrapper around github.com/LarnTechKe/work that provides a clean, type-safe interface for enqueuing and processing Redis-backed background jobs.

Features

  • Simple enqueuing — JSON payload marshaling handled automatically.
  • Delayed jobs — schedule jobs to run after a duration.
  • Retry options — per-job retry with exponential, linear, or fixed backoff.
  • Unique jobs — deduplicate identical payloads.
  • Worker pool — built-in middleware for logging and panic recovery.
  • Cron schedules — register recurring jobs with cron expressions.
  • Environment-based config — zero-config defaults with env var overrides.

Installation

go get github.com/LarnTechKe/enviar

Requires Go 1.24+ and a running Redis instance.

Quick Start

Define a Job
package jobs

import "github.com/LarnTechKe/enviar"

type SendEmailJob struct{}

func (j SendEmailJob) Name() string { return "send_email" }

func (j SendEmailJob) Options() []enviar.JobOption {
    return []enviar.JobOption{
        enviar.WithMaxFails(3),
        enviar.WithMaxConcurrency(5),
    }
}
Implement a Handler
package handlers

import (
    "context"
    "encoding/json"

    "myapp/jobs"
)

type EmailHandler struct{}

func (h *EmailHandler) Job() enviar.Job { return jobs.SendEmailJob{} }

func (h *EmailHandler) PerformJob(ctx context.Context, body string) error {
    var req EmailRequest
    if err := json.Unmarshal([]byte(body), &req); err != nil {
        return err
    }
    // ... send the email ...
    return nil
}
Enqueue a Job
cfg := enviar.LoadEnv()
pool := cfg.NewPool()
defer pool.Close()

enqueuer := enviar.NewEnqueuer(pool, cfg.Namespace)

id, err := enqueuer.EnqueueBody("send_email", map[string]string{
    "to":      "[email protected]",
    "subject": "Welcome!",
    "body":    "Thanks for signing up.",
})
Enqueue with Delay
id, err := enqueuer.EnqueueBodyIn("generate_report", 30*time.Second, reportRequest)
Enqueue with Retry Options
import "github.com/LarnTechKe/work"

id, err := enqueuer.EnqueueBodyWithRetry("webhook_delivery", payload, work.RetryOptions{
    MaxRetries: 5,
    Strategy:   work.BackoffExponential,
})
Enqueue Unique Job
id, err := enqueuer.EnqueueBodyUnique("process_payment", paymentRequest)
// id == "" when a duplicate is already enqueued
Start Workers
cfg := enviar.LoadEnv()
pool := cfg.NewPool()
defer pool.Close()

wp := enviar.NewWorkerPool(pool, cfg.Namespace, cfg.Concurrency)
wp.AddJobHandlers(
    &handlers.EmailHandler{},
    &handlers.PaymentHandler{},
)

// Optional: recurring jobs
wp.AddRecurringJobs(map[string]string{
    "*/20 * * * *": "generate_report",
})

// Blocks until SIGINT/SIGTERM
wp.Start(context.Background())

Configuration

enviar reads from environment variables with sensible defaults:

Variable Default Description
ENVIAR_NAMESPACE enviar Redis key namespace
ENVIAR_REDIS_URL localhost:6379 host:port or redis://:pass@host:port/db
ENVIAR_REDIS_DB 0 Database number (plain host:port only)
ENVIAR_CONCURRENCY 10 Number of concurrent workers

Job Options

Option Description
WithMaxConcurrency(n) Max concurrent instances of this job
WithMaxFails(n) Max failures before dead-lettering
WithPriority(p) Scheduling priority (1–10000)
WithHighPriority() Shorthand for priority 10
WithLowPriority() Shorthand for priority 1
WithSkipDead(bool) Skip the dead queue on exhaustion
WithBackoff(fn) Custom backoff calculator

Architecture

enviar
├── config.go        — Config, LoadEnv, NewPool
├── enqueuer.go      — Enqueuer (JSON-aware job producer)
├── job.go           — Job, JobHandler interfaces; JobOption helpers
└── worker_pool.go   — WorkerPool (consumer with middleware)

enviar wraps the low-level work.Enqueuer and work.WorkerPool types, adding JSON payload marshaling, structured logging, panic recovery, and a functional-options pattern for job configuration.

License

MIT — same as the upstream work library.

Documentation

Index

Constants

View Source
const BodyKey = "body"

BodyKey is the args key used to carry the JSON-encoded payload.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Namespace   string // Redis key namespace (e.g. "myapp-work").
	RedisURL    string // "host:port" or "redis://:password@host:port".
	RedisDB     int    // Redis database number (0–15); ignored for redis:// URLs.
	Concurrency uint   // Number of concurrent workers (default 10).
}

Config holds the Redis connection and worker pool configuration.

func LoadEnv

func LoadEnv() Config

LoadEnv builds a Config from environment variables:

  • ENVIAR_NAMESPACE (default "enviar")
  • ENVIAR_REDIS_URL (default "localhost:6379") — supports redis:// URLs
  • ENVIAR_REDIS_DB (default "0")
  • ENVIAR_CONCURRENCY (default "10")

func (Config) NewPool

func (c Config) NewPool() *redis.Pool

NewPool creates a *redis.Pool from the configuration. It supports both plain "host:port" and full "redis://" URLs.

type Enqueuer

type Enqueuer struct {
	// contains filtered or unexported fields
}

Enqueuer wraps work.Enqueuer with a simpler, JSON-centric interface.

func NewEnqueuer

func NewEnqueuer(pool *redis.Pool, namespace string) *Enqueuer

NewEnqueuer creates an Enqueuer backed by the given Redis pool and namespace.

func (*Enqueuer) EnqueueBody

func (e *Enqueuer) EnqueueBody(jobName string, payload interface{}) (string, error)

EnqueueBody marshals payload to JSON and enqueues an immediate job. It returns the new job's ID.

func (*Enqueuer) EnqueueBodyDelayedWithRetry

func (e *Enqueuer) EnqueueBodyDelayedWithRetry(jobName string, delay time.Duration, payload interface{}, opts work.RetryOptions) (string, error)

EnqueueBodyDelayedWithRetry marshals payload and enqueues with both a delay and retry options.

func (*Enqueuer) EnqueueBodyIn

func (e *Enqueuer) EnqueueBodyIn(jobName string, delay time.Duration, payload interface{}) (string, error)

EnqueueBodyIn marshals payload to JSON and enqueues with a delay.

func (*Enqueuer) EnqueueBodyUnique

func (e *Enqueuer) EnqueueBodyUnique(jobName string, payload interface{}) (string, error)

EnqueueBodyUnique marshals payload and enqueues only if no identical job is already queued. Returns ("", nil) when a duplicate is detected.

func (*Enqueuer) EnqueueBodyWithRetry

func (e *Enqueuer) EnqueueBodyWithRetry(jobName string, payload interface{}, opts work.RetryOptions) (string, error)

EnqueueBodyWithRetry marshals payload and enqueues with retry options.

type Job

type Job interface {
	Name() string
	Options() []JobOption
}

Job defines a background job type's name and worker-side options.

type JobHandler

type JobHandler interface {
	Job() Job
	PerformJob(ctx context.Context, body string) error
}

JobHandler processes jobs of a specific type. PerformJob receives the JSON-encoded body string from the job args.

type JobOption

type JobOption func(*work.JobOptions)

JobOption is a functional option that configures a work.JobOptions value.

func WithBackoff

func WithBackoff(fn func(*work.Job) int64) JobOption

WithBackoff sets a custom backoff function for retries.

func WithHighPriority

func WithHighPriority() JobOption

WithHighPriority is a convenience alias for WithPriority(10).

func WithLowPriority

func WithLowPriority() JobOption

WithLowPriority is a convenience alias for WithPriority(1).

func WithMaxConcurrency

func WithMaxConcurrency(n uint) JobOption

WithMaxConcurrency limits how many instances of this job may run at once.

func WithMaxFails

func WithMaxFails(n uint) JobOption

WithMaxFails sets the maximum number of failures before the job is sent to the dead queue.

func WithPriority

func WithPriority(p uint) JobOption

WithPriority sets the job's scheduling priority.

func WithSkipDead

func WithSkipDead(skip bool) JobOption

WithSkipDead controls whether failed jobs bypass the dead queue.

type WorkerPool

type WorkerPool interface {
	AddJobHandlers(handlers ...JobHandler)
	AddRecurringJobs(cronTaskMap map[string]string)
	Start(ctx context.Context)
	Stop()
}

WorkerPool manages job handlers, middleware, and periodic schedules.

func NewWorkerPool

func NewWorkerPool(redisPool *redis.Pool, namespace string, concurrency uint) WorkerPool

NewWorkerPool creates a pool with logging and panic-recovery middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL