Skip to content

HttpRequest.Builder customizer for Client transports #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024-2025 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import java.net.URI;
import java.net.http.HttpRequest;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;

/**
* Customize {@link HttpRequest.Builder} before executing the request, in either SSE or
* Streamable HTTP transport.
* <p>
* When used in a non-blocking context, implementations MUST be non-blocking.
*
* @author Daniel Garnier-Moiroux
*/
public interface AsyncHttpRequestCustomizer {

Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
@Nullable String body);

AsyncHttpRequestCustomizer NOOP = new Noop();

/**
* Wrap a sync implementation in an async wrapper.
* <p>
* Do NOT wrap a blocking implementation for use in a non-blocking context. For a
* blocking implementation, consider using {@link Schedulers#boundedElastic()}.
*/
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
return (builder, method, uri, body) -> Mono.fromSupplier(() -> {
customizer.customize(builder, method, uri, body);
return builder;
});
}

class Noop implements AsyncHttpRequestCustomizer {

@Override
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
String body) {
return Mono.just(builder);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 - 2024 the original author or authors.
* Copyright 2024 - 2025 the original author or authors.
*/
package io.modelcontextprotocol.client.transport;

Expand Down Expand Up @@ -102,6 +102,11 @@ public class HttpClientSseClientTransport implements McpClientTransport {
*/
protected final Sinks.One<String> messageEndpointSink = Sinks.one();

/**
* Customizer to modify requests before they are executed.
*/
private final AsyncHttpRequestCustomizer httpRequestCustomizer;

/**
* Creates a new transport instance with default HTTP client and object mapper.
* @param baseUri the base URI of the MCP server
Expand Down Expand Up @@ -172,18 +177,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
* @param objectMapper the object mapper for JSON serialization/deserialization
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
@Deprecated(forRemoval = true)
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, ObjectMapper objectMapper) {
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
}

/**
* Creates a new transport instance with custom HTTP client builder, object mapper,
* and headers.
* @param httpClient the HTTP client to use
* @param requestBuilder the HTTP request builder to use
* @param baseUri the base URI of the MCP server
* @param sseEndpoint the SSE endpoint path
* @param objectMapper the object mapper for JSON serialization/deserialization
* @param httpRequestCustomizer customizer for the requestBuilder before executing
* requests
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
Assert.notNull(httpClient, "httpClient must not be null");
Assert.notNull(requestBuilder, "requestBuilder must not be null");
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
this.baseUri = URI.create(baseUri);
this.sseEndpoint = sseEndpoint;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.httpRequestCustomizer = httpRequestCustomizer;
}

/**
Expand Down Expand Up @@ -213,6 +238,8 @@ public static class Builder {
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.header("Content-Type", "application/json");

private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;

/**
* Creates a new builder instance.
*/
Expand Down Expand Up @@ -310,31 +337,66 @@ public Builder objectMapper(ObjectMapper objectMapper) {
return this;
}

/**
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
* executing them.
* <p>
* This overrides the customizer from
* {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}.
* <p>
* Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking
* context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}
* instead.
* @param syncHttpRequestCustomizer the request customizer
* @return this builder
*/
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
return this;
}

/**
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
* executing them.
* <p>
* This overrides the customizer from
* {@link #httpRequestCustomizer(SyncHttpRequestCustomizer)}.
* <p>
* Do NOT use a blocking implementation in a non-blocking context.
* @param asyncHttpRequestCustomizer the request customizer
* @return this builder
*/
public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
return this;
}

/**
* Builds a new {@link HttpClientSseClientTransport} instance.
* @return a new transport instance
*/
public HttpClientSseClientTransport build() {
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
objectMapper);
objectMapper, httpRequestCustomizer);
}

}

@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);

return Mono.create(sink -> {

HttpRequest request = requestBuilder.copy()
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
return Mono.defer(() -> {
var builder = requestBuilder.copy()
.uri(uri)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.GET()
.build();

.GET();
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null));
}).flatMap(requestBuilder -> Mono.create(sink -> {
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
.sendAsync(requestBuilder.build(),
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
.exceptionallyCompose(e -> {
sseSink.error(e);
return CompletableFuture.failedFuture(e);
Expand Down Expand Up @@ -397,7 +459,7 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
.subscribe();

this.sseSubscription.set(connection);
});
}));
}

/**
Expand Down Expand Up @@ -453,13 +515,13 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {

private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final String body) {
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
final HttpRequest request = this.requestBuilder.copy()
.uri(requestUri)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

// TODO: why discard the body?
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
return Mono.defer(() -> {
var builder = this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body));
return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body));
}).flatMap(customizedBuilder -> {
var request = customizedBuilder.build();
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
});
}

/**
Expand Down
Loading