Skip to content

pardnchiu/go-queue

Repository files navigation

Note

This README was translated by ChatGPT 5-mini, get the original version from here.

cover

Go Task Queue

pkg card codecov version license

Lightweight Golang priority queue that supports bounded concurrency, priority promotion, error retry, and graceful shutdown. Maximizes hardware utilization and prevents system overload.
Suitable for scenarios that need controlled concurrent task execution with priority scheduling.

Core Features

Bounded Concurrency

Configurable worker pool size (default: CPU cores × 2). Tasks beyond worker capacity queue up to avoid system overload while maximizing hardware utilization.

Priority Queue and Promotion

A five-level priority system implemented with a min-heap (Immediate > High > Retry > Normal > Low). Higher-priority tasks execute first; tasks with the same priority are processed FIFO. Tasks that wait for a long time are automatically promoted to a higher priority to prevent starvation. Promotion thresholds are calculated based on configured timeouts.

Error Retry

When a task fails, it can be automatically retried. Retry tasks are re-enqueued with Retry priority, which is between High and Normal, providing a delayed retry effect.

Flowchart

Main Flow
graph TB
  User[User]

  subgraph "Actions"
  New[New]
  Start[Start]
  Enqueue[Enqueue Task]
  Close[Shutdown]
  Options[WithTaskID<br>WithTimeout<br>WithCallback<br>WithRetry]
  end

  subgraph "Task Management"
  Pending[pending queue]
  TaskHeap[taskHeap<br>promotion]
  Task[task]
  end

  subgraph "Execution"
  Worker1[Worker 1]
  Worker2[Worker 2]
  WorkerN[Worker N...]
  Execute[execute task]
  Callback[callback]
  Retry[setRetry retry]
  end

  subgraph "Priority System"
  Priority[priority]
  Immediate[Immediate]
  High[High]
  RetryPri[Retry]
  Normal[Normal]
  Low[Low]
  InsertAt[insertAt / ordering]
  end

  User --> New
  User --> Start
  User --> Enqueue
  User --> Close

  Callback --> |async| User

  Start -->|start| Execution

  Enqueue -->|options| Options
  Enqueue -->|create| Task
  Task -->|push| Pending
  
  Pending -->|manage| TaskHeap
  InsertAt --> Pending
  TaskHeap -->|sort| Priority

  Priority -.-> Immediate
  Priority -.-> High
  Priority -.-> RetryPri
  Priority -.-> Normal
  Priority -.-> Low

  Immediate -.-> InsertAt
  High -.-> InsertAt
  RetryPri -.-> InsertAt
  Normal -.-> InsertAt
  Low -.-> InsertAt

  Pending --> |pop| Worker1
  Pending --> |pop| Worker2
  Pending --> |pop| WorkerN

  Worker1 -->|execute| Execute
  Worker2 -->|execute| Execute
  WorkerN -->|execute| Execute

  Execute -->|success| Callback
  Execute -->|fail & retryable| Retry
  Retry -->|re-enqueue| Pending
Loading
Priority Promotion
stateDiagram
  [*] --> Low: create task
  [*] --> Normal: create task
  [*] --> High: create task
  [*] --> Immediate: create task
  
  Low --> Normal: wait >= promotion.After
  Normal --> High: wait >= promotion.After
  High --> [*]: executed
  Immediate --> [*]: executed
  
  note right of Low
  timeout
  range 30-120s
  end note
  
  note right of Normal
  timeout * 2
  range 30-120s
  end note
Loading
Retry Flow
stateDiagram
  [*] --> Execute: task starts
  
  Execute --> Success: execution succeeds
  Execute --> Failed: execution fails
  
  Success --> Callback: trigger callback
  Callback --> [*]
  
  Failed --> CheckRetry: check retry
  
  CheckRetry --> SetRetry: retryOn && retryTimes < retryMax
  CheckRetry --> Exhausted: retryTimes >= retryMax
  CheckRetry --> NoRetry: retryOn == false
  
  SetRetry --> Pending: re-enqueue with Retry priority
  Pending --> Execute: wait for execution
  
  Exhausted --> [*]: log task.exhausted
  NoRetry --> [*]: log task.failed
