-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: add xDS transport custom Dialer support #7586
base: master
Are you sure you want to change the base?
Changes from 5 commits
354e088
b5d173c
d187d39
6a82444
ad20fd0
a8ff4fb
3f564dd
65feb3c
14cbf57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,11 @@ package bootstrap | |
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"maps" | ||
"net" | ||
"net/url" | ||
"os" | ||
"slices" | ||
|
@@ -179,6 +181,7 @@ type ServerConfig struct { | |
// credentials and store it here for easy access. | ||
selectedCreds ChannelCreds | ||
credsDialOption grpc.DialOption | ||
dialerOption grpc.DialOption | ||
|
||
cleanups []func() | ||
} | ||
|
@@ -223,6 +226,12 @@ func (sc *ServerConfig) CredsDialOption() grpc.DialOption { | |
return sc.credsDialOption | ||
} | ||
|
||
// DialerOption returns the first supported Dialer function that specifies how | ||
// to dial the xDS server from the configuration, as a dial option. | ||
func (sc *ServerConfig) DialerOption() grpc.DialOption { | ||
return sc.dialerOption | ||
} | ||
Comment on lines
+232
to
+234
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm ... I'm thinking if we should just have a method to return a slice of dial options from this type: func (sc *ServerConfig) DialOptions() []grpc.DialOption {
// return a slice of dial options that contains the creds dial option and optionally one for the dialer
} With this approach, the transport can be oblivious to the dial options, and tomorrow if another dial option needs to be added here, then the transport won't have to change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. I think this abstraction would be appropriate in a follow-up PR as it would involve removing the existing |
||
|
||
// Cleanups returns a collection of functions to be called when the xDS client | ||
// for this server is closed. Allows cleaning up resources created specifically | ||
// for this server. | ||
|
@@ -275,6 +284,12 @@ func (sc *ServerConfig) MarshalJSON() ([]byte, error) { | |
return json.Marshal(server) | ||
} | ||
|
||
// dialer captures the Dialer method specified via the credentials bundle. | ||
type dialer interface { | ||
// Dialer specifies how to dial the xDS server. | ||
Dialer(context.Context, string) (net.Conn, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this method name be more specific? Maybe something like The reason I'm suggesting this is because the API to register a credentials type is public. So, users could register their own credentials using Or at least, we should document this in the public API in package Thoughts @dfawley There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. I think What are your thoughts @dfawley? If everyone is in agreement, then I will make the change. |
||
} | ||
|
||
// UnmarshalJSON takes the json data (a server) and unmarshals it to the struct. | ||
func (sc *ServerConfig) UnmarshalJSON(data []byte) error { | ||
server := serverConfigJSON{} | ||
|
@@ -298,6 +313,9 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error { | |
} | ||
sc.selectedCreds = cc | ||
sc.credsDialOption = grpc.WithCredentialsBundle(bundle) | ||
if d, ok := bundle.(dialer); ok { | ||
sc.dialerOption = grpc.WithContextDialer(d.Dialer) | ||
} | ||
sc.cleanups = append(sc.cleanups, cancel) | ||
break | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* | ||
* Copyright 2024 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
package xds_test | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"net" | ||
"testing" | ||
|
||
"github.com/google/uuid" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/internal" | ||
"google.golang.org/grpc/internal/stubserver" | ||
"google.golang.org/grpc/internal/testutils" | ||
"google.golang.org/grpc/internal/testutils/xds/e2e" | ||
internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap" | ||
"google.golang.org/grpc/resolver" | ||
"google.golang.org/grpc/xds/bootstrap" | ||
|
||
testgrpc "google.golang.org/grpc/interop/grpc_testing" | ||
testpb "google.golang.org/grpc/interop/grpc_testing" | ||
) | ||
|
||
const testDialerCredsBuilderName = "test_dialer_creds" | ||
|
||
var ( | ||
mgmtServerAddress string | ||
customDialerCalled bool | ||
) | ||
|
||
func init() { | ||
bootstrap.RegisterCredentials(&testDialerCredsBuilder{}) | ||
} | ||
|
||
// testDialerCredsBuilder implements the `Credentials` interface defined in | ||
// package `xds/bootstrap` and encapsulates an insecure credential with a | ||
// custom Dialer that specifies how to dial the xDS server. | ||
type testDialerCredsBuilder struct{} | ||
|
||
func (t *testDialerCredsBuilder) Build(json.RawMessage) (credentials.Bundle, func(), error) { | ||
return &testDialerCredsBundle{}, func() {}, nil | ||
} | ||
|
||
func (t *testDialerCredsBuilder) Name() string { | ||
return testDialerCredsBuilderName | ||
} | ||
|
||
// testDialerCredsBundle implements the `Bundle` interface defined in package | ||
// `credentials` and encapsulates an insecure credential with a custom Dialer | ||
// that specifies how to dial the xDS server. | ||
type testDialerCredsBundle struct{} | ||
|
||
func (t *testDialerCredsBundle) TransportCredentials() credentials.TransportCredentials { | ||
return insecure.NewCredentials() | ||
} | ||
|
||
func (t *testDialerCredsBundle) PerRPCCredentials() credentials.PerRPCCredentials { | ||
return nil | ||
} | ||
|
||
func (t *testDialerCredsBundle) NewWithMode(string) (credentials.Bundle, error) { | ||
return &testDialerCredsBundle{}, nil | ||
} | ||
|
||
// Dialer specifies how to dial the xDS management server. | ||
func (t *testDialerCredsBundle) Dialer(context.Context, string) (net.Conn, error) { | ||
customDialerCalled = true | ||
// Create a pass-through connection (no-op) to the xDS management server. | ||
return net.Dial("tcp", mgmtServerAddress) | ||
} | ||
|
||
func (s) TestClientCustomDialerFromCredentialsBundle(t *testing.T) { | ||
customDialerCalled = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be a field in the creds bundle instead of a global? I think we should be able to do the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done.
Called |
||
|
||
// Start an xDS management server. | ||
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) | ||
|
||
// Create bootstrap configuration pointing to the above management server. | ||
nodeID := uuid.New().String() | ||
bc, err := internalbootstrap.NewContentsForTesting(internalbootstrap.ConfigOptionsForTesting{ | ||
Servers: []byte(fmt.Sprintf(`[{ | ||
"server_uri": %q, | ||
"channel_creds": [{"type": %q}] | ||
}]`, mgmtServer.Address, testDialerCredsBuilderName)), | ||
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), | ||
}) | ||
if err != nil { | ||
t.Fatalf("Failed to create bootstrap configuration: %v", err) | ||
} | ||
|
||
// Set the management server address to be used by the custom dialer. | ||
mgmtServerAddress = mgmtServer.Address | ||
|
||
// Create an xDS resolver with the above bootstrap configuration. | ||
var resolverBuilder resolver.Builder | ||
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { | ||
resolverBuilder, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) | ||
if err != nil { | ||
t.Fatalf("Failed to create xDS resolver for testing: %v", err) | ||
} | ||
} | ||
|
||
// Spin up a test backend. | ||
server := stubserver.StartTestService(t, nil) | ||
defer server.Stop() | ||
|
||
// Configure client side xDS resources on the management server. | ||
const serviceName = "my-service-client-side-xds" | ||
resources := e2e.DefaultClientResources(e2e.ResourceParams{ | ||
DialTarget: serviceName, | ||
NodeID: nodeID, | ||
Host: "localhost", | ||
Port: testutils.ParsePort(t, server.Address), | ||
SecLevel: e2e.SecurityLevelNone, | ||
}) | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
if err := mgmtServer.Update(ctx, resources); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Create a ClientConn and make a successful RPC. The insecure transport credentials passed into | ||
// the gRPC.NewClient is the credentials for the data plane communication with the test backend. | ||
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) | ||
if err != nil { | ||
t.Fatalf("failed to dial local test server: %v", err) | ||
} | ||
defer cc.Close() | ||
|
||
client := testgrpc.NewTestServiceClient(cc) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think (?) this should require There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
|
||
if !customDialerCalled { | ||
t.Fatalf("xDS client transport custom dialer called = false, want true") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,16 @@ | |
package transport_test | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"net" | ||
"testing" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/internal/xds/bootstrap" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/credentials/insecure" | ||
internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap" | ||
"google.golang.org/grpc/xds/bootstrap" | ||
"google.golang.org/grpc/xds/internal/xdsclient/transport" | ||
"google.golang.org/grpc/xds/internal/xdsclient/transport/internal" | ||
|
||
|
@@ -39,7 +45,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) { | |
internal.GRPCNewClient = customDialer | ||
defer func() { internal.GRPCNewClient = oldDial }() | ||
|
||
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: "server-address"}) | ||
serverCfg, err := internalbootstrap.ServerConfigForTesting(internalbootstrap.ServerConfigTestingOptions{URI: "server-address"}) | ||
if err != nil { | ||
t.Fatalf("Failed to create server config for testing: %v", err) | ||
} | ||
|
@@ -82,3 +88,76 @@ func (s) TestNewWithGRPCDial(t *testing.T) { | |
t.Fatalf("transport.New(%+v) custom dialer called = true, want false", opts) | ||
} | ||
} | ||
|
||
const testDialerCredsBuilderName = "test_dialer_creds" | ||
|
||
func init() { | ||
bootstrap.RegisterCredentials(&testDialerCredsBuilder{}) | ||
} | ||
|
||
// testDialerCredsBuilder implements the `Credentials` interface defined in | ||
// package `xds/bootstrap` and encapsulates an insecure credential with a | ||
// custom Dialer that specifies how to dial the xDS server. | ||
type testDialerCredsBuilder struct{} | ||
|
||
func (t *testDialerCredsBuilder) Build(json.RawMessage) (credentials.Bundle, func(), error) { | ||
return &testDialerCredsBundle{}, func() {}, nil | ||
} | ||
|
||
func (t *testDialerCredsBuilder) Name() string { | ||
return testDialerCredsBuilderName | ||
} | ||
|
||
// testDialerCredsBundle implements the `Bundle` interface defined in package | ||
// `credentials` and encapsulates an insecure credential with a custom Dialer | ||
// that specifies how to dial the xDS server. | ||
type testDialerCredsBundle struct{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would embedding an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
func (t *testDialerCredsBundle) TransportCredentials() credentials.TransportCredentials { | ||
return insecure.NewCredentials() | ||
} | ||
|
||
func (t *testDialerCredsBundle) PerRPCCredentials() credentials.PerRPCCredentials { | ||
return nil | ||
} | ||
|
||
func (t *testDialerCredsBundle) NewWithMode(string) (credentials.Bundle, error) { | ||
return &testDialerCredsBundle{}, nil | ||
} | ||
|
||
func (t *testDialerCredsBundle) Dialer(context.Context, string) (net.Conn, error) { | ||
return nil, nil | ||
} | ||
|
||
func (s) TestNewWithDialerFromCredentialsBundle(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this really tests whether the transport is actually passing the new dial option to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
serverCfg, err := internalbootstrap.ServerConfigForTesting(internalbootstrap.ServerConfigTestingOptions{ | ||
URI: "trafficdirector.googleapis.com:443", | ||
ChannelCreds: []internalbootstrap.ChannelCreds{{Type: testDialerCredsBuilderName}}, | ||
}) | ||
if err != nil { | ||
t.Fatalf("Failed to create server config for testing: %v", err) | ||
} | ||
if serverCfg.DialerOption() == nil { | ||
t.Fatalf("Dialer for xDS transport in server config for testing is nil, want non-nil") | ||
} | ||
// Create a new transport. | ||
opts := transport.Options{ | ||
ServerCfg: serverCfg, | ||
NodeProto: &v3corepb.Node{}, | ||
OnRecvHandler: func(update transport.ResourceUpdate, onDone func()) error { | ||
onDone() | ||
return nil | ||
}, | ||
OnErrorHandler: func(error) {}, | ||
OnSendHandler: func(*transport.ResourceSendInfo) {}, | ||
} | ||
c, err := transport.New(opts) | ||
defer func() { | ||
if c != nil { | ||
c.Close() | ||
} | ||
}() | ||
if err != nil { | ||
t.Fatalf("transport.New(%v) failed: %v", opts, err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this comment be rephrased to indicate that it is actually the first supported credentials that determines this dialer function. There is really no such thing as the first supported dialer function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Rephrased comment to include that this Dialer function is determined by the first supported credentials from the configuration.