-
Notifications
You must be signed in to change notification settings - Fork 25.4k
ESQL: Revive some more of inlinestats functionality #123589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Hi @astefan, I've created a changelog YAML for you. |
@@ -34,31 +34,31 @@ FROM employees | |||
| SORT emp_no ASC | |||
| LIMIT 5; | |||
|
|||
emp_no:integer | languages:integer | gender:keyword | max_lang:integer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a series of changes to existent queries that already worked, where the order of the columns was, imo, wrong.
inlinestats
should behave like stats
where the aggregate comes first, the groupings come afterwards.
In this very specific case of this query where the keep
is explicit about gender
being the last column from left to right, inlinestats
will put max_lang
before gender
because gender
is its grouping field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some context:
The initial version of INLINESTATS
left the group-by columns in place to make it clear that these come from the previous steps and were left untouched. @astefan discovered that this leads to very weird side effects when the group-by columns are defined as an expression.
E.g. if the upstream commands output is existing_field, other_field
, then INLINESTATS c = count(*) BY expr=2*other_field, existing_field
would have the output
existing_field, other_field, expr, c
(if we consider thatexpr=2*other_field
is turned into anEVAL
ahead ofINLINESTATS
)existing_field, other_field, c, expr
(if we agree that expressions in group-bys get placed to the right)
and neither are consistent with the output of STATS
(which always places the groups on the right hand side).
Putting the group-by columns always on the right hand side of the output, consistently with STATS
, is likely a better overall choice IMHO to avoid behavior that is both edge-case-y and unexpected to users.
required_capability: inlinestats_v4 | ||
FROM employees | ||
| KEEP emp_no, languages, gender | ||
| INLINESTATS max_lang = MAX(languages) BY y = gender |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I discovered this issue while testing different variants of the order of columns.
In this particular case where gender
is simply "aliased" with a different name, the query doesn't work, even though things more complex like BY y = concat(gender, "")
do work. The investigation was more involved and I chose to tackle this one in a subsequent PR. Preliminary work show that ReplaceAggregateNestedExpressionWithEval
and CombineProjections
seem to work one against the other. The former adds an eval
while the latter eliminates it and puts it back in the groupings of the aggregation.
@@ -129,4 +133,19 @@ protected NodeInfo<Join> info() { | |||
public Join replaceChildren(LogicalPlan left, LogicalPlan right) { | |||
return new InlineJoin(source(), left, right, config()); | |||
} | |||
|
|||
@Override | |||
public List<Attribute> computeOutput(List<Attribute> left, List<Attribute> right) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the meat of the change.
@@ -32,7 +33,8 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize { | |||
private final List<Attribute> matchFields; | |||
private final List<Attribute> leftFields; | |||
private final List<Attribute> rightFields; | |||
private final List<Attribute> output; | |||
private final List<Attribute> addedFields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple rename.
return this; | ||
} | ||
|
||
@Override | ||
public List<Attribute> output() { | ||
return output; | ||
if (lazyOutput == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The meat of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heya, I think this is good to merge - although I left a couple minor comments + a test suggestion that we could tackle before.
What I think we should discuss is the modelling of InlineJoin with respect to what attributes the branches of this binary plan contain; see my comment below.
@@ -34,31 +34,31 @@ FROM employees | |||
| SORT emp_no ASC | |||
| LIMIT 5; | |||
|
|||
emp_no:integer | languages:integer | gender:keyword | max_lang:integer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some context:
The initial version of INLINESTATS
left the group-by columns in place to make it clear that these come from the previous steps and were left untouched. @astefan discovered that this leads to very weird side effects when the group-by columns are defined as an expression.
E.g. if the upstream commands output is existing_field, other_field
, then INLINESTATS c = count(*) BY expr=2*other_field, existing_field
would have the output
existing_field, other_field, expr, c
(if we consider thatexpr=2*other_field
is turned into anEVAL
ahead ofINLINESTATS
)existing_field, other_field, c, expr
(if we agree that expressions in group-bys get placed to the right)
and neither are consistent with the output of STATS
(which always places the groups on the right hand side).
Putting the group-by columns always on the right hand side of the output, consistently with STATS
, is likely a better overall choice IMHO to avoid behavior that is both edge-case-y and unexpected to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a test that does something like INLINESTATS ... BY y=to_upper(x), existing_field
(also with different order), just to make double sure that mixed expressions work fine?
JoinType joinType = config().type(); | ||
List<Attribute> output; | ||
if (LEFT.equals(joinType)) { | ||
List<Attribute> leftOutput = new ArrayList<>(left().output()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We stream through this anyway and otherwise never use this copy; I don't think we need to copy into a new ArrayList for that, we could stream through left().output()
directly in line 144.
AttributeSet rightKeys = new AttributeSet(config().rightFields()); | ||
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, this part is interesting. And confusing. But interestingly so.
I was confused and thought this is wrong because we filter the right keys from the left output - normally in joins, the attributes in the left and right hand sides are disjoint!
However, because of how inline join works, my assumption is wrong and we have attributes that occur on both sides, because the INLINESTATS aggregation in the right hand side will reference attributes from the left hand side.
This will likely be a source of bugs, because in some parts of the code we rely on the fact that the left and right hand side branches of a binary plan must not have any attributes in common (like here in PruneColumns), even when the branches have the same index in common - like in two consecutive and identical LOOKUP JOIN
commands.
Going forward, I think we should:
- Make this contract explicit by making this part of the plan consistency check.
- Refactor InlineJoin so that the left and right hand children don't have the same attributes, ever. I think this could be achieved by messing with
StubRelation
's output a little.
If you agree with me, this means that in this PR it'd be preferable not to assume that the left fields have overlap with the right fields; instead, the output can be computed as follows:
List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList();
List<Attribute> rightWithAppendedKeys = new ArrayList<>(right);
rightWithAppendedKeys.removeAll(config().rightFields());
rightWithAppendedKeys.addAll(config().leftFields());
output = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this differs from lookup join, please add a comment on the method on what the computeOutput does and why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment for this, but I would rather keep the code as is. Any further changes to this part will likely come as a package of changes in other places as well, mainly what @alex-spies is suggesting in his review about an overall modeling of inline joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be something for a follow up, but I tried
ROW x = 1 | INLINESTATS max(x) by x
and the node died on me with
[2025-03-04T18:45:52,141][ERROR][o.e.b.ElasticsearchUncaughtExceptionHandler] [runTask-0] fatal error in thread [elasticsearch[runTask-0][esql_worker][T#2]], exiting java.lang.AssertionError: expected concrete indices with data node plan but got empty; data node plan ExchangeSinkExec[[max(x){r}#4, x{r}#2],false]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I haven't looked at row
. Definitely something for a follow-up.
var rightFieldNames = rightFields.stream().map(Attribute::name).toList(); | ||
lazyOutput.removeIf(a -> rightFieldNames.contains(a.name())); | ||
lazyOutput.addAll(addedFields); | ||
lazyOutput.addAll(rightFields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this looks like we're preserving the join keys from the right, but we normally preserve join keys from the left. In this case it doesn't matter, because they are the same, but this differs conceptually from LOOKUP JOIN which explicitly preserves the join keys from the left.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
AttributeSet rightKeys = new AttributeSet(config().rightFields()); | ||
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this differs from lookup join, please add a comment on the method on what the computeOutput does and why.
if (LEFT.equals(joinType)) { | ||
List<Attribute> leftOutput = new ArrayList<>(left().output()); | ||
AttributeSet rightKeys = new AttributeSet(config().rightFields()); | ||
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using streams, you could create an empty collection then do a forEach to add elements in there:
List<Attribute> leftOutputWithoutMatchFields = new ArrayList<>()
leftOutput.forEach(e -> if (rightKeys.contains(e) == false) { leftOutputWIthoutMatchField.add(e)});
@elasticmachine test this |
💚 Backport successful
|
This one looked at the way the different field outputs of InlineJoin are computed.