Loading

Usage

Installation

Note

The latest commit may change; using tagged versions is recommended.
Commits that only change documentation may be rebased later.

go get github.com/pardnchiu/go-queue@[VERSION]

git clone --depth 1 --branch [VERSION] https://github.com/pardnchiu/go-queue.git

Initialization

Basic Usage

package main

import (
  "context"
  "fmt"
  "time"
  
  queue "github.com/pardnchiu/go-queue"
)

func main() {
  // Initialize with default config
  q := queue.New(nil)
  
  // Start queue workers
  ctx := context.Background()
  q.Start(ctx)
  
  // Enqueue tasks
  for i := 0; i < 10; i++ {
    id, err := q.Enqueue(ctx, "", func(ctx context.Context) error {
      fmt.Println("task executed")
      return nil
    })
    if err != nil {
      fmt.Printf("enqueue failed: %v\n", err)
    }
    fmt.Printf("task ID: %s\n", id)
  }
  
  // Graceful shutdown (wait for all tasks to finish)
  if err := q.Shutdown(ctx); err != nil {
    fmt.Printf("shutdown error: %v\n", err)
  }
}

Using Preset Configs

package main

import (
  "context"
  "fmt"
  
  queue "github.com/pardnchiu/go-queue"
)

func main() {
  q := queue.New(&queue.Config{
    Workers: 8,
    Size:    1000,
    Timeout: 60,
    Preset: map[string]queue.PresetConfig{
      "critical": {Priority: PriorityImmediate, Timeout: 15},
      "email":    {Priority: PriorityHigh, Timeout: 30},
      "report":   {Priority: PriorityNormal, Timeout: 120},
      "cleanup":  {Priority: PriorityLow, Timeout: 300},
    },
  })
  
  ctx := context.Background()
  q.Start(ctx)
  defer q.Shutdown(ctx)
  
  // Use preset config for critical payment
  q.Enqueue(ctx, "critical", func(ctx context.Context) error {
    return processPayment()
  })
  
  // Use preset config to send email
  q.Enqueue(ctx, "email", func(ctx context.Context) error {
    return sendNotification()
  })
  
  // Use preset config to generate report
  q.Enqueue(ctx, "report", func(ctx context.Context) error {
    return generateReport()
  })
}

Using Options

package main

import (
  "context"
  "fmt"
  "time"
  
  queue "github.com/pardnchiu/go-queue"
)

func main() {
  q := queue.New(&queue.Config{Workers: 4})
  
  ctx := context.Background()
  q.Start(ctx)
  defer q.Shutdown(ctx)
  
  // Custom task ID
  q.Enqueue(ctx, "", func(ctx context.Context) error {
    return processOrder("ORD-123")
  }, queue.WithTaskID("order-ORD-123"))
  
  // Custom timeout
  q.Enqueue(ctx, "", func(ctx context.Context) error {
    return heavyComputation()
  }, queue.WithTimeout(5*time.Minute))
  
  // Custom callback (only triggered on success)
  q.Enqueue(ctx, "", func(ctx context.Context) error {
    return sendEmail()
  }, queue.WithCallback(func(id string) {
    fmt.Printf("task %s completed\n", id)
  }))
  
  // Combined options
  q.Enqueue(ctx, "", func(ctx context.Context) error {
    return importData()
  },
    queue.WithTaskID("import-daily"),
    queue.WithTimeout(10*time.Minute),
    queue.WithCallback(func(id string) {
      logSuccess(id)
    }),
  )
}

Configuration

type Config struct {
  Workers int                     // worker pool size (default: CPU cores × 2)
  Size    int                     // max queue capacity (default: Workers × 64)
  Timeout int                     // default timeout in seconds (default: 30)
  Preset  map[string]PresetConfig // named preset configs
}

type PresetConfig struct {
  Priority Priority         // nil = PriorityNormal
  Timeout  time.Duration    // override timeout in seconds (0 = computed by priority)
}

