Skip to content

[Logstash] Add CEL native data collection, and dashboards for Pipeline and Pipeline Details #8098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Oct 10, 2023

Conversation

robbavey
Copy link
Member

@robbavey robbavey commented Oct 4, 2023

Proposed commit message

This PR follows on from #7704, and adds data collection to the integration, as well as standard Dashboards for Logstash Pipelines.

The short term goal for this is to enable Logstash to be monitored with Serverless Elasticsearch instances, which do not provide a Stack Monitoring UI, with the long term goal of iterating on this to replace the existing Stack Monitoring UI.

This PR includes 3 new data streams - node_cel, pipelines and plugins, with data collected from the Logstash Metrics API using the CEL input, intended to replace the existing metricbeat collection.

This give us more flexibility going forward, and should improve our velocity to add new features to the Logstash dashboards straight from the API, without needing to update several code bases.

The CEL data streams will create a single document per node for the node_cel data stream, a document per pipeline per node for the pipeline data stream, and a document per plugin per pipeline per node for the plugins data stream. For this reason, we default to a lower sending cadence for the plugins data stream than for the other.

Note that the existing data collection from metricbeat defaults to sending large documents, including significant data for the entire pipeline graph every 10 seconds.

This integration also includes ingest pipelines to clean up the data from the Logstash flow metrics API, to remove entries we don't necessarily need at this stage, and to remove the string representation of "Infinity".

When installing a Logstash integration, users will be presented with an option to collect Metrics for Stack Monitoring or for Technical Preview

Screenshot 2023-10-04 at 3 50 30 PM

(Note: If there was a way to have an either/or option in the UI, that would be very helpful)

The dashboards are described in the Screenshots section.

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.

How to test this PR locally

  • Install elastic-package
  • Checkout this PR
  • Navigate to the integrations/packages/logstash directory
  • Run elastic-package build to build the package
  • Run elastic-package stack up to bring up the Elastic stack
  • Start an instance of Logstash
  • Install the Logstash integration
  • Uncheck the Metrics (Stack Monitoring) option
  • Check the Metrics (Technical Preview) option
  • Click 'Save and Continue' and follow the instructions to enroll the agent

Related issues

Follows on from #7704

Screenshots

Integration Selection Page

Screenshot 2023-10-04 at 3 56 22 PM

Pipelines Dashboard

This Dashboard provides an overview of running pipelines, with a table containing details of pipelines running, which can be drilled down into a "details view"

Proposed Page

Screenshot 2023-10-04 at 4 08 37 PM

This is intended to replace the existing pipelines page in the Stack monitoring solution

Existing Page
Screenshot 2023-10-04 at 4 08 15 PM

The page includes additional details, including Persistent Queue usage per pipeline, and more visibility into which pipelines may be blocked

Pipeline Details Dashboard

The Pipeline Details dashboard takes a different approach to the pipeline details view in stack monitoring

The page includes some general pipeline information, showing the rate of events flowing through the pipelines and state of persistent queues, broken down per node running the pipeline:

Proposed Page

General Information
Screenshot 2023-10-04 at 4 26 05 PM

It also includes information for each plugin type:

Inputs
Screenshot 2023-10-04 at 4 27 53 PM
Filters
Screenshot 2023-10-04 at 4 28 50 PM
Outputs
Screenshot 2023-10-04 at 4 29 49 PM
Codecs
Screenshot 2023-10-04 at 4 30 21 PM

This has benefits and drawbacks compared with the existing approach:

The benefits are:

  • More information and more rich drill downs to help track down slow performing plugins
  • Can track PQ usage on a per-pipeline and per-node basis

Drawbacks:

  • The lack of visual representation may make it more difficult to figure out which plugin is at fault when multiple plugins of the same type are at play
    • Note - this can be mitigated by users including an id in their plugin definition
    • We will be looking to add more information (originating file + line number of plugin) in a subsequent follow-up PR.

Existing Dashboard

The existing dashboard contains a representation of the pipeline graph, which we are not easily able to show using standard Kibana Dashboards

Screenshot 2023-10-04 at 4 32 50 PM

And a small pop-up drilldown for each plugin

Screenshot 2023-10-04 at 4 36 56 PM

Note: This will require a follow-up PR to enable for serverless - the existing integration has some compatibility issues that I would like to fold into a subsequent PR

This commit adds infrastructure to retrieve pipeline information from the Logstash Metrics API using
CEL. This is intended to provide a flattened version of the data provided by the metrics API enabling
visualizations to be created using standard Kibana dashboards, rather than rely on the custom stack
monitoring UI
This commit adds infrastructure to retrieve plugin information from the Logstash Metrics API using
CEL. This is intended to provide a flattened version of the data provided by the metrics API enabling
visualizations to be created using standard Kibana dashboards, rather than rely on the custom stack
monitoring UI.

Note that by default, we are sending data at a slower cadence than for nodes and pipelines. Large
Logstash pipelines may include *many* plugins, to avoid excessive data transmission, we will send
at a lower cadence, and allow users to adjust depending on their needs
This commit adds infrastructure to retrieve node information from the Logstash Metrics API using
CEL. This is intended to replace the existing node retrieval for stack monitoring and is intended
to mirror the data that was previously retrieved by metricbeat
This commit adds dashboards for Logstash pipelines and plugins
Fix tests
@robbavey robbavey added the enhancement New feature or request label Oct 4, 2023
@robbavey robbavey requested a review from a team as a code owner October 4, 2023 20:59
@elasticmachine
Copy link

elasticmachine commented Oct 4, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-10-09T18:01:25.647+0000

  • Duration: 22 min 1 sec

Test stats 🧪

Test Results
Failed 0
Passed 40
Skipped 0
Total 40

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

@robbavey
Copy link
Member Author

robbavey commented Oct 4, 2023

/test

@elasticmachine
Copy link

elasticmachine commented Oct 5, 2023

🌐 Coverage report

Name Metrics % (covered/total) Diff
Packages 100.0% (2/2) 💚
Files 100.0% (6/6) 💚
Classes 100.0% (6/6) 💚
Methods 100.0% (55/55) 💚 36.585
Lines 87.558% (190/217) 👎 -12.306
Conditionals 100.0% (0/0) 💚

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference is to name this file based on the input type used like cel.yml.hbs. If you do so, then update the template_path attribute in the manifest.yml.

password: {{password}}

redact:
fields: ~
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fields: ~
fields:
- password

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if you switch to using auth.basic then you won't need to redact it or configure the request with the Authorization header manually.

auth:
  basic:
    user: {{escape_string username}}
    password: {{escape_string password}}

@@ -0,0 +1,3 @@
errors:
exclude_checks:
- SVR00002 # Mandatory filters in dashboards.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs an elastic-package format.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to remove for this now - I will add a follow up PR for serverless/3.0 format compatibility

- name: cpuacct
type: group
fields:
- name: control_group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a little inconsistency between node_cel and node_stats. control_group is indexed as text in at packages/logstash/data_stream/node_stats/fields/fields.yml:128:27. Same for the *.cpu.control_group.

That might cause some query/agg issues.

type: keyword
ignore_above: 1024
description: Host mac addresses.
- name: name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is already declared in ecs.yml.

Comment on lines 8 to 11
- name: account.id
type: keyword
ignore_above: 1024
description: "The cloud account or organization id used to identify different entities in a multi-tenant environment.\nExamples: AWS account id, Google Cloud ORG Id, or other unique identifier."
Copy link
Member

@andrewkroh andrewkroh Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of these fields exist in ECS and should use external: ecs.

Suggested change
- name: account.id
type: keyword
ignore_above: 1024
description: "The cloud account or organization id used to identify different entities in a multi-tenant environment.\nExamples: AWS account id, Google Cloud ORG Id, or other unique identifier."
- name: account.id
external: ecs

I've been working on a tool to fix this automatically. This command will edit the file for you. (I would only use it on a clean workspace so you can review its edits in isolation.)

