Skip to content

Move HTTP content aggregation from Netty into RestController #129302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 19, 2025

Conversation

mhl-b
Copy link
Contributor

@mhl-b mhl-b commented Jun 12, 2025

Netty HTTP content aggregator appears to be not flexible for our needs. In some cases, like bulk API we need to bypass aggregation, and Netty's aggregator does not have any memory safety, like backpressure or circuit breaker. That means we always aggregate requests fully and occasionally run OOM.

This change moves aggregation from the Netty module into Server module, RestController. That means in netty code we no longer deal with fully aggregated requests and everything is a stream of chunks. RestController aggregates chunks when RestHandler indicates that stream is not supported via new flag. Opens up opportunity to apply backpressure/circuit-breaker per chunk basis for all REST handlers. Also simplifies implementation of new streaming handlers.

Once RestHandler is resolved in RestController we check if it supports stream content. If stream is supported then request proceeds to the Handler. If stream is not supported a new RestContentAggregator aggregates chunks, then replaces content of the RestRequest with aggregated content.

PR touches many files but essentially changes are:

  • Removal of Netty4HttpAggregator and Bulk API predicates from Netty pipeline.
  • Addition of RestContentAggregator and support for HTTP content replacement.
  • Adjusting integ tests in netty and security modules to consume stream (AggregatingDispatcher) and not leak buffers.

closes #120746

Comment on lines 94 to 101
@Override
public void setBody(HttpBody body) {
if (body.isFull()) {
var contentLength = body.asFull().bytes().length();
HttpUtil.setContentLength(nettyRequest, contentLength);
}
this.content = body;
}
Copy link
Contributor Author

@mhl-b mhl-b Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing mutability for the HTTP request body, I need to swap stream with full content after aggregation. References to original HTTP request spread roots too deep.

Copy link
Contributor

@nicktindall nicktindall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tidy up, I will leave to the other reviewers to approve because they have more context, but LGTM

try {
dispatchRequest(aggregatedRequest, aggChannel, handler, methodHandlers, threadContext);
} catch (Exception e) {
assert false : "exception must be handled in dispatchRequest";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log an error here just in case it somehow happens in prod?

replaceBody(restRequest, chunk);
result.accept(restRequest);
} else {
var comp = CompositeBytesReference.of(chunks.toArray(new ReleasableBytesReference[0]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth making new ReleasableBytesReference[0] a constant to avoid the allocation? Or will the compiler be clever enough to do that I wonder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is smart enough to skip extra allocation. It still allocates array of the list size. Would be nice to have CompositeBytesReference.of(List<Chunks>).


public RestRequestAggregator(CircuitBreakerService circuitBreakerService) {
this.circuitBreakerService = circuitBreakerService;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we use the circuitBreakerService in here? it doesn't look like it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was to include CB in this PR. So I've done basic plumbing. But scope of CB appears fairly large. I will leave it as it is. CB will work as before, we fully aggregate content and then try breaker.

@mhl-b mhl-b force-pushed the http-aggregator-removal branch from cc408fb to 9ced839 Compare June 13, 2025 04:42
@mhl-b mhl-b force-pushed the http-aggregator-removal branch from 7012336 to 0cd4f72 Compare June 14, 2025 20:11
Comment on lines -866 to 864
final HttpValidator httpValidator = (httpRequest, channel, validationListener) -> {
// assert that the validator sees the request unaltered
assertThat(httpRequest.uri(), is(uri));
if (randomBoolean()) {
validationListener.onResponse(null);
} else {
validationListener.onFailure(new ElasticsearchException("Boom"));
}
};
try (
Copy link
Contributor Author

@mhl-b mhl-b Jun 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, oversized requests that fail auth will be rejected at netty aggregator with size exception. Right now it will go through to the RestController, controller will send a response based on auth exception and close(discard) stream.

I think it's ok to have Auth exception with higher precedence than content size. But I think there is a possibility for an evil request with extremely large content that will be discarded, but we want to close channel sooner, rather than discarding content. If we want to keep previous behaviour I need to adjust Netty4HttpContentSizeHandler.

@mhl-b mhl-b marked this pull request as ready for review June 15, 2025 04:50
@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Jun 15, 2025
@mhl-b
Copy link
Contributor Author

mhl-b commented Jun 15, 2025

@DaveCTurner CI has been happy last 3 runs, with minor test fixes. It's ready for review. I will address audit logging in following PR if this one passes.

@mhl-b mhl-b added :Distributed Coordination/Network Http and internode communication implementations Team:Distributed Coordination Meta label for Distributed Coordination team >enhancement and removed needs:triage Requires assignment of a team area label labels Jun 15, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@elasticsearchmachine
Copy link
Collaborator

Hi @mhl-b, I've created a changelog YAML for you.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. I haven't quite finished reviewing the changes in detail but have left some comments from my first pass through.

Comment on lines 1004 to 1009
return new ReleasableBytesReference(randomBytesReference(length), new AbstractRefCounted() {
@Override
protected void closeInternal() {

}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leak-track these things?

Suggested change
return new ReleasableBytesReference(randomBytesReference(length), new AbstractRefCounted() {
@Override
protected void closeInternal() {
}
});
return new ReleasableBytesReference(randomBytesReference(length), LeakTracker.wrap(AbstractRefCounted.of(() -> {})));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

        return new ReleasableBytesReference(randomBytesReference(length), LeakTracker.wrap(new AbstractRefCounted() {
            @Override
            protected void closeInternal() {}
        }));


@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert this?

Suggested change
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {}
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {
assert restRequest.isStreamedContent() == false;
}

return httpRequest.body().asFull().bytes().length();
return switch (httpRequest.body()) {
case HttpBody.Full content -> content.bytes().length();
case HttpBody.Stream stream -> Math.toIntExact(httpRequest.contentLengthHeader());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm suspicious about this, are you sure we need it? I'd rather we only exposed the length of an aggregated body. My worry is that code which is processing a streaming body shouldn't be dependent on the content length since it's not always available, but with this lenience we might end up not covering the unavailable-length case properly in tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. This is only used for the circuit breaker estimation. I can remove whole content-length changes, I don't think it's essential for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed content-length changes c7bb930

Copy link
Contributor Author

@mhl-b mhl-b Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added hasContent to the HTTP request, we need to know if content is present before aggregation happens, so I'm keeping logic for headers check. 34e8881

try {
dispatchRequest(aggregatedRequest, restChannel, handler, methodHandlers, threadContext);
} catch (Exception e) {
// dispatchRequest already handles exceptions, this time we wont be able to send response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. If dispatchRequest is expected not to throw anything, should we assert false : e here?

Copy link
Contributor Author

@mhl-b mhl-b Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not well written by me. It's a bit tricky. dispatchRequest catches errors and tries to send error response. But sending error response might fail too, at least we have a test that does that by injecting failure into sendResponse path, not sure that production code can throw there. Failing to send response can only go to the error log.
Cannot re-throw Exception without wrapping into RuntimeException but that mess up error handling. "Cannot" means the whole chain need to have throw Exception signature which is pretty large.

Copy link
Contributor Author

@mhl-b mhl-b Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this again, wrapping into RuntimeException might be a right thing to do. Let it bubble up to the pipelining http error handler. This will properly close channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rethrow exception 43bf06a

boolean closing;
ArrayList<ReleasableBytesReference> chunks;

private AggregationChunkHandler(RestRequest restRequest, Consumer<RestRequest> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit:

Suggested change
private AggregationChunkHandler(RestRequest restRequest, Consumer<RestRequest> result) {
private AggregationChunkHandler(RestRequest restRequest, Consumer<RestRequest> resultConsumer) {

if (closing == false) {
closing = true;
if (chunks != null) {
Releasables.close(chunks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If closed mid-aggregation, do we need some way to notify the handler? Conversely, if we got to the end of the aggregation, could we release the chunks earlier? AIUI we close the request when starting to send the response, which seems much later than necessary.

Copy link
Contributor Author

@mhl-b mhl-b Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If closed mid-aggregation, do we need some way to notify the handler?

I don't think so. Handlers that handle whole content shouldn't bother about incomplete requests. Thats the whole point of aggregation, keep it simple downstream, right?

Conversely, if we got to the end of the aggregation, could we release the chunks earlier?

We release chunks at prepareRequest, if you remember we changed that few months ago 7aa07f1. This close handler is for premature termination.

@@ -30,6 +30,10 @@ enum HttpVersion {

HttpBody body();

default void setBody(HttpBody body) {
throw new UnsupportedOperationException("not implemented");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert false here too? This'd be a bug right? Although there's only 4 implementing classes (plus a few subclasses) so I'd prefer not to have a default implementation.

if (encoding != null && encoding.equals("chunked")) {
return -1;
} else {
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? What requests might not have a Content-Length or Transfer-Encoding header exactly? I thought that'd not be allowed in HTTP/1.1, and in HTTP/1.0 a request without a Content-Length header would have a body that is terminated by the connection close.

Copy link
Contributor Author

@mhl-b mhl-b Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What requests might not have a Content-Length or Transfer-Encoding header exactly?

For example GET request without body is allowed to not have either.

https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-5

A user agent SHOULD NOT send a Content-Length header field when the request message does not contain content and the method semantics do not anticipate such data.

This if-condition might not be very precise, but it's practical. When Transfer-Encoding is present it is very likely chunked. And in this case Content-Length should be omitted. But if none of them present it should be zero length content.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with hasContent 34e8881

@@ -119,10 +91,18 @@ public HttpBody body() {
return content;
}

@Override
public void setBody(HttpBody body) {
if (body.isFull()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that we only ever replace a streaming body with a fully-aggregated body?

@@ -119,10 +90,14 @@ public HttpBody body() {
return content;
}

@Override
public void setBody(HttpBody body) {
this.content = body.asFull();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body.asFull(); includes assertion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but can we also check that this.content is not already a fully-aggregated body?

@mhl-b mhl-b requested a review from DaveCTurner June 17, 2025 04:26
@elasticsearchmachine
Copy link
Collaborator

Hi @mhl-b, I've updated the changelog YAML for you.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One further comment on supporting contentLength() on a streaming request, and a few nits, otherwise this looks good to go.

Comment on lines +152 to +153
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability nit: this has to happen before the ctx::read above; of course it does today because we're already on the event loop so the dispatched ctx::read happens after this method returns but still it's confusing to write them in this order.

}

private Netty4HttpRequest(
public Netty4HttpRequest(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably a mistake to expose the released flag to the public

Suggested change
public Netty4HttpRequest(
private Netty4HttpRequest(

Comment on lines 399 to 400
});
ch.pipeline().addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may as well carry on chaining the addLast calls

Suggested change
});
ch.pipeline().addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));
})
.addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));

return httpRequest.body().asFull().bytes().length();
return switch (httpRequest.body()) {
case HttpBody.Full content -> content.bytes().length();
case HttpBody.Stream stream -> 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this is trappy, I'd much prefer it remained an error to call contentLength on a streaming request.


public void testFullBodyPassThrough() {
var fullRequest = newRestRequest(between(1, 1024));
aggregate(fullRequest, (aggregated) -> assertEquals(fullRequest.content(), aggregated.content()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to assert that the consumer is invoked, otherwise this will pass if aggregate doesn't. Also can we assertSame?

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mhl-b mhl-b merged commit eeca493 into elastic:main Jun 19, 2025
27 checks passed
kderusso pushed a commit to kderusso/elasticsearch that referenced this pull request Jun 23, 2025
mridula-s109 pushed a commit to mridula-s109/elasticsearch that referenced this pull request Jun 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Network Http and internode communication implementations >enhancement Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Indexing a single doc to a data stream bulk endpoint trips assertion and returns class cast exception
4 participants