Description
RFC: Task Runtime Storage (TRS)
1. Abstract 📝
This RFC proposes the Task Runtime Storage (TRS) feature for HyperQueue. TRS provides a simple, persistent key-value store associated with each individual task. This store is managed by the HQ server, survives task restarts (e.g., due to worker crashes), and its state is persisted in the server's journal, ensuring data integrity across server restarts. The primary goals are to simplify the implementation of iterative tasks, enable user-assisted task resilience, and allow for external progress monitoring.
2. Motivation 🤔
Long-running or iterative computations often involve multiple steps or stages. If a task fails midway, re-running it from the beginning can be computationally expensive and time-consuming. Users currently need to implement custom checkpointing and recovery mechanisms outside of HyperQueue.
TRS aims to provide a lightweight, integrated solution for tasks to:
- Store critical progress indicators: For example, the current iteration number, processed data points, or the path to the last successfully written intermediate file.
- Resume from the last known state: Upon restart, tasks can query TRS to retrieve their previous state and continue processing, avoiding redundant work.
- Report fine-grained progress: External systems or users can query TRS to monitor the progress of a task beyond simple running/finished states.
This feature will lower the barrier for users to build more robust and observable distributed applications with HyperQueue.
3. Goals ✅
- Provide a per-task key-value storage mechanism.
- Ensure TRS data persistence across task restarts (e.g., due to worker failures).
- Ensure TRS data persistence across HQ server restarts (via journaling).
- Allow tasks to read and write to their own TRS during execution.
- Allow external clients (e.g., CLI, Python API) to read TRS data for a specific task.
- Keep the TRS mechanism simple and lightweight, with clearly defined (and limited) storage capacity per task.
- Transactional guarantees across multiple keys: System allows to update multiple keys atomically
4. Non-Goals ❌
- Large-scale data storage: TRS is not intended for storing large datasets or intermediate results. Users should continue using distributed file systems or databases for such purposes. TRS is for small pieces of metadata.
- Inter-task communication: TRS is scoped to a single task. It's not designed as a direct communication channel between different tasks.
- Complex query capabilities: TRS will offer simple key-based lookups. Advanced querying or indexing is out of scope.
5. Proposed Design 💡
TRS will manifest as a small, dictionary-like structure associated with each task, managed by the HQ server.
5.1. Core Features
- Task-Scoped: Each task instance will have its own independent TRS.
- Server-Side Storage: The TRS data will reside on the HQ server.
- Limited Size: A strict, configurable limit (e.g., a few kilobytes, a small number of key-value pairs) will be enforced per task to prevent abuse and manage server resources. The exact default and maximum limits need to be determined. Values will likely be restricted to string types initially for simplicity, with potential for other simple types (int, float, bool) later.
- Persistence:
- Task Restarts: When a task is preempted or fails and is subsequently restarted on the same or a different worker, it will have access to the last successfully committed state of its TRS.
- Server Restarts: All TRS modifications will be written to the HQ server's journal. Upon server restart, the TRS data for all tasks will be reloaded from the journal.
- Access Mechanisms:
- From within the task:
- CLI: A command like
hq task trs set <KEY> <VALUE>
andhq task trs get <KEY>
will be available within the task's execution environment. - Environment Variables: Upon task (re)start, the current TRS key-value pairs can be exposed as environment variables (e.g.,
HQ_TRS_<KEY>=<VALUE>
). This provides a simple way for scripts to access initial TRS state. Updates during runtime would still require the CLI or an API. - Python API (Future): A Python library function could offer more programmatic access.
- CLI: A command like
- From outside the task (external access):
- CLI:
hq task <JOB_ID> <TASK_ID> trs get <KEY>
(orhq task <JOB_ID> <TASK_ID> trs list
to see all keys). - Python API (Future): Functions to read TRS data for a specific task.
- CLI:
- From within the task:
5.2. Data Flow
- Task Initialization: When a task starts for the first time, its TRS is empty.
- Task (Re)start with TRS Data: If a task is restarting, the HQ server provides the current TRS state to the worker. This can be done by passing the key-value pairs as environment variables (e.g.,
HQ_TRS_step="0"
). - Task Modifies TRS:
- The task executes
hq task-store set mykey myvalue
. - This command communicates the change to the assigned worker.
- The worker forwards the update request to the HQ server.
- The HQ server validates the update (e.g., size limits), updates the TRS in memory, and writes the change to its journal.
- An acknowledgment is sent back to the worker, and subsequently to the
hq
command.
- The task executes
- Task Reads TRS:
- The task executes
hq task-store get mykey
. - This command communicates with the worker, which may have a cached version or request it from the server.
- The task executes
- External Read:
- User executes
hq task <JOB_ID> <TASK_ID> task-store get mykey
. - The CLI communicates with the HQ server.
- The server retrieves the value for
mykey
from the specified task's TRS and returns it.
- User executes
- Task Failure and Restart:
- A worker crashes, and the task running on it is lost.
- The HQ server reschedules the task.
- When the task starts on a new worker, the server provides the last successfully journaled TRS state to the new worker (e.g., via environment variables as in step 2). The task can then use this information to resume.
6. Use Cases 🚀
6.1. Iterative Computations Resilience
Consider a program performing an iterative refinement:
Current (Pseudocode):
step = 0
while not is_precision_enough():
do_computation(step)
write_data(f"output_step_{step}.dat")
step += 1
With TRS (Pseudocode):
import os
import subprocess
# Check if restarting by looking for a TRS variable
if "HQ_TRS_STEP" in os.environ:
step = int(os.environ["HQ_TRS_step"])
load_data(f"step_{step}.dat")
print(f"Resuming from step {step}")
step += 1
else:
step = 0
print("Starting from scratch, step 0")
while not is_precision_enough(step): # Assuming precision check might depend on step
do_computation(step)
write_data(f"step_{step}.dat") # Save current step's data
step += 1
subprocess.run(["hq", "task-store", "set", "step", str(step)])
Explanation:
- When the task starts, it checks for the
HQ_TRS_step
environment variable. - If found, it means the task is resuming. It parses the step number and can potentially load its state from a file corresponding to the previous successfully completed step.
- If not found, it starts from
step = 0
. - After each successful computation and data write for the current
step
, it useshq task-state set step <step>
.
6.2. Progress Monitoring
An external user or script can monitor the progress of the iterative task:
hq task <JOB_ID> <TASK_ID> task-store get step
This command would return the current value of the "step" key, indicating how far the task has progressed. This is more granular than just seeing if the task is "RUNNING".
7. Alternatives Considered 🤔
- User-Managed Checkpointing via Shared Filesystem: This is the current approach. It's flexible but requires more setup from the user, boilerplate code, and a shared filesystem. TRS aims for a simpler, integrated solution for small metadata.
- Dedicated Checkpointing Service: Integrating with an external checkpointing service could be an option but adds external dependencies and complexity. TRS is envisioned as a lightweight, built-in feature.