Skip to content

ESQL: Revive some more of inlinestats functionality #123589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 6, 2025

Conversation

astefan
Copy link
Contributor

@astefan astefan commented Feb 27, 2025

This one looked at the way the different field outputs of InlineJoin are computed.

@astefan astefan added >bug auto-backport Automatically create backport pull requests when merged test-release Trigger CI checks against release build :Analytics/ES|QL AKA ESQL v9.0.1 v9.1.0 labels Feb 27, 2025
@astefan astefan requested review from costin and alex-spies February 27, 2025 10:30
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Feb 27, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @astefan, I've created a changelog YAML for you.

@@ -34,31 +34,31 @@ FROM employees
| SORT emp_no ASC
| LIMIT 5;

emp_no:integer | languages:integer | gender:keyword | max_lang:integer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a series of changes to existent queries that already worked, where the order of the columns was, imo, wrong.
inlinestats should behave like stats where the aggregate comes first, the groupings come afterwards.

In this very specific case of this query where the keep is explicit about gender being the last column from left to right, inlinestats will put max_lang before gender because gender is its grouping field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some context:
The initial version of INLINESTATS left the group-by columns in place to make it clear that these come from the previous steps and were left untouched. @astefan discovered that this leads to very weird side effects when the group-by columns are defined as an expression.

E.g. if the upstream commands output is existing_field, other_field, then INLINESTATS c = count(*) BY expr=2*other_field, existing_field would have the output

  • existing_field, other_field, expr, c (if we consider that expr=2*other_field is turned into an EVAL ahead of INLINESTATS)
  • existing_field, other_field, c, expr (if we agree that expressions in group-bys get placed to the right)

and neither are consistent with the output of STATS (which always places the groups on the right hand side).

Putting the group-by columns always on the right hand side of the output, consistently with STATS, is likely a better overall choice IMHO to avoid behavior that is both edge-case-y and unexpected to users.

required_capability: inlinestats_v4
FROM employees
| KEEP emp_no, languages, gender
| INLINESTATS max_lang = MAX(languages) BY y = gender
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered this issue while testing different variants of the order of columns.
In this particular case where gender is simply "aliased" with a different name, the query doesn't work, even though things more complex like BY y = concat(gender, "") do work. The investigation was more involved and I chose to tackle this one in a subsequent PR. Preliminary work show that ReplaceAggregateNestedExpressionWithEval and CombineProjections seem to work one against the other. The former adds an eval while the latter eliminates it and puts it back in the groupings of the aggregation.

@@ -129,4 +133,19 @@ protected NodeInfo<Join> info() {
public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
return new InlineJoin(source(), left, right, config());
}

@Override
public List<Attribute> computeOutput(List<Attribute> left, List<Attribute> right) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the meat of the change.

@@ -32,7 +33,8 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
private final List<Attribute> matchFields;
private final List<Attribute> leftFields;
private final List<Attribute> rightFields;
private final List<Attribute> output;
private final List<Attribute> addedFields;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple rename.

return this;
}