go run github.com/andrewkroh/fydler@main -fix packages/logstash/data_stream/node_cel/fields/*yml

Comment on lines 18 to 22
get_request(state.url).with({
"Header":{
"Authorization": ["Basic "+string(base64(state.username+":"+state.password))],
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done more easily withe the basic_authentication extension.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something I am missing beyond adding auth.basic.user and auth.basic.password blocks eg -

config_version: "2"
interval: {{period}}
resource.url: "{{url}}/_node/stats?graph=true&vertices=true"
{{#if resource_ssl}}
resource.ssl:
  {{resource_ssl}}
{{/if}}

{{#if username}}
auth.basic.user: {{escape_string username}}
{{/if}}
{{#if password}}
auth.basic.password: {{escape_string password}}
{{/if}}

redact:
  fields: ~

program: |
  get_request(state.url)
  .do_request().as(resp, bytes(resp.Body)
:
:

I have tried this a number of times, and I've never seen the Authorization header sent successfully, hence the workarounds applied here.

Comment on lines 38 to 40
"events":body['events'],
"jvm":{
"uptime_in_millis":body['jvm']['uptime_in_millis'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These and below can be dotted, eg:

Suggested change
"events":body['events'],
"jvm":{
"uptime_in_millis":body['jvm']['uptime_in_millis'],
"events":body.events,
"jvm":{
"uptime_in_millis":body.jvm.uptime_in_millis,

Stylistic only, but slightly lighter on the page.

}})
)
.as(eve, {
"events":eve.map(each, eve),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intention here? It looks to me like you are wanting a single element array with eve as the element. Is that correct?

Suggested change
"events":eve.map(each, eve),
"events": [eve],

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, yes - your suggestion works perfectly

Comment on lines 37 to 60
.with({
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid)))
})
.with({
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid})))
})
.with({
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"]
})
.with({
"inputs":body.pipelines[pipeline_name]["plugins"]["inputs"]
})
.with({
"filters":body.pipelines[pipeline_name]["plugins"]["filters"]
})
.with({
"codecs":body.pipelines[pipeline_name]["plugins"]["codecs"]
})
.with({
"host":{
"name":body.name,
"address":body.http_address,
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each with call will need to allocate a Go map. I would combine all these into one.

Suggested change
.with({
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid)))
})
.with({
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid})))
})
.with({
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"]
})
.with({
"inputs":body.pipelines[pipeline_name]["plugins"]["inputs"]
})
.with({
"filters":body.pipelines[pipeline_name]["plugins"]["filters"]
})
.with({
"codecs":body.pipelines[pipeline_name]["plugins"]["codecs"]
})
.with({
"host":{
"name":body.name,
"address":body.http_address,
}
})
.with({
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid))),
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid}))),
"outputs":body.pipelines[pipeline_name].plugins.outputs,
"inputs":body.pipelines[pipeline_name].plugins.inputs,
"filters":body.pipelines[pipeline_name].plugins.filters,
"codecs":body.pipelines[pipeline_name].plugins.codecs,
"host":{
"name":body.name,
"address":body.http_address,
}
})

Maybe also consider pulling out body.pipelines[pipeline_name].plugins once, before the evaluation here.

(oops, missed the initial with, but you get the idea)

"ms":input.events.queue_push_duration_in_millis
}
}
}.drop_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check to see that you still get what you want with only the final drop_empty (L87) in the expression.

"ms":filter.events.duration_in_millis
}
}
}.drop_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here.

@robbavey
Copy link
Member Author

robbavey commented Oct 5, 2023

I think I've addressed most of the comments in the original reviews - apart from the simplified basic_auth, which I have been unable to get working thus far

Using the `get()` method instead of `get_request(url).do_request()` enables
the use of basic auth by simply specifying `auth.basic` in the yaml definition
allowing a whole bunch of code to be removed from the script, and only needing to
redact the passwords, rather than use a blanket redaction.
Comment on lines 28 to 34
.with({
"host":{
"name":body.name,
"address":body.http_address,
}
})
.with({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can merge these as well.

"flow":body.pipelines[pipeline_name].flow,
"time":{
"queue_push_duration":{
"ms":body.pipelines[pipeline_name]["events"]["queue_push_duration_in_millis"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"ms":body.pipelines[pipeline_name]["events"]["queue_push_duration_in_millis"],
"ms":body.pipelines[pipeline_name].events.queue_push_duration_in_millis,

(similar below — quoted bracket indexing is only needed when the key is not a valid CEL token)

Comment on lines 17 to 18
fields:
- password
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can now be the null case.

Suggested change
fields:
- password
fields: ~

Comment on lines 17 to 18
fields:
- password
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fields:
- password
fields: ~

.with({
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid))),
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid}))),
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"],
"outputs":body.pipelines[pipeline_name].plugins.outputs,

(similar below)

"plugin":{
"type":"input",
"input":{
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == input.id), tuple.cluster_id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == input.id), tuple.cluster_id),
"elasticsearch.cluster.id":event.es_cluster_id_map.map(tuple, (tuple.plugin_id == input.id), tuple.cluster_id),

(similar below)

@robbavey
Copy link
Member Author

robbavey commented Oct 6, 2023

This is ready for another round

Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits, then LGTM for the CEL code.

"filter":{
"id":filter.id,
"name":filter.name,
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == filter.id), tuple.cluster_id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == filter.id), tuple.cluster_id),
"elasticsearch.cluster.id":event.es_cluster_id_map..map(tuple, (tuple.plugin_id == filter.id), tuple.cluster_id),

@robbavey
Copy link
Member Author

robbavey commented Oct 9, 2023

This is ready for another round @elastic/infra-monitoring-ui

Note, that this is part of the serverless initiative to enable Logstash monitoring in the absence of a stack monitoring UI

I will be following on this PR with the changes necessary to make this package visible to serverless, which includes changes to the existing integration package

Result of running:

go run github.com/andrewkroh/fydler@main -fix packages/logstash/data_stream/*/fields/*yml
Copy link
Contributor

@klacabane klacabane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@IanLee1521
Copy link
Contributor

First off, I just found this and it's AWESOME!

Maybe it's better for me to open a new issue, but just before I do, would it make sense to mark these dashboards with a reference to users needing to switch the integration from Stack Monitoring to Technical Preview?

I just ran in to this where I was trying to look at the [Metrics Logstash] * dashboards and getting errors.

image

Eventually I found this need to change the integration configuration and now things are working.

Thoughts? Is that worth noting on the dashboards?

@robbavey
Copy link
Member Author

robbavey commented Dec 1, 2023

@IanLee1521 First of all, thanks for trying out the new dashboards in technical preview - we love to get feedback on new features we are developing, and we appreciate you trying them out as we refine them. Any further comments would be gratefully received.

I think this should be an issue - as we start to transition to the new way of collecting and presenting data for monitoring, we should be making it clearer how to work with this effectively.

I'm on PTO at the moment, but I can make an issue when I get back, unless you would prefer to create one yourself.

@robbavey
Copy link
Member Author

robbavey commented Dec 4, 2023

@IanLee1521 - I've added an issue here. And thanks again for the feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Integration:logstash Logstash
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants