-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Move HTTP content aggregation from Netty into RestController #129302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@Override | ||
public void setBody(HttpBody body) { | ||
if (body.isFull()) { | ||
var contentLength = body.asFull().bytes().length(); | ||
HttpUtil.setContentLength(nettyRequest, contentLength); | ||
} | ||
this.content = body; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing mutability for the HTTP request body, I need to swap stream with full content after aggregation. References to original HTTP request spread roots too deep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tidy up, I will leave to the other reviewers to approve because they have more context, but LGTM
try { | ||
dispatchRequest(aggregatedRequest, aggChannel, handler, methodHandlers, threadContext); | ||
} catch (Exception e) { | ||
assert false : "exception must be handled in dispatchRequest"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log an error here just in case it somehow happens in prod?
replaceBody(restRequest, chunk); | ||
result.accept(restRequest); | ||
} else { | ||
var comp = CompositeBytesReference.of(chunks.toArray(new ReleasableBytesReference[0])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth making new ReleasableBytesReference[0]
a constant to avoid the allocation? Or will the compiler be clever enough to do that I wonder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is smart enough to skip extra allocation. It still allocates array of the list size. Would be nice to have CompositeBytesReference.of(List<Chunks>)
.
|
||
public RestRequestAggregator(CircuitBreakerService circuitBreakerService) { | ||
this.circuitBreakerService = circuitBreakerService; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: do we use the circuitBreakerService in here? it doesn't look like it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was to include CB in this PR. So I've done basic plumbing. But scope of CB appears fairly large. I will leave it as it is. CB will work as before, we fully aggregate content and then try breaker.
cc408fb
to
9ced839
Compare
7012336
to
0cd4f72
Compare
final HttpValidator httpValidator = (httpRequest, channel, validationListener) -> { | ||
// assert that the validator sees the request unaltered | ||
assertThat(httpRequest.uri(), is(uri)); | ||
if (randomBoolean()) { | ||
validationListener.onResponse(null); | ||
} else { | ||
validationListener.onFailure(new ElasticsearchException("Boom")); | ||
} | ||
}; | ||
try ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, oversized requests that fail auth will be rejected at netty aggregator with size exception. Right now it will go through to the RestController, controller will send a response based on auth exception and close(discard) stream.
I think it's ok to have Auth exception with higher precedence than content size. But I think there is a possibility for an evil request with extremely large content that will be discarded, but we want to close channel sooner, rather than discarding content. If we want to keep previous behaviour I need to adjust Netty4HttpContentSizeHandler.
@DaveCTurner CI has been happy last 3 runs, with minor test fixes. It's ready for review. I will address audit logging in following PR if this one passes. |
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
Hi @mhl-b, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. I haven't quite finished reviewing the changes in detail but have left some comments from my first pass through.
return new ReleasableBytesReference(randomBytesReference(length), new AbstractRefCounted() { | ||
@Override | ||
protected void closeInternal() { | ||
|
||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we leak-track these things?
return new ReleasableBytesReference(randomBytesReference(length), new AbstractRefCounted() { | |
@Override | |
protected void closeInternal() { | |
} | |
}); | |
return new ReleasableBytesReference(randomBytesReference(length), LeakTracker.wrap(AbstractRefCounted.of(() -> {}))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
return new ReleasableBytesReference(randomBytesReference(length), LeakTracker.wrap(new AbstractRefCounted() {
@Override
protected void closeInternal() {}
}));
|
||
@Override | ||
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { | ||
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert this?
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {} | |
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) { | |
assert restRequest.isStreamedContent() == false; | |
} |
return httpRequest.body().asFull().bytes().length(); | ||
return switch (httpRequest.body()) { | ||
case HttpBody.Full content -> content.bytes().length(); | ||
case HttpBody.Stream stream -> Math.toIntExact(httpRequest.contentLengthHeader()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm suspicious about this, are you sure we need it? I'd rather we only exposed the length of an aggregated body. My worry is that code which is processing a streaming body shouldn't be dependent on the content length since it's not always available, but with this lenience we might end up not covering the unavailable-length case properly in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. This is only used for the circuit breaker estimation. I can remove whole content-length changes, I don't think it's essential for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed content-length changes c7bb930
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added hasContent
to the HTTP request, we need to know if content is present before aggregation happens, so I'm keeping logic for headers check. 34e8881
try { | ||
dispatchRequest(aggregatedRequest, restChannel, handler, methodHandlers, threadContext); | ||
} catch (Exception e) { | ||
// dispatchRequest already handles exceptions, this time we wont be able to send response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment. If dispatchRequest
is expected not to throw anything, should we assert false : e
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not well written by me. It's a bit tricky. dispatchRequest
catches errors and tries to send error response. But sending error response might fail too, at least we have a test that does that by injecting failure into sendResponse path, not sure that production code can throw there. Failing to send response can only go to the error log.
Cannot re-throw Exception without wrapping into RuntimeException but that mess up error handling. "Cannot" means the whole chain need to have throw Exception signature which is pretty large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again, wrapping into RuntimeException might be a right thing to do. Let it bubble up to the pipelining http error handler. This will properly close channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rethrow exception 43bf06a
boolean closing; | ||
ArrayList<ReleasableBytesReference> chunks; | ||
|
||
private AggregationChunkHandler(RestRequest restRequest, Consumer<RestRequest> result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nit:
private AggregationChunkHandler(RestRequest restRequest, Consumer<RestRequest> result) { | |
private AggregationChunkHandler(RestRequest restRequest, Consumer<RestRequest> resultConsumer) { |
if (closing == false) { | ||
closing = true; | ||
if (chunks != null) { | ||
Releasables.close(chunks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If closed mid-aggregation, do we need some way to notify the handler? Conversely, if we got to the end of the aggregation, could we release the chunks earlier? AIUI we close the request when starting to send the response, which seems much later than necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If closed mid-aggregation, do we need some way to notify the handler?
I don't think so. Handlers that handle whole content shouldn't bother about incomplete requests. Thats the whole point of aggregation, keep it simple downstream, right?
Conversely, if we got to the end of the aggregation, could we release the chunks earlier?
We release chunks at prepareRequest
, if you remember we changed that few months ago 7aa07f1. This close handler is for premature termination.
@@ -30,6 +30,10 @@ enum HttpVersion { | |||
|
|||
HttpBody body(); | |||
|
|||
default void setBody(HttpBody body) { | |||
throw new UnsupportedOperationException("not implemented"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert false
here too? This'd be a bug right? Although there's only 4 implementing classes (plus a few subclasses) so I'd prefer not to have a default
implementation.
if (encoding != null && encoding.equals("chunked")) { | ||
return -1; | ||
} else { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? What requests might not have a Content-Length
or Transfer-Encoding
header exactly? I thought that'd not be allowed in HTTP/1.1, and in HTTP/1.0 a request without a Content-Length
header would have a body that is terminated by the connection close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What requests might not have a Content-Length or Transfer-Encoding header exactly?
For example GET request without body is allowed to not have either.
https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-5
A user agent SHOULD NOT send a Content-Length header field when the request message does not contain content and the method semantics do not anticipate such data.
This if-condition might not be very precise, but it's practical. When Transfer-Encoding
is present it is very likely chunked
. And in this case Content-Length
should be omitted. But if none of them present it should be zero length content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with hasContent 34e8881
@@ -119,10 +91,18 @@ public HttpBody body() { | |||
return content; | |||
} | |||
|
|||
@Override | |||
public void setBody(HttpBody body) { | |||
if (body.isFull()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert that we only ever replace a streaming body with a fully-aggregated body?
@@ -119,10 +90,14 @@ public HttpBody body() { | |||
return content; | |||
} | |||
|
|||
@Override | |||
public void setBody(HttpBody body) { | |||
this.content = body.asFull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
body.asFull();
includes assertion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but can we also check that this.content
is not already a fully-aggregated body?
Hi @mhl-b, I've updated the changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One further comment on supporting contentLength()
on a streaming request, and a few nits, otherwise this looks good to go.
if (msg instanceof LastHttpContent) { | ||
currentRequestStream = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readability nit: this has to happen before the ctx::read
above; of course it does today because we're already on the event loop so the dispatched ctx::read
happens after this method returns but still it's confusing to write them in this order.
} | ||
|
||
private Netty4HttpRequest( | ||
public Netty4HttpRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably a mistake to expose the released
flag to the public
public Netty4HttpRequest( | |
private Netty4HttpRequest( |
}); | ||
ch.pipeline().addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may as well carry on chaining the addLast
calls
}); | |
ch.pipeline().addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength())); | |
}) | |
.addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength())); |
return httpRequest.body().asFull().bytes().length(); | ||
return switch (httpRequest.body()) { | ||
case HttpBody.Full content -> content.bytes().length(); | ||
case HttpBody.Stream stream -> 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this is trappy, I'd much prefer it remained an error to call contentLength
on a streaming request.
|
||
public void testFullBodyPassThrough() { | ||
var fullRequest = newRestRequest(between(1, 1024)); | ||
aggregate(fullRequest, (aggregated) -> assertEquals(fullRequest.content(), aggregated.content())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to assert that the consumer is invoked, otherwise this will pass if aggregate
doesn't. Also can we assertSame
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Netty HTTP content aggregator appears to be not flexible for our needs. In some cases, like bulk API we need to bypass aggregation, and Netty's aggregator does not have any memory safety, like backpressure or circuit breaker. That means we always aggregate requests fully and occasionally run OOM.
This change moves aggregation from the Netty module into Server module, RestController. That means in netty code we no longer deal with fully aggregated requests and everything is a stream of chunks. RestController aggregates chunks when RestHandler indicates that stream is not supported via new flag. Opens up opportunity to apply backpressure/circuit-breaker per chunk basis for all REST handlers. Also simplifies implementation of new streaming handlers.
Once RestHandler is resolved in RestController we check if it supports stream content. If stream is supported then request proceeds to the Handler. If stream is not supported a new RestContentAggregator aggregates chunks, then replaces content of the RestRequest with aggregated content.
PR touches many files but essentially changes are:
closes #120746