@Override
public List<Attribute> output() {
return output;
if (lazyOutput == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meat of the PR.

@astefan astefan removed the test-release Trigger CI checks against release build label Feb 27, 2025
@astefan astefan added the test-release Trigger CI checks against release build label Feb 28, 2025
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya, I think this is good to merge - although I left a couple minor comments + a test suggestion that we could tackle before.

What I think we should discuss is the modelling of InlineJoin with respect to what attributes the branches of this binary plan contain; see my comment below.

@@ -34,31 +34,31 @@ FROM employees
| SORT emp_no ASC
| LIMIT 5;

emp_no:integer | languages:integer | gender:keyword | max_lang:integer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some context:
The initial version of INLINESTATS left the group-by columns in place to make it clear that these come from the previous steps and were left untouched. @astefan discovered that this leads to very weird side effects when the group-by columns are defined as an expression.

E.g. if the upstream commands output is existing_field, other_field, then INLINESTATS c = count(*) BY expr=2*other_field, existing_field would have the output

  • existing_field, other_field, expr, c (if we consider that expr=2*other_field is turned into an EVAL ahead of INLINESTATS)
  • existing_field, other_field, c, expr (if we agree that expressions in group-bys get placed to the right)

and neither are consistent with the output of STATS (which always places the groups on the right hand side).

Putting the group-by columns always on the right hand side of the output, consistently with STATS, is likely a better overall choice IMHO to avoid behavior that is both edge-case-y and unexpected to users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test that does something like INLINESTATS ... BY y=to_upper(x), existing_field (also with different order), just to make double sure that mixed expressions work fine?

JoinType joinType = config().type();
List<Attribute> output;
if (LEFT.equals(joinType)) {
List<Attribute> leftOutput = new ArrayList<>(left().output());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We stream through this anyway and otherwise never use this copy; I don't think we need to copy into a new ArrayList for that, we could stream through left().output() directly in line 144.

Comment on lines 143 to 144
AttributeSet rightKeys = new AttributeSet(config().rightFields());
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this part is interesting. And confusing. But interestingly so.

I was confused and thought this is wrong because we filter the right keys from the left output - normally in joins, the attributes in the left and right hand sides are disjoint!

However, because of how inline join works, my assumption is wrong and we have attributes that occur on both sides, because the INLINESTATS aggregation in the right hand side will reference attributes from the left hand side.

This will likely be a source of bugs, because in some parts of the code we rely on the fact that the left and right hand side branches of a binary plan must not have any attributes in common (like here in PruneColumns), even when the branches have the same index in common - like in two consecutive and identical LOOKUP JOIN commands.

Going forward, I think we should:

  • Make this contract explicit by making this part of the plan consistency check.
  • Refactor InlineJoin so that the left and right hand children don't have the same attributes, ever. I think this could be achieved by messing with StubRelation's output a little.

If you agree with me, this means that in this PR it'd be preferable not to assume that the left fields have overlap with the right fields; instead, the output can be computed as follows:

            List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList();
            List<Attribute> rightWithAppendedKeys = new ArrayList<>(right);
            rightWithAppendedKeys.removeAll(config().rightFields());
            rightWithAppendedKeys.addAll(config().leftFields());

            output = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this differs from lookup join, please add a comment on the method on what the computeOutput does and why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment for this, but I would rather keep the code as is. Any further changes to this part will likely come as a package of changes in other places as well, mainly what @alex-spies is suggesting in his review about an overall modeling of inline joins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be something for a follow up, but I tried

ROW x = 1 | INLINESTATS max(x) by x

and the node died on me with

[2025-03-04T18:45:52,141][ERROR][o.e.b.ElasticsearchUncaughtExceptionHandler] [runTask-0] fatal error in thread [elasticsearch[runTask-0][esql_worker][T#2]], exiting java.lang.AssertionError: expected concrete indices with data node plan but got empty; data node plan ExchangeSinkExec[[max(x){r}#4, x{r}#2],false]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I haven't looked at row. Definitely something for a follow-up.

var rightFieldNames = rightFields.stream().map(Attribute::name).toList();
lazyOutput.removeIf(a -> rightFieldNames.contains(a.name()));
lazyOutput.addAll(addedFields);
lazyOutput.addAll(rightFields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this looks like we're preserving the join keys from the right, but we normally preserve join keys from the left. In this case it doesn't matter, because they are the same, but this differs conceptually from LOOKUP JOIN which explicitly preserves the join keys from the left.

Copy link
Member

@costin costin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines 143 to 144
AttributeSet rightKeys = new AttributeSet(config().rightFields());
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this differs from lookup join, please add a comment on the method on what the computeOutput does and why.

if (LEFT.equals(joinType)) {
List<Attribute> leftOutput = new ArrayList<>(left().output());
AttributeSet rightKeys = new AttributeSet(config().rightFields());
List<Attribute> leftOutputWithoutMatchFields = leftOutput.stream().filter(attr -> rightKeys.contains(attr) == false).toList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using streams, you could create an empty collection then do a forEach to add elements in there:

 List<Attribute> leftOutputWithoutMatchFields = new ArrayList<>()
 leftOutput.forEach(e -> if (rightKeys.contains(e) == false) { leftOutputWIthoutMatchField.add(e)});

@astefan astefan removed the test-release Trigger CI checks against release build label Mar 6, 2025
@astefan
Copy link
Contributor Author

astefan commented Mar 6, 2025

@elasticmachine test this

@astefan astefan merged commit 04c8bf4 into elastic:main Mar 6, 2025
17 checks passed
@elasticsearchmachine
Copy link
Collaborator

💚 Backport successful

Status Branch Result
9.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL auto-backport Automatically create backport pull requests when merged >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.0.1 v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants