Skip to content

RFC: Task Runtime Storage (TRS) #912

Open
@spirali

Description

@spirali

RFC: Task Runtime Storage (TRS)

1. Abstract 📝

This RFC proposes the Task Runtime Storage (TRS) feature for HyperQueue. TRS provides a simple, persistent key-value store associated with each individual task. This store is managed by the HQ server, survives task restarts (e.g., due to worker crashes), and its state is persisted in the server's journal, ensuring data integrity across server restarts. The primary goals are to simplify the implementation of iterative tasks, enable user-assisted task resilience, and allow for external progress monitoring.


2. Motivation 🤔

Long-running or iterative computations often involve multiple steps or stages. If a task fails midway, re-running it from the beginning can be computationally expensive and time-consuming. Users currently need to implement custom checkpointing and recovery mechanisms outside of HyperQueue.

TRS aims to provide a lightweight, integrated solution for tasks to:

  • Store critical progress indicators: For example, the current iteration number, processed data points, or the path to the last successfully written intermediate file.
  • Resume from the last known state: Upon restart, tasks can query TRS to retrieve their previous state and continue processing, avoiding redundant work.
  • Report fine-grained progress: External systems or users can query TRS to monitor the progress of a task beyond simple running/finished states.

This feature will lower the barrier for users to build more robust and observable distributed applications with HyperQueue.


3. Goals ✅

  • Provide a per-task key-value storage mechanism.
  • Ensure TRS data persistence across task restarts (e.g., due to worker failures).
  • Ensure TRS data persistence across HQ server restarts (via journaling).
  • Allow tasks to read and write to their own TRS during execution.
  • Allow external clients (e.g., CLI, Python API) to read TRS data for a specific task.
  • Keep the TRS mechanism simple and lightweight, with clearly defined (and limited) storage capacity per task.
  • Transactional guarantees across multiple keys: System allows to update multiple keys atomically

4. Non-Goals ❌

  • Large-scale data storage: TRS is not intended for storing large datasets or intermediate results. Users should continue using distributed file systems or databases for such purposes. TRS is for small pieces of metadata.
  • Inter-task communication: TRS is scoped to a single task. It's not designed as a direct communication channel between different tasks.
  • Complex query capabilities: TRS will offer simple key-based lookups. Advanced querying or indexing is out of scope.

5. Proposed Design 💡

TRS will manifest as a small, dictionary-like structure associated with each task, managed by the HQ server.

5.1. Core Features

  • Task-Scoped: Each task instance will have its own independent TRS.
  • Server-Side Storage: The TRS data will reside on the HQ server.
  • Limited Size: A strict, configurable limit (e.g., a few kilobytes, a small number of key-value pairs) will be enforced per task to prevent abuse and manage server resources. The exact default and maximum limits need to be determined. Values will likely be restricted to string types initially for simplicity, with potential for other simple types (int, float, bool) later.
  • Persistence:
    • Task Restarts: When a task is preempted or fails and is subsequently restarted on the same or a different worker, it will have access to the last successfully committed state of its TRS.
    • Server Restarts: All TRS modifications will be written to the HQ server's journal. Upon server restart, the TRS data for all tasks will be reloaded from the journal.
  • Access Mechanisms:
    • From within the task:
      • CLI: A command like hq task trs set <KEY> <VALUE> and hq task trs get <KEY> will be available within the task's execution environment.
      • Environment Variables: Upon task (re)start, the current TRS key-value pairs can be exposed as environment variables (e.g., HQ_TRS_<KEY>=<VALUE>). This provides a simple way for scripts to access initial TRS state. Updates during runtime would still require the CLI or an API.
      • Python API (Future): A Python library function could offer more programmatic access.
    • From outside the task (external access):
      • CLI: hq task <JOB_ID> <TASK_ID> trs get <KEY> (or hq task <JOB_ID> <TASK_ID> trs list to see all keys).
      • Python API (Future): Functions to read TRS data for a specific task.

5.2. Data Flow

  1. Task Initialization: When a task starts for the first time, its TRS is empty.
  2. Task (Re)start with TRS Data: If a task is restarting, the HQ server provides the current TRS state to the worker. This can be done by passing the key-value pairs as environment variables (e.g., HQ_TRS_step="0").
  3. Task Modifies TRS:
    • The task executes hq task-store set mykey myvalue.
    • This command communicates the change to the assigned worker.
    • The worker forwards the update request to the HQ server.
    • The HQ server validates the update (e.g., size limits), updates the TRS in memory, and writes the change to its journal.
    • An acknowledgment is sent back to the worker, and subsequently to the hq command.
  4. Task Reads TRS:
    • The task executes hq task-store get mykey.
    • This command communicates with the worker, which may have a cached version or request it from the server.
  5. External Read:
    • User executes hq task <JOB_ID> <TASK_ID> task-store get mykey.
    • The CLI communicates with the HQ server.
    • The server retrieves the value for mykey from the specified task's TRS and returns it.
  6. Task Failure and Restart:
    • A worker crashes, and the task running on it is lost.
    • The HQ server reschedules the task.
    • When the task starts on a new worker, the server provides the last successfully journaled TRS state to the new worker (e.g., via environment variables as in step 2). The task can then use this information to resume.

6. Use Cases 🚀

6.1. Iterative Computations Resilience

Consider a program performing an iterative refinement:

Current (Pseudocode):

step = 0
while not is_precision_enough():
    do_computation(step)
    write_data(f"output_step_{step}.dat")
    step += 1

With TRS (Pseudocode):

import os
import subprocess

# Check if restarting by looking for a TRS variable
if "HQ_TRS_STEP" in os.environ:
    step = int(os.environ["HQ_TRS_step"])
    load_data(f"step_{step}.dat")
    print(f"Resuming from step {step}")
    step += 1
else:
    step = 0
    print("Starting from scratch, step 0")

while not is_precision_enough(step): # Assuming precision check might depend on step
    do_computation(step)
    write_data(f"step_{step}.dat") # Save current step's data

    step += 1
    subprocess.run(["hq", "task-store", "set", "step", str(step)])

Explanation:

  • When the task starts, it checks for the HQ_TRS_step environment variable.
  • If found, it means the task is resuming. It parses the step number and can potentially load its state from a file corresponding to the previous successfully completed step.
  • If not found, it starts from step = 0.
  • After each successful computation and data write for the current step, it uses hq task-state set step <step>.

6.2. Progress Monitoring

An external user or script can monitor the progress of the iterative task:

hq task <JOB_ID> <TASK_ID> task-store get step

This command would return the current value of the "step" key, indicating how far the task has progressed. This is more granular than just seeing if the task is "RUNNING".


7. Alternatives Considered 🤔

  • User-Managed Checkpointing via Shared Filesystem: This is the current approach. It's flexible but requires more setup from the user, boilerplate code, and a shared filesystem. TRS aims for a simpler, integrated solution for small metadata.
  • Dedicated Checkpointing Service: Integrating with an external checkpointing service could be an option but adds external dependencies and complexity. TRS is envisioned as a lightweight, built-in feature.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions