Skip to content
Open
5 changes: 0 additions & 5 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,6 @@ func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
return consensus.setCurBlockViewID(viewID)
}

// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}

// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
Expand Down
2 changes: 1 addition & 1 deletion consensus/quorum/quorom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func TestCIdentities_NthNextValidatorFailedEdgeCase2(t *testing.T) {
case <-done:
t.Error("Expected a timeout, but successfully calculated next leader")

case <-time.After(5 * time.Second):
case <-time.After(1 * time.Second):
t.Log("Test timed out, possible infinite loop")
}
}
Expand Down
15 changes: 7 additions & 8 deletions consensus/quorum/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type ParticipantTracker interface {
NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
FirstParticipant(shardingconfig.Instance) *bls.PublicKeyWrapper
NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error)
FirstParticipant() *bls.PublicKeyWrapper
UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper)
}

Expand Down Expand Up @@ -202,20 +203,18 @@ func (s *cIdentities) IndexOf(pubKey bls.SerializedPublicKey) int {
}

// NthNext return the Nth next pubkey, next can be negative number
func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false

func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) {
idx := s.IndexOf(pubKey.Bytes)
if idx != -1 {
found = true
if idx == -1 {
return nil, errors.Errorf("pubKey not found %x", pubKey.Bytes)
}
numNodes := int(s.ParticipantsCount())
// sanity check to avoid out of bound access
if numNodes <= 0 || numNodes > len(s.publicKeys) {
numNodes = len(s.publicKeys)
}
idx = (idx + next) % numNodes
return found, &s.publicKeys[idx]
return &s.publicKeys[idx], nil
}

// NthNextValidatorV2 returns the Nth next pubkey nodes, but from another validator.
Expand Down Expand Up @@ -314,7 +313,7 @@ func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.P
}

// FirstParticipant returns the first participant of the shard
func (s *cIdentities) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper {
func (s *cIdentities) FirstParticipant() *bls.PublicKeyWrapper {
return &s.publicKeys[0]
}

Expand Down
10 changes: 8 additions & 2 deletions consensus/quorum/thread_safe_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey
return a.decider.NthNextValidator(slotList, pubKey, next)
}

func (a threadSafeDeciderImpl) NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNext(pubKey, next)
}

func (a threadSafeDeciderImpl) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
Expand All @@ -68,10 +74,10 @@ func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubk
return a.decider.NthNextHmy(instance, pubkey, next)
}

func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper {
func (a threadSafeDeciderImpl) FirstParticipant() *bls.PublicKeyWrapper {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.FirstParticipant(instance)
return a.decider.FirstParticipant()
}

func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) {
Expand Down
78 changes: 73 additions & 5 deletions consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (pm *State) SetCurBlockViewID(viewID uint64) uint64 {
return viewID
}

// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}

// GetViewChangingID return the current view changing id
// It is meaningful during view change mode
func (pm *State) GetViewChangingID() uint64 {
Expand Down Expand Up @@ -140,10 +145,27 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
return nextViewID, viewChangeDuration
}

// getNextLeaderKey uniquely determine who is the leader for given viewID
// It reads the current leader's pubkey based on the blockchain data and returns
// getNextLeaderKeySkipSameAddress uniquely determine who is the leader for given viewID
// It receives the committee and returns
// the next leader based on the gap of the viewID of the view change and the last
// know view id of the block.
func (consensus *Consensus) getNextLeaderKeySkipSameAddress(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper {
gap := 1

cur := consensus.getCurBlockViewID()
if viewID > cur {
gap = int(viewID - cur)
}
// use pubkey as default key as well
leaderPubKey := consensus.getLeaderPubKey()
rs, err := viewChangeNextValidator(consensus.decider, gap, committee.Slots, leaderPubKey)
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[getNextLeaderKeySkipSameAddress] viewChangeNextValidator failed")
return leaderPubKey
}
return rs
}

func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper {
gap := 1

Expand Down Expand Up @@ -182,7 +204,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
// it can still sync with other validators.
if curHeader.IsLastBlockInEpoch() {
consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch")
lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch))
lastLeaderPubKey = consensus.decider.FirstParticipant()
}
}
}
Expand Down Expand Up @@ -231,6 +253,46 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
return next
}

type nthNext interface {
NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error)
}

func viewChangeNextValidator(decider nthNext, gap int, slots shard.SlotList, lastLeaderPubKey *bls.PublicKeyWrapper) (*bls.PublicKeyWrapper, error) {
if gap > 1 {
current, err := decider.NthNext(
lastLeaderPubKey,
gap-1)
if err != nil {
return nil, errors.WithMessagef(err, "NthNext failed, gap %d", gap)
}

publicToAddress := make(map[bls.SerializedPublicKey]common.Address)
for _, slot := range slots {
publicToAddress[slot.BLSPublicKey] = slot.EcdsaAddress
}

for i := 0; i < len(slots); i++ {
gap = gap + i
next, err := decider.NthNext(
lastLeaderPubKey,
gap)
if err != nil {
return nil, errors.New("current leader not found")
}

if publicToAddress[current.Bytes] != publicToAddress[next.Bytes] {
return next, nil
}
}
} else {
next, err := decider.NthNext(
lastLeaderPubKey,
gap)
return next, errors.WithMessagef(err, "NthNext failed, gap %d", gap)
}
return nil, errors.New("current leader not found")
}

func createTimeout() map[TimeoutType]*utils.Timeout {
timeouts := make(map[TimeoutType]*utils.Timeout)
timeouts[timeoutConsensus] = utils.NewTimeout(phaseDuration)
Expand All @@ -250,7 +312,8 @@ func (consensus *Consensus) startViewChange() {
consensus.current.SetMode(ViewChanging)
nextViewID, duration := consensus.getNextViewID()
consensus.setViewChangingID(nextViewID)
epoch := consensus.Blockchain().CurrentHeader().Epoch()
currentHeader := consensus.Blockchain().CurrentHeader()
epoch := currentHeader.Epoch()
ss, err := consensus.Blockchain().ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
Expand All @@ -267,7 +330,12 @@ func (consensus *Consensus) startViewChange() {
// aganist the consensus.LeaderPubKey variable.
// Ideally, we shall use another variable to keep track of the
// leader pubkey in viewchange mode
consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee))
c := consensus.Blockchain().Config()
if c.IsLeaderRotationV2Epoch(currentHeader.Epoch()) {
consensus.setLeaderPubKey(consensus.getNextLeaderKeySkipSameAddress(nextViewID, committee))
} else {
consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee))
}

consensus.getLogger().Warn().
Uint64("nextViewID", nextViewID).
Expand Down
140 changes: 138 additions & 2 deletions consensus/view_change_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package consensus

import (
"math/big"
"testing"

"github.com/harmony-one/harmony/crypto/bls"

"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/crypto/bls"
harmony_bls "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/shard"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBasicViewChanging(t *testing.T) {
Expand Down Expand Up @@ -118,3 +122,135 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {

assert.Equal(t, nextKey, &wrappedBLSKeys[1])
}

func TestViewChangeNextValidator(t *testing.T) {
decider := quorum.NewDecider(quorum.SuperMajorityVote, shard.BeaconChainShardID)
assert.Equal(t, int64(0), decider.ParticipantsCount())
wrappedBLSKeys := []bls.PublicKeyWrapper{}

const keyCount = 5
for i := 0; i < keyCount; i++ {
blsKey := harmony_bls.RandPrivateKey()
blsPubKey := harmony_bls.WrapperFromPrivateKey(blsKey)
wrappedBLSKeys = append(wrappedBLSKeys, *blsPubKey.Pub)
}

decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{})
assert.EqualValues(t, keyCount, decider.ParticipantsCount())

t.Run("check_different_address_for_validators", func(t *testing.T) {
var (
rs *bls.PublicKeyWrapper
err error
slots []shard.Slot
)
for i := 0; i < keyCount; i++ {
slot := shard.Slot{
EcdsaAddress: common.BigToAddress(big.NewInt(int64(i))),
BLSPublicKey: wrappedBLSKeys[i].Bytes,
}
slots = append(slots, slot)
}

rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[0], rs)

rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)

rs, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[2], rs)

// and no panic or error for future 1k gaps
for i := 0; i < 1000; i++ {
_, err = viewChangeNextValidator(decider, i, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
}
})

// we can't find next validator, because all validators have the same address
t.Run("same_address_for_all_validators", func(t *testing.T) {
var (
rs *bls.PublicKeyWrapper
err error
slots []shard.Slot
)
for i := 0; i < keyCount; i++ {
slot := shard.Slot{
EcdsaAddress: common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3")),
BLSPublicKey: wrappedBLSKeys[i].Bytes,
}
slots = append(slots, slot)
}

rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[0], rs)

rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)

// error because all validators belong same address
_, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0])
require.Error(t, err)

// all of them return error, no way to recover
for i := 2; i < 1000; i++ {
_, err = viewChangeNextValidator(decider, i, slots, &wrappedBLSKeys[0])
require.Errorf(t, err, "error because all validators belong same address %d", i)
}
})

// we can't find next validator, because all validators have the same address
t.Run("check_5_validators_2_addrs", func(t *testing.T) {
// Slot represents node id (BLS address)
var (
addr1 = common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3"))
addr2 = common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z4"))
rs *bls.PublicKeyWrapper
err error
)
slots := []shard.Slot{
{
EcdsaAddress: addr1,
BLSPublicKey: wrappedBLSKeys[0].Bytes,
},
{
EcdsaAddress: addr1,
BLSPublicKey: wrappedBLSKeys[1].Bytes,
},
{
EcdsaAddress: addr2,
BLSPublicKey: wrappedBLSKeys[2].Bytes,
},
{
EcdsaAddress: addr2,
BLSPublicKey: wrappedBLSKeys[3].Bytes,
},
{
EcdsaAddress: addr2,
BLSPublicKey: wrappedBLSKeys[4].Bytes,
},
}

rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[0], rs)

rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)

rs, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[2], rs)

rs, err = viewChangeNextValidator(decider, 3, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)
})
}
2 changes: 1 addition & 1 deletion internal/chain/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"sort"
"time"

bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/numeric"

bls2 "github.com/harmony-one/bls/ffi/go/bls"
blsvrf "github.com/harmony-one/harmony/crypto/vrf/bls"

"github.com/ethereum/go-ethereum/common"
Expand Down