-
Notifications
You must be signed in to change notification settings - Fork 77
Rework OTel logs handler to be spec compliant, refactor for clarity #751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,28 +3,178 @@ package selfhosted | |
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "github.com/pganalyze/collector/config" | ||
| "github.com/pganalyze/collector/output/pganalyze_collector" | ||
| "github.com/pganalyze/collector/state" | ||
| "github.com/pganalyze/collector/util" | ||
| otlpLogsService "go.opentelemetry.io/proto/otlp/collector/logs/v1" | ||
| common "go.opentelemetry.io/proto/otlp/common/v1" | ||
| otlpLogs "go.opentelemetry.io/proto/otlp/logs/v1" | ||
| "google.golang.org/protobuf/encoding/protojson" | ||
| "google.golang.org/protobuf/proto" | ||
| ) | ||
|
|
||
| // There currently are three kinds of log formats we aim to support here: | ||
| func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { | ||
| otelLogServer := server.Config.LogOtelServer | ||
|
|
||
| serveMux := http.NewServeMux() | ||
| serveMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { | ||
| b, err := io.ReadAll(r.Body) | ||
| if err != nil { | ||
| prefixedLogger.PrintError("OTel log server could not read request body: %s", err) | ||
| w.WriteHeader(http.StatusBadRequest) | ||
| w.Write([]byte("Could not read request body")) | ||
| return | ||
| } | ||
|
|
||
| var resp []byte | ||
| switch r.Header.Get("Content-Type") { | ||
| case "application/x-protobuf": | ||
| resp, err = handleOtlpLogsRequestProtobuf(b, server, rawLogStream, parsedLogStream, prefixedLogger, opts.VeryVerbose) | ||
| case "application/json": | ||
| resp, err = handleOtlpLogsRequestJson(b, server, rawLogStream, parsedLogStream, prefixedLogger, opts.VeryVerbose) | ||
| default: | ||
| err = fmt.Errorf("Unsupported Content-Type, must be application/x-protobuf or application/json") | ||
| } | ||
|
|
||
| if err != nil { | ||
| w.WriteHeader(http.StatusBadRequest) | ||
| w.Write([]byte(err.Error())) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to the specification, we should technically:
|
||
| } else { | ||
| w.WriteHeader(http.StatusOK) | ||
| w.Write(resp) | ||
| } | ||
| }) | ||
|
|
||
| util.GoServeHTTP(ctx, prefixedLogger, otelLogServer, serveMux) | ||
| } | ||
|
|
||
| func handleOtlpLogsRequestProtobuf(b []byte, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, veryVerbose bool) (resp []byte, err error) { | ||
| logsData := &otlpLogs.LogsData{} | ||
| if err = proto.Unmarshal(b, logsData); err != nil { | ||
| prefixedLogger.PrintError("OTel log server could not unmarshal request body, expected binary OTLP Protobuf format: %s", err) | ||
| err = fmt.Errorf("Could not unmarshal Protobuf request body") | ||
| return | ||
| } | ||
|
|
||
| if veryVerbose { | ||
| jsonData, err := json.Marshal(logsData) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like you dropped the indent thingy (
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I've actually found the pretty-printed version harder to work with in practice - to actually analyze it you'd typically want to copy it out, and the pretty printing makes that harder to do. That said, I think I'll revert to pretty printing, since it seems that's what we're doing for the other |
||
| if err != nil { | ||
| prefixedLogger.PrintVerbose("OTel log server failed to convert protobuf to JSON: %v", err) | ||
| } else { | ||
| prefixedLogger.PrintVerbose("OTel log server received Protobuf log data in the following format:\n") | ||
| prefixedLogger.PrintVerbose(string(jsonData)) | ||
| } | ||
| } | ||
|
|
||
| response := handleOtlpLogsRequest(logsData, server, rawLogStream, parsedLogStream) | ||
|
|
||
| return proto.Marshal(response) | ||
| } | ||
|
|
||
| func handleOtlpLogsRequestJson(b []byte, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, veryVerbose bool) (resp []byte, err error) { | ||
| logsData := &otlpLogs.LogsData{} | ||
| if err = protojson.Unmarshal(b, logsData); err != nil { | ||
| prefixedLogger.PrintError("OTel log server could not unmarshal request body, JSON does not match expected format: %s\n received body: %s", err, string(b)) | ||
| err = fmt.Errorf("Could not unmarshal JSON request body") | ||
| return | ||
| } | ||
|
|
||
| if veryVerbose { | ||
| prefixedLogger.PrintVerbose("OTel log server received JSON log data in the following format:\n") | ||
| prefixedLogger.PrintVerbose(string(b)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit similar to above, this was previously pretty JSON but now it's a compact one. Is this intentional? |
||
| } | ||
|
|
||
| response := handleOtlpLogsRequest(logsData, server, rawLogStream, parsedLogStream) | ||
|
|
||
| return protojson.Marshal(response) | ||
| } | ||
|
|
||
| // handleOtlpLogsRequest - Takes one or more OTLP log records and processes them | ||
| // | ||
| // There are currently three kinds of log formats we aim to support here: | ||
| // | ||
| // 1. Plain log messages (unstructured message, body of log record is a string) | ||
| // 2. jsonlog encoded as OTel key/value map | ||
| // 3. jsonlog wrapped in K8s context (via fluentbit) as OTel key/value map | ||
| // 3. jsonlog wrapped in K8s context (via fluentbit/Vector) as OTel key/value map | ||
| // | ||
| // Other variants (e.g. csvlog, or plain messages in a K8s context) are currently | ||
| // not supported and will be ignored. | ||
| func handleOtlpLogsRequest(logsData *otlpLogs.LogsData, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem) *otlpLogsService.ExportLogsServiceResponse { | ||
| var rejectedLogRecords int64 | ||
| for _, r := range logsData.ResourceLogs { | ||
| for _, s := range r.ScopeLogs { | ||
| for _, l := range s.LogRecords { | ||
| if l.Body.GetKvlistValue() != nil { | ||
| // jsonlog log message | ||
| record := transformJsonLogRecord(l.Body.GetKvlistValue(), server.Config) | ||
| if record != nil { | ||
| logLine, detailLine := logLineFromJsonlog(record, server.GetLogParser()) | ||
| parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} | ||
| if detailLine != nil { | ||
| parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} | ||
| } | ||
| } else { | ||
| rejectedLogRecords++ | ||
| } | ||
| } else if l.Body.GetStringValue() != "" { | ||
| // Plain log message | ||
| item := SelfHostedLogStreamItem{} | ||
| item.Line = l.Body.GetStringValue() | ||
| item.OccurredAt = time.Unix(0, int64(l.TimeUnixNano)) | ||
| rawLogStream <- item | ||
| } else { | ||
| rejectedLogRecords++ | ||
| } | ||
| } | ||
| } | ||
| } | ||
| response := &otlpLogsService.ExportLogsServiceResponse{} | ||
| if rejectedLogRecords > 0 { | ||
| response.PartialSuccess = &otlpLogsService.ExportLogsPartialSuccess{RejectedLogRecords: rejectedLogRecords} | ||
| } | ||
| return response | ||
| } | ||
|
|
||
| func transformJsonLogRecord(recordContainer *common.KeyValueList, config config.ServerConfig) *common.KeyValueList { | ||
| var logger string | ||
| var record *common.KeyValueList | ||
| var kubernetes *common.KeyValueList | ||
| hasErrorSeverity := false | ||
| for _, v := range recordContainer.Values { | ||
| if v.Key == "logger" { | ||
| logger = v.Value.GetStringValue() | ||
| } | ||
| if v.Key == "record" { | ||
| record = v.Value.GetKvlistValue() | ||
| } | ||
| if v.Key == "kubernetes" { | ||
| kubernetes = v.Value.GetKvlistValue() | ||
| } | ||
| if v.Key == "error_severity" { | ||
| hasErrorSeverity = true | ||
| } | ||
| } | ||
| // TODO: Support other logger names (this is only tested with CNPG) | ||
| if logger == "postgres" { | ||
| // jsonlog wrapped in K8s context (via fluentbit / Vector) | ||
| if kubernetes != nil && skipDueToK8sFilter(kubernetes, config) { | ||
| return nil | ||
| } | ||
| return record | ||
| } else if logger == "" && hasErrorSeverity { | ||
| // simple jsonlog (Postgres jsonlog has error_severity key) | ||
| return recordContainer | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) (state.LogLine, *state.LogLine) { | ||
| var logLine state.LogLine | ||
|
|
@@ -72,7 +222,7 @@ func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) | |
| return logLine, nil | ||
| } | ||
|
|
||
| func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, prefixedLogger *util.Logger) bool { | ||
| func skipDueToK8sFilter(kubernetes *common.KeyValueList, config config.ServerConfig) bool { | ||
| var k8sPodName string | ||
| var k8sNamespaceName string | ||
|
|
||
|
|
@@ -91,120 +241,17 @@ func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, p | |
| } | ||
| } | ||
|
|
||
| if server.Config.LogOtelK8SPod != "" { | ||
| if server.Config.LogOtelK8SPodNamespace != "" && server.Config.LogOtelK8SPodNamespace != k8sNamespaceName { | ||
| if config.LogOtelK8SPod != "" { | ||
| if config.LogOtelK8SPodNamespace != "" && config.LogOtelK8SPodNamespace != k8sNamespaceName { | ||
| return true | ||
| } | ||
| if server.Config.LogOtelK8SPodName != k8sPodName { | ||
| if config.LogOtelK8SPodName != k8sPodName { | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| if server.Config.LogOtelK8SLabels != "" { | ||
| return util.CheckLabelSelectorMismatch(k8sLabels, server.Config.LogOtelK8SLabelSelectors) | ||
| if config.LogOtelK8SLabels != "" { | ||
| return util.CheckLabelSelectorMismatch(k8sLabels, config.LogOtelK8SLabelSelectors) | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| func otelV1LogHandler(w http.ResponseWriter, r *http.Request, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { | ||
| logParser := server.GetLogParser() | ||
| b, err := io.ReadAll(r.Body) | ||
|
|
||
| if err != nil { | ||
| prefixedLogger.PrintError("OTel log server could not read request body") | ||
| return | ||
| } | ||
|
|
||
| logsData := &otlpLogs.LogsData{} | ||
| contentType := r.Header.Get("Content-Type") | ||
| switch contentType { | ||
| case "application/json": | ||
| if err := protojson.Unmarshal(b, logsData); err != nil { | ||
| prefixedLogger.PrintError("OTel log server could not unmarshal request body, JSON does not match expected format: %s\n received body: %s", err, string(b)) | ||
| return | ||
| } | ||
| default: | ||
| if err := proto.Unmarshal(b, logsData); err != nil { | ||
| prefixedLogger.PrintError("OTel log server could not unmarshal request body, expected binary OTLP Protobuf format: %s", err) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if opts.VeryVerbose { | ||
| jsonData, err := json.MarshalIndent(logsData, "", " ") | ||
| if err != nil { | ||
| prefixedLogger.PrintVerbose("OTel log server failed to convert protobuf to JSON: %v", err) | ||
| } | ||
| prefixedLogger.PrintVerbose("OTel log server received log data in the following format:\n") | ||
| prefixedLogger.PrintVerbose(string(jsonData)) | ||
| } | ||
|
|
||
| for _, r := range logsData.ResourceLogs { | ||
| for _, s := range r.ScopeLogs { | ||
| for _, l := range s.LogRecords { | ||
| var logger string | ||
| var record *common.KeyValueList | ||
| var kubernetes *common.KeyValueList | ||
| hasErrorSeverity := false | ||
| if l.Body.GetKvlistValue() != nil { | ||
| for _, v := range l.Body.GetKvlistValue().Values { | ||
| if v.Key == "logger" { | ||
| logger = v.Value.GetStringValue() | ||
| } | ||
| if v.Key == "record" { | ||
| record = v.Value.GetKvlistValue() | ||
| } | ||
| if v.Key == "kubernetes" { | ||
| kubernetes = v.Value.GetKvlistValue() | ||
| } | ||
| if v.Key == "error_severity" { | ||
| hasErrorSeverity = true | ||
| } | ||
| } | ||
| // TODO: Support other logger names (this is only tested with CNPG) | ||
| if logger == "postgres" { | ||
| // jsonlog wrapped in K8s context (via fluentbit / Vector) | ||
| logLine, detailLine := logLineFromJsonlog(record, logParser) | ||
| if kubernetes != nil && skipDueToK8sFilter(kubernetes, server, prefixedLogger) { | ||
| continue | ||
| } | ||
|
|
||
| parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} | ||
| if detailLine != nil { | ||
| parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} | ||
| } | ||
| } else if logger == "" && hasErrorSeverity { | ||
| // simple jsonlog (Postgres jsonlog has error_severity key) | ||
| logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), logParser) | ||
| parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} | ||
| if detailLine != nil { | ||
| parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} | ||
| } | ||
| } | ||
| } else if l.Body.GetStringValue() != "" { | ||
| // plain log message | ||
| item := SelfHostedLogStreamItem{} | ||
| item.Line = l.Body.GetStringValue() | ||
| item.OccurredAt = time.Unix(0, int64(l.TimeUnixNano)) | ||
| rawLogStream <- item | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { | ||
| otelLogServer := server.Config.LogOtelServer | ||
|
|
||
| serverMux := http.NewServeMux() | ||
| serverMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { | ||
| otelV1LogHandler(w, r, server, rawLogStream, parsedLogStream, prefixedLogger, opts) | ||
| }) | ||
|
|
||
| go func() { | ||
| err := http.ListenAndServe(otelLogServer, serverMux) | ||
| if err != nil { | ||
| prefixedLogger.PrintError("Error starting OTel log server on %s: %v\n", otelLogServer, err) | ||
| } | ||
| }() | ||
| } | ||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll also want to specify the content type for the write, which maybe you can do here by doing something like the following?: