jobqueue

package
v0.0.0-...-044368a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Redis key prefixes
	JobKeyPrefix     = "job:"
	JobQueueKey      = "job_queue"
	JobProcessingKey = "job_processing"
	JobStatsKey      = "job_stats"

	// Job settings
	DefaultMaxRetries = 3
	JobTTL            = 24 * time.Hour // Jobs expire after 24 hours
)

Variables

View Source
var ErrRequeue = fmt.Errorf("requeue job for another node")

Functions

func EnqueueImageProcessing

func EnqueueImageProcessing(image *models.Image, enableBackup bool) error

EnqueueImageProcessing enqueues an image processing job in the unified queue This replaces the old imageprocessor.ProcessImage function

func ProcessImageUnified

func ProcessImageUnified(image *models.Image) error

ProcessImageUnified is the new unified function that replaces imageprocessor.ProcessImage This function should be used instead of the old imageprocessor.ProcessImage

Types

type DeleteImageJobPayload

type DeleteImageJobPayload struct {
	ImageID       uint   `json:"image_id"`
	ImageUUID     string `json:"image_uuid"`
	FromReportID  *uint  `json:"from_report_id,omitempty"`
	InitiatedByID *uint  `json:"initiated_by_id,omitempty"`
}

DeleteImageJobPayload contains payload for deleting an image and its variants/files asynchronously

func DeleteImageJobPayloadFromMap

func DeleteImageJobPayloadFromMap(data map[string]interface{}) (*DeleteImageJobPayload, error)

func (DeleteImageJobPayload) ToMap

func (p DeleteImageJobPayload) ToMap() map[string]interface{}

type ImageProcessingJobPayload

type ImageProcessingJobPayload struct {
	ImageID      uint   `json:"image_id"`
	ImageUUID    string `json:"image_uuid"`
	FilePath     string `json:"file_path"`     // Original file path
	FileName     string `json:"file_name"`     // Original file name
	FileType     string `json:"file_type"`     // File extension (.jpg, .png, etc.)
	EnableBackup bool   `json:"enable_backup"` // Whether to trigger S3 backup after processing
	PoolID       uint   `json:"pool_id"`       // Storage pool ID (routing hint)
	NodeID       string `json:"node_id"`       // Optional node ID (routing hint)
}

ImageProcessingJobPayload contains the payload for image processing jobs

func ImageProcessingJobPayloadFromMap

func ImageProcessingJobPayloadFromMap(data map[string]interface{}) (*ImageProcessingJobPayload, error)

FromMap creates a payload from a map

func (ImageProcessingJobPayload) ToMap

func (p ImageProcessingJobPayload) ToMap() map[string]interface{}

ToMap converts the payload to a map for storage

type Job

