Skip to content

Commit d2e5be5

Browse files
authored
feat(go): Support streamable http in MCP Plugin (#3135)
1 parent 5a057af commit d2e5be5

File tree

2 files changed

+100
-7
lines changed

2 files changed

+100
-7
lines changed

‎go/plugins/mcp/client.go‎

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"fmt"
2121
"net/http"
22+
"time"
2223

2324
"github.com/firebase/genkit/go/core/logger"
2425
"github.com/mark3labs/mcp-go/client"
@@ -40,6 +41,14 @@ type SSEConfig struct {
4041
HTTPClient *http.Client // Optional custom HTTP client
4142
}
4243

44+
// StreamableHTTPConfig contains options for the Streamable HTTP transport
45+
type StreamableHTTPConfig struct {
46+
BaseURL string
47+
Headers map[string]string
48+
HTTPClient *http.Client // Optional custom HTTP client
49+
Timeout time.Duration // HTTP request timeout
50+
}
51+
4352
// MCPClientOptions holds configuration for the MCPClient.
4453
type MCPClientOptions struct {
4554
// Name for this client instance - ideally a nickname for the server
@@ -57,6 +66,9 @@ type MCPClientOptions struct {
5766

5867
// SSE contains config for connecting to a remote server via SSE transport
5968
SSE *SSEConfig
69+
70+
// StreamableHTTP contains config for connecting to a remote server via Streamable HTTP transport
71+
StreamableHTTP *StreamableHTTPConfig
6072
}
6173

6274
// ServerRef represents an active connection to an MCP server
@@ -152,7 +164,23 @@ func (c *GenkitMCPClient) createTransport(options MCPClientOptions) (transport.I
152164
return transport.NewSSE(options.SSE.BaseURL, sseOptions...)
153165
}
154166

155-
return nil, fmt.Errorf("no valid transport configuration provided: must specify Stdio or SSE")
167+
if options.StreamableHTTP != nil {
168+
var streamableHTTPOptions []transport.StreamableHTTPCOption
169+
if options.StreamableHTTP.Headers != nil {
170+
streamableHTTPOptions = append(streamableHTTPOptions, transport.WithHTTPHeaders(options.StreamableHTTP.Headers))
171+
}
172+
if options.StreamableHTTP.Timeout > 0 {
173+
streamableHTTPOptions = append(streamableHTTPOptions, transport.WithHTTPTimeout(options.StreamableHTTP.Timeout))
174+
}
175+
176+
transportImpl, err := transport.NewStreamableHTTP(options.StreamableHTTP.BaseURL, streamableHTTPOptions...)
177+
if err != nil {
178+
return nil, fmt.Errorf("failed to create streamable HTTP transport: %w", err)
179+
}
180+
return transportImpl, nil
181+
}
182+
183+
return nil, fmt.Errorf("no valid transport configuration provided: must specify Stdio, SSE, or StreamableHTTP")
156184
}
157185

158186
// initializeClient initializes the MCP client connection

‎go/samples/mcp-client/main.go‎

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"os"
21+
"time"
2122

2223
"github.com/firebase/genkit/go/ai"
2324
"github.com/firebase/genkit/go/core/logger"
@@ -250,13 +251,74 @@ func clientGetPromptExample() {
250251
}
251252
}
252253

