Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add xDS transport custom Dialer support #7586

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
18 changes: 18 additions & 0 deletions internal/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ package bootstrap

import (
"bytes"
"context"
"encoding/json"
"fmt"
"maps"
"net"
"net/url"
"os"
"slices"
Expand Down Expand Up @@ -179,6 +181,7 @@ type ServerConfig struct {
// credentials and store it here for easy access.
selectedCreds ChannelCreds
credsDialOption grpc.DialOption
dialerOption grpc.DialOption

cleanups []func()
}
Expand Down Expand Up @@ -223,6 +226,12 @@ func (sc *ServerConfig) CredsDialOption() grpc.DialOption {
return sc.credsDialOption
}

// DialerOption returns the first supported Dialer function that specifies how
// to dial the xDS server from the configuration, as a dial option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this comment be rephrased to indicate that it is actually the first supported credentials that determines this dialer function. There is really no such thing as the first supported dialer function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Rephrased comment to include that this Dialer function is determined by the first supported credentials from the configuration.

func (sc *ServerConfig) DialerOption() grpc.DialOption {
return sc.dialerOption
}
Comment on lines +232 to +234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ... I'm thinking if we should just have a method to return a slice of dial options from this type:

func (sc *ServerConfig) DialOptions() []grpc.DialOption {
	// return a slice of dial options that contains the creds dial option and optionally one for the dialer
}

With this approach, the transport can be oblivious to the dial options, and tomorrow if another dial option needs to be added here, then the transport won't have to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged.
I think this is a good abstraction, although it might slightly reduce readability and intent of what each dial option does (only two dial options passed to the transport).

I think this abstraction would be appropriate in a follow-up PR as it would involve removing the existing func (sc *ServerConfig) CredsDialOption(), introduce the new API above, and refactoring the transport, which is unrelated to the current PR of adding a custom Dialer used by the xDS transport and it would reduce this PR's focus/intent/readability.


// Cleanups returns a collection of functions to be called when the xDS client
// for this server is closed. Allows cleaning up resources created specifically
// for this server.
Expand Down Expand Up @@ -275,6 +284,12 @@ func (sc *ServerConfig) MarshalJSON() ([]byte, error) {
return json.Marshal(server)
}

// dialer captures the Dialer method specified via the credentials bundle.
type dialer interface {
// Dialer specifies how to dial the xDS server.
Dialer(context.Context, string) (net.Conn, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method name be more specific? Maybe something like DialXDS or DialXDSServer or something else?

The reason I'm suggesting this is because the API to register a credentials type is public. So, users could register their own credentials using bootstrap.RegisterCredentials, and if they end up having a Dialer method on their creds implementation type (which is maybe doing something else), then, they would be in for a surprise.

Or at least, we should document this in the public API in package xds/bootstrap?

Thoughts @dfawley

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged.
Users could definitely have a the Dialer method on their creds bundle implementation and it is a valid concern. We have no idea or control of what users could implement. Using another "less common" name compared to Dialer would greatly reduce this risk.

I think DialXDS would be appropriate and not too verbose, although this would introduce slight repetition as the XDS suffix is already implied (per the Repetition section of the Google Go Style Decisions guide).

What are your thoughts @dfawley? If everyone is in agreement, then I will make the change.

}

// UnmarshalJSON takes the json data (a server) and unmarshals it to the struct.
func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
server := serverConfigJSON{}
Expand All @@ -298,6 +313,9 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
}
sc.selectedCreds = cc
sc.credsDialOption = grpc.WithCredentialsBundle(bundle)
if d, ok := bundle.(dialer); ok {
sc.dialerOption = grpc.WithContextDialer(d.Dialer)
}
sc.cleanups = append(sc.cleanups, cancel)
break
}
Expand Down
158 changes: 158 additions & 0 deletions test/xds/xds_client_custom_dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xds_test

import (
"context"
"encoding/json"
"fmt"
"net"
"testing"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/bootstrap"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

const testDialerCredsBuilderName = "test_dialer_creds"

var (
mgmtServerAddress string
customDialerCalled bool
)

func init() {
bootstrap.RegisterCredentials(&testDialerCredsBuilder{})
}

// testDialerCredsBuilder implements the `Credentials` interface defined in
// package `xds/bootstrap` and encapsulates an insecure credential with a
// custom Dialer that specifies how to dial the xDS server.
type testDialerCredsBuilder struct{}

func (t *testDialerCredsBuilder) Build(json.RawMessage) (credentials.Bundle, func(), error) {
return &testDialerCredsBundle{}, func() {}, nil
}

func (t *testDialerCredsBuilder) Name() string {
return testDialerCredsBuilderName
}

// testDialerCredsBundle implements the `Bundle` interface defined in package
// `credentials` and encapsulates an insecure credential with a custom Dialer
// that specifies how to dial the xDS server.
type testDialerCredsBundle struct{}

func (t *testDialerCredsBundle) TransportCredentials() credentials.TransportCredentials {
return insecure.NewCredentials()
}

func (t *testDialerCredsBundle) PerRPCCredentials() credentials.PerRPCCredentials {
return nil
}

func (t *testDialerCredsBundle) NewWithMode(string) (credentials.Bundle, error) {
return &testDialerCredsBundle{}, nil
}

// Dialer specifies how to dial the xDS management server.
func (t *testDialerCredsBundle) Dialer(context.Context, string) (net.Conn, error) {
customDialerCalled = true
// Create a pass-through connection (no-op) to the xDS management server.
return net.Dial("tcp", mgmtServerAddress)
}

func (s) TestClientCustomDialerFromCredentialsBundle(t *testing.T) {
customDialerCalled = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a field in the creds bundle instead of a global? I think we should be able to do the RegisterCredentials here instead of in an init.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the mgmtServerAddress global. Or if you want to be a little fancy, it can be passed through the bootstrap config, and testDialerCredsBuilder.Build can parse it and pass it to the testDialerCredsBundle that it builds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the testDialerCredsBuilder needs to have a channel on which it sends the testDialerCredsBundle that it creates. That way, the test can get access to the creds bundle and can check if the custom dialer was called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Removed all global variables (customDialerCalled and mgmtServerAddress) in this test.

  • The mgmtServerAddress is passed through the bootstrap config where testDialerCredsBuilder.Build unmarshals the JSON config and passes it to the testDialerCredsBundle.
  • Added a chan struct{} to testDialerCredsBuilder and testDialerCredsBundle such that the test can check whether the custom dialer has been called. Followed Go Tip 79 for one-time signaling and simply used close(chan) along with select statements. Furthermore, added a case for ctx.Done() in the select to gracefully handle the case if the dial failed due to timeout.

Called bootstrap.RegisterCredentials() within this test and removed init().


// Start an xDS management server.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc, err := internalbootstrap.NewContentsForTesting(internalbootstrap.ConfigOptionsForTesting{
Servers: []byte(fmt.Sprintf(`[{
"server_uri": %q,
"channel_creds": [{"type": %q}]
}]`, mgmtServer.Address, testDialerCredsBuilderName)),
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
})
if err != nil {
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}

// Set the management server address to be used by the custom dialer.
mgmtServerAddress = mgmtServer.Address

// Create an xDS resolver with the above bootstrap configuration.
var resolverBuilder resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
resolverBuilder, err = newResolver.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}

// Spin up a test backend.
server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure client side xDS resources on the management server.
const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: testutils.ParsePort(t, server.Address),
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC. The insecure transport credentials passed into
// the gRPC.NewClient is the credentials for the data plane communication with the test backend.
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think (?) this should require WaitForReady -- we shouldn't be expecting connections to fail as we start the server before the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Removed grpc.WaitForReady(true) call option. It makes sense as the server is indeed started before the client.
Tested with thread sanitizer and ran test 100 times and ensured there are no flakes.

t.Fatalf("EmptyCall() failed: %v", err)
}

if !customDialerCalled {
t.Fatalf("xDS client transport custom dialer called = false, want true")
}
}
3 changes: 3 additions & 0 deletions xds/internal/xdsclient/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func New(opts Options) (*Transport, error) {
Timeout: 20 * time.Second,
}),
}
if dialerOpts := opts.ServerCfg.DialerOption(); dialerOpts != nil {
dopts = append(dopts, dialerOpts)
}
grpcNewClient := transportinternal.GRPCNewClient.(func(string, ...grpc.DialOption) (*grpc.ClientConn, error))
cc, err := grpcNewClient(opts.ServerCfg.ServerURI(), dopts...)
if err != nil {
Expand Down
83 changes: 81 additions & 2 deletions xds/internal/xdsclient/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package transport_test

import (
"context"
"encoding/json"
"net"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/transport/internal"

Expand All @@ -39,7 +45,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) {
internal.GRPCNewClient = customDialer
defer func() { internal.GRPCNewClient = oldDial }()

serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: "server-address"})
serverCfg, err := internalbootstrap.ServerConfigForTesting(internalbootstrap.ServerConfigTestingOptions{URI: "server-address"})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
Expand Down Expand Up @@ -82,3 +88,76 @@ func (s) TestNewWithGRPCDial(t *testing.T) {
t.Fatalf("transport.New(%+v) custom dialer called = true, want false", opts)
}
}

const testDialerCredsBuilderName = "test_dialer_creds"

func init() {
bootstrap.RegisterCredentials(&testDialerCredsBuilder{})
}

// testDialerCredsBuilder implements the `Credentials` interface defined in
// package `xds/bootstrap` and encapsulates an insecure credential with a
// custom Dialer that specifies how to dial the xDS server.
type testDialerCredsBuilder struct{}

func (t *testDialerCredsBuilder) Build(json.RawMessage) (credentials.Bundle, func(), error) {
return &testDialerCredsBundle{}, func() {}, nil
}

func (t *testDialerCredsBuilder) Name() string {
return testDialerCredsBuilderName
}

// testDialerCredsBundle implements the `Bundle` interface defined in package
// `credentials` and encapsulates an insecure credential with a custom Dialer
// that specifies how to dial the xDS server.
type testDialerCredsBundle struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would embedding an credentials.Bundle here and setting it to insecure.NewBundle from testDialerCredsBuilder.Build, help to get rid of the implementation of the three methods below?

Copy link
Contributor Author

@danielzhaotongliu danielzhaotongliu Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Great find and suggestion!
Embedded credentials.Bundle for the struct in both transport_test.go and xds_client_custom_dialer_test.go.
This follows from Go Tip 7 and eliminated unnecessary boilerplate methods and improved readability.


func (t *testDialerCredsBundle) TransportCredentials() credentials.TransportCredentials {
return insecure.NewCredentials()
}

func (t *testDialerCredsBundle) PerRPCCredentials() credentials.PerRPCCredentials {
return nil
}

func (t *testDialerCredsBundle) NewWithMode(string) (credentials.Bundle, error) {
return &testDialerCredsBundle{}, nil
}

func (t *testDialerCredsBundle) Dialer(context.Context, string) (net.Conn, error) {
return nil, nil
}

func (s) TestNewWithDialerFromCredentialsBundle(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this really tests whether the transport is actually passing the new dial option to grpc.Dial. It is not possible to check the exact dial options being passed because dial options are implemented as functions, and it is not possible to compare them. But at least, we should check if the call to grpc.Dial or grpc.NewClient from the transport passes the expected number of dial options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Updated this test such that initially it overrides internal.GRPCNewClient with a custom pass-through grpc.NewClient (then resets back to the original grpc.NewClient as to not have order-dependent tests) that gets the number of dial options. Later the test verifies the number of dial options passed to the custom grpc.NewClient is 3. Also, added a comment explaining why it is 3.

serverCfg, err := internalbootstrap.ServerConfigForTesting(internalbootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []internalbootstrap.ChannelCreds{{Type: testDialerCredsBuilderName}},
})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
if serverCfg.DialerOption() == nil {
t.Fatalf("Dialer for xDS transport in server config for testing is nil, want non-nil")
}
// Create a new transport.
opts := transport.Options{
ServerCfg: serverCfg,
NodeProto: &v3corepb.Node{},
OnRecvHandler: func(update transport.ResourceUpdate, onDone func()) error {
onDone()
return nil
},
OnErrorHandler: func(error) {},
OnSendHandler: func(*transport.ResourceSendInfo) {},
}
c, err := transport.New(opts)
defer func() {
if c != nil {
c.Close()
}
}()
if err != nil {
t.Fatalf("transport.New(%v) failed: %v", opts, err)
}
}