Priority Levels

Priority Value Timeout Calculation Use Case
Immediate 0 timeout / 4 (15-120s) Payments, alerts
High 1 timeout / 2 (15-120s) User-initiated ops
Retry 2 timeout / 2 (15-120s) Failed task retry (internal use)
Normal 3 timeout (15-120s) Background tasks
Low 4 timeout × 2 (15-120s) Cleanup, analytics

Note: Retry priority is managed internally by the system and cannot be directly specified by users.

Available Functions

Queue Management

  • New(config) - create a new queue instance

    q := queue.New(&queue.Config{
      Workers: 4,
      Size:    256,
      Timeout: 60,
    })
  • Start(ctx) - start the worker pool

    q.Start(ctx)
  • Enqueue(ctx, preset, action, options...) - add a task to the queue

    id, err := q.Enqueue(ctx, "email", func(ctx context.Context) error {
      return sendEmail()
    })
  • Shutdown(ctx) - graceful shutdown

    // wait indefinitely for all tasks to finish
    err := q.Shutdown(context.Background())
    
    // with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    err := q.Shutdown(ctx)

Enqueue Options

  • WithTaskID(id) - set a custom task ID

    queue.WithTaskID("unique-task-id")
  • WithTimeout(duration) - set task timeout

    queue.WithTimeout(5 * time.Minute)
  • WithCallback(fn) - set completion callback (only triggered on success)

    queue.WithCallback(func(id string) {
    // called asynchronously after task succeeds
    })
  • WithRetry(maxRetry) - enable error retry

    // use default max retries (3)
    queue.WithRetry(nil)
    
    // specify max retries
    maxRetry := 5
    queue.WithRetry(&maxRetry)

Priority Promotion

Waiting tasks in the queue are automatically promoted to prevent starvation:

Original Priority Promotion Target Wait Threshold
Low Normal clamp(timeout, 30s, 120s)
Normal High clamp(timeout × 2, 30s, 120s)
Retry - no promotion (managed by retry mechanism)
High - no promotion (already high priority)
Immediate - no promotion (highest priority)

Promotion is checked each time a worker pops a task from the queue.

Retry Mechanism

When WithRetry is enabled, failed tasks are automatically retried:

Stage Behavior
Failed and retryTimes < retryMax Re-enqueued with Retry priority
Failed and retryTimes >= retryMax Log task.exhausted, terminate task
Success Trigger callback (if configured)

Retry Characteristics:

  • Retry tasks have Retry priority, which is between High and Normal
  • Each retry resets startAt, entering the end of the queue (within same priority)
  • Retry priority does not participate in automatic promotion to avoid interfering with normal priority scheduling
  • Default max retries is 3, customizable via WithRetry(&maxRetry)

Timeout Mechanism

  • Each task has a timeout based on priority or explicit configuration.
  • Use WithTimeout to override the default timeout.
  • On timeout:
    • the task's context is canceled
    • an error is logged
    • if retry is enabled, it's treated as a failure and retry is attempted
    • goroutine leak detection: if the task doesn't respond to cancellation within 5 seconds, a warning is emitted

Use Cases

Use Case Configuration
API background jobs Workers: CPU×2, Timeout: 30s
Email/notifications Priority: High, Timeout: 60s
Report generation Priority: Normal, Timeout: 300s
Data cleanup Priority: Low, Timeout: 600s
Payment processing Priority: Immediate, Timeout: 15s

Works well integrated with go-scheduler:

// Scheduler triggers, Queue controls concurrency
scheduler.Add("@every 1m", func() error {
  orders := db.GetPendingOrders()
  for _, o := range orders {
    queue.Enqueue(ctx, "order", func(ctx context.Context) error {
      return processOrder(o)
    })
  }
  return nil
})

License

This project is licensed under MIT.

Author

邱敬幃 Pardn Chiu

Stars

Star


©️ 2025 邱敬幃 Pardn Chiu

About

Priority-based queue with automatic timeout promotion

Topics

Resources

License

Stars

Watchers

Forks

Languages