Documentation
¶
Index ¶
- Constants
- Variables
- func EnqueueImageProcessing(image *models.Image, enableBackup bool) error
- func ProcessImageUnified(image *models.Image) error
- type DeleteImageJobPayload
- type ImageProcessingJobPayload
- type Job
- type JobStatus
- type JobType
- type Manager
- type MoveImageJobPayload
- type PoolMoveEnqueueJobPayload
- type Queue
- func (q *Queue) EnqueueDeleteImageJob(imageID uint, imageUUID string, fromReportID *uint, initiatedBy *uint) (*Job, error)
- func (q *Queue) EnqueueJob(jobType JobType, payload map[string]interface{}) (*Job, error)
- func (q *Queue) EnqueueS3BackupJob(imageID uint, imageUUID, filePath, fileName string, fileSize int64, ...) (*Job, error)
- func (q *Queue) EnqueueS3DeleteJob(imageID uint, imageUUID, objectKey, bucketName string, backupID uint) (*Job, error)
- func (q *Queue) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (q *Queue) GetJobStats(ctx context.Context) (map[JobStatus]int64, error)
- func (q *Queue) GetProcessingSize(ctx context.Context) (int64, error)
- func (q *Queue) GetQueueSize(ctx context.Context) (int64, error)
- func (q *Queue) ProcessDelayedS3Backups() error
- func (q *Queue) RetryFailedS3Backups() error
- func (q *Queue) Start()
- func (q *Queue) Stop()
- type S3BackupJobPayload
- type S3DeleteJobPayload
Constants ¶
const ( // Redis key prefixes JobKeyPrefix = "job:" JobQueueKey = "job_queue" JobProcessingKey = "job_processing" JobStatsKey = "job_stats" // Job settings DefaultMaxRetries = 3 JobTTL = 24 * time.Hour // Jobs expire after 24 hours )
Variables ¶
var ErrRequeue = fmt.Errorf("requeue job for another node")
Functions ¶
func EnqueueImageProcessing ¶
EnqueueImageProcessing enqueues an image processing job in the unified queue This replaces the old imageprocessor.ProcessImage function
func ProcessImageUnified ¶
ProcessImageUnified is the new unified function that replaces imageprocessor.ProcessImage This function should be used instead of the old imageprocessor.ProcessImage
Types ¶
type DeleteImageJobPayload ¶
type DeleteImageJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
FromReportID *uint `json:"from_report_id,omitempty"`
InitiatedByID *uint `json:"initiated_by_id,omitempty"`
}
DeleteImageJobPayload contains payload for deleting an image and its variants/files asynchronously
func DeleteImageJobPayloadFromMap ¶
func DeleteImageJobPayloadFromMap(data map[string]interface{}) (*DeleteImageJobPayload, error)
func (DeleteImageJobPayload) ToMap ¶
func (p DeleteImageJobPayload) ToMap() map[string]interface{}
type ImageProcessingJobPayload ¶
type ImageProcessingJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
FilePath string `json:"file_path"` // Original file path
FileName string `json:"file_name"` // Original file name
FileType string `json:"file_type"` // File extension (.jpg, .png, etc.)
EnableBackup bool `json:"enable_backup"` // Whether to trigger S3 backup after processing
PoolID uint `json:"pool_id"` // Storage pool ID (routing hint)
NodeID string `json:"node_id"` // Optional node ID (routing hint)
}
ImageProcessingJobPayload contains the payload for image processing jobs
func ImageProcessingJobPayloadFromMap ¶
func ImageProcessingJobPayloadFromMap(data map[string]interface{}) (*ImageProcessingJobPayload, error)
FromMap creates a payload from a map
func (ImageProcessingJobPayload) ToMap ¶
func (p ImageProcessingJobPayload) ToMap() map[string]interface{}
ToMap converts the payload to a map for storage
type Job ¶
type Job struct {
ID string `json:"id"`
Type JobType `json:"type"`
Status JobStatus `json:"status"`
Payload map[string]interface{} `json:"payload"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ProcessedAt *time.Time `json:"processed_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
Job represents a background job
func (*Job) IsRetryable ¶
IsRetryable checks if the job can be retried
func (*Job) MarkAsCompleted ¶
func (j *Job) MarkAsCompleted()
MarkAsCompleted updates the job status to completed
func (*Job) MarkAsFailed ¶
MarkAsFailed updates the job status to failed
func (*Job) MarkAsProcessing ¶
func (j *Job) MarkAsProcessing()
MarkAsProcessing updates the job status to processing
func (*Job) MarkAsRetrying ¶
func (j *Job) MarkAsRetrying()
MarkAsRetrying updates the job status to retrying
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the global job queue and background tasks
func GetManager ¶
func GetManager() *Manager
GetManager returns the global job queue manager (singleton)
type MoveImageJobPayload ¶
type MoveImageJobPayload struct {
ImageID uint `json:"image_id"`
SourcePoolID uint `json:"source_pool_id"`
TargetPoolID uint `json:"target_pool_id"`
}
MoveImageJobPayload contains payload for moving a single image+variants between pools
func MoveImageJobPayloadFromMap ¶
func MoveImageJobPayloadFromMap(data map[string]interface{}) (*MoveImageJobPayload, error)
func (MoveImageJobPayload) ToMap ¶
func (p MoveImageJobPayload) ToMap() map[string]interface{}
type PoolMoveEnqueueJobPayload ¶
type PoolMoveEnqueueJobPayload struct {
SourcePoolID uint `json:"source_pool_id"`
TargetPoolID uint `json:"target_pool_id"`
CursorID uint `json:"cursor_id"` // last processed Image.ID; 0 = start
}
PoolMoveEnqueueJobPayload contains payload for scanning a source pool and enqueuing per-image move jobs
func PoolMoveEnqueueJobPayloadFromMap ¶
func PoolMoveEnqueueJobPayloadFromMap(data map[string]interface{}) (*PoolMoveEnqueueJobPayload, error)
func (PoolMoveEnqueueJobPayload) ToMap ¶
func (p PoolMoveEnqueueJobPayload) ToMap() map[string]interface{}
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue manages background jobs using Redis
func (*Queue) EnqueueDeleteImageJob ¶
func (q *Queue) EnqueueDeleteImageJob(imageID uint, imageUUID string, fromReportID *uint, initiatedBy *uint) (*Job, error)
EnqueueDeleteImageJob enqueues an asynchronous delete job for an image
func (*Queue) EnqueueJob ¶
EnqueueJob adds a new job to the queue
func (*Queue) EnqueueS3BackupJob ¶
func (q *Queue) EnqueueS3BackupJob(imageID uint, imageUUID, filePath, fileName string, fileSize int64, backupID uint) (*Job, error)
EnqueueS3BackupJob creates and enqueues an S3 backup job
func (*Queue) EnqueueS3DeleteJob ¶
func (q *Queue) EnqueueS3DeleteJob(imageID uint, imageUUID, objectKey, bucketName string, backupID uint) (*Job, error)
EnqueueS3DeleteJob creates and enqueues an S3 delete job
func (*Queue) GetJobStats ¶
GetJobStats returns statistics about job statuses
func (*Queue) GetProcessingSize ¶
GetProcessingSize returns the number of jobs being processed
func (*Queue) GetQueueSize ¶
GetQueueSize returns the number of pending jobs
func (*Queue) ProcessDelayedS3Backups ¶
ProcessDelayedS3Backups finds images older than the configured delay and enqueues backup jobs
func (*Queue) RetryFailedS3Backups ¶
RetryFailedS3Backups finds and retries failed S3 backup jobs
type S3BackupJobPayload ¶
type S3BackupJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
FilePath string `json:"file_path"`
FileName string `json:"file_name"`
FileSize int64 `json:"file_size"`
Provider models.BackupProvider `json:"provider"`
BackupID uint `json:"backup_id"`
}
S3BackupJobPayload contains the payload for S3 backup jobs
func S3BackupJobPayloadFromMap ¶
func S3BackupJobPayloadFromMap(data map[string]interface{}) (*S3BackupJobPayload, error)
FromMap creates a payload from a map
func (S3BackupJobPayload) ToMap ¶
func (p S3BackupJobPayload) ToMap() map[string]interface{}
ToMap converts the payload to a map for storage
type S3DeleteJobPayload ¶
type S3DeleteJobPayload struct {
ImageID uint `json:"image_id"`
ImageUUID string `json:"image_uuid"`
ObjectKey string `json:"object_key"`
BucketName string `json:"bucket_name"`
Provider models.BackupProvider `json:"provider"`
BackupID uint `json:"backup_id"`
}
S3DeleteJobPayload contains the payload for S3 delete jobs
func S3DeleteJobPayloadFromMap ¶
func S3DeleteJobPayloadFromMap(data map[string]interface{}) (*S3DeleteJobPayload, error)
FromMap creates a delete payload from a map
func (S3DeleteJobPayload) ToMap ¶
func (p S3DeleteJobPayload) ToMap() map[string]interface{}
ToMap converts the payload to a map for storage