ytask

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: GPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExampleFailureHandler

func ExampleFailureHandler(task Task, err error) (time.Time, bool)

ExampleFailureHandler is a simple failure handler that retries tasks with exponential backoff

func ExampleMiddleware

func ExampleMiddleware(next func(ctx context.Context, task Task) error) func(ctx context.Context, task Task) error

ExampleMiddleware is a simple middleware that logs task execution

Types

type BaseTask

type BaseTask struct {
	ID                string
	Type              string
	Priority          int
	Status            TaskStatus
	FailureCount      int
	NextExecutionTime time.Time
}

BaseTask provides a basic implementation of the Task interface

func NewBaseTask

func NewBaseTask(id, taskType string, priority int) *BaseTask

NewBaseTask creates a new BaseTask with the given parameters

func (*BaseTask) Execute

func (t *BaseTask) Execute(ctx context.Context) error

Execute is a placeholder implementation that should be overridden by embedding structs

func (*BaseTask) GetFailureCount

func (t *BaseTask) GetFailureCount() int

GetFailureCount returns the number of times this task has failed

func (*BaseTask) GetID

func (t *BaseTask) GetID() string

GetID returns the unique identifier of the task

func (*BaseTask) GetNextExecutionTime

func (t *BaseTask) GetNextExecutionTime() time.Time

GetNextExecutionTime returns the next time this task should be executed

func (*BaseTask) GetPriority

func (t *BaseTask) GetPriority() int

GetPriority returns the priority of the task

func (*BaseTask) GetStatus

func (t *BaseTask) GetStatus() TaskStatus

GetStatus returns the current status of the task

func (*BaseTask) GetType

func (t *BaseTask) GetType() string

GetType returns the type of the task

func (*BaseTask) IncrementFailureCount

func (t *BaseTask) IncrementFailureCount()

IncrementFailureCount increments the failure count

func (*BaseTask) IsDuplicate

func (t *BaseTask) IsDuplicate(other Task) bool

IsDuplicate checks if this task is a duplicate of another task Default implementation checks if IDs are the same

func (*BaseTask) SetNextExecutionTime

func (t *BaseTask) SetNextExecutionTime(time time.Time)

SetNextExecutionTime sets the next time this task should be executed

func (*BaseTask) SetStatus

func (t *BaseTask) SetStatus(status TaskStatus)

SetStatus updates the status of the task

type CPUUsageFunc

type CPUUsageFunc func(prevTime *time.Time, prevUsage *float64) float64

CPUUsageFunc is a function type for getting CPU usage

var GetCPUUsage CPUUsageFunc = defaultGetCPUUsage

Current implementations that can be overridden for testing

type ExampleTask

type ExampleTask struct {
	BaseTask
	// Data holds the task's data
	Data string
	// Duration is how long the task will take to execute
	Duration time.Duration
	// ShouldFail indicates if the task should fail
	ShouldFail bool
}

ExampleTask is a simple example implementation of the Task interface

func NewExampleTask

func NewExampleTask(id string, priority int, data string, duration time.Duration, shouldFail bool) *ExampleTask

NewExampleTask creates a new example task

func (*ExampleTask) Execute

func (t *ExampleTask) Execute(ctx context.Context) error

Execute runs the example task

func (*ExampleTask) IsDuplicate

func (t *ExampleTask) IsDuplicate(other Task) bool

IsDuplicate checks if this task is a duplicate of another task For example tasks, we consider them duplicates if they have the same ID and Data

type FailureHandler

type FailureHandler func(task Task, err error) (time.Time, bool)

FailureHandler is a function that handles task failures

type MemoryUsageFunc

type MemoryUsageFunc func() float64

MemoryUsageFunc is a function type for getting memory usage

var GetMemoryUsage MemoryUsageFunc = defaultGetMemoryUsage

type Middleware

type Middleware func(next func(ctx context.Context, task Task) error) func(ctx context.Context, task Task) error

Middleware is a function that wraps task execution

type PriorityMode

type PriorityMode int

PriorityMode defines how tasks with different priorities are scheduled

const (
	// PriorityModePercentage schedules tasks based on priority percentages
	PriorityModePercentage PriorityMode = iota
	// PriorityModeStrict schedules tasks strictly by priority (higher first)
	PriorityModeStrict
)

type ResourceMonitor

type ResourceMonitor struct {
	// contains filtered or unexported fields
}

ResourceMonitor monitors system resources and adjusts concurrency limits

func NewResourceMonitor

func NewResourceMonitor(scheduler *Scheduler, maxCPUUsage, maxMemoryUsage float64) *ResourceMonitor

NewResourceMonitor creates a new resource monitor

func (*ResourceMonitor) SetMaxCPUUsage

func (m *ResourceMonitor) SetMaxCPUUsage(maxCPUUsage float64)

SetMaxCPUUsage sets the maximum CPU usage percentage

func (*ResourceMonitor) SetMaxMemoryUsage

func (m *ResourceMonitor) SetMaxMemoryUsage(maxMemoryUsage float64)

SetMaxMemoryUsage sets the maximum memory usage percentage

func (*ResourceMonitor) SetMinConcurrency

func (m *ResourceMonitor) SetMinConcurrency(minConcurrency int)

SetMinConcurrency sets the minimum concurrency

func (*ResourceMonitor) Start

func (m *ResourceMonitor) Start()

Start starts the resource monitor

func (*ResourceMonitor) Stop

func (m *ResourceMonitor) Stop()

Stop stops the resource monitor

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages task execution

func NewScheduler

func NewScheduler(maxGlobalConcurrency int) *Scheduler

