Skip to content

Commit a1a618f

Browse files
committed
Add timeout while synchronizing head blocks
1 parent 07c36ec commit a1a618f

3 files changed

Lines changed: 89 additions & 45 deletions

File tree

sqlchain/chain.go

Lines changed: 83 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -568,51 +568,81 @@ func (c *Chain) produceBlock(now time.Time) (err error) {
568568
return
569569
}
570570

571-
func (c *Chain) syncHead() {
571+
func (c *Chain) syncHead() (err error) {
572572
// Try to fetch if the block of the current turn is not advised yet
573-
if h := c.rt.getNextTurn() - 1; c.rt.getHead().Height < h {
574-
var err error
575-
req := &MuxFetchBlockReq{
576-
Envelope: proto.Envelope{
577-
// TODO(leventeliu): Add fields.
578-
},
579-
DatabaseID: c.databaseID,
580-
FetchBlockReq: FetchBlockReq{
581-
Height: h,
582-
},
573+
h := c.rt.getNextTurn() - 1
574+
if c.rt.getHead().Height >= h {
575+
return
576+
}
577+
578+
var (
579+
peers = c.rt.getPeers()
580+
l = len(peers.Servers)
581+
le = c.logEntryWithHeadState()
582+
583+
child, cancel = context.WithTimeout(c.rt.ctx, c.rt.tick)
584+
wg = &sync.WaitGroup{}
585+
586+
succCount uint32
587+
)
588+
defer func() {
589+
wg.Wait()
590+
cancel()
591+
592+
if succCount == 0 {
593+
// Set error if all RPC calls are failed
594+
err = errors.New("all remote peers are unreachable")
583595
}
584-
resp := &MuxFetchBlockResp{}
585-
peers := c.rt.getPeers()
586-
l := len(peers.Servers)
587-
succ := false
588-
le := c.logEntryWithHeadState()
596+
}()
589597

590-
for i, s := range peers.Servers {
591-
ile := le.WithFields(log.Fields{"remote": fmt.Sprintf("[%d/%d] %s", i, l, s)})
592-
if s != c.rt.getServer() {
593-
if err = c.cl.CallNode(
594-
s, route.SQLCFetchBlock.String(), req, resp,
595-
); err != nil || resp.Block == nil {
596-
ile.WithError(err).Debug("failed to fetch block from peer")
597-
} else {
598-
select {
599-
case c.blocks <- resp.Block:
600-
case <-c.rt.ctx.Done():
601-
err = c.rt.ctx.Err()
602-
le.WithError(err).Info("abort head block synchronizing")
603-
return
604-
}
605-
ile.Debug("fetch block from remote peer successfully")
606-
succ = true
607-
break
598+
for i, s := range peers.Servers {
599+
// Skip local server
600+
if s == c.rt.getServer() {
601+
continue
602+
}
603+
604+
wg.Add(1)
605+
go func(i int, node proto.NodeID) {
606+
defer wg.Done()
607+
var (
608+
ile = le.WithFields(log.Fields{"remote": fmt.Sprintf("[%d/%d] %s", i, l, node)})
609+
req = &MuxFetchBlockReq{
610+
DatabaseID: c.databaseID,
611+
FetchBlockReq: FetchBlockReq{
612+
Height: h,
613+
},
608614
}
615+
resp = &MuxFetchBlockResp{}
616+
)
617+
618+
if err := c.cl.CallNodeWithContext(
619+
child, node, route.SQLCFetchBlock.String(), req, resp,
620+
); err != nil {
621+
ile.WithError(err).Error("failed to fetch block from peer")
622+
return
609623
}
610-
}
611624

612-
if !succ {
613-
le.Debug("cannot get block from any peer")
614-
}
625+
if resp.Block == nil {
626+
atomic.AddUint32(&succCount, 1)
627+
ile.Debug("fetch block request reply: no such block")
628+
return
629+
}
630+
631+
ile.WithFields(log.Fields{
632+
"parent": resp.Block.ParentHash().Short(4),
633+
"hash": resp.Block.BlockHash().Short(4),
634+
}).Debug("fetch block request reply: found block")
635+
select {
636+
case c.blocks <- resp.Block:
637+
atomic.AddUint32(&succCount, 1)
638+
case <-child.Done():
639+
le.WithError(child.Err()).Info("abort head block synchronizing")
640+
return
641+
}
642+
}(i, s)
615643
}
644+
645+
return
616646
}
617647

618648
// runCurrentTurn does the check and runs block producing if its my turn.
@@ -656,7 +686,9 @@ func (c *Chain) mainCycle(ctx context.Context) {
656686
c.logEntry().WithError(ctx.Err()).Info("abort main cycle")
657687
return
658688
default:
659-
c.syncHead()
689+
if err := c.syncHead(); err != nil {
690+
c.logEntry().WithError(err).Error("failed to sync head")
691+
}
660692
if t, d := c.rt.nextTick(); d > 0 {
661693
time.Sleep(d)
662694
} else {
@@ -667,7 +699,7 @@ func (c *Chain) mainCycle(ctx context.Context) {
667699
}
668700

669701
// sync synchronizes blocks and queries from the other peers.
670-
func (c *Chain) sync() {
702+
func (c *Chain) sync() (err error) {
671703
le := c.logEntry()
672704
le.Debug("synchronizing chain state")
673705
defer func() {
@@ -686,10 +718,14 @@ func (c *Chain) sync() {
686718
break
687719
}
688720
for c.rt.getNextTurn() <= height {
689-
c.syncHead()
721+
if err = c.syncHead(); err != nil {
722+
le.WithError(err).Errorf("failed to sync block at height %d", height)
723+
return
724+
}
690725
c.rt.setNextTurn()
691726
}
692727
}
728+
return
693729
}
694730

695731
func (c *Chain) processBlocks(ctx context.Context) {
@@ -792,11 +828,15 @@ func (c *Chain) processBlocks(ctx context.Context) {
792828
}
793829

794830
// Start starts the main process of the sql-chain.
795-
func (c *Chain) Start() {
831+
func (c *Chain) Start() (err error) {
796832
c.rt.goFunc(c.processBlocks)
797-
c.sync()
833+
if err = c.sync(); err != nil {
834+
_ = c.Stop()
835+
return
836+
}
798837
c.rt.goFunc(c.mainCycle)
799838
c.rt.startService(c)
839+
return
800840
}
801841

802842
// Stop stops the main process of the sql-chain.

sqlchain/chain_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,9 @@ func TestMultiChain(t *testing.T) {
275275

276276
// Start all chain instances
277277
for _, v := range chains {
278-
v.chain.Start()
278+
if err = v.chain.Start(); err != nil {
279+
t.Fatalf("error occurred: %v", err)
280+
}
279281
defer func(c *Chain) {
280282
// Stop chain main process before exit
281283
_ = c.Stop()

worker/db.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,9 @@ func NewDatabase(cfg *DBConfig, peers *proto.Peers,
176176
if db.chain, err = sqlchain.NewChain(chainCfg); err != nil {
177177
return
178178
}
179-
db.chain.Start()
179+
if err = db.chain.Start(); err != nil {
180+
return
181+
}
180182

181183
// init kayak config
182184
kayakWalPath := filepath.Join(cfg.DataDir, KayakWalFileName)

0 commit comments

Comments
 (0)