Skip to content

Commit

Permalink
clusterimpl: stop using gs.switchTo on child policy config update
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Aug 28, 2024
1 parent 3eb0145 commit 0f5b421
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
104 changes: 104 additions & 0 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package clusterimpl

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -40,6 +42,7 @@ import (
"google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
Expand Down Expand Up @@ -820,6 +823,107 @@ func (s) TestUpdateLRSServer(t *testing.T) {
}
}

// Test verifies that child policies was updated on receipt
// configuration update.
func (s) TestChildPolicyUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()

builder := balancer.Get(Name)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

childConfigUpdated := atomic.Bool{}
// Create a stub balancer which updates picker on receipt of config
// update.
const childPolicyName = "stubBalancer-ChildPolicyUpdatedSynchronouslyOnConfigUpdate"
stub.Register(childPolicyName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
childConfigUpdated.Store(true)
return nil
},
})

testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

if !childConfigUpdated.Load() {
t.Fatal("Child policy update did not happen synchronously on receipt of configuration update.")
}
}

// Test verifies that config update fails in case child policy
// config failed to parse.
func (s) TestFailToParseChildPolicyConfig(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()

builder := balancer.Get(Name)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

// Create a stub balancer which throws error from ParseConfig.
const parseConfigError = "failed to parse config"
const childPolicyName = "stubBalancer-ChildPolicyUpdatedSynchronouslyOnConfigUpdate"
stub.Register(childPolicyName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
return nil
},
ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return nil, errors.New(parseConfigError)
},
})

testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}
err = b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName,
},
},
})

if err == nil || !strings.Contains(err.Error(), parseConfigError) {
t.Errorf("Got child policy config parsed and updated, wanted parse config to fail.")
}
}

func assertString(f func() (string, error)) string {
s, err := f()
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,22 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
}

if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
if err := b.child.SwitchTo(bb); err != nil {
return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
strCfg, err := newConfig.ChildPolicy.MarshalJSON()
if err != nil {
return fmt.Errorf("error marshaling config: %v", err)

Check warning on line 253 in xds/internal/balancer/clusterimpl/clusterimpl.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterimpl/clusterimpl.go#L253

Added line #L253 was not covered by tests
}
r := json.RawMessage(strCfg)
sc, err := gracefulswitch.ParseConfig(r)
if err != nil {
return fmt.Errorf("error parsing child config: %v", err)
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: sc,
}); err != nil {
return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
}
}
}
b.config = newConfig
Expand Down

0 comments on commit 0f5b421

Please sign in to comment.