From 2acaa4c9d606a108681da894bfc4f5df9234f58d Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sat, 13 Dec 2025 00:07:12 -0800 Subject: [PATCH 1/3] Introduce new util.GoServeHTTP helper to correctly start HTTP servers Previously we had code that suffered from two main problems: - It used the global "http.HandleFunc" method, instead of specific serverMux structs (this is a problem if multiple servers are running) - It did not respect context cancelation correctly, likely leading to Go routine leaks and other issues when the collector reloads --- input/system/heroku/http_handler.go | 29 ++++++++------- input/system/selfhosted/otel_handler.go | 11 ++---- runner/run.go | 2 +- util/dummy_http_handler.go | 22 ------------ util/http_server.go | 47 +++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 46 deletions(-) delete mode 100644 util/dummy_http_handler.go create mode 100644 util/http_server.go diff --git a/input/system/heroku/http_handler.go b/input/system/heroku/http_handler.go index 7e0eab479..02a88b6ea 100644 --- a/input/system/heroku/http_handler.go +++ b/input/system/heroku/http_handler.go @@ -2,7 +2,6 @@ package heroku import ( "context" - "fmt" "net/http" "os" "sync" @@ -15,19 +14,19 @@ func SetupHttpHandlerLogs(ctx context.Context, wg *sync.WaitGroup, opts state.Co herokuLogStream := make(chan HttpSyslogMessage, state.LogStreamBufferLen) setupLogTransformer(ctx, wg, servers, herokuLogStream, parsedLogStream, opts, logger) - go func() { - http.HandleFunc("/", util.HttpRedirectToApp) - http.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) { - for _, item := range ReadHerokuPostgresSyslogMessages(r.Body) { - item.Path = r.URL.Path - select { - case herokuLogStream <- item: - // Handed over successfully - default: - fmt.Printf("WARNING: Channel buffer exceeded, skipping message\n") - } + serveMux := http.NewServeMux() + serveMux.HandleFunc("/", util.HttpRedirectToApp) + serveMux.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) { + for _, item := range ReadHerokuPostgresSyslogMessages(r.Body) { + item.Path = r.URL.Path + select { + case herokuLogStream <- item: + // Handed over successfully + default: + logger.PrintInfo("WARNING: Channel buffer exceeded, skipping message\n") } - }) - http.ListenAndServe(":"+os.Getenv("PORT"), nil) - }() + } + }) + + util.GoServeHTTP(ctx, logger, ":"+os.Getenv("PORT"), serveMux) } diff --git a/input/system/selfhosted/otel_handler.go b/input/system/selfhosted/otel_handler.go index b79ec1ebd..d27b75ab1 100644 --- a/input/system/selfhosted/otel_handler.go +++ b/input/system/selfhosted/otel_handler.go @@ -196,15 +196,10 @@ func otelV1LogHandler(w http.ResponseWriter, r *http.Request, server *state.Serv func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { otelLogServer := server.Config.LogOtelServer - serverMux := http.NewServeMux() - serverMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { + serveMux := http.NewServeMux() + serveMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { otelV1LogHandler(w, r, server, rawLogStream, parsedLogStream, prefixedLogger, opts) }) - go func() { - err := http.ListenAndServe(otelLogServer, serverMux) - if err != nil { - prefixedLogger.PrintError("Error starting OTel log server on %s: %v\n", otelLogServer, err) - } - }() + util.GoServeHTTP(ctx, prefixedLogger, otelLogServer, serveMux) } diff --git a/runner/run.go b/runner/run.go index d3f47f69a..58958cd3d 100644 --- a/runner/run.go +++ b/runner/run.go @@ -326,7 +326,7 @@ func Run(ctx context.Context, wg *sync.WaitGroup, opts state.CollectionOpts, log SetupLogCollection(ctx, wg, servers, opts, logger, hasAnyHeroku, hasAnyGoogleCloudSQL, hasAnyAzureDatabase, hasAnyTembo) } else if util.IsHeroku() { // Even if logs are deactivated, Heroku still requires us to have a functioning web server - util.SetupHttpHandlerDummy() + util.SetupHttpHandlerDummy(ctx, logger) } if hasAnyActivityEnabled { diff --git a/util/dummy_http_handler.go b/util/dummy_http_handler.go deleted file mode 100644 index c0ba7a02f..000000000 --- a/util/dummy_http_handler.go +++ /dev/null @@ -1,22 +0,0 @@ -package util - -import ( - "net/http" - "os" -) - -func SetupHttpHandlerDummy() { - port, ok := os.LookupEnv("PORT") - if !ok { - port = "5000" - } - go func() { - http.HandleFunc("/", HttpRedirectToApp) - http.ListenAndServe(":"+port, nil) - }() -} - -// HttpRedirectToApp - Provides a HTTP redirect to the pganalyze app -func HttpRedirectToApp(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, "https://app.pganalyze.com/", http.StatusFound) -} diff --git a/util/http_server.go b/util/http_server.go new file mode 100644 index 000000000..3bb80f0dc --- /dev/null +++ b/util/http_server.go @@ -0,0 +1,47 @@ +package util + +import ( + "context" + "net" + "net/http" + "os" +) + +func GoServeHTTP(ctx context.Context, logger *Logger, addr string, serveMux *http.ServeMux) { + s := &http.Server{ + BaseContext: func(net.Listener) context.Context { return ctx }, + Addr: addr, + Handler: serveMux, + } + lc := net.ListenConfig{} + l, err := lc.Listen(ctx, "tcp", s.Addr) + if err != nil { + logger.PrintError("Error starting HTTP server on %s: %v\n", addr, err) + return + } + go func() { + err := s.Serve(l) + if err != http.ErrServerClosed { + logger.PrintError("Error running HTTP server on %s: %v\n", addr, err) + } + }() + go func() { + <-ctx.Done() + s.Close() + }() +} + +func SetupHttpHandlerDummy(ctx context.Context, logger *Logger) { + port, ok := os.LookupEnv("PORT") + if !ok { + port = "5000" + } + serveMux := http.NewServeMux() + serveMux.HandleFunc("/", HttpRedirectToApp) + GoServeHTTP(ctx, logger, ":"+port, serveMux) +} + +// HttpRedirectToApp - Provides a HTTP redirect to the pganalyze app +func HttpRedirectToApp(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "https://app.pganalyze.com/", http.StatusFound) +} From e3a74b4d8e0e8889d72cc4e8dc713047bab90395 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sat, 13 Dec 2025 00:09:19 -0800 Subject: [PATCH 2/3] Reconnecting socket test: Remove unnecessary if condition This was copied from the ListenAndServe method, but isn't needed because we know that an address is present. --- util/reconnecting_socket_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/util/reconnecting_socket_test.go b/util/reconnecting_socket_test.go index a949335f1..a4bd8ff8e 100644 --- a/util/reconnecting_socket_test.go +++ b/util/reconnecting_socket_test.go @@ -31,11 +31,7 @@ func TestSocketReconnect(t *testing.T) { c.Close() }) s := &http.Server{Addr: "localhost:9123", Handler: serverMux} - addr := s.Addr - if addr == "" { - addr = ":http" - } - ln, err := net.Listen("tcp", addr) + ln, err := net.Listen("tcp", s.Addr) if err != nil { t.Errorf("TestSocketReconnect: failed to start socket: %v", err) cancel() From b414532f0436354e86a5e30bfd6041c07899fa45 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sat, 13 Dec 2025 00:13:14 -0800 Subject: [PATCH 3/3] Rework OTel logs handler to be spec compliant, refactor for clarity The OTLP protocol spec requires that we return a response object (encoded based on request content type) that indicates how many of the submitted log records were rejected. Further it requires us to return a 4XX error if we could not process the request. Any error thats not 429 (rate limited) must not be retried by the client, per the spec. In passing, refactor the OTel logs handler to reduce duplication when dealing with very similar yet different cases (jsonlog as the main log record, vs wrapped in Kubernetes context). --- input/system/selfhosted/otel_handler.go | 264 ++++++++----- .../otlp/collector/logs/v1/logs_service.pb.go | 367 ++++++++++++++++++ .../collector/logs/v1/logs_service.pb.gw.go | 171 ++++++++ .../collector/logs/v1/logs_service_grpc.pb.go | 109 ++++++ vendor/modules.txt | 1 + 5 files changed, 806 insertions(+), 106 deletions(-) create mode 100644 vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.go create mode 100644 vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.gw.go create mode 100644 vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service_grpc.pb.go diff --git a/input/system/selfhosted/otel_handler.go b/input/system/selfhosted/otel_handler.go index d27b75ab1..45ca9e0e2 100644 --- a/input/system/selfhosted/otel_handler.go +++ b/input/system/selfhosted/otel_handler.go @@ -3,28 +3,178 @@ package selfhosted import ( "context" "encoding/json" + "fmt" "io" "net/http" "strconv" "time" + "github.com/pganalyze/collector/config" "github.com/pganalyze/collector/output/pganalyze_collector" "github.com/pganalyze/collector/state" "github.com/pganalyze/collector/util" + otlpLogsService "go.opentelemetry.io/proto/otlp/collector/logs/v1" common "go.opentelemetry.io/proto/otlp/common/v1" otlpLogs "go.opentelemetry.io/proto/otlp/logs/v1" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) -// There currently are three kinds of log formats we aim to support here: +func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { + otelLogServer := server.Config.LogOtelServer + + serveMux := http.NewServeMux() + serveMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + prefixedLogger.PrintError("OTel log server could not read request body: %s", err) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Could not read request body")) + return + } + + var resp []byte + switch r.Header.Get("Content-Type") { + case "application/x-protobuf": + resp, err = handleOtlpLogsRequestProtobuf(b, server, rawLogStream, parsedLogStream, prefixedLogger, opts.VeryVerbose) + case "application/json": + resp, err = handleOtlpLogsRequestJson(b, server, rawLogStream, parsedLogStream, prefixedLogger, opts.VeryVerbose) + default: + err = fmt.Errorf("Unsupported Content-Type, must be application/x-protobuf or application/json") + } + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + } else { + w.WriteHeader(http.StatusOK) + w.Write(resp) + } + }) + + util.GoServeHTTP(ctx, prefixedLogger, otelLogServer, serveMux) +} + +func handleOtlpLogsRequestProtobuf(b []byte, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, veryVerbose bool) (resp []byte, err error) { + logsData := &otlpLogs.LogsData{} + if err = proto.Unmarshal(b, logsData); err != nil { + prefixedLogger.PrintError("OTel log server could not unmarshal request body, expected binary OTLP Protobuf format: %s", err) + err = fmt.Errorf("Could not unmarshal Protobuf request body") + return + } + + if veryVerbose { + jsonData, err := json.Marshal(logsData) + if err != nil { + prefixedLogger.PrintVerbose("OTel log server failed to convert protobuf to JSON: %v", err) + } else { + prefixedLogger.PrintVerbose("OTel log server received Protobuf log data in the following format:\n") + prefixedLogger.PrintVerbose(string(jsonData)) + } + } + + response := handleOtlpLogsRequest(logsData, server, rawLogStream, parsedLogStream) + + return proto.Marshal(response) +} + +func handleOtlpLogsRequestJson(b []byte, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, veryVerbose bool) (resp []byte, err error) { + logsData := &otlpLogs.LogsData{} + if err = protojson.Unmarshal(b, logsData); err != nil { + prefixedLogger.PrintError("OTel log server could not unmarshal request body, JSON does not match expected format: %s\n received body: %s", err, string(b)) + err = fmt.Errorf("Could not unmarshal JSON request body") + return + } + + if veryVerbose { + prefixedLogger.PrintVerbose("OTel log server received JSON log data in the following format:\n") + prefixedLogger.PrintVerbose(string(b)) + } + + response := handleOtlpLogsRequest(logsData, server, rawLogStream, parsedLogStream) + + return protojson.Marshal(response) +} + +// handleOtlpLogsRequest - Takes one or more OTLP log records and processes them +// +// There are currently three kinds of log formats we aim to support here: // // 1. Plain log messages (unstructured message, body of log record is a string) // 2. jsonlog encoded as OTel key/value map -// 3. jsonlog wrapped in K8s context (via fluentbit) as OTel key/value map +// 3. jsonlog wrapped in K8s context (via fluentbit/Vector) as OTel key/value map // // Other variants (e.g. csvlog, or plain messages in a K8s context) are currently // not supported and will be ignored. +func handleOtlpLogsRequest(logsData *otlpLogs.LogsData, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem) *otlpLogsService.ExportLogsServiceResponse { + var rejectedLogRecords int64 + for _, r := range logsData.ResourceLogs { + for _, s := range r.ScopeLogs { + for _, l := range s.LogRecords { + if l.Body.GetKvlistValue() != nil { + // jsonlog log message + record := transformJsonLogRecord(l.Body.GetKvlistValue(), server.Config) + if record != nil { + logLine, detailLine := logLineFromJsonlog(record, server.GetLogParser()) + parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} + if detailLine != nil { + parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} + } + } else { + rejectedLogRecords++ + } + } else if l.Body.GetStringValue() != "" { + // Plain log message + item := SelfHostedLogStreamItem{} + item.Line = l.Body.GetStringValue() + item.OccurredAt = time.Unix(0, int64(l.TimeUnixNano)) + rawLogStream <- item + } else { + rejectedLogRecords++ + } + } + } + } + response := &otlpLogsService.ExportLogsServiceResponse{} + if rejectedLogRecords > 0 { + response.PartialSuccess = &otlpLogsService.ExportLogsPartialSuccess{RejectedLogRecords: rejectedLogRecords} + } + return response +} + +func transformJsonLogRecord(recordContainer *common.KeyValueList, config config.ServerConfig) *common.KeyValueList { + var logger string + var record *common.KeyValueList + var kubernetes *common.KeyValueList + hasErrorSeverity := false + for _, v := range recordContainer.Values { + if v.Key == "logger" { + logger = v.Value.GetStringValue() + } + if v.Key == "record" { + record = v.Value.GetKvlistValue() + } + if v.Key == "kubernetes" { + kubernetes = v.Value.GetKvlistValue() + } + if v.Key == "error_severity" { + hasErrorSeverity = true + } + } + // TODO: Support other logger names (this is only tested with CNPG) + if logger == "postgres" { + // jsonlog wrapped in K8s context (via fluentbit / Vector) + if kubernetes != nil && skipDueToK8sFilter(kubernetes, config) { + return nil + } + return record + } else if logger == "" && hasErrorSeverity { + // simple jsonlog (Postgres jsonlog has error_severity key) + return recordContainer + } + + return nil +} func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) (state.LogLine, *state.LogLine) { var logLine state.LogLine @@ -72,7 +222,7 @@ func logLineFromJsonlog(record *common.KeyValueList, logParser state.LogParser) return logLine, nil } -func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, prefixedLogger *util.Logger) bool { +func skipDueToK8sFilter(kubernetes *common.KeyValueList, config config.ServerConfig) bool { var k8sPodName string var k8sNamespaceName string @@ -91,115 +241,17 @@ func skipDueToK8sFilter(kubernetes *common.KeyValueList, server *state.Server, p } } - if server.Config.LogOtelK8SPod != "" { - if server.Config.LogOtelK8SPodNamespace != "" && server.Config.LogOtelK8SPodNamespace != k8sNamespaceName { + if config.LogOtelK8SPod != "" { + if config.LogOtelK8SPodNamespace != "" && config.LogOtelK8SPodNamespace != k8sNamespaceName { return true } - if server.Config.LogOtelK8SPodName != k8sPodName { + if config.LogOtelK8SPodName != k8sPodName { return true } } - if server.Config.LogOtelK8SLabels != "" { - return util.CheckLabelSelectorMismatch(k8sLabels, server.Config.LogOtelK8SLabelSelectors) + if config.LogOtelK8SLabels != "" { + return util.CheckLabelSelectorMismatch(k8sLabels, config.LogOtelK8SLabelSelectors) } return false } - -func otelV1LogHandler(w http.ResponseWriter, r *http.Request, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { - logParser := server.GetLogParser() - b, err := io.ReadAll(r.Body) - - if err != nil { - prefixedLogger.PrintError("OTel log server could not read request body") - return - } - - logsData := &otlpLogs.LogsData{} - contentType := r.Header.Get("Content-Type") - switch contentType { - case "application/json": - if err := protojson.Unmarshal(b, logsData); err != nil { - prefixedLogger.PrintError("OTel log server could not unmarshal request body, JSON does not match expected format: %s\n received body: %s", err, string(b)) - return - } - default: - if err := proto.Unmarshal(b, logsData); err != nil { - prefixedLogger.PrintError("OTel log server could not unmarshal request body, expected binary OTLP Protobuf format: %s", err) - return - } - } - - if opts.VeryVerbose { - jsonData, err := json.MarshalIndent(logsData, "", " ") - if err != nil { - prefixedLogger.PrintVerbose("OTel log server failed to convert protobuf to JSON: %v", err) - } - prefixedLogger.PrintVerbose("OTel log server received log data in the following format:\n") - prefixedLogger.PrintVerbose(string(jsonData)) - } - - for _, r := range logsData.ResourceLogs { - for _, s := range r.ScopeLogs { - for _, l := range s.LogRecords { - var logger string - var record *common.KeyValueList - var kubernetes *common.KeyValueList - hasErrorSeverity := false - if l.Body.GetKvlistValue() != nil { - for _, v := range l.Body.GetKvlistValue().Values { - if v.Key == "logger" { - logger = v.Value.GetStringValue() - } - if v.Key == "record" { - record = v.Value.GetKvlistValue() - } - if v.Key == "kubernetes" { - kubernetes = v.Value.GetKvlistValue() - } - if v.Key == "error_severity" { - hasErrorSeverity = true - } - } - // TODO: Support other logger names (this is only tested with CNPG) - if logger == "postgres" { - // jsonlog wrapped in K8s context (via fluentbit / Vector) - logLine, detailLine := logLineFromJsonlog(record, logParser) - if kubernetes != nil && skipDueToK8sFilter(kubernetes, server, prefixedLogger) { - continue - } - - parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} - if detailLine != nil { - parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} - } - } else if logger == "" && hasErrorSeverity { - // simple jsonlog (Postgres jsonlog has error_severity key) - logLine, detailLine := logLineFromJsonlog(l.Body.GetKvlistValue(), logParser) - parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: logLine} - if detailLine != nil { - parsedLogStream <- state.ParsedLogStreamItem{Identifier: server.Config.Identifier, LogLine: *detailLine} - } - } - } else if l.Body.GetStringValue() != "" { - // plain log message - item := SelfHostedLogStreamItem{} - item.Line = l.Body.GetStringValue() - item.OccurredAt = time.Unix(0, int64(l.TimeUnixNano)) - rawLogStream <- item - } - } - } - } -} - -func setupOtelHandler(ctx context.Context, server *state.Server, rawLogStream chan<- SelfHostedLogStreamItem, parsedLogStream chan state.ParsedLogStreamItem, prefixedLogger *util.Logger, opts state.CollectionOpts) { - otelLogServer := server.Config.LogOtelServer - - serveMux := http.NewServeMux() - serveMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, r *http.Request) { - otelV1LogHandler(w, r, server, rawLogStream, parsedLogStream, prefixedLogger, opts) - }) - - util.GoServeHTTP(ctx, prefixedLogger, otelLogServer, serveMux) -} diff --git a/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.go b/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.go new file mode 100644 index 000000000..978efdde9 --- /dev/null +++ b/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.go @@ -0,0 +1,367 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.21.6 +// source: opentelemetry/proto/collector/logs/v1/logs_service.proto + +package v1 + +import ( + v1 "go.opentelemetry.io/proto/otlp/logs/v1" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ExportLogsServiceRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // An array of ResourceLogs. + // For data coming from a single resource this array will typically contain one + // element. Intermediary nodes (such as OpenTelemetry Collector) that receive + // data from multiple origins typically batch the data before forwarding further and + // in that case this array will contain multiple elements. + ResourceLogs []*v1.ResourceLogs `protobuf:"bytes,1,rep,name=resource_logs,json=resourceLogs,proto3" json:"resource_logs,omitempty"` +} + +func (x *ExportLogsServiceRequest) Reset() { + *x = ExportLogsServiceRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExportLogsServiceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExportLogsServiceRequest) ProtoMessage() {} + +func (x *ExportLogsServiceRequest) ProtoReflect() protoreflect.Message { + mi := &file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExportLogsServiceRequest.ProtoReflect.Descriptor instead. +func (*ExportLogsServiceRequest) Descriptor() ([]byte, []int) { + return file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescGZIP(), []int{0} +} + +func (x *ExportLogsServiceRequest) GetResourceLogs() []*v1.ResourceLogs { + if x != nil { + return x.ResourceLogs + } + return nil +} + +type ExportLogsServiceResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The details of a partially successful export request. + // + // If the request is only partially accepted + // (i.e. when the server accepts only parts of the data and rejects the rest) + // the server MUST initialize the `partial_success` field and MUST + // set the `rejected_` with the number of items it rejected. + // + // Servers MAY also make use of the `partial_success` field to convey + // warnings/suggestions to senders even when the request was fully accepted. + // In such cases, the `rejected_` MUST have a value of `0` and + // the `error_message` MUST be non-empty. + // + // A `partial_success` message with an empty value (rejected_ = 0 and + // `error_message` = "") is equivalent to it not being set/present. Senders + // SHOULD interpret it the same way as in the full success case. + PartialSuccess *ExportLogsPartialSuccess `protobuf:"bytes,1,opt,name=partial_success,json=partialSuccess,proto3" json:"partial_success,omitempty"` +} + +func (x *ExportLogsServiceResponse) Reset() { + *x = ExportLogsServiceResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExportLogsServiceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExportLogsServiceResponse) ProtoMessage() {} + +func (x *ExportLogsServiceResponse) ProtoReflect() protoreflect.Message { + mi := &file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExportLogsServiceResponse.ProtoReflect.Descriptor instead. +func (*ExportLogsServiceResponse) Descriptor() ([]byte, []int) { + return file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescGZIP(), []int{1} +} + +func (x *ExportLogsServiceResponse) GetPartialSuccess() *ExportLogsPartialSuccess { + if x != nil { + return x.PartialSuccess + } + return nil +} + +type ExportLogsPartialSuccess struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The number of rejected log records. + // + // A `rejected_` field holding a `0` value indicates that the + // request was fully accepted. + RejectedLogRecords int64 `protobuf:"varint,1,opt,name=rejected_log_records,json=rejectedLogRecords,proto3" json:"rejected_log_records,omitempty"` + // A developer-facing human-readable message in English. It should be used + // either to explain why the server rejected parts of the data during a partial + // success or to convey warnings/suggestions during a full success. The message + // should offer guidance on how users can address such issues. + // + // error_message is an optional field. An error_message with an empty value + // is equivalent to it not being set. + ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` +} + +func (x *ExportLogsPartialSuccess) Reset() { + *x = ExportLogsPartialSuccess{} + if protoimpl.UnsafeEnabled { + mi := &file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExportLogsPartialSuccess) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExportLogsPartialSuccess) ProtoMessage() {} + +func (x *ExportLogsPartialSuccess) ProtoReflect() protoreflect.Message { + mi := &file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExportLogsPartialSuccess.ProtoReflect.Descriptor instead. +func (*ExportLogsPartialSuccess) Descriptor() ([]byte, []int) { + return file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescGZIP(), []int{2} +} + +func (x *ExportLogsPartialSuccess) GetRejectedLogRecords() int64 { + if x != nil { + return x.RejectedLogRecords + } + return 0 +} + +func (x *ExportLogsPartialSuccess) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + +var File_opentelemetry_proto_collector_logs_v1_logs_service_proto protoreflect.FileDescriptor + +var file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDesc = []byte{ + 0x0a, 0x38, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, + 0x6c, 0x6f, 0x67, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x5f, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x25, 0x6f, 0x70, 0x65, 0x6e, + 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x6c, 0x6f, 0x67, 0x73, 0x2e, 0x76, + 0x31, 0x1a, 0x26, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x6c, + 0x6f, 0x67, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6a, 0x0a, 0x18, 0x45, 0x78, 0x70, + 0x6f, 0x72, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x4e, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x5f, 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x6f, + 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x6c, 0x6f, 0x67, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x4c, 0x6f, 0x67, 0x73, 0x22, 0x85, 0x01, 0x0a, 0x19, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, + 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x68, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x73, + 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, + 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x6c, 0x6f, 0x67, + 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x0e, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x71, 0x0a, + 0x18, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x61, 0x6c, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x30, 0x0a, 0x14, 0x72, 0x65, 0x6a, + 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x72, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x65, + 0x64, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x32, 0x9d, 0x01, 0x0a, 0x0b, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x8d, 0x01, 0x0a, 0x06, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x3f, 0x2e, 0x6f, 0x70, + 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x6c, 0x6f, 0x67, 0x73, + 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x40, 0x2e, 0x6f, + 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x6c, 0x6f, 0x67, + 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x42, 0x98, 0x01, 0x0a, 0x28, 0x69, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, + 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, + 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x6c, 0x6f, 0x67, 0x73, 0x2e, 0x76, 0x31, 0x42, 0x10, 0x4c, + 0x6f, 0x67, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, + 0x01, 0x5a, 0x30, 0x67, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, + 0x74, 0x72, 0x79, 0x2e, 0x69, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6f, 0x74, 0x6c, + 0x70, 0x2f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x6c, 0x6f, 0x67, 0x73, + 0x2f, 0x76, 0x31, 0xaa, 0x02, 0x25, 0x4f, 0x70, 0x65, 0x6e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, + 0x74, 0x72, 0x79, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x2e, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescOnce sync.Once + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescData = file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDesc +) + +func file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescGZIP() []byte { + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescOnce.Do(func() { + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescData) + }) + return file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDescData +} + +var file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_opentelemetry_proto_collector_logs_v1_logs_service_proto_goTypes = []interface{}{ + (*ExportLogsServiceRequest)(nil), // 0: opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest + (*ExportLogsServiceResponse)(nil), // 1: opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse + (*ExportLogsPartialSuccess)(nil), // 2: opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess + (*v1.ResourceLogs)(nil), // 3: opentelemetry.proto.logs.v1.ResourceLogs +} +var file_opentelemetry_proto_collector_logs_v1_logs_service_proto_depIdxs = []int32{ + 3, // 0: opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.resource_logs:type_name -> opentelemetry.proto.logs.v1.ResourceLogs + 2, // 1: opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse.partial_success:type_name -> opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess + 0, // 2: opentelemetry.proto.collector.logs.v1.LogsService.Export:input_type -> opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest + 1, // 3: opentelemetry.proto.collector.logs.v1.LogsService.Export:output_type -> opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_opentelemetry_proto_collector_logs_v1_logs_service_proto_init() } +func file_opentelemetry_proto_collector_logs_v1_logs_service_proto_init() { + if File_opentelemetry_proto_collector_logs_v1_logs_service_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExportLogsServiceRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExportLogsServiceResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExportLogsPartialSuccess); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_opentelemetry_proto_collector_logs_v1_logs_service_proto_goTypes, + DependencyIndexes: file_opentelemetry_proto_collector_logs_v1_logs_service_proto_depIdxs, + MessageInfos: file_opentelemetry_proto_collector_logs_v1_logs_service_proto_msgTypes, + }.Build() + File_opentelemetry_proto_collector_logs_v1_logs_service_proto = out.File + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_rawDesc = nil + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_goTypes = nil + file_opentelemetry_proto_collector_logs_v1_logs_service_proto_depIdxs = nil +} diff --git a/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.gw.go b/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.gw.go new file mode 100644 index 000000000..d34f32d91 --- /dev/null +++ b/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service.pb.gw.go @@ -0,0 +1,171 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: opentelemetry/proto/collector/logs/v1/logs_service.proto + +/* +Package v1 is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package v1 + +import ( + "context" + "io" + "net/http" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = metadata.Join + +func request_LogsService_Export_0(ctx context.Context, marshaler runtime.Marshaler, client LogsServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq ExportLogsServiceRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.Export(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_LogsService_Export_0(ctx context.Context, marshaler runtime.Marshaler, server LogsServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq ExportLogsServiceRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.Export(ctx, &protoReq) + return msg, metadata, err + +} + +// RegisterLogsServiceHandlerServer registers the http handlers for service LogsService to "mux". +// UnaryRPC :call LogsServiceServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterLogsServiceHandlerFromEndpoint instead. +func RegisterLogsServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server LogsServiceServer) error { + + mux.Handle("POST", pattern_LogsService_Export_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/opentelemetry.proto.collector.logs.v1.LogsService/Export", runtime.WithHTTPPathPattern("/v1/logs")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LogsService_Export_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_LogsService_Export_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +// RegisterLogsServiceHandlerFromEndpoint is same as RegisterLogsServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterLogsServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterLogsServiceHandler(ctx, mux, conn) +} + +// RegisterLogsServiceHandler registers the http handlers for service LogsService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterLogsServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterLogsServiceHandlerClient(ctx, mux, NewLogsServiceClient(conn)) +} + +// RegisterLogsServiceHandlerClient registers the http handlers for service LogsService +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "LogsServiceClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "LogsServiceClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "LogsServiceClient" to call the correct interceptors. +func RegisterLogsServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client LogsServiceClient) error { + + mux.Handle("POST", pattern_LogsService_Export_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/opentelemetry.proto.collector.logs.v1.LogsService/Export", runtime.WithHTTPPathPattern("/v1/logs")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LogsService_Export_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_LogsService_Export_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_LogsService_Export_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "logs"}, "")) +) + +var ( + forward_LogsService_Export_0 = runtime.ForwardResponseMessage +) diff --git a/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service_grpc.pb.go b/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service_grpc.pb.go new file mode 100644 index 000000000..e1b7c457c --- /dev/null +++ b/vendor/go.opentelemetry.io/proto/otlp/collector/logs/v1/logs_service_grpc.pb.go @@ -0,0 +1,109 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.1.0 +// - protoc v3.21.6 +// source: opentelemetry/proto/collector/logs/v1/logs_service.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// LogsServiceClient is the client API for LogsService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type LogsServiceClient interface { + // For performance reasons, it is recommended to keep this RPC + // alive for the entire life of the application. + Export(ctx context.Context, in *ExportLogsServiceRequest, opts ...grpc.CallOption) (*ExportLogsServiceResponse, error) +} + +type logsServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewLogsServiceClient(cc grpc.ClientConnInterface) LogsServiceClient { + return &logsServiceClient{cc} +} + +func (c *logsServiceClient) Export(ctx context.Context, in *ExportLogsServiceRequest, opts ...grpc.CallOption) (*ExportLogsServiceResponse, error) { + out := new(ExportLogsServiceResponse) + err := c.cc.Invoke(ctx, "/opentelemetry.proto.collector.logs.v1.LogsService/Export", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// LogsServiceServer is the server API for LogsService service. +// All implementations must embed UnimplementedLogsServiceServer +// for forward compatibility +type LogsServiceServer interface { + // For performance reasons, it is recommended to keep this RPC + // alive for the entire life of the application. + Export(context.Context, *ExportLogsServiceRequest) (*ExportLogsServiceResponse, error) + mustEmbedUnimplementedLogsServiceServer() +} + +// UnimplementedLogsServiceServer must be embedded to have forward compatible implementations. +type UnimplementedLogsServiceServer struct { +} + +func (UnimplementedLogsServiceServer) Export(context.Context, *ExportLogsServiceRequest) (*ExportLogsServiceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Export not implemented") +} +func (UnimplementedLogsServiceServer) mustEmbedUnimplementedLogsServiceServer() {} + +// UnsafeLogsServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to LogsServiceServer will +// result in compilation errors. +type UnsafeLogsServiceServer interface { + mustEmbedUnimplementedLogsServiceServer() +} + +func RegisterLogsServiceServer(s grpc.ServiceRegistrar, srv LogsServiceServer) { + s.RegisterService(&LogsService_ServiceDesc, srv) +} + +func _LogsService_Export_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExportLogsServiceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LogsServiceServer).Export(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/opentelemetry.proto.collector.logs.v1.LogsService/Export", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LogsServiceServer).Export(ctx, req.(*ExportLogsServiceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// LogsService_ServiceDesc is the grpc.ServiceDesc for LogsService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var LogsService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "opentelemetry.proto.collector.logs.v1.LogsService", + HandlerType: (*LogsServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Export", + Handler: _LogsService_Export_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "opentelemetry/proto/collector/logs/v1/logs_service.proto", +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a15f4db37..06abbf92b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -555,6 +555,7 @@ go.opentelemetry.io/otel/trace/internal/telemetry go.opentelemetry.io/otel/trace/noop # go.opentelemetry.io/proto/otlp v1.0.0 ## explicit; go 1.17 +go.opentelemetry.io/proto/otlp/collector/logs/v1 go.opentelemetry.io/proto/otlp/collector/trace/v1 go.opentelemetry.io/proto/otlp/common/v1 go.opentelemetry.io/proto/otlp/logs/v1