Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions input/system/heroku/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package heroku

import (
"context"
"fmt"
"net/http"
"os"
"sync"
Expand All @@ -15,19 +14,19 @@ func SetupHttpHandlerLogs(ctx context.Context, wg *sync.WaitGroup, opts state.Co
herokuLogStream := make(chan HttpSyslogMessage, state.LogStreamBufferLen)
setupLogTransformer(ctx, wg, servers, herokuLogStream, parsedLogStream, opts, logger)

go func() {
http.HandleFunc("/", util.HttpRedirectToApp)
http.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) {
for _, item := range ReadHerokuPostgresSyslogMessages(r.Body) {
item.Path = r.URL.Path
select {
case herokuLogStream <- item:
// Handed over successfully
default:
fmt.Printf("WARNING: Channel buffer exceeded, skipping message\n")
}
serveMux := http.NewServeMux()
serveMux.HandleFunc("/", util.HttpRedirectToApp)
serveMux.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) {
for _, item := range ReadHerokuPostgresSyslogMessages(r.Body) {
item.Path = r.URL.Path
select {
case herokuLogStream <- item:
// Handed over successfully
default:
logger.PrintInfo("WARNING: Channel buffer exceeded, skipping message\n")
}
})
http.ListenAndServe(":"+os.Getenv("PORT"), nil)
}()
}
})

util.GoServeHTTP(ctx, logger, ":"+os.Getenv("PORT"), serveMux)
}
269 changes: 158 additions & 111 deletions input/system/selfhosted/otel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,178 @@ package selfhosted
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/output/pganalyze_collector"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
otlpLogsService "go.opentelemetry.io/proto/otlp/collector/logs/v1"
common "go.opentelemetry.io/proto/otlp/common/v1"
otlpLogs "go.opentelemetry.io/proto/otlp/logs/v1"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// There currently are three kinds of log formats we aim to support here:
func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) {
otelLogServer := server.Config.LogOtelServer

serveMux := http.NewServeMux()
serveMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
if err != nil {
prefixedLogger.PrintError("OTel log server could not read request body: %s", err)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Could not read request body"))
return
}

var resp []byte
switch r.Header.Get("Content-Type") {
case "application/x-protobuf":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll also want to specify the content type for the write, which maybe you can do here by doing something like the following?:

w.Header().Set("Content-Type", "application/x-protobuf")

resp, err = handleOtlpLogsRequestProtobuf(b, server, rawLogStream, parsedLogStream, prefixedLogger, opts.VeryVerbose)
case "application/json":
resp, err = handleOtlpLogsRequestJson(b, server, rawLogStream, parsedLogStream, prefixedLogger, opts.VeryVerbose)
default:
err = fmt.Errorf("Unsupported Content-Type, must be application/x-protobuf or application/json")
}

if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the specification, we should technically:

  • use Status as a reply
    • we can omit code, but will need message
  • then marshall Status with either proto or protojson, depending on which content type we received

https://opentelemetry.io/docs/specs/otlp/#failures-1

} else {
w.WriteHeader(http.StatusOK)
w.Write(resp)
}
})

util.GoServeHTTP(ctx, prefixedLogger, otelLogServer, serveMux)
}

func handleOtlpLogsRequestProtobuf(b []byte, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, veryVerbose bool) (resp []byte, err error) {
logsData := &otlpLogs.LogsData{}
if err = proto.Unmarshal(b, logsData); err != nil {
prefixedLogger.PrintError("OTel log server could not unmarshal request body, expected binary OTLP Protobuf format: %s", err)
err = fmt.Errorf("Could not unmarshal Protobuf request body")
return
}

if veryVerbose {
jsonData, err := json.Marshal(logsData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you dropped the indent thingy (json.MarshalIndent(logsData, "", " ")), why? Maybe you thought that it'll be too many lines...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I've actually found the pretty-printed version harder to work with in practice - to actually analyze it you'd typically want to copy it out, and the pretty printing makes that harder to do.

That said, I think I'll revert to pretty printing, since it seems that's what we're doing for the other --very-verbose cases, and its better to be consistent here. I can also see the pretty printing working better when we're on a customer call to review a problem.

if err != nil {
prefixedLogger.PrintVerbose("OTel log server failed to convert protobuf to JSON: %v", err)
} else {
prefixedLogger.PrintVerbose("OTel log server received Protobuf log data in the following format:\n")
prefixedLogger.PrintVerbose(string(jsonData))
}
}

response := handleOtlpLogsRequest(logsData, server, rawLogStream, parsedLogStream)

return proto.Marshal(response)
}

func handleOtlpLogsRequestJson(b []byte, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, veryVerbose bool) (resp []byte, err error) {
logsData := &otlpLogs.LogsData{}
if err = protojson.Unmarshal(b, logsData); err != nil {
prefixedLogger.PrintError("OTel log server could not unmarshal request body, JSON does not match expected format: %s\n received body: %s", err, string(b))
err = fmt.Errorf("Could not unmarshal JSON request body")
return
}

if veryVerbose {
prefixedLogger.PrintVerbose("OTel log server received JSON log data in the following format:\n")
prefixedLogger.PrintVerbose(string(b))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit similar to above, this was previously pretty JSON but now it's a compact one. Is this intentional?

}

response := handleOtlpLogsRequest(logsData, server, rawLogStream, parsedLogStream)

return protojson.Marshal(response)
}

// handleOtlpLogsRequest - Takes one or more OTLP log records and processes them
//
// There are currently three kinds of log formats we aim to support here:
//
// 1. Plain log messages (unstructured message, body of log record is a string)
// 2. jsonlog encoded as OTel key/value map
// 3. jsonlog wrapped in K8s context (via fluentbit) as OTel key/value map
// 3. jsonlog wrapped in K8s context (via fluentbit/Vector) as OTel key/value map
//
// Other variants (e.g. csvlog, or plain messages in a K8s context) are currently
// not supported and will be ignored.
func handleOtlpLogsRequest(logsData *otlpLogs.LogsData, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem) *otlpLogsService.ExportLogsServiceResponse {
var rejectedLogRecords int64
for _, r := range logsData.ResourceLogs {
for _, s := range r.ScopeLogs {
for _, l := range s.LogRecords {
if l.Body.GetKvlistValue() != nil {
// jsonlog log message
record := transformJsonLogRecord(l.Body.GetKvlistValue(), server.Config)
if record != nil {
logLine, detailLine := logLineFromJsonlog(record, server.GetLogParser())
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
if detailLine != nil {
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine}
}
} else {
rejectedLogRecords++
}
} else if l.Body.GetStringValue() != "" {
// Plain log message
item := SelfHostedLogStreamItem{}
item.Line = l.Body.GetStringValue()
item.OccurredAt = time.Unix(0, int64(l.TimeUnixNano))
rawLogStream <- item
} else {
rejectedLogRecords++
}
}
}
}
response := &otlpLogsService.ExportLogsServiceResponse{}
if rejectedLogRecords > 0 {
response.PartialSuccess = &otlpLogsService.ExportLogsPartialSuccess{RejectedLogRecords: rejectedLogRecords}
}
return response
}

func transformJsonLogRecord(recordContainer *common.KeyValueList, config config.ServerConfig) *common.KeyValueList {
var logger string
var record *common.KeyValueList
var kubernetes *common.KeyValueList
hasErrorSeverity := false
for _, v := range recordContainer.Values {
if v.Key == "logger" {
logger = v.Value.GetStringValue()
}
if v.Key == "record" {
record = v.Value.GetKvlistValue()
}
if v.Key == "kubernetes" {
kubernetes = v.Value.GetKvlistValue()
}
if v.Key == "error_severity" {
hasErrorSeverity = true
}
}
// TODO: Support other logger names (this is only tested with CNPG)
if logger == "postgres" {
// jsonlog wrapped in K8s context (via fluentbit / Vector)
if kubernetes != nil && skipDueToK8sFilter(kubernetes, config) {
return nil
}
return record
} else if logger == "" && hasErrorSeverity {
// simple jsonlog (Postgres jsonlog has error_severity key)
return recordContainer
}

return nil
}

func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) (state.LogLine, *state.LogLine) {
var logLine state.LogLine
Expand Down Expand Up @@ -72,7 +222,7 @@ func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser)
return logLine, nil
}

func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, prefixedLogger *util.Logger) bool {
func skipDueToK8sFilter(kubernetes *common.KeyValueList, config config.ServerConfig) bool {
var k8sPodName string
var k8sNamespaceName string

Expand All @@ -91,120 +241,17 @@ func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, p
}
}

if server.Config.LogOtelK8SPod != "" {
if server.Config.LogOtelK8SPodNamespace != "" && server.Config.LogOtelK8SPodNamespace != k8sNamespaceName {
if config.LogOtelK8SPod != "" {
if config.LogOtelK8SPodNamespace != "" && config.LogOtelK8SPodNamespace != k8sNamespaceName {
return true
}
if server.Config.LogOtelK8SPodName != k8sPodName {
if config.LogOtelK8SPodName != k8sPodName {
return true
}
}

if server.Config.LogOtelK8SLabels != "" {
return util.CheckLabelSelectorMismatch(k8sLabels, server.Config.LogOtelK8SLabelSelectors)
if config.LogOtelK8SLabels != "" {
return util.CheckLabelSelectorMismatch(k8sLabels, config.LogOtelK8SLabelSelectors)
}
return false
}

func otelV1LogHandler(w http.ResponseWriter, r *http.Request, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) {
logParser := server.GetLogParser()
b, err := io.ReadAll(r.Body)

if err != nil {
prefixedLogger.PrintError("OTel log server could not read request body")
return
}

logsData := &otlpLogs.LogsData{}
contentType := r.Header.Get("Content-Type")
switch contentType {
case "application/json":
if err := protojson.Unmarshal(b, logsData); err != nil {
prefixedLogger.PrintError("OTel log server could not unmarshal request body, JSON does not match expected format: %s\n received body: %s", err, string(b))
return
}
default:
if err := proto.Unmarshal(b, logsData); err != nil {
prefixedLogger.PrintError("OTel log server could not unmarshal request body, expected binary OTLP Protobuf format: %s", err)
return
}
}

if opts.VeryVerbose {
jsonData, err := json.MarshalIndent(logsData, "", " ")
if err != nil {
prefixedLogger.PrintVerbose("OTel log server failed to convert protobuf to JSON: %v", err)
}
prefixedLogger.PrintVerbose("OTel log server received log data in the following format:\n")
prefixedLogger.PrintVerbose(string(jsonData))
}

for _, r := range logsData.ResourceLogs {
for _, s := range r.ScopeLogs {
for _, l := range s.LogRecords {
var logger string
var record *common.KeyValueList
var kubernetes *common.KeyValueList
hasErrorSeverity := false
if l.Body.GetKvlistValue() != nil {
for _, v := range l.Body.GetKvlistValue().Values {
if v.Key == "logger" {
logger = v.Value.GetStringValue()
}
if v.Key == "record" {
record = v.Value.GetKvlistValue()
}
if v.Key == "kubernetes" {
kubernetes = v.Value.GetKvlistValue()
}
if v.Key == "error_severity" {
hasErrorSeverity = true
}
}
// TODO: Support other logger names (this is only tested with CNPG)
if logger == "postgres" {
// jsonlog wrapped in K8s context (via fluentbit / Vector)
logLine, detailLine := logLineFromJsonlog(record, logParser)
if kubernetes != nil && skipDueToK8sFilter(kubernetes, server, prefixedLogger) {
continue
}

parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
if detailLine != nil {
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine}
}
} else if logger == "" && hasErrorSeverity {
// simple jsonlog (Postgres jsonlog has error_severity key)
logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), logParser)
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine}
if detailLine != nil {
parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine}
}
}
} else if l.Body.GetStringValue() != "" {
// plain log message
item := SelfHostedLogStreamItem{}
item.Line = l.Body.GetStringValue()
item.OccurredAt = time.Unix(0, int64(l.TimeUnixNano))
rawLogStream <- item
}
}
}
}
}

func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) {
otelLogServer := server.Config.LogOtelServer

serverMux := http.NewServeMux()
serverMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) {
otelV1LogHandler(w, r, server, rawLogStream, parsedLogStream, prefixedLogger, opts)
})

go func() {
err := http.ListenAndServe(otelLogServer, serverMux)
if err != nil {
prefixedLogger.PrintError("Error starting OTel log server on %s: %v\n", otelLogServer, err)
}
}()
}
2 changes: 1 addition & 1 deletion runner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Run(ctx context.Context, wg *sync.WaitGroup, opts state.CollectionOpts, log
SetupLogCollection(ctx, wg, servers, opts, logger, hasAnyHeroku, hasAnyGoogleCloudSQL, hasAnyAzureDatabase, hasAnyTembo)
} else if util.IsHeroku() {
// Even if logs are deactivated, Heroku still requires us to have a functioning web server
util.SetupHttpHandlerDummy()
util.SetupHttpHandlerDummy(ctx, logger)
}

if hasAnyActivityEnabled {
Expand Down
22 changes: 0 additions & 22 deletions util/dummy_http_handler.go

This file was deleted.

Loading
Loading