Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions genkit-tools/cli/src/mcp/flows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@ import { record } from '@genkit-ai/tools-common/utils';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp';
import z from 'zod';
import { McpRunToolEvent } from './analytics.js';
import { McpRuntimeManager } from './util.js';
import {
McpRuntimeManager,
getCommonSchema,
resolveProjectRoot,
} from './util.js';

export function defineFlowTools(server: McpServer, manager: McpRuntimeManager) {
export function defineFlowTools(server: McpServer, projectRoot: string) {
server.registerTool(
'list_flows',
{
title: 'List Genkit Flows',
description:
'Use this to discover available Genkit flows or inspect the input schema of Genkit flows to know how to successfully call them.',
inputSchema: getCommonSchema(),
},
async () => {
async (opts) => {
await record(new McpRunToolEvent('list_flows'));
const runtimeManager = await manager.getManager();
const rootOrError = resolveProjectRoot(opts, projectRoot);
if (typeof rootOrError !== 'string') return rootOrError;

const runtimeManager = await McpRuntimeManager.getManager(rootOrError);
const actions = await runtimeManager.listActions();

let flows = '';
Expand All @@ -56,21 +64,24 @@ export function defineFlowTools(server: McpServer, manager: McpRuntimeManager) {
{
title: 'Run Flow',
description: 'Runs the flow with the provided input',
inputSchema: {
inputSchema: getCommonSchema({
flowName: z.string().describe('name of the flow'),
input: z
.string()
.describe(
'Flow input as JSON object encoded as string (it will be passed through `JSON.parse`). Must conform to the schema.'
)
.optional(),
},
}),
},
async ({ flowName, input }) => {
async (opts) => {
await record(new McpRunToolEvent('run_flow'));
const rootOrError = resolveProjectRoot(opts, projectRoot);
if (typeof rootOrError !== 'string') return rootOrError;
const { flowName, input } = opts;

try {
const runtimeManager = await manager.getManager();
const runtimeManager = await McpRuntimeManager.getManager(rootOrError);
const response = await runtimeManager.runAction({
key: `/flow/${flowName}`,
input: input !== undefined ? JSON.parse(input) : undefined,
Expand Down
108 changes: 55 additions & 53 deletions genkit-tools/cli/src/mcp/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import { record } from '@genkit-ai/tools-common/utils';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp';
import { z } from 'zod';
import { McpRunToolEvent } from './analytics.js';
import { McpRuntimeManager } from './util.js';
import {
McpRuntimeManager,
getCommonSchema,
resolveProjectRoot,
} from './util.js';

export function defineRuntimeTools(
server: McpServer,
manager: McpRuntimeManager
) {
export function defineRuntimeTools(server: McpServer, projectRoot: string) {
server.registerTool(
'start_runtime',
{
Expand All @@ -33,70 +34,71 @@ export function defineRuntimeTools(
Examples:
{command: 'go', args: ['run', 'main.go']}
{command: 'npm', args: ['run', 'dev']}`,
inputSchema: {
inputSchema: getCommonSchema({
command: z.string(),
args: z.array(z.string()),
},
}),
},
async ({ command, args }) => {
async (opts) => {
await record(new McpRunToolEvent('start_runtime'));
await manager.getManagerWithDevProcess(command, args);
const rootOrError = resolveProjectRoot(opts, projectRoot);
if (typeof rootOrError !== 'string') return rootOrError;

await McpRuntimeManager.getManagerWithDevProcess(
rootOrError,
opts.command,
opts.args
);

return {
content: [{ type: 'text', text: `Done.` }],
};
}
);

server.registerTool(
'kill_runtime',
{
title: 'Kills any existing Genkit runtime process',
description:
'Use this to stop an existing runtime that was started using the `start_runtime` tool',
},
async () => {
await record(new McpRunToolEvent('kill_runtime'));
const runtimeManager = await manager.getManager();
if (!runtimeManager.processManager) {
return {
isError: true,
content: [
{ type: 'text', text: `No runtime process currently running.` },
],
};
}
const registerControlTool = (
name: string,
title: string,
action: 'kill' | 'restart'
) => {
server.registerTool(
name,
{
title,
description: `Use this to ${action} an existing runtime that was started using the \`start_runtime\` tool`,
inputSchema: getCommonSchema(),
},
async (opts) => {
await record(new McpRunToolEvent(name));
const rootOrError = resolveProjectRoot(opts, projectRoot);
if (typeof rootOrError !== 'string') return rootOrError;

await runtimeManager.processManager?.kill();
return {
content: [{ type: 'text', text: `Done.` }],
};
}
);
const runtimeManager = await McpRuntimeManager.getManager(rootOrError);
if (!runtimeManager.processManager) {
return {
isError: true,
content: [
{ type: 'text', text: `No runtime process currently running.` },
],
};
}

server.registerTool(
'restart_runtime',
{
title: 'Restarts any existing Genkit runtime process',
description:
'Use this to restart an existing runtime that was started using the `start_runtime` tool',
},
async () => {
await record(new McpRunToolEvent('restart_runtime'));
const runtimeManager = await manager.getManager();
if (!runtimeManager.processManager) {
await runtimeManager.processManager[action]();
return {
isError: true,
content: [
{ type: 'text', text: `No runtime process currently running.` },
],
content: [{ type: 'text', text: `Done.` }],
};
}
);
};

await runtimeManager.processManager?.restart();
return {
content: [{ type: 'text', text: `Done.` }],
};
}
registerControlTool(
'kill_runtime',
'Kills any existing Genkit runtime process',
'kill'
);
registerControlTool(
'restart_runtime',
'Restarts any existing Genkit runtime process',
'restart'
);
}
18 changes: 10 additions & 8 deletions genkit-tools/cli/src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,31 @@ import { defineInitPrompt } from './prompts/init';
import { defineRuntimeTools } from './runtime';
import { defineTraceTools } from './trace';
import { defineUsageGuideTool } from './usage';
import { McpRuntimeManager } from './util';
import { isAntigravity, McpRuntimeManager } from './util';

export async function startMcpServer(projectRoot: string) {
const server = new McpServer({
name: 'Genkit MCP',
version: '0.0.2',
});

const manager = new McpRuntimeManager(projectRoot);

await defineDocsTool(server);
await defineUsageGuideTool(server);
defineInitPrompt(server);
defineRuntimeTools(server, manager);

defineFlowTools(server, manager);
defineTraceTools(server, manager);
defineFlowTools(server, projectRoot);
defineTraceTools(server, projectRoot);
// Disable runtime tools in AGY. Something about AGY env is messing with
// runtime discoverability.
if (!isAntigravity) {
defineRuntimeTools(server, projectRoot);
}

return new Promise(async (resolve) => {
const transport = new StdioServerTransport();
const cleanup = async () => {
try {
await manager.kill();
await McpRuntimeManager.kill();
} catch (e) {
// ignore
}
Expand All @@ -54,7 +56,7 @@ export async function startMcpServer(projectRoot: string) {
};
transport.onclose = async () => {
try {
await manager.kill();
await McpRuntimeManager.kill();
} catch (e) {
// ignore
}
Expand Down
22 changes: 13 additions & 9 deletions genkit-tools/cli/src/mcp/trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,34 @@ import { record } from '@genkit-ai/tools-common/utils';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp';
import z from 'zod';
import { McpRunToolEvent } from './analytics.js';
import { McpRuntimeManager } from './util.js';
import {
McpRuntimeManager,
getCommonSchema,
resolveProjectRoot,
} from './util.js';

export function defineTraceTools(
server: McpServer,
manager: McpRuntimeManager
) {
export function defineTraceTools(server: McpServer, projectRoot: string) {
server.registerTool(
'get_trace',
{
title: 'Get Genkit Trace',
description: 'Returns the trace details',
inputSchema: {
inputSchema: getCommonSchema({
traceId: z
.string()
.describe(
'trace id (typically returned after running a flow or other actions)'
),
},
}),
},
async ({ traceId }) => {
async (opts) => {
await record(new McpRunToolEvent('get_trace'));
const rootOrError = resolveProjectRoot(opts, projectRoot);
if (typeof rootOrError !== 'string') return rootOrError;
const { traceId } = opts;

try {
const runtimeManager = await manager.getManager();
const runtimeManager = await McpRuntimeManager.getManager(rootOrError);
const response = await runtimeManager.getTrace({ traceId });
return {
content: [
Expand Down
67 changes: 51 additions & 16 deletions genkit-tools/cli/src/mcp/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,74 @@
*/

import { RuntimeManager } from '@genkit-ai/tools-common/manager';
import { z } from 'zod';
import { startDevProcessManager, startManager } from '../utils/manager-utils';

export const isAntigravity = !!process.env.ANTIGRAVITY_ENV;

export function getCommonSchema(shape: z.ZodRawShape = {}): z.ZodRawShape {
return !isAntigravity
? shape
: {
projectRoot: z
.string()
.describe(
'The path to the current project root (a.k.a workspace directory or project directory)'
),
...shape,
};
}

export function resolveProjectRoot(
opts: {
[x: string]: any;
},
fallback: string
): string | { content: any[]; isError: boolean } {
if (isAntigravity && !opts?.projectRoot) {
return {
content: [
{ type: 'text', text: 'Project root is required for this tool.' },
],
isError: true,
};
}
return opts?.projectRoot ?? fallback;
}

/** Genkit Runtime manager specifically for the MCP server. Allows lazy
* initialization and dev process manangement. */
export class McpRuntimeManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This class has been refactored to use static methods and properties, effectively turning it into a singleton. While this works for the current use case, it's worth noting that this introduces global state, which can make the system harder to reason about and test. For future development, you might consider whether this is the desired long-term architecture or if a dependency injection approach could be used to manage the RuntimeManager's lifecycle, even with the dynamic projectRoot requirement.

private manager: RuntimeManager | undefined;

constructor(private projectRoot: string) {}
private static manager: RuntimeManager | undefined;
private static currentProjectRoot: string | undefined;

async getManager() {
if (!this.manager) {
this.manager = await startManager(
this.projectRoot,
true /* manageHealth */
);
static async getManager(projectRoot: string) {
if (this.manager && this.currentProjectRoot === projectRoot) {
return this.manager;
}
if (this.manager) {
await this.manager.stop();
}
this.manager = await startManager(projectRoot, true /* manageHealth */);
this.currentProjectRoot = projectRoot;
return this.manager;
}

async getManagerWithDevProcess(command: string, args: string[]) {
static async getManagerWithDevProcess(
projectRoot: string,
command: string,
args: string[]
) {
if (this.manager) {
await this.manager.stop();
}
const devManager = await startDevProcessManager(
this.projectRoot,
command,
args
);
const devManager = await startDevProcessManager(projectRoot, command, args);
this.manager = devManager.manager;
this.currentProjectRoot = projectRoot;
return this.manager;
}

async kill() {
static async kill() {
if (this.manager) {
await this.manager.stop();
}
Expand Down
4 changes: 3 additions & 1 deletion genkit-tools/cli/src/utils/manager-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ export async function startDevProcessManager(
projectRoot,
processManager,
});
const processPromise = processManager.start();
const processPromise = processManager.start({
cwd: projectRoot,
});
return { manager, processPromise };
}

Expand Down
Loading
Loading