NewScheduler creates a new task scheduler

func (*Scheduler) AddTask

func (s *Scheduler) AddTask(task Task) bool

AddTask adds a task to the scheduler

func (*Scheduler) CancelTask

func (s *Scheduler) CancelTask(taskID string) bool

CancelTask cancels a specific task

func (*Scheduler) CancelTasksByType added in v0.0.2

func (s *Scheduler) CancelTasksByType(taskType string) int

CancelTasksByType cancels all tasks of a specific type

func (*Scheduler) ClearTasksByType added in v0.0.2

func (s *Scheduler) ClearTasksByType(taskType string) int

ClearTasksByType removes all tasks of a specific type without marking them as cancelled and resets all counters for this task type to 0

func (*Scheduler) GetGlobalStats

func (s *Scheduler) GetGlobalStats() TaskStats

GetGlobalStats returns global task statistics

func (*Scheduler) GetStats

func (s *Scheduler) GetStats() map[string]*TaskStats

GetStats returns statistics for all task types

func (*Scheduler) PauseNamespace added in v0.0.2

func (s *Scheduler) PauseNamespace(namespace string)

PauseNamespace pauses processing of all task types in a namespace. This prevents any tasks in the namespace from being executed until ResumeNamespace is called. Tasks that are already executing will continue to run until completion.

func (*Scheduler) PauseTaskType

func (s *Scheduler) PauseTaskType(taskType string)

PauseTaskType pauses processing of a specific task type

func (*Scheduler) RegisterTaskType

func (s *Scheduler) RegisterTaskType(taskType string, config TaskTypeConfig)

RegisterTaskType registers a new task type with the scheduler

func (*Scheduler) RegisterTaskTypeWithNamespace added in v0.0.2

func (s *Scheduler) RegisterTaskTypeWithNamespace(taskType string, namespace string, config TaskTypeConfig)

RegisterTaskTypeWithNamespace registers a new task type with the scheduler and associates it with a namespace. This allows for grouping related task types and applying concurrency limits at the namespace level. If namespace is empty, the task type is not associated with any namespace.

func (*Scheduler) ResumeNamespace added in v0.0.2

func (s *Scheduler) ResumeNamespace(namespace string)

ResumeNamespace resumes processing of all task types in a namespace. This allows tasks in the namespace to be executed after they were paused with PauseNamespace. The scheduler will start processing tasks in the namespace on the next processing cycle.

func (*Scheduler) ResumeTaskType

func (s *Scheduler) ResumeTaskType(taskType string)

ResumeTaskType resumes processing of a specific task type

func (*Scheduler) SetMaxGlobalConcurrency

func (s *Scheduler) SetMaxGlobalConcurrency(max int)

SetMaxGlobalConcurrency updates the maximum global concurrency

func (*Scheduler) SetNamespaceMaxConcurrency added in v0.0.2

func (s *Scheduler) SetNamespaceMaxConcurrency(namespace string, max int)

SetNamespaceMaxConcurrency sets the maximum concurrency for a namespace. This limits the total number of tasks that can be executed concurrently across all task types in the namespace. If max <= 0, the concurrency limit for the namespace is removed.

func (*Scheduler) Start

func (s *Scheduler) Start()

Start starts the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the scheduler and waits for all tasks to complete

func (*Scheduler) Use

func (s *Scheduler) Use(middleware Middleware)

Use adds middleware to the scheduler

type Task

type Task interface {
	// Execute runs the task and returns any error
	Execute(ctx context.Context) error

	// GetID returns the unique identifier of the task
	GetID() string

	// GetType returns the type of the task
	GetType() string

	// GetPriority returns the priority of the task
	GetPriority() int

	// GetStatus returns the current status of the task
	GetStatus() TaskStatus

	// SetStatus updates the status of the task
	SetStatus(status TaskStatus)

	// GetFailureCount returns the number of times this task has failed
	GetFailureCount() int

	// IncrementFailureCount increments the failure count
	IncrementFailureCount()

	// GetNextExecutionTime returns the next time this task should be executed
	GetNextExecutionTime() time.Time

	// SetNextExecutionTime sets the next time this task should be executed
	SetNextExecutionTime(time time.Time)

	// IsDuplicate checks if this task is a duplicate of another task
	IsDuplicate(other Task) bool
}

Task is the interface that all tasks must implement

type TaskStats

type TaskStats struct {
	Queued    int64
	Executing int64
	Completed int64
	Failed    int64
	Paused    int64
	Cancelled int64
}

TaskStats holds statistics for tasks

type TaskStatus

type TaskStatus string

TaskStatus represents the current status of a task

const (
	TaskStatusQueued    TaskStatus = "queued"
	TaskStatusExecuting TaskStatus = "executing"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusPaused    TaskStatus = "paused"
	TaskStatusCancelled TaskStatus = "cancelled"
)

type TaskTypeConfig

type TaskTypeConfig struct {
	// MaxConcurrency is the maximum number of concurrent tasks of this type
	MaxConcurrency int
	// PriorityLevels is the number of priority levels for this task type
	PriorityLevels int
	// PriorityMode determines how priorities are handled
	PriorityMode PriorityMode
	// PriorityPercentages defines the percentage of tasks to execute at each priority level
	// Only used when PriorityMode is PriorityModePercentage
	PriorityPercentages []int
	// FailureHandler is called when a task fails
	FailureHandler FailureHandler
	// FilterDuplicates determines whether to filter duplicate tasks
	FilterDuplicates bool
}

TaskTypeConfig holds configuration for a specific task type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL