@@ -568,51 +568,81 @@ func (c *Chain) produceBlock(now time.Time) (err error) {
568568 return
569569}
570570
571- func (c * Chain ) syncHead () {
571+ func (c * Chain ) syncHead () ( err error ) {
572572 // Try to fetch if the block of the current turn is not advised yet
573- if h := c .rt .getNextTurn () - 1 ; c .rt .getHead ().Height < h {
574- var err error
575- req := & MuxFetchBlockReq {
576- Envelope : proto.Envelope {
577- // TODO(leventeliu): Add fields.
578- },
579- DatabaseID : c .databaseID ,
580- FetchBlockReq : FetchBlockReq {
581- Height : h ,
582- },
573+ h := c .rt .getNextTurn () - 1
574+ if c .rt .getHead ().Height >= h {
575+ return
576+ }
577+
578+ var (
579+ peers = c .rt .getPeers ()
580+ l = len (peers .Servers )
581+ le = c .logEntryWithHeadState ()
582+
583+ child , cancel = context .WithTimeout (c .rt .ctx , c .rt .tick )
584+ wg = & sync.WaitGroup {}
585+
586+ succCount uint32
587+ )
588+ defer func () {
589+ wg .Wait ()
590+ cancel ()
591+
592+ if succCount == 0 {
593+ // Set error if all RPC calls are failed
594+ err = errors .New ("all remote peers are unreachable" )
583595 }
584- resp := & MuxFetchBlockResp {}
585- peers := c .rt .getPeers ()
586- l := len (peers .Servers )
587- succ := false
588- le := c .logEntryWithHeadState ()
596+ }()
589597
590- for i , s := range peers .Servers {
591- ile := le .WithFields (log.Fields {"remote" : fmt .Sprintf ("[%d/%d] %s" , i , l , s )})
592- if s != c .rt .getServer () {
593- if err = c .cl .CallNode (
594- s , route .SQLCFetchBlock .String (), req , resp ,
595- ); err != nil || resp .Block == nil {
596- ile .WithError (err ).Debug ("failed to fetch block from peer" )
597- } else {
598- select {
599- case c .blocks <- resp .Block :
600- case <- c .rt .ctx .Done ():
601- err = c .rt .ctx .Err ()
602- le .WithError (err ).Info ("abort head block synchronizing" )
603- return
604- }
605- ile .Debug ("fetch block from remote peer successfully" )
606- succ = true
607- break
598+ for i , s := range peers .Servers {
599+ // Skip local server
600+ if s == c .rt .getServer () {
601+ continue
602+ }
603+
604+ wg .Add (1 )
605+ go func (i int , node proto.NodeID ) {
606+ defer wg .Done ()
607+ var (
608+ ile = le .WithFields (log.Fields {"remote" : fmt .Sprintf ("[%d/%d] %s" , i , l , node )})
609+ req = & MuxFetchBlockReq {
610+ DatabaseID : c .databaseID ,
611+ FetchBlockReq : FetchBlockReq {
612+ Height : h ,
613+ },
608614 }
615+ resp = & MuxFetchBlockResp {}
616+ )
617+
618+ if err := c .cl .CallNodeWithContext (
619+ child , node , route .SQLCFetchBlock .String (), req , resp ,
620+ ); err != nil {
621+ ile .WithError (err ).Error ("failed to fetch block from peer" )
622+ return
609623 }
610- }
611624
612- if ! succ {
613- le .Debug ("cannot get block from any peer" )
614- }
625+ if resp .Block == nil {
626+ atomic .AddUint32 (& succCount , 1 )
627+ ile .Debug ("fetch block request reply: no such block" )
628+ return
629+ }
630+
631+ ile .WithFields (log.Fields {
632+ "parent" : resp .Block .ParentHash ().Short (4 ),
633+ "hash" : resp .Block .BlockHash ().Short (4 ),
634+ }).Debug ("fetch block request reply: found block" )
635+ select {
636+ case c .blocks <- resp .Block :
637+ atomic .AddUint32 (& succCount , 1 )
638+ case <- child .Done ():
639+ le .WithError (child .Err ()).Info ("abort head block synchronizing" )
640+ return
641+ }
642+ }(i , s )
615643 }
644+
645+ return
616646}
617647
618648// runCurrentTurn does the check and runs block producing if its my turn.
@@ -656,7 +686,9 @@ func (c *Chain) mainCycle(ctx context.Context) {
656686 c .logEntry ().WithError (ctx .Err ()).Info ("abort main cycle" )
657687 return
658688 default :
659- c .syncHead ()
689+ if err := c .syncHead (); err != nil {
690+ c .logEntry ().WithError (err ).Error ("failed to sync head" )
691+ }
660692 if t , d := c .rt .nextTick (); d > 0 {
661693 time .Sleep (d )
662694 } else {
@@ -667,7 +699,7 @@ func (c *Chain) mainCycle(ctx context.Context) {
667699}
668700
669701// sync synchronizes blocks and queries from the other peers.
670- func (c * Chain ) sync () {
702+ func (c * Chain ) sync () ( err error ) {
671703 le := c .logEntry ()
672704 le .Debug ("synchronizing chain state" )
673705 defer func () {
@@ -686,10 +718,14 @@ func (c *Chain) sync() {
686718 break
687719 }
688720 for c .rt .getNextTurn () <= height {
689- c .syncHead ()
721+ if err = c .syncHead (); err != nil {
722+ le .WithError (err ).Errorf ("failed to sync block at height %d" , height )
723+ return
724+ }
690725 c .rt .setNextTurn ()
691726 }
692727 }
728+ return
693729}
694730
695731func (c * Chain ) processBlocks (ctx context.Context ) {
@@ -792,11 +828,15 @@ func (c *Chain) processBlocks(ctx context.Context) {
792828}
793829
794830// Start starts the main process of the sql-chain.
795- func (c * Chain ) Start () {
831+ func (c * Chain ) Start () ( err error ) {
796832 c .rt .goFunc (c .processBlocks )
797- c .sync ()
833+ if err = c .sync (); err != nil {
834+ _ = c .Stop ()
835+ return
836+ }
798837 c .rt .goFunc (c .mainCycle )
799838 c .rt .startService (c )
839+ return
800840}
801841
802842// Stop stops the main process of the sql-chain.
0 commit comments