Skip to content

pipeline disconnect is now initalized by the beater#50721

Draft
khushijain21 wants to merge 14 commits into
mainfrom
beat-owns-disconnet
Draft

pipeline disconnect is now initalized by the beater#50721
khushijain21 wants to merge 14 commits into
mainfrom
beat-owns-disconnet

Conversation

@khushijain21
Copy link
Copy Markdown
Contributor

@khushijain21 khushijain21 commented May 15, 2026

Proposed commit message

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

@botelastic botelastic Bot added the needs_team Indicates that the issue/PR needs a Team:* label label May 15, 2026
@botelastic
Copy link
Copy Markdown

botelastic Bot commented May 15, 2026

This pull request doesn't have a Team:<team> label.

@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.

@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented May 15, 2026

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @khushijain21? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

@github-actions
Copy link
Copy Markdown
Contributor

TL;DR

Most Buildkite failures in this run trace to the shutdown refactor from Pipeline.Close() to Pipeline.Disconnect(ctx): some paths now wait indefinitely (hanging/leaking goroutines), while others use too-short timeouts and drop events before tests can read outputs. The immediate unblock is to restore bounded/consistent pipeline shutdown semantics.

Remediation

  • Reintroduce a bounded default wait at the pipeline layer (or preserve Close() behavior) so Disconnect does not block forever when passed context.Background() / long-lived t.Context().
  • Avoid shortening Filebeat shutdown by default in this PR (filebeat.shutdown_timeout): the current 1s default appears to truncate output in system tests.
  • Keep one canonical shutdown point in libbeat instance lifecycle (publisher close/disconnect), then rerun affected suites:
    • cd libbeat && go test ./publisher/pipeline -run TestClient -count=1
    • cd x-pack/osquerybeat && go test ./osqreceiver -run TestLeak -count=1
    • cd x-pack/filebeat && mage unitTest pythonIntegTest
Investigation details

Root Cause

The PR changes pipeline shutdown contracts and call sites in ways that introduce two failure modes:

  1. Unbounded wait / goroutine leaks

    • libbeat/publisher/pipeline/pipeline.go:228-233 changed shutdown to p.outputController.waitClose(ctx, p.forceCloseQueue) (caller-controlled context) instead of internal timeout behavior.
    • In failing libbeat jobs, stacks show shutdown blocked in output_process.go:204 via Pipeline.Disconnect from TestClient (client_test.go:89), for 10-15 minutes.
    • Osquery leak tests also fail with lingering pipeline goroutines (clientWorker, eventConsumer, queueReader) after shutdown.
  2. Early timeout / dropped events

    • filebeat/config/config.go:66 changes default ShutdownTimeout from 0 to 1 * time.Second.
    • filebeat/beater/filebeat.go:523-533 replaces prior wait logic with context.WithTimeout(..., fb.config.ShutdownTimeout) + fb.pipeline.Disconnect(ctx).
    • Filebeat system tests then fail reading output NDJSON files and show truncated module output counts (100 expected, 10/67 observed), consistent with premature shutdown before full publish/flush.

A related lifecycle change removes central publisher close hooks:

  • libbeat/cmd/instance/beat.go:522-526 removed publisher io.Closer call before beater.Stop().
  • x-pack/libbeat/cmd/instance/beat.go:282-286 removed receiver pipeline wait-close settings.

Evidence

  • Build: https://buildkite.com/elastic/beats/builds/46190
  • Failing jobs/steps:
    • Libbeat unit/integration/fips unit: blocked stacks in pipeline shutdown path
    • x-pack/osquerybeat win/linux unit: TestLeak unexpected goroutines
    • x-pack/filebeat unit/python-integ/win unit: FileNotFoundError on output NDJSON and O365 count mismatch
    • Filebeat check/update: generated file drift (filebeat/filebeat.reference.yml) is also present
  • Key log excerpts:
libbeat: github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*processOutputController).closeQueue ... output_process.go:204
libbeat: github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).Disconnect ... pipeline.go:228
libbeat: github.com/elastic/beats/v7/libbeat/publisher/pipeline.TestClient.func1 ... client_test.go:89
```

```
osquerybeat: === FAIL: x-pack/osquerybeat/osqreceiver TestLeak/unhealthy_consumer
oteltest.go:223: found unexpected goroutines
```

```
filebeat: FAILED ... test_http_endpoint_* - FileNotFoundError: .../output/filebeat-20260515.ndjson
filebeat: FAILED ... test_fileset_file_070_o365 - AssertionError: expected 100 events to compare but got 10
filebeat: FAILED ... test_fileset_file_084_o365 - AssertionError: expected 100 events to compare but got 67
```

```
filebeat check/update: Error: some files are not up-to-date ... Modified: [filebeat/filebeat.reference.yml]

