Skip to content

Commit 3fd90d3

Browse files
author
Zeqing Guo
authored
Merge pull request #186 from CovenantSQL/feature/startup
Speed up BPs at genesis startup
2 parents 973c3d2 + 1cd5912 commit 3fd90d3

6 files changed

Lines changed: 80 additions & 68 deletions

File tree

blockproducer/chain.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ func (c *Chain) advanceNextHeight(now time.Time, d time.Duration) {
395395
}
396396
// Normally, a block producing should start right after the new period, but more time may also
397397
// elapse since the last block synchronizing.
398-
if elapsed > c.tick { // TODO(leventeliu): add threshold config for `elapsed`.
398+
if elapsed+c.tick > c.period { // TODO(leventeliu): add threshold config for `elapsed`.
399399
log.WithFields(log.Fields{
400400
"advanced_height": c.getNextHeight(),
401401
"using_timestamp": now.Format(time.RFC3339Nano),
@@ -630,29 +630,27 @@ func (c *Chain) syncCurrentHead(ctx context.Context) (ok bool) {
630630
}
631631
resp = &types.FetchBlockResp{}
632632
)
633+
var le = log.WithFields(log.Fields{
634+
"local": c.peerInfo(),
635+
"remote": id,
636+
"height": h,
637+
})
633638
if err = c.cl.CallNodeWithContext(
634639
cld, id, route.MCCFetchBlock.String(), req, resp,
635640
); err != nil {
636-
log.WithFields(log.Fields{
637-
"local": c.peerInfo(),
638-
"remote": id,
639-
"height": h,
640-
}).WithError(err).Warn("failed to fetch block")
641+
le.WithError(err).Warn("failed to fetch block")
641642
atomic.AddUint32(&unreachable, 1)
642643
return
643644
}
644-
var fields = log.Fields{
645-
"local": c.peerInfo(),
646-
"remote": id,
647-
"height": h,
648-
}
649-
defer log.WithFields(fields).Debug("fetch block request reply")
650645
if resp.Block == nil {
646+
le.Debug("fetch block request reply: no such block")
651647
return
652648
}
653649
// Push new block from other peers
654-
fields["parent"] = resp.Block.ParentHash().Short(4)
655-
fields["hash"] = resp.Block.BlockHash().Short(4)
650+
le.WithFields(log.Fields{
651+
"parent": resp.Block.ParentHash().Short(4),
652+
"hash": resp.Block.BlockHash().Short(4),
653+
}).Debug("fetch block request reply: found block")
656654
select {
657655
case c.pendingBlocks <- resp.Block:
658656
case <-cld.Done():

blockproducer/rpc.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,31 @@ func WaitDatabaseCreation(
188188
}
189189
}
190190

191+
// WaitBPChainService waits until BP chain service is ready.
192+
func WaitBPChainService(ctx context.Context, period time.Duration) (err error) {
193+
var (
194+
ticker = time.NewTicker(period)
195+
req = &types.FetchBlockReq{
196+
Height: 0, // Genesis block
197+
}
198+
resp = &types.FetchTxBillingResp{}
199+
)
200+
defer ticker.Stop()
201+
for {
202+
select {
203+
case <-ticker.C:
204+
if err = rpc.RequestBP(
205+
route.MCCFetchBlock.String(), req, resp,
206+
); err == nil || !strings.Contains(err.Error(), "can't find service") {
207+
return
208+
}
209+
case <-ctx.Done():
210+
err = ctx.Err()
211+
return
212+
}
213+
}
214+
}
215+
191216
// Create allocates new database.
192217
func Create(
193218
meta types.ResourceMeta,

cmd/cql-fuse/block_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,15 @@ func initTestDB() (*sql.DB, func()) {
245245
return nil, stopNodes
246246
}
247247

248+
// wait for chain service
249+
var ctx1, cancel1 = context.WithTimeout(context.Background(), 1*time.Minute)
250+
defer cancel1()
251+
err = bp.WaitBPChainService(ctx1, 3*time.Second)
252+
if err != nil {
253+
log.Errorf("wait for chain service failed: %v", err)
254+
return nil, stopNodes
255+
}
256+
248257
// create
249258
meta := client.ResourceMeta{}
250259
meta.Node = 1
@@ -266,9 +275,9 @@ func initTestDB() (*sql.DB, func()) {
266275
}
267276

268277
// wait for creation
269-
var ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
270-
defer cancel()
271-
err = bp.WaitDatabaseCreation(ctx, proto.DatabaseID(dsnCfg.DatabaseID), db, 3*time.Second)
278+
var ctx2, cancel2 = context.WithTimeout(context.Background(), 1*time.Minute)
279+
defer cancel2()
280+
err = bp.WaitDatabaseCreation(ctx2, proto.DatabaseID(dsnCfg.DatabaseID), db, 3*time.Second)
272281
if err != nil {
273282
log.Errorf("wait for creation failed: %v", err)
274283
return nil, stopNodes

cmd/cql-observer/observation_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -252,20 +252,18 @@ func TestFullProcess(t *testing.T) {
252252

253253
Convey("test full process", t, func() {
254254
var (
255-
err error
256-
cliPriv *asymmetric.PrivateKey
257-
addr, addr2 proto.AccountAddress
258-
dsn, dsn2 string
259-
cfg, cfg2 *client.Config
260-
dbID, dbID2 string
261-
ctx, ctx2 context.Context
262-
ccl, ccl2 context.CancelFunc
255+
err error
256+
cliPriv *asymmetric.PrivateKey
257+
addr, addr2 proto.AccountAddress
258+
dsn, dsn2 string
259+
cfg, cfg2 *client.Config
260+
dbID, dbID2 string
261+
ctx1, ctx2, ctx3 context.Context
262+
ccl1, ccl2, ccl3 context.CancelFunc
263263
)
264264
startNodes()
265265
defer stopNodes()
266266

267-
time.Sleep(10 * time.Second)
268-
269267
err = client.Init(FJ(testWorkingDir, "./observation/node_c/config.yaml"), []byte(""))
270268
So(err, ShouldBeNil)
271269

@@ -280,6 +278,12 @@ func TestFullProcess(t *testing.T) {
280278
FJ(testWorkingDir, "./observation/node_miner_1/private.key"), []byte{})
281279
So(err, ShouldBeNil)
282280

281+
// wait until bp chain service is ready
282+
ctx1, ccl1 = context.WithTimeout(context.Background(), 1*time.Minute)
283+
defer ccl1()
284+
err = bp.WaitBPChainService(ctx1, 3*time.Second)
285+
So(err, ShouldBeNil)
286+
283287
// create
284288
_, dsn, err = bp.Create(types.ResourceMeta{
285289
TargetMiners: []proto.AccountAddress{addr},
@@ -295,9 +299,9 @@ func TestFullProcess(t *testing.T) {
295299
cfg, err = client.ParseDSN(dsn)
296300
So(err, ShouldBeNil)
297301
dbID = cfg.DatabaseID
298-
ctx, ccl = context.WithTimeout(context.Background(), 5*time.Minute)
299-
defer ccl()
300-
err = bp.WaitDatabaseCreation(ctx, proto.DatabaseID(dbID), db, 3*time.Second)
302+
ctx2, ccl2 = context.WithTimeout(context.Background(), 5*time.Minute)
303+
defer ccl2()
304+
err = bp.WaitDatabaseCreation(ctx2, proto.DatabaseID(dbID), db, 3*time.Second)
301305
So(err, ShouldBeNil)
302306

303307
_, err = db.Exec("CREATE TABLE test (test int)")
@@ -367,9 +371,9 @@ func TestFullProcess(t *testing.T) {
367371
So(err, ShouldBeNil)
368372
dbID2 = cfg2.DatabaseID
369373
So(dbID, ShouldNotResemble, dbID2)
370-
ctx2, ccl2 = context.WithTimeout(context.Background(), 5*time.Minute)
371-
defer ccl2()
372-
err = bp.WaitDatabaseCreation(ctx2, proto.DatabaseID(dbID2), db2, 3*time.Second)
374+
ctx3, ccl3 = context.WithTimeout(context.Background(), 5*time.Minute)
375+
defer ccl3()
376+
err = bp.WaitDatabaseCreation(ctx3, proto.DatabaseID(dbID2), db2, 3*time.Second)
373377
So(err, ShouldBeNil)
374378

375379
_, err = db2.Exec("CREATE TABLE test (test int)")

cmd/cqld/bootstrap.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ func runNode(nodeID proto.NodeID, listenAddr string) (err error) {
8585
return
8686
}
8787

88+
// start server
89+
go func() {
90+
server.Serve()
91+
}()
92+
defer func() {
93+
server.Listener.Close()
94+
server.Stop()
95+
}()
96+
8897
// init storage
8998
log.Info("init storage")
9099
var st *LocalStorage
@@ -146,15 +155,6 @@ func runNode(nodeID proto.NodeID, listenAddr string) (err error) {
146155
log.Info(conf.StartSucceedMessage)
147156
//go periodicPingBlockProducer()
148157

149-
// start server
150-
go func() {
151-
server.Serve()
152-
}()
153-
defer func() {
154-
server.Listener.Close()
155-
server.Stop()
156-
}()
157-
158158
signalCh := make(chan os.Signal, 1)
159159
signal.Notify(
160160
signalCh,

cmd/cqld/cqld_test.go

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package main
2020

2121
import (
2222
"context"
23-
"strings"
2423
"syscall"
2524
"testing"
2625
"time"
2726

27+
bp "github.com/CovenantSQL/CovenantSQL/blockproducer"
2828
"github.com/CovenantSQL/CovenantSQL/conf"
2929
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
3030
"github.com/CovenantSQL/CovenantSQL/route"
@@ -34,30 +34,6 @@ import (
3434
. "github.com/smartystreets/goconvey/convey"
3535
)
3636

37-
func waitBPChainService(ctx context.Context, period time.Duration) (err error) {
38-
var (
39-
ticker = time.NewTicker(period)
40-
req = &types.FetchBlockReq{
41-
Height: 0, // Genesis block
42-
}
43-
resp = &types.FetchTxBillingResp{}
44-
)
45-
defer ticker.Stop()
46-
for {
47-
select {
48-
case <-ticker.C:
49-
if err = rpc.RequestBP(
50-
route.MCCFetchBlock.String(), req, resp,
51-
); err == nil || strings.Contains(err.Error(), "can't find service") {
52-
return
53-
}
54-
case <-ctx.Done():
55-
err = ctx.Err()
56-
return
57-
}
58-
}
59-
}
60-
6137
func TestCQLD(t *testing.T) {
6238
Convey("Test cqld 3BPs", t, func() {
6339
var (
@@ -85,7 +61,7 @@ func TestCQLD(t *testing.T) {
8561
// Wait BP chain service to be ready
8662
ctx2, ccl2 = context.WithTimeout(context.Background(), 30*time.Second)
8763
defer ccl2()
88-
err = waitBPChainService(ctx2, 3*time.Second)
64+
err = bp.WaitBPChainService(ctx2, 3*time.Second)
8965
So(err, ShouldBeNil)
9066

9167
// Wait for block producing

0 commit comments

Comments
 (0)