Documentation
¶
Index ¶
- func ExampleFailureHandler(task Task, err error) (time.Time, bool)
- func ExampleMiddleware(next func(ctx context.Context, task Task) error) func(ctx context.Context, task Task) error
- type BaseTask
- func (t *BaseTask) Execute(ctx context.Context) error
- func (t *BaseTask) GetFailureCount() int
- func (t *BaseTask) GetID() string
- func (t *BaseTask) GetNextExecutionTime() time.Time
- func (t *BaseTask) GetPriority() int
- func (t *BaseTask) GetStatus() TaskStatus
- func (t *BaseTask) GetType() string
- func (t *BaseTask) IncrementFailureCount()
- func (t *BaseTask) IsDuplicate(other Task) bool
- func (t *BaseTask) SetNextExecutionTime(time time.Time)
- func (t *BaseTask) SetStatus(status TaskStatus)
- type CPUUsageFunc
- type ExampleTask
- type FailureHandler
- type MemoryUsageFunc
- type Middleware
- type PriorityMode
- type ResourceMonitor
- type Scheduler
- func (s *Scheduler) AddTask(task Task) bool
- func (s *Scheduler) CancelTask(taskID string) bool
- func (s *Scheduler) CancelTasksByType(taskType string) int
- func (s *Scheduler) ClearTasksByType(taskType string) int
- func (s *Scheduler) GetGlobalStats() TaskStats
- func (s *Scheduler) GetStats() map[string]*TaskStats
- func (s *Scheduler) PauseNamespace(namespace string)
- func (s *Scheduler) PauseTaskType(taskType string)
- func (s *Scheduler) RegisterTaskType(taskType string, config TaskTypeConfig)
- func (s *Scheduler) RegisterTaskTypeWithNamespace(taskType string, namespace string, config TaskTypeConfig)
- func (s *Scheduler) ResumeNamespace(namespace string)
- func (s *Scheduler) ResumeTaskType(taskType string)
- func (s *Scheduler) SetMaxGlobalConcurrency(max int)
- func (s *Scheduler) SetNamespaceMaxConcurrency(namespace string, max int)
- func (s *Scheduler) Start()
- func (s *Scheduler) Stop()
- func (s *Scheduler) Use(middleware Middleware)
- type Task
- type TaskStats
- type TaskStatus
- type TaskTypeConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExampleFailureHandler ¶
ExampleFailureHandler is a simple failure handler that retries tasks with exponential backoff
Types ¶
type BaseTask ¶
type BaseTask struct {
ID string
Type string
Priority int
Status TaskStatus
FailureCount int
NextExecutionTime time.Time
}
BaseTask provides a basic implementation of the Task interface
func NewBaseTask ¶
NewBaseTask creates a new BaseTask with the given parameters
func (*BaseTask) Execute ¶
Execute is a placeholder implementation that should be overridden by embedding structs
func (*BaseTask) GetFailureCount ¶
GetFailureCount returns the number of times this task has failed
func (*BaseTask) GetNextExecutionTime ¶
GetNextExecutionTime returns the next time this task should be executed
func (*BaseTask) GetPriority ¶
GetPriority returns the priority of the task
func (*BaseTask) GetStatus ¶
func (t *BaseTask) GetStatus() TaskStatus
GetStatus returns the current status of the task
func (*BaseTask) IncrementFailureCount ¶
func (t *BaseTask) IncrementFailureCount()
IncrementFailureCount increments the failure count
func (*BaseTask) IsDuplicate ¶
IsDuplicate checks if this task is a duplicate of another task Default implementation checks if IDs are the same
func (*BaseTask) SetNextExecutionTime ¶
SetNextExecutionTime sets the next time this task should be executed
func (*BaseTask) SetStatus ¶
func (t *BaseTask) SetStatus(status TaskStatus)
SetStatus updates the status of the task
type CPUUsageFunc ¶
CPUUsageFunc is a function type for getting CPU usage
var GetCPUUsage CPUUsageFunc = defaultGetCPUUsage
Current implementations that can be overridden for testing
type ExampleTask ¶
type ExampleTask struct {
BaseTask
// Data holds the task's data
Data string
// Duration is how long the task will take to execute
Duration time.Duration
// ShouldFail indicates if the task should fail
ShouldFail bool
}
ExampleTask is a simple example implementation of the Task interface
func NewExampleTask ¶
func NewExampleTask(id string, priority int, data string, duration time.Duration, shouldFail bool) *ExampleTask
NewExampleTask creates a new example task
func (*ExampleTask) Execute ¶
func (t *ExampleTask) Execute(ctx context.Context) error
Execute runs the example task
func (*ExampleTask) IsDuplicate ¶
func (t *ExampleTask) IsDuplicate(other Task) bool
IsDuplicate checks if this task is a duplicate of another task For example tasks, we consider them duplicates if they have the same ID and Data
type FailureHandler ¶
FailureHandler is a function that handles task failures
type MemoryUsageFunc ¶
type MemoryUsageFunc func() float64
MemoryUsageFunc is a function type for getting memory usage
var GetMemoryUsage MemoryUsageFunc = defaultGetMemoryUsage
type Middleware ¶
type Middleware func(next func(ctx context.Context, task Task) error) func(ctx context.Context, task Task) error
Middleware is a function that wraps task execution
type PriorityMode ¶
type PriorityMode int
PriorityMode defines how tasks with different priorities are scheduled
const ( // PriorityModePercentage schedules tasks based on priority percentages PriorityModePercentage PriorityMode = iota // PriorityModeStrict schedules tasks strictly by priority (higher first) PriorityModeStrict )
type ResourceMonitor ¶
type ResourceMonitor struct {
// contains filtered or unexported fields
}
ResourceMonitor monitors system resources and adjusts concurrency limits
func NewResourceMonitor ¶
func NewResourceMonitor(scheduler *Scheduler, maxCPUUsage, maxMemoryUsage float64) *ResourceMonitor
NewResourceMonitor creates a new resource monitor
func (*ResourceMonitor) SetMaxCPUUsage ¶
func (m *ResourceMonitor) SetMaxCPUUsage(maxCPUUsage float64)
SetMaxCPUUsage sets the maximum CPU usage percentage
func (*ResourceMonitor) SetMaxMemoryUsage ¶
func (m *ResourceMonitor) SetMaxMemoryUsage(maxMemoryUsage float64)
SetMaxMemoryUsage sets the maximum memory usage percentage
func (*ResourceMonitor) SetMinConcurrency ¶
func (m *ResourceMonitor) SetMinConcurrency(minConcurrency int)
SetMinConcurrency sets the minimum concurrency
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages task execution
func NewScheduler ¶
NewScheduler creates a new task scheduler
func (*Scheduler) CancelTask ¶
CancelTask cancels a specific task
func (*Scheduler) CancelTasksByType ¶ added in v0.0.2
CancelTasksByType cancels all tasks of a specific type
func (*Scheduler) ClearTasksByType ¶ added in v0.0.2
ClearTasksByType removes all tasks of a specific type without marking them as cancelled and resets all counters for this task type to 0
func (*Scheduler) GetGlobalStats ¶
GetGlobalStats returns global task statistics
func (*Scheduler) PauseNamespace ¶ added in v0.0.2
PauseNamespace pauses processing of all task types in a namespace. This prevents any tasks in the namespace from being executed until ResumeNamespace is called. Tasks that are already executing will continue to run until completion.
func (*Scheduler) PauseTaskType ¶
PauseTaskType pauses processing of a specific task type
func (*Scheduler) RegisterTaskType ¶
func (s *Scheduler) RegisterTaskType(taskType string, config TaskTypeConfig)
RegisterTaskType registers a new task type with the scheduler
func (*Scheduler) RegisterTaskTypeWithNamespace ¶ added in v0.0.2
func (s *Scheduler) RegisterTaskTypeWithNamespace(taskType string, namespace string, config TaskTypeConfig)
RegisterTaskTypeWithNamespace registers a new task type with the scheduler and associates it with a namespace. This allows for grouping related task types and applying concurrency limits at the namespace level. If namespace is empty, the task type is not associated with any namespace.
func (*Scheduler) ResumeNamespace ¶ added in v0.0.2
ResumeNamespace resumes processing of all task types in a namespace. This allows tasks in the namespace to be executed after they were paused with PauseNamespace. The scheduler will start processing tasks in the namespace on the next processing cycle.
func (*Scheduler) ResumeTaskType ¶
ResumeTaskType resumes processing of a specific task type
func (*Scheduler) SetMaxGlobalConcurrency ¶
SetMaxGlobalConcurrency updates the maximum global concurrency
func (*Scheduler) SetNamespaceMaxConcurrency ¶ added in v0.0.2
SetNamespaceMaxConcurrency sets the maximum concurrency for a namespace. This limits the total number of tasks that can be executed concurrently across all task types in the namespace. If max <= 0, the concurrency limit for the namespace is removed.
func (*Scheduler) Stop ¶
func (s *Scheduler) Stop()
Stop stops the scheduler and waits for all tasks to complete
func (*Scheduler) Use ¶
func (s *Scheduler) Use(middleware Middleware)
Use adds middleware to the scheduler
type Task ¶
type Task interface {
// Execute runs the task and returns any error
Execute(ctx context.Context) error
// GetID returns the unique identifier of the task
GetID() string
// GetType returns the type of the task
GetType() string
// GetPriority returns the priority of the task
GetPriority() int
// GetStatus returns the current status of the task
GetStatus() TaskStatus
// SetStatus updates the status of the task
SetStatus(status TaskStatus)
// GetFailureCount returns the number of times this task has failed
GetFailureCount() int
// IncrementFailureCount increments the failure count
IncrementFailureCount()
// GetNextExecutionTime returns the next time this task should be executed
GetNextExecutionTime() time.Time
// SetNextExecutionTime sets the next time this task should be executed
SetNextExecutionTime(time time.Time)
// IsDuplicate checks if this task is a duplicate of another task
IsDuplicate(other Task) bool
}
Task is the interface that all tasks must implement
type TaskStats ¶
type TaskStats struct {
Queued int64
Executing int64
Completed int64
Failed int64
Paused int64
Cancelled int64
}
TaskStats holds statistics for tasks
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current status of a task
const ( TaskStatusQueued TaskStatus = "queued" TaskStatusExecuting TaskStatus = "executing" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusPaused TaskStatus = "paused" TaskStatusCancelled TaskStatus = "cancelled" )
type TaskTypeConfig ¶
type TaskTypeConfig struct {
// MaxConcurrency is the maximum number of concurrent tasks of this type
MaxConcurrency int
// PriorityLevels is the number of priority levels for this task type
PriorityLevels int
// PriorityMode determines how priorities are handled
PriorityMode PriorityMode
// PriorityPercentages defines the percentage of tasks to execute at each priority level
// Only used when PriorityMode is PriorityModePercentage
PriorityPercentages []int
// FailureHandler is called when a task fails
FailureHandler FailureHandler
// FilterDuplicates determines whether to filter duplicate tasks
FilterDuplicates bool
}
TaskTypeConfig holds configuration for a specific task type