Verification

  • Not run in this detective workflow (read-only analysis from Buildkite logs + PR diff).

Follow-up

If you want, I can post a minimal patch suggestion scoped to shutdown semantics only (pipeline + filebeat timeout default) to reduce risk and quickly confirm causality.


What is this? | From workflow: PR Buildkite Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 15, 2026

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 15, 2026

✅ Vale Linting Results

No issues found on modified lines!


The Vale linter checks documentation changes against the Elastic Docs style guide.

To use Vale locally or report issues, refer to Elastic style guide for Vale.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

// and shut down at this point, so all events that will be acknowledged
// already have been. However for the "once" option supported by the
// log input, events may still be active.
if *once {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need special handling for once case because pipeline disconnection is now initalized by the beater instead of libbeat.

Comment thread filebeat/config/config.go
Bbolt: bboltst.DefaultConfig(),
},
ShutdownTimeout: 0,
ShutdownTimeout: 1 * time.Second,
Copy link
Copy Markdown
Contributor Author

@khushijain21 khushijain21 May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1s was the default time the pipeline always waited before shuttinf down. See

settings := pipeline.Settings{
// Since now publisher is closed on Stop, we want to give some
// time to ack any pending events by default to avoid
// changing on stop behavior too much.
WaitClose: time.Second,
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}

@khushijain21 khushijain21 changed the title beat owns disconnet pipeline disconnect is now initalized by the beater May 15, 2026
@github-actions

This comment has been minimized.

@github-actions
Copy link
Copy Markdown
Contributor

TL;DR

golangci-lint failed again in workflow run 25926070515 across all matrix jobs (ubuntu, macos, windows) due to lint violations in files touched by this PR. Immediate action: fix the reported findings (especially errcheck/forbidigo/staticcheck/unused/misspell) and rerun lint.

Remediation

  • Fix the currently reported violations in touched files, prioritizing repeated blockers: unchecked Disconnect/removeLogFile errors (errcheck), forbidden logger/path globals (forbidigo), misspelling acknowleged, SA1019 deprecations, ST1005 capitalized error strings, goimports formatting in heartbeat/config/config_test.go, and unused symbols.
  • Re-run lint locally with whole-file scope before pushing (same behavior as CI): golangci-lint run --timeout=30m --whole-files (or the repo’s equivalent lint target), then retrigger the workflow.
Investigation details

Root Cause

The run fails in step golangci-lint for every matrix job because lint checks report multiple violations in PR-touched files. This is a lint-gate failure rather than an infrastructure issue.

Evidence

  • Workflow: https://github.com/elastic/beats/actions/runs/25926070515
  • Job/step:
    • lint (macos-latest)golangci-lint
    • lint (ubuntu-latest)golangci-lint
    • lint (windows-latest)golangci-lint
  • Key log excerpt:
    ##[error]libbeat/monitoring/report/elasticsearch/elasticsearch.go:209:23: Error return value of `r.pipeline.Disconnect` is not checked (errcheck)
    ##[error]libbeat/cmd/instance/beat.go:188:5: use of `logp.NewLogger` forbidden ... (forbidigo)
    ##[error]heartbeat/config/config_test.go:24:1: File is not properly formatted (goimports)
    ##[error]packetbeat/beater/packetbeat.go:206:93: `acknowleged` is a misspelling of `acknowledge` (misspell)
    ##[error]x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go:79:10: ST1005: error strings should not be capitalized (staticcheck)
    ##[error]x-pack/libbeat/cmd/instance/beat.go:40:7: const receiverPublisherCloseTimeout is unused (unused)
    ##[error]issues found
    

Validation

  • Not run in this detective workflow (read-only log analysis).

Follow-up

  • I checked the most recent prior PR Actions Detective comment on this PR: it was for a different failure (check-docs generated-file drift), so this golangci-lint report is materially new.

What is this? | From workflow: PR Actions Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

needs_team Indicates that the issue/PR needs a Team:* label

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant