-
Notifications
You must be signed in to change notification settings - Fork 474
[Logstash] Add CEL native data collection, and dashboards for Pipeline and Pipeline Details #8098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit adds infrastructure to retrieve pipeline information from the Logstash Metrics API using CEL. This is intended to provide a flattened version of the data provided by the metrics API enabling visualizations to be created using standard Kibana dashboards, rather than rely on the custom stack monitoring UI
This commit adds infrastructure to retrieve plugin information from the Logstash Metrics API using CEL. This is intended to provide a flattened version of the data provided by the metrics API enabling visualizations to be created using standard Kibana dashboards, rather than rely on the custom stack monitoring UI. Note that by default, we are sending data at a slower cadence than for nodes and pipelines. Large Logstash pipelines may include *many* plugins, to avoid excessive data transmission, we will send at a lower cadence, and allow users to adjust depending on their needs
This commit adds infrastructure to retrieve node information from the Logstash Metrics API using CEL. This is intended to replace the existing node retrieval for stack monitoring and is intended to mirror the data that was previously retrieved by metricbeat
This commit adds dashboards for Logstash pipelines and plugins
Fix tests
/test |
🌐 Coverage report
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My personal preference is to name this file based on the input type used like cel.yml.hbs
. If you do so, then update the template_path
attribute in the manifest.yml.
password: {{password}} | ||
|
||
redact: | ||
fields: ~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fields: ~ | |
fields: | |
- password |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if you switch to using auth.basic
then you won't need to redact it or configure the request with the Authorization header manually.
auth:
basic:
user: {{escape_string username}}
password: {{escape_string password}}
packages/logstash/validation.yml
Outdated
@@ -0,0 +1,3 @@ | |||
errors: | |||
exclude_checks: | |||
- SVR00002 # Mandatory filters in dashboards. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs an elastic-package format
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to remove for this now - I will add a follow up PR for serverless/3.0 format compatibility
- name: cpuacct | ||
type: group | ||
fields: | ||
- name: control_group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a little inconsistency between node_cel and node_stats. control_group
is indexed as text
in at packages/logstash/data_stream/node_stats/fields/fields.yml:128:27. Same for the *.cpu.control_group.
That might cause some query/agg issues.
type: keyword | ||
ignore_above: 1024 | ||
description: Host mac addresses. | ||
- name: name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is already declared in ecs.yml.
- name: account.id | ||
type: keyword | ||
ignore_above: 1024 | ||
description: "The cloud account or organization id used to identify different entities in a multi-tenant environment.\nExamples: AWS account id, Google Cloud ORG Id, or other unique identifier." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of these fields exist in ECS and should use external: ecs
.
- name: account.id | |
type: keyword | |
ignore_above: 1024 | |
description: "The cloud account or organization id used to identify different entities in a multi-tenant environment.\nExamples: AWS account id, Google Cloud ORG Id, or other unique identifier." | |
- name: account.id | |
external: ecs |
I've been working on a tool to fix this automatically. This command will edit the file for you. (I would only use it on a clean workspace so you can review its edits in isolation.)
go run github.com/andrewkroh/fydler@main -fix packages/logstash/data_stream/node_cel/fields/*yml
get_request(state.url).with({ | ||
"Header":{ | ||
"Authorization": ["Basic "+string(base64(state.username+":"+state.password))], | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done more easily withe the basic_authentication
extension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something I am missing beyond adding auth.basic.user
and auth.basic.password
blocks eg -
config_version: "2"
interval: {{period}}
resource.url: "{{url}}/_node/stats?graph=true&vertices=true"
{{#if resource_ssl}}
resource.ssl:
{{resource_ssl}}
{{/if}}
{{#if username}}
auth.basic.user: {{escape_string username}}
{{/if}}
{{#if password}}
auth.basic.password: {{escape_string password}}
{{/if}}
redact:
fields: ~
program: |
get_request(state.url)
.do_request().as(resp, bytes(resp.Body)
:
:
I have tried this a number of times, and I've never seen the Authorization
header sent successfully, hence the workarounds applied here.
"events":body['events'], | ||
"jvm":{ | ||
"uptime_in_millis":body['jvm']['uptime_in_millis'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These and below can be dotted, eg:
"events":body['events'], | |
"jvm":{ | |
"uptime_in_millis":body['jvm']['uptime_in_millis'], | |
"events":body.events, | |
"jvm":{ | |
"uptime_in_millis":body.jvm.uptime_in_millis, |
Stylistic only, but slightly lighter on the page.
}}) | ||
) | ||
.as(eve, { | ||
"events":eve.map(each, eve), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the intention here? It looks to me like you are wanting a single element array with eve
as the element. Is that correct?
"events":eve.map(each, eve), | |
"events": [eve], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, yes - your suggestion works perfectly
.with({ | ||
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid))) | ||
}) | ||
.with({ | ||
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid}))) | ||
}) | ||
.with({ | ||
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"] | ||
}) | ||
.with({ | ||
"inputs":body.pipelines[pipeline_name]["plugins"]["inputs"] | ||
}) | ||
.with({ | ||
"filters":body.pipelines[pipeline_name]["plugins"]["filters"] | ||
}) | ||
.with({ | ||
"codecs":body.pipelines[pipeline_name]["plugins"]["codecs"] | ||
}) | ||
.with({ | ||
"host":{ | ||
"name":body.name, | ||
"address":body.http_address, | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each with
call will need to allocate a Go map. I would combine all these into one.
.with({ | |
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid))) | |
}) | |
.with({ | |
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid}))) | |
}) | |
.with({ | |
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"] | |
}) | |
.with({ | |
"inputs":body.pipelines[pipeline_name]["plugins"]["inputs"] | |
}) | |
.with({ | |
"filters":body.pipelines[pipeline_name]["plugins"]["filters"] | |
}) | |
.with({ | |
"codecs":body.pipelines[pipeline_name]["plugins"]["codecs"] | |
}) | |
.with({ | |
"host":{ | |
"name":body.name, | |
"address":body.http_address, | |
} | |
}) | |
.with({ | |
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid))), | |
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid}))), | |
"outputs":body.pipelines[pipeline_name].plugins.outputs, | |
"inputs":body.pipelines[pipeline_name].plugins.inputs, | |
"filters":body.pipelines[pipeline_name].plugins.filters, | |
"codecs":body.pipelines[pipeline_name].plugins.codecs, | |
"host":{ | |
"name":body.name, | |
"address":body.http_address, | |
} | |
}) |
Maybe also consider pulling out body.pipelines[pipeline_name].plugins
once, before the evaluation here.
(oops, missed the initial with
, but you get the idea)
"ms":input.events.queue_push_duration_in_millis | ||
} | ||
} | ||
}.drop_empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check to see that you still get what you want with only the final drop_empty
(L87) in the expression.
"ms":filter.events.duration_in_millis | ||
} | ||
} | ||
}.drop_empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here.
packages/logstash/data_stream/plugins/agent/stream/stream.yml.hbs
Outdated
Show resolved
Hide resolved
I think I've addressed most of the comments in the original reviews - apart from the simplified |
Using the `get()` method instead of `get_request(url).do_request()` enables the use of basic auth by simply specifying `auth.basic` in the yaml definition allowing a whole bunch of code to be removed from the script, and only needing to redact the passwords, rather than use a blanket redaction.
.with({ | ||
"host":{ | ||
"name":body.name, | ||
"address":body.http_address, | ||
} | ||
}) | ||
.with({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can merge these as well.
"flow":body.pipelines[pipeline_name].flow, | ||
"time":{ | ||
"queue_push_duration":{ | ||
"ms":body.pipelines[pipeline_name]["events"]["queue_push_duration_in_millis"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"ms":body.pipelines[pipeline_name]["events"]["queue_push_duration_in_millis"], | |
"ms":body.pipelines[pipeline_name].events.queue_push_duration_in_millis, |
(similar below — quoted bracket indexing is only needed when the key is not a valid CEL token)
fields: | ||
- password |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can now be the null case.
fields: | |
- password | |
fields: ~ |
fields: | ||
- password |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fields: | |
- password | |
fields: ~ |
.with({ | ||
"es_cluster_id":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), each.cluster_uuid))), | ||
"es_cluster_id_map":((body.pipelines[pipeline_name].vertices).as(vertices, vertices.map(each, has(each.cluster_uuid), {"plugin_id":each.id, "cluster_id":each.cluster_uuid}))), | ||
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"outputs":body.pipelines[pipeline_name]["plugins"]["outputs"], | |
"outputs":body.pipelines[pipeline_name].plugins.outputs, |
(similar below)
"plugin":{ | ||
"type":"input", | ||
"input":{ | ||
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == input.id), tuple.cluster_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == input.id), tuple.cluster_id), | |
"elasticsearch.cluster.id":event.es_cluster_id_map.map(tuple, (tuple.plugin_id == input.id), tuple.cluster_id), |
(similar below)
This is ready for another round |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits, then LGTM for the CEL code.
packages/logstash/data_stream/pipeline/agent/stream/cel.yml.hbs
Outdated
Show resolved
Hide resolved
"filter":{ | ||
"id":filter.id, | ||
"name":filter.name, | ||
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == filter.id), tuple.cluster_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"elasticsearch.cluster.id":event['es_cluster_id_map'].map(tuple, (tuple.plugin_id == filter.id), tuple.cluster_id), | |
"elasticsearch.cluster.id":event.es_cluster_id_map..map(tuple, (tuple.plugin_id == filter.id), tuple.cluster_id), |
This is ready for another round @elastic/infra-monitoring-ui Note, that this is part of the serverless initiative to enable Logstash monitoring in the absence of a stack monitoring UI I will be following on this PR with the changes necessary to make this package visible to serverless, which includes changes to the existing integration package |
Result of running: go run github.com/andrewkroh/fydler@main -fix packages/logstash/data_stream/*/fields/*yml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
First off, I just found this and it's AWESOME! Maybe it's better for me to open a new issue, but just before I do, would it make sense to mark these dashboards with a reference to users needing to switch the integration from Stack Monitoring to Technical Preview? I just ran in to this where I was trying to look at the Eventually I found this need to change the integration configuration and now things are working. Thoughts? Is that worth noting on the dashboards? |
@IanLee1521 First of all, thanks for trying out the new dashboards in technical preview - we love to get feedback on new features we are developing, and we appreciate you trying them out as we refine them. Any further comments would be gratefully received. I think this should be an issue - as we start to transition to the new way of collecting and presenting data for monitoring, we should be making it clearer how to work with this effectively. I'm on PTO at the moment, but I can make an issue when I get back, unless you would prefer to create one yourself. |
@IanLee1521 - I've added an issue here. And thanks again for the feedback! |
Proposed commit message
This PR follows on from #7704, and adds data collection to the integration, as well as standard Dashboards for Logstash Pipelines.
The short term goal for this is to enable Logstash to be monitored with Serverless Elasticsearch instances, which do not provide a Stack Monitoring UI, with the long term goal of iterating on this to replace the existing Stack Monitoring UI.
This PR includes 3 new data streams -
node_cel
,pipelines
andplugins
, with data collected from the Logstash Metrics API using the CEL input, intended to replace the existing metricbeat collection.This give us more flexibility going forward, and should improve our velocity to add new features to the Logstash dashboards straight from the API, without needing to update several code bases.
The CEL data streams will create a single document per node for the
node_cel
data stream, a document per pipeline per node for thepipeline
data stream, and a document per plugin per pipeline per node for theplugins
data stream. For this reason, we default to a lower sending cadence for theplugins
data stream than for the other.Note that the existing data collection from metricbeat defaults to sending large documents, including significant data for the entire pipeline graph every 10 seconds.
This integration also includes ingest pipelines to clean up the data from the Logstash flow metrics API, to remove entries we don't necessarily need at this stage, and to remove the string representation of "Infinity".
When installing a Logstash integration, users will be presented with an option to collect Metrics for Stack Monitoring or for Technical Preview
(Note: If there was a way to have an either/or option in the UI, that would be very helpful)
The dashboards are described in the Screenshots section.
Checklist
changelog.yml
file.How to test this PR locally
integrations/packages/logstash
directoryelastic-package build
to build the packageelastic-package stack up
to bring up the Elastic stackLogstash
integrationMetrics (Stack Monitoring)
optionMetrics (Technical Preview)
optionRelated issues
Follows on from #7704
Screenshots
Integration Selection Page
Pipelines Dashboard
This Dashboard provides an overview of running pipelines, with a table containing details of pipelines running, which can be drilled down into a "details view"
Proposed Page
This is intended to replace the existing pipelines page in the Stack monitoring solution
Existing Page
The page includes additional details, including Persistent Queue usage per pipeline, and more visibility into which pipelines may be blocked
Pipeline Details Dashboard
The Pipeline Details dashboard takes a different approach to the pipeline details view in stack monitoring
The page includes some general pipeline information, showing the rate of events flowing through the pipelines and state of persistent queues, broken down per node running the pipeline:
Proposed Page
General Information
It also includes information for each plugin type:
Inputs
Filters
Outputs
Codecs
This has benefits and drawbacks compared with the existing approach:
The benefits are:
Drawbacks:
Existing Dashboard
The existing dashboard contains a representation of the pipeline graph, which we are not easily able to show using standard Kibana Dashboards
And a small pop-up drilldown for each plugin
Note: This will require a follow-up PR to enable for serverless - the existing integration has some compatibility issues that I would like to fold into a subsequent PR