DAGベースのタスク管理ライブラリ。タスクの依存関係を管理し、���同期で並列実行します。
- DAG(有向非巡回グラフ)によるタスク管理 - タスク間の依存関係を定義し、トポロジカル順序で実行
- 非同期並列実行 - 依存関係のないタスクを
tokioで並列実行(同時実行数は設定可能) - 静的解析 - 実行前にDAG構造とタスク定義を検証(循環検出、依存関係チェック、権限コンフリクト検出)
- プラグイン式Executor -
TaskExecutorトレイトを実装して独自のタスク実行ロジックを追加 - タスク間データフロー -
inputsフィールドで前のタスクの出力を参照($.task_id.output.field構文) - 埋め込み参照 - 文字列内に
${...}構文で値を埋め込み - 自己参照 -
$.selfで現在のタスクのフィールドを参照 - 条件付き実行 -
if/elseフィールドで条件に基づいてタスクをスキップ - サブグラフ実行 -
DagExecutorでDAGをネストして実行(最大3レベル)、サブグラフ内の結果を親から参照可能 - ループ実行 -
loop_configでDAGを繰り返し実行、$.loop.*で前回イテレーションの結果を参照可能 - MCP連携 - Model Context Protocolを通じてClaude Codeと連携
- ロールベースの権限管理 - ファイルアクセス権限、コマンド実行権限を定義
- マルチプラットフォームUI - Dioxusによるデスクトップ/Web/TUI対応GUI
git clone https://github.com/HNakamura33/task-composer.git
cd task-composer
cargo build --release
# CLIをインストール
cargo install --path task-composer-cli# ヘルプ表示
task-composer --help
# 静的解析のみ
task-composer analyze <FILE>
# 静的解析 + 実行(エラーがあれば中止)
task-composer run <FILE>
task-composer run --force <FILE> # エラーがあっても続行
# 実行のみ(静的解析なし)
task-composer exec <FILE>
# 後方互換性(実行のみ)
task-composer <FILE>$ task-composer analyze sample_dag.json
=== Static Analysis: sample_dag.json ===
DAG Structure:
Root nodes: ["1"]
Leaf nodes: ["4"]
Topological order: ["1", "2", "3", "4"]
Parallel pairs: 1 pair(s)
Critical path: ["1", "3", "4"]
Validation Results:
[WARN] タスク 2 と 3 が '${project_root}/src' で WriteWrite 競合しています
=== Summary ===
Warnings: 1
| レベル | 検出内容 |
|---|---|
| Error | 循環依存、存在しない依存先、自己依存、空のタスク名、権限の矛盾 |
| Warning | 空のプロンプト、未知のExecutor、重複依存、孤立ノード、ファイルコンフリクト |
# サンプルDAGを実行
cargo run -p task-composer-cli
# カスタムDAGファイルを指定
cargo run -p task-composer-cli -- path/to/your_dag.json
# 静的解析
cargo run -p task-composer-cli -- analyze sample_dag.json
# テスト実行
cargo test
# ドキュメント生成
cargo doc --openuse task_composer_core::dag::DAG;
use task_composer_core::analysis::StaticAnalyzer;
use task_composer_core::task_executor::LogExecutor;
#[tokio::main]
async fn main() {
// JSONからDAGを作成
let json = std::fs::read_to_string("sample_dag.json").unwrap();
let mut dag = DAG::from_json(&json).unwrap();
// 静的解析を実行
let analyzer = StaticAnalyzer::new(&dag);
let result = analyzer.analyze();
if result.has_errors() {
eprintln!("Errors found: {}", result.error_count());
return;
}
// Executorを登録
dag.register_executor(Box::new(LogExecutor::new()));
// 非同期で実行
let results = dag.execute_async().await.unwrap();
println!("Executed {} tasks", results.len());
}{
"tasks": [
{ "task_id": "build", "executor": "log" },
{ "task_id": "test", "executor": "log", "dependencies": ["build"] }
]
}{
"tasks": [
{
"task_id": "1",
"name": "Setup Environment",
"description": "開発環境のセットアップ",
"priority": 1,
"prompt": "プロジェクトを初期化して依存関係をインストール",
"executor": "log",
"args": {"env": "development"},
"dependencies": [],
"role": {
"role_id": "role_setup",
"name": "Setup Role",
"subagents": [],
"skills": ["environment"],
"description": "環境構築用ロール",
"tool_permissions": {
"bash": {
"allowed_commands": ["git", "npm"],
"blocked_commands": ["rm -rf /"],
"require_confirmation": []
},
"write": {
"max_file_size_mb": 10,
"allowed_extensions": [".rs", ".json"]
}
},
"file_permissions": {
"allowed_paths": ["src/"],
"denied_paths": [".env"],
"read_only_paths": []
}
}
},
{
"task_id": "2",
"name": "Build",
"executor": "log",
"args": {
"setup_result": "$.1.output.message"
},
"dependencies": ["1"]
}
],
"config": {
"max_concurrent_tasks": 4,
"default_task_timeout_secs": 300
}
}| フィールド | 型 | 説明 |
|---|---|---|
task_id |
String | タスクの一意な識別子 |
executor |
String | 使用するExecutorの名前(log, mcp, dag) |
| フィールド | 型 | 説明 |
|---|---|---|
name |
String? | タスクの表示名(省略時はtask_idを使用) |
description |
String? | タスクの詳細説明 |
priority |
u8 | 優先度(0-255、デフォルト: 0) |
prompt |
String? | タスク実行時のプロンプト |
args |
JSON | タスクに渡す引数(パス参照 $.task_id.output.* を含む) |
dependencies |
Array | 依存するタスクIDのリスト |
if |
String? | 実行条件(trueなら実行、falseならスキップ) |
else |
String? | 実行条件(trueならスキップ、falseなら実行) |
role |
Object? | ロール定義(省略時は全権限許可) |
timeout_secs |
u64? | タスクのタイムアウト(秒)。省略時はconfigのデフォルト値を使用 |
依存タスクの出力を参照:
$.1.output.user_id # 基本的なフィールドアクセス
$.1.output.config.host # ネストしたフィールド
$.1.output.items[0] # 配列インデックス
$.1.output.users[0].name # 配列+ネスト
$.001-101.output.data # ハイフン付きタスクID
文字列内に値を埋め込み:
{
"prompt": "Hello ${$.1.output.name}! Your ID is ${$.1.output.user_id}."
}現在のタスクのフィールドを参照:
{
"args": {
"current_task_name": "$.self.name",
"current_role": "$.self.role",
"role_skills": "$.self.role.skills"
}
}対応フィールド: task_id, name, description, priority, status, prompt, executor, args, dependencies, role
サブDAG内で親DAGから渡された値を参照:
{
"task_id": "sub_workflow",
"executor": "dag",
"dependencies": ["parent_task"],
"args": {
"inputs": {
"parent_value": "$.parent_task.output.value",
"config": "$.parent_task.output.config"
},
"dag": {
"tasks": [{
"task_id": "child_task",
"executor": "data",
"args": {
"value": "$.inputs.parent_value",
"message": "Config: ${$.inputs.config.name}"
}
}]
}
}
}args.inputsで親DAGの値をサブDAGに渡す- サブDAG内では
$.inputs.{field}で参照 - ネストしたフィールドにも対応(
$.inputs.config.name) - 埋め込み参照も可能(
${$.inputs.field})
タスクにifまたはelseフィールドを追加することで、条件に基づいて実行をスキップできます。
| フィールド | 条件結果 | タスク |
|---|---|---|
| なし | - | 実行 |
if |
true | 実行 |
if |
false | スキップ |
else |
true | スキップ |
else |
false | 実行 |
{
"tasks": [
{ "task_id": "validate", "executor": "log" },
{
"task_id": "on_success",
"if": "$.validate.output.task_id == \"validate\"",
"dependencies": ["validate"],
"executor": "log"
},
{
"task_id": "on_failure",
"else": "$.validate.output.task_id == \"validate\"",
"dependencies": ["validate"],
"executor": "log"
}
]
}| 種類 | 例 |
|---|---|
| パス参照 | $.task_id.output.field |
| 比較演算 | ==, !=, >, <, >=, <= |
| 論理演算 | &&, ||, ! |
| リテラル | true, false, "string", 123, null |
依存先がスキップされると、依存元も自動的にスキップされます。
validate → on_success (スキップ) → finalize (伝播でスキップ)
デバッグ・テスト用のシンプルなExecutor。タスク情報をログ出力します。
dag.register_executor(Box::new(LogExecutor::new()));Model Context Protocolを通じて外部MCPサーバーと連携します。
{
"executor": "mcp",
"args": {
"connection": {
"type": "stdio",
"command": "uv",
"args": ["run", "--directory", "/path/to/mcp_server", "main.py", "serve"]
},
"tool": "claude_code_query",
"arguments": {
"prompt": "Review this code for security issues",
"options": {
"cwd": "/path/to/project",
"max_turns": 5,
"allowed_tools": ["Read", "Grep", "Glob"]
},
"extra_options": {
"role": "$.self.role"
}
}
}
}サブグラフ(入れ子DAG)を実行します。最大3レベルまでネスト可能で、親子DAG間で値を受け渡しできます。
{
"task_id": "data_pipeline",
"executor": "dag",
"dependencies": ["config_task"],
"args": {
"inputs": {
"config": "$.config_task.output.value"
},
"dag": {
"tasks": [
{"task_id": "extract", "executor": "data", "args": {"source": "$.inputs.config"}},
{"task_id": "transform", "executor": "log", "dependencies": ["extract"]},
{"task_id": "load", "executor": "log", "dependencies": ["transform"]}
],
"config": {"max_concurrent_tasks": 1}
}
}
}args.inputsで親DAGからサブDAGへ値を渡す- サブDAG内では
$.inputs.{field}で参照 - サブグラフの結果は親DAGから
$.{task_id}.output.{inner_task}.output.{field}で参照
シェルコマンドを実行します。コマンドの出力がJSON形式の場合、自動的にパースしてparsedフィールドに格納します。
{
"executor": "bash",
"args": {
"command": "echo '{\"status\": \"success\"}'",
"cwd": "/path/to/workdir",
"timeout_secs": 60,
"shell": "sh"
}
}| パラメータ | 説明 | デフォルト |
|---|---|---|
command |
実行するシェルコマンド(必須) | - |
cwd |
作業ディレクトリ | カレントディレクトリ |
timeout_secs |
タイムアウト(秒) | 300 |
shell |
使用するシェル | sh |
{
"exit_code": 0,
"stdout": "{\"status\": \"success\"}",
"stderr": "",
"success": true,
"parsed": { "status": "success" }
}parsed: stdoutがJSON形式の場合、パース結果が格納される(パース失敗時はnull)- 他のタスクから
$.task_id.output.parsed.fieldで参照可能
{
"task_id": "check",
"executor": "bash",
"args": {
"command": "echo '${$.prev.output}' | grep -q 'SUCCESS' && echo '{\"matched\": true}' || echo '{\"matched\": false}'"
}
}ローカルGitリポジトリの操作を実行します。clone、commit、branch操作など基本的なGit操作をサポートします。
{
"executor": "git",
"args": {
"action": {
"type": "clone",
"url": "https://github.com/user/repo.git",
"path": "/tmp/my-repo",
"branch": "main"
}
}
}| 操作 | 説明 | 必須パラメータ |
|---|---|---|
clone |
リポジトリをクローン | url, path |
open |
既存リポジトリを開く | path |
init |
新規リポジトリを初期化 | path |
status |
ステータス確認 | path |
diff |
変更差分を表示 | path |
log |
コミット履歴を表示 | path |
commit |
コミット作成 | path, message |
create_branch |
ブランチ作成 | path, name |
checkout |
ブランチ切り替え | path, branch |
list_branches |
ブランチ一覧 | path |
delete_branch |
ブランチ削除 | path, name |
fetch |
リモートから取得 | path |
push |
リモートへプッシュ | path |
{
"action": {
"type": "clone",
"url": "git@github.com:user/repo.git",
"path": "/tmp/repo",
"auth": {
"type": "ssh_agent"
}
}
}| 認証タイプ | 説明 |
|---|---|
ssh_agent |
SSH Agentを使用 |
ssh_key |
SSHキーファイルを指定(private_key_path, passphrase) |
user_password |
ユーザー名/パスワード認証(username, password) |
GitHub APIを通じてIssueやPull Requestを操作します。環境変数GITHUB_TOKENで認証するか、argsでtokenを指定します。
{
"executor": "github",
"args": {
"owner": "user",
"repo": "repository",
"action": {
"type": "create_issue",
"title": "Bug report",
"body": "Description of the issue"
}
}
}| 操作 | 説明 | 必須パラメータ |
|---|---|---|
create_issue |
Issue作成 | title |
get_issue |
Issue詳細取得 | number |
list_issues |
Issue一覧 | - |
update_issue |
Issue更新 | number |
close_issue |
Issueクローズ | number |
create_comment |
コメント追加 | number, body |
delete_comment |
コメント削除 | comment_id |
| 操作 | 説明 | 必須パラメータ |
|---|---|---|
create_pr |
PR作成 | title, head, base |
get_pr |
PR詳細取得 | number |
list_prs |
PR一覧 | - |
merge_pr |
PRマ��ジ | number |
request_review |
レビュー依頼 | number |
loop_configを使用してDAGを繰り返し実行できます。
{
"loop_config": {
"max_iterations": 5,
"until_condition": "$.loop.iteration >= 3"
},
"tasks": [...]
}| フィールド | 説明 |
|---|---|
max_iterations |
最大繰り返し回数(必須) |
while_condition |
継続条件(trueの間ループ継続) |
until_condition |
終了条件(trueになったらループ終了) |
| 参照 | 説明 | 例 |
|---|---|---|
$.loop.iteration |
現在のイテレーション番号(0始まり) | 0, 1, 2... |
$.loop.first |
初回かどうか | true / false |
$.loop.previous.{task_id}.output |
前回イテレーションの結果 | $.loop.previous.counter.output.value |
{
"args": {
"iteration": "$.loop.iteration",
"is_first": "$.loop.first",
"previous_value": "$.loop.previous.counter.output.value"
}
}初回イテレーション($.loop.first == true)では、$.loop.previous.*はnullを返します。
Ralph Loopは、AIエージェントに反復的な自己改善ループを実行させるパターンです。task-composerのloop_configとMcpExecutorを組み合わせることで実現できます。
| Ralph Loopの特徴 | task-composerでの実現方法 |
|---|---|
| プロンプト不変 | 同じtask定義がループで繰り返し実行される |
| ファイル永続性 | Claude Codeがファイルに書き込み、次のイテレーションで参照 |
| 自己参照 | $.loop.previous.* で前回の結果を参照 |
| 完了条件 | until_condition で特定の出力を検出して終了 |
| 最大反復数 | max_iterations で無限ループを防止 |
{
"loop_config": {
"max_iterations": 10,
"until_condition": "$.improve.output.all_tests_passed == true"
},
"tasks": [
{
"task_id": "improve",
"executor": "mcp",
"args": {
"connection": { "type": "stdio", "command": "uv", "args": ["..."] },
"tool": "claude_code_query",
"arguments": {
"prompt": "Iteration: ${$.loop.iteration}\nPrevious result: ${$.loop.previous.improve.output}\n\n1. Run tests\n2. Fix failures\n3. Output { \"all_tests_passed\": true } when done",
"options": {
"max_turns": 50,
"allowed_tools": ["Read", "Write", "Edit", "Bash"]
}
}
}
}
]
}推奨:
- テスト駆動開発(テスト実行→失敗修正→再実行)
- コード品質改善(lint→修正→再lint)
- 段階的な機能実装(実装→テスト→改善)
- 自動検証可能なタスク
非推奨:
- 人間の判断が必要なタスク
- 明確な完了条件がないタスク
- 本番環境でのデバッグ
samples/sample_ralph_loop.json を参照してください。
タスクの実行時間を制限してハングアップを防ぐことができます。
{
"config": {
"max_concurrent_tasks": 4,
"default_task_timeout_secs": 300
},
"tasks": [
{
"task_id": "quick_task",
"executor": "log"
},
{
"task_id": "long_task",
"executor": "mcp",
"timeout_secs": 900
}
]
}| 設定場所 | フィールド | 説明 |
|---|---|---|
| Config | default_task_timeout_secs |
全タスク共通のデフォルトタイムアウト(秒) |
| Task | timeout_secs |
タスク個別のタイムアウト(秒)、Configより優先 |
- タスク個別の
timeout_secsが設定されている場合、そちらが優先される - どちらも設定されていない場合、タイムアウトなしで実行される
- タイムアウト時はタスクが失敗(Failed)として扱われる
use async_trait::async_trait;
use task_composer_core::task_executor::{TaskExecutor, ExecutionContext, ExecutionResult, ExecutionStatus};
use task_composer_core::types::Task;
struct MyExecutor;
#[async_trait]
impl TaskExecutor for MyExecutor {
fn name(&self) -> &str {
"my_executor"
}
async fn execute_task(
&self,
task: &Task,
ctx: &ExecutionContext,
) -> Result<ExecutionResult, String> {
println!("Executing: {}", task.name);
Ok(ExecutionResult {
task_id: task.task_id.clone(),
status: ExecutionStatus::Success,
output: serde_json::json!({
"result": "completed",
"data": ctx.args
}),
})
}
}
// 登録
dag.register_executor(Box::new(MyExecutor));mcp_servers/claude_code_mcp/にClaude Code連携用のMCPサーバーが含まれています。
cd mcp_servers/claude_code_mcp
uv syncclaude_code_queryツール: Claude Codeにクエリを送信options: Claude Agent SDK のオプション(cwd,max_turns,allowed_tools等)extra_options.role: ロール情報をシステムプロンプトとして注入
task-composer/
├── Cargo.toml # ワークスペース定義
├── LICENSE # Apache 2.0
├── README.md
├── CLAUDE.md # Claude向け指示
├── samples/ # サンプルDAGファイル
│ ├── sample_dag.json
│ ├── sample_mcp_dag.json
│ ├── sample_loop.json
│ └── ...
├── task-composer-core/ # コアライブラリ
│ ├── Cargo.toml
│ └── src/
│ ├── lib.rs
│ ├── types.rs # 型定義(Task, Role, Config等)
│ ├── path_resolver.rs # パス参照解決
│ ├── dag/ # DAG実装
│ ├── task_executor/ # Executor実装
│ └── analysis/ # 静的解析
├── task-composer-cli/ # CLIツール
│ ├── Cargo.toml
│ └── src/main.rs
├── task-composer-ui/ # Dioxus UI(Desktop/Web/TUI)
│ ├── Cargo.toml
│ ├── Dioxus.toml
│ └── src/
└── mcp_servers/
└── claude_code_mcp/
├── main.py # FastMCPサーバー
└── pyproject.toml
サンプルファイルは samples/ ディレクトリに配置されています。詳細は samples/README.md を参照してください。
samples/
├── basics/ # 入門・基本サンプル
│ ├── minimal.json
│ ├── simple_dag.json
│ ├── embedded_reference.json
│ └── auto_dependency.json
├── executors/ # Executor別サンプル
│ ├── bash.json
│ ├── data.json
│ ├── mcp/ # MCP連携
│ ├── git/ # Git操作
│ └── github/ # GitHub API
├── features/ # 高度な機能
│ ├── loop/ # ループ実行
│ ├── condition/ # 条件分岐
│ └── subgraph/ # サブグラフ
├── workflows/ # 実践的ワークフロー
└── _internal/ # 内部テスト用
Apache License 2.0
このプロジェクトはRust学習を目的としています。詳細はCLAUDE.mdを参照してください。