254+
// MCP Client Streamable HTTP Example - connects to a server via Streamable HTTP transport
255+
func clientStreamableHTTPExample() {
256+
ctx := context.Background()
257+
258+
// Initialize Genkit with Google AI
259+
g, err := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{}))
260+
if err != nil {
261+
logger.FromContext(ctx).Error("Failed to initialize Genkit", "error", err)
262+
return
263+
}
264+
265+
logger.FromContext(ctx).Info("Creating MCP client with Streamable HTTP transport", "server", "everything")
266+
// Create and connect to MCP server using Streamable HTTP transport
267+
// Note: Start the server with: npx @modelcontextprotocol/server-everything streamableHttp --port 3001
268+
// This will start the server on http://localhost:3001
269+
client, err := mcp.NewGenkitMCPClient(mcp.MCPClientOptions{
270+
Name: "mcp-everything-http",
271+
Version: "1.0.0",
272+
StreamableHTTP: &mcp.StreamableHTTPConfig{
273+
BaseURL: "http://localhost:3001",
274+
Headers: map[string]string{
275+
"User-Agent": "genkit-mcp-client/1.0.0",
276+
},
277+
Timeout: 30 * time.Second, // Optional timeout
278+
},
279+
})
280+
if err != nil {
281+
logger.FromContext(ctx).Error("Failed to create MCP client with Streamable HTTP", "error", err)
282+
return
283+
}
284+
logger.FromContext(ctx).Info("MCP client with Streamable HTTP created successfully", "client", "mcp-everything-http")
285+
286+
// Get tools and generate response
287+
tools, _ := client.GetActiveTools(ctx, g)
288+
logger.FromContext(ctx).Info("Found MCP tools via Streamable HTTP", "count", len(tools), "client", "mcp-everything-http")
289+
290+
var toolRefs []ai.ToolRef
291+
for _, tool := range tools {
292+
toolRefs = append(toolRefs, tool)
293+
}
294+
295+
// Generate response using tools from the HTTP server
296+
response, err := genkit.Generate(ctx, g,
297+
ai.WithModelName("googleai/gemini-2.0-flash-exp"),
298+
ai.WithPrompt("Use the echo tool to repeat the message 'Hello from Streamable HTTP!' and then use the add tool to calculate 15 + 27."),
299+
ai.WithTools(toolRefs...),
300+
ai.WithToolChoice(ai.ToolChoiceAuto),
301+
)
302+
if err != nil {
303+
logger.FromContext(ctx).Error("Generation failed", "error", err)
304+
} else {
305+
logger.FromContext(ctx).Info("Generation completed", "response", response.Text())
306+
}
307+
308+
// Disconnect from server
309+
logger.FromContext(ctx).Info("Disconnecting from MCP server", "client", "mcp-everything-http")
310+
client.Disconnect()
311+
logger.FromContext(ctx).Info("Disconnected from MCP server", "client", "mcp-everything-http")
312+
}
313+
253314
func main() {
254315
if len(os.Args) < 2 {
255-
fmt.Println("Usage: go run main.go [manager|multi|client|getprompt]")
256-
fmt.Println(" manager - MCP Manager example with time server")
257-
fmt.Println(" multi - MCP Manager example with multiple servers (time and fetch)")
258-
fmt.Println(" client - MCP Client example with time server")
259-
fmt.Println(" getprompt - MCP Client GetPrompt example")
316+
fmt.Println("Usage: go run main.go [manager|multi|client|getprompt|streamablehttp|test]")
317+
fmt.Println(" manager - MCP Manager example with time server")
318+
fmt.Println(" multi - MCP Manager example with multiple servers (time and fetch)")
319+
fmt.Println(" client - MCP Client example with time server")
320+
fmt.Println(" getprompt - MCP Client GetPrompt example")
321+
fmt.Println(" streamablehttp - MCP Client Streamable HTTP example")
260322
os.Exit(1)
261323
}
262324

@@ -275,9 +337,12 @@ func main() {
275337
case "getprompt":
276338
logger.FromContext(ctx).Info("Running MCP Client GetPrompt example")
277339
clientGetPromptExample()
340+
case "streamablehttp":
341+
logger.FromContext(ctx).Info("Running MCP Client Streamable HTTP example")
342+
clientStreamableHTTPExample()
278343
default:
279344
fmt.Printf("Unknown example: %s\n", os.Args[1])
280-
fmt.Println("Use 'manager', 'multi', 'client', or 'getprompt'")
345+
fmt.Println("Use 'manager', 'multi', 'client', 'getprompt', or 'streamablehttp'")
281346
os.Exit(1)
282347
}
283348
}

0 commit comments

Comments
 (0)