type Job struct {
	ID          string                 `json:"id"`
	Type        JobType                `json:"type"`
	Status      JobStatus              `json:"status"`
	Payload     map[string]interface{} `json:"payload"`
	CreatedAt   time.Time              `json:"created_at"`
	UpdatedAt   time.Time              `json:"updated_at"`
	ProcessedAt *time.Time             `json:"processed_at,omitempty"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
	ErrorMsg    string                 `json:"error_msg,omitempty"`
	RetryCount  int                    `json:"retry_count"`
	MaxRetries  int                    `json:"max_retries"`
}

Job represents a background job

func (*Job) IsRetryable

func (j *Job) IsRetryable() bool

IsRetryable checks if the job can be retried

func (*Job) MarkAsCompleted

func (j *Job) MarkAsCompleted()

MarkAsCompleted updates the job status to completed

func (*Job) MarkAsFailed

func (j *Job) MarkAsFailed(errorMsg string)

MarkAsFailed updates the job status to failed

func (*Job) MarkAsProcessing

func (j *Job) MarkAsProcessing()

MarkAsProcessing updates the job status to processing

func (*Job) MarkAsRetrying

func (j *Job) MarkAsRetrying()

MarkAsRetrying updates the job status to retrying

type JobStatus

type JobStatus string

JobStatus defines the status of a job

const (
	JobStatusPending    JobStatus = "pending"
	JobStatusProcessing JobStatus = "processing"
	JobStatusCompleted  JobStatus = "completed"
	JobStatusFailed     JobStatus = "failed"
	JobStatusRetrying   JobStatus = "retrying"
)

type JobType

type JobType string

JobType defines the type of job

const (
	JobTypeImageProcessing JobType = "image_processing"
	JobTypeS3Backup        JobType = "s3_backup"
	JobTypeS3Delete        JobType = "s3_delete"
	JobTypePoolMoveEnqueue JobType = "pool_move_enqueue"
	JobTypeMoveImage       JobType = "move_image"
	JobTypeDeleteImage     JobType = "delete_image"
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the global job queue and background tasks

func GetManager

func GetManager() *Manager

GetManager returns the global job queue manager (singleton)

func (*Manager) GetQueue

func (m *Manager) GetQueue() *Queue

GetQueue returns the managed job queue

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning returns whether the manager is currently running

func (*Manager) Start

func (m *Manager) Start()

Start starts the job queue and background tasks

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the job queue and background tasks

type MoveImageJobPayload

type MoveImageJobPayload struct {
	ImageID      uint `json:"image_id"`
	SourcePoolID uint `json:"source_pool_id"`
	TargetPoolID uint `json:"target_pool_id"`
}

MoveImageJobPayload contains payload for moving a single image+variants between pools

func MoveImageJobPayloadFromMap

func MoveImageJobPayloadFromMap(data map[string]interface{}) (*MoveImageJobPayload, error)

func (MoveImageJobPayload) ToMap

func (p MoveImageJobPayload) ToMap() map[string]interface{}

type PoolMoveEnqueueJobPayload

type PoolMoveEnqueueJobPayload struct {
	SourcePoolID uint `json:"source_pool_id"`
	TargetPoolID uint `json:"target_pool_id"`
	CursorID     uint `json:"cursor_id"` // last processed Image.ID; 0 = start
}

PoolMoveEnqueueJobPayload contains payload for scanning a source pool and enqueuing per-image move jobs

func PoolMoveEnqueueJobPayloadFromMap

func PoolMoveEnqueueJobPayloadFromMap(data map[string]interface{}) (*PoolMoveEnqueueJobPayload, error)

func (PoolMoveEnqueueJobPayload) ToMap

func (p PoolMoveEnqueueJobPayload) ToMap() map[string]interface{}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue manages background jobs using Redis

func NewQueue

func NewQueue(workers int) *Queue

NewQueue creates a new job queue

func (*Queue) EnqueueDeleteImageJob

func (q *Queue) EnqueueDeleteImageJob(imageID uint, imageUUID string, fromReportID *uint, initiatedBy *uint) (*Job, error)

EnqueueDeleteImageJob enqueues an asynchronous delete job for an image

func (*Queue) EnqueueJob

func (q *Queue) EnqueueJob(jobType JobType, payload map[string]interface{}) (*Job, error)

EnqueueJob adds a new job to the queue

func (*Queue) EnqueueS3BackupJob

func (q *Queue) EnqueueS3BackupJob(imageID uint, imageUUID, filePath, fileName string, fileSize int64, backupID uint) (*Job, error)

EnqueueS3BackupJob creates and enqueues an S3 backup job

func (*Queue) EnqueueS3DeleteJob

func (q *Queue) EnqueueS3DeleteJob(imageID uint, imageUUID, objectKey, bucketName string, backupID uint) (*Job, error)

EnqueueS3DeleteJob creates and enqueues an S3 delete job

func (*Queue) GetJob

func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID

func (*Queue) GetJobStats

func (q *Queue) GetJobStats(ctx context.Context) (map[JobStatus]int64, error)

GetJobStats returns statistics about job statuses

func (*Queue) GetProcessingSize

func (q *Queue) GetProcessingSize(ctx context.Context) (int64, error)

GetProcessingSize returns the number of jobs being processed

func (*Queue) GetQueueSize

func (q *Queue) GetQueueSize(ctx context.Context) (int64, error)

GetQueueSize returns the number of pending jobs

func (*Queue) ProcessDelayedS3Backups

func (q *Queue) ProcessDelayedS3Backups() error

ProcessDelayedS3Backups finds images older than the configured delay and enqueues backup jobs

func (*Queue) RetryFailedS3Backups

func (q *Queue) RetryFailedS3Backups() error

RetryFailedS3Backups finds and retries failed S3 backup jobs

func (*Queue) Start

func (q *Queue) Start()

Start starts the job queue workers

func (*Queue) Stop

func (q *Queue) Stop()

Stop stops the job queue workers

type S3BackupJobPayload

type S3BackupJobPayload struct {
	ImageID   uint                  `json:"image_id"`
	ImageUUID string                `json:"image_uuid"`
	FilePath  string                `json:"file_path"`
	FileName  string                `json:"file_name"`
	FileSize  int64                 `json:"file_size"`
	Provider  models.BackupProvider `json:"provider"`
	BackupID  uint                  `json:"backup_id"`
}

S3BackupJobPayload contains the payload for S3 backup jobs

func S3BackupJobPayloadFromMap

func S3BackupJobPayloadFromMap(data map[string]interface{}) (*S3BackupJobPayload, error)

FromMap creates a payload from a map

func (S3BackupJobPayload) ToMap

func (p S3BackupJobPayload) ToMap() map[string]interface{}

ToMap converts the payload to a map for storage

type S3DeleteJobPayload

type S3DeleteJobPayload struct {
	ImageID    uint                  `json:"image_id"`
	ImageUUID  string                `json:"image_uuid"`
	ObjectKey  string                `json:"object_key"`
	BucketName string                `json:"bucket_name"`
	Provider   models.BackupProvider `json:"provider"`
	BackupID   uint                  `json:"backup_id"`
}

S3DeleteJobPayload contains the payload for S3 delete jobs

func S3DeleteJobPayloadFromMap

func S3DeleteJobPayloadFromMap(data map[string]interface{}) (*S3DeleteJobPayload, error)

FromMap creates a delete payload from a map

func (S3DeleteJobPayload) ToMap

func (p S3DeleteJobPayload) ToMap() map[string]interface{}

ToMap converts the payload to a map for storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL