Skip to content

Commit

Permalink
fix(node): host startup is not optimized (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeremyPansier authored Dec 22, 2022
1 parent 2035a7d commit e7f12e8
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 56 deletions.
1 change: 0 additions & 1 deletion src/node/clock/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ type Engine interface {
Start()
Stop()
Do()
Wait()
}
65 changes: 25 additions & 40 deletions src/node/clock/tick/engine.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package tick

import (
"github.com/my-cloud/ruthenium/src/log"
"github.com/my-cloud/ruthenium/src/node/clock"
"sync"
"time"

"github.com/my-cloud/ruthenium/src/node/clock"
)

type Engine struct {
Expand All @@ -17,19 +16,15 @@ type Engine struct {
skippedOccurrences int
started bool
requested bool

waitGroup *sync.WaitGroup
logger log.Logger
}

func NewEngine(function func(timestamp int64), watch clock.Watch, timer time.Duration, occurrences int, skippedOccurrences int, logger log.Logger) *Engine {
func NewEngine(function func(timestamp int64), watch clock.Watch, timer time.Duration, occurrences int, skippedOccurrences int) *Engine {
subTimer := timer
if occurrences > 0 {
subTimer = time.Duration(timer.Nanoseconds() / int64(occurrences))
}
ticker := time.NewTicker(subTimer)
var waitGroup sync.WaitGroup
return &Engine{function, watch, subTimer, ticker, occurrences, skippedOccurrences, false, false, &waitGroup, logger}
return &Engine{function, watch, subTimer, ticker, occurrences, skippedOccurrences, false, false}
}

func (engine *Engine) Do() {
Expand All @@ -41,21 +36,17 @@ func (engine *Engine) Do() {
deadline := parsedStartDate.Sub(startTime)
engine.ticker.Reset(deadline)
engine.requested = true
engine.waitGroup.Add(1)
go func() {
defer engine.waitGroup.Done()
<-engine.ticker.C
now := engine.watch.Now().Round(engine.timer)
engine.function(now.UnixNano())
engine.requested = false
if engine.started {
newParsedStartDate := now.Add(engine.timer)
newDeadline := newParsedStartDate.Sub(now)
engine.ticker.Reset(newDeadline)
} else {
engine.ticker.Stop()
}
}()
<-engine.ticker.C
now := engine.watch.Now().Round(engine.timer)
engine.function(now.UnixNano())
engine.requested = false
if engine.started {
newParsedStartDate := now.Add(engine.timer)
newDeadline := newParsedStartDate.Sub(now)
engine.ticker.Reset(newDeadline)
} else {
engine.ticker.Stop()
}
}

func (engine *Engine) Start() {
Expand All @@ -69,28 +60,22 @@ func (engine *Engine) Start() {
engine.ticker.Reset(deadline)
<-engine.ticker.C
engine.ticker.Reset(engine.timer)
go func() {
for {
for i := 0; i < engine.occurrences; i++ {
if i >= engine.skippedOccurrences {
if !engine.started {
engine.ticker.Stop()
return
}
now := engine.watch.Now().Round(engine.timer)
engine.function(now.UnixNano())
for {
for i := 0; i < engine.occurrences; i++ {
if i < engine.occurrences-engine.skippedOccurrences {
if !engine.started {
engine.ticker.Stop()
return
}
<-engine.ticker.C
now := engine.watch.Now().Round(engine.timer)
engine.function(now.UnixNano())
}
<-engine.ticker.C
}
}()
}
}

func (engine *Engine) Stop() {
engine.started = false
engine.ticker.Reset(time.Nanosecond)
}

func (engine *Engine) Wait() {
engine.waitGroup.Wait()
}
9 changes: 5 additions & 4 deletions src/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"flag"
"fmt"
"time"

"github.com/my-cloud/ruthenium/src/config"
"github.com/my-cloud/ruthenium/src/encryption"
"github.com/my-cloud/ruthenium/src/environment"
Expand All @@ -14,7 +16,6 @@ import (
"github.com/my-cloud/ruthenium/src/node/protocol/poh"
"github.com/my-cloud/ruthenium/src/node/protocol/validation"
"github.com/my-cloud/ruthenium/src/node/protocol/verification"
"time"
)

const (
Expand Down Expand Up @@ -51,11 +52,11 @@ func main() {
logger.Fatal(fmt.Errorf("failed to create synchronizer: %w", err).Error())
}
synchronizationTimer := time.Second * synchronizationIntervalInSeconds
synchronizationEngine := tick.NewEngine(synchronizer.Synchronize, watch, synchronizationTimer, 1, 0, logger)
synchronizationEngine := tick.NewEngine(synchronizer.Synchronize, watch, synchronizationTimer, 1, 0)
blockchain := verification.NewBlockchain(registry, validationTimer, synchronizer, logger)
pool := validation.NewTransactionsPool(blockchain, registry, wallet.Address(), settings.GenesisAmount, validationTimer, watch, logger)
validationEngine := tick.NewEngine(pool.Validate, watch, validationTimer, 1, 0, logger)
verificationEngine := tick.NewEngine(blockchain.Verify, watch, validationTimer, verificationsCountPerValidation, 1, logger)
validationEngine := tick.NewEngine(pool.Validate, watch, validationTimer, 1, 0)
verificationEngine := tick.NewEngine(blockchain.Verify, watch, validationTimer, verificationsCountPerValidation, 1)
serverFactory := gp2p.NewServerFactory()
server, err := serverFactory.CreateServer(int(*port))
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions src/node/network/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"fmt"

gp2p "github.com/leprosus/golang-p2p"
"github.com/my-cloud/ruthenium/src/log"
"github.com/my-cloud/ruthenium/src/node/clock"
Expand Down Expand Up @@ -51,11 +52,15 @@ func (host *Host) Run() error {
}

func (host *Host) startBlockchain() {
host.logger.Info("updating the blockchain...")
host.synchronizationEngine.Do()
host.synchronizationEngine.Wait()
host.synchronizationEngine.Start()
host.validationEngine.Start()
host.verificationEngine.Start()
host.logger.Info("neighbors are synchronized")
go host.synchronizationEngine.Start()
host.verificationEngine.Do()
host.logger.Info("the blockchain is now up to date")
host.validationEngine.Do()
go host.validationEngine.Start()
go host.verificationEngine.Start()
}

func (host *Host) handle(_ context.Context, req gp2p.Data) (res gp2p.Data, err error) {
Expand All @@ -72,11 +77,11 @@ func (host *Host) handle(_ context.Context, req gp2p.Data) (res gp2p.Data, err e
case GetTransactionsRequest:
res = host.getTransactions()
case MineRequest:
host.validationEngine.Do()
go host.validationEngine.Do()
case StartMiningRequest:
host.validationEngine.Start()
go host.validationEngine.Start()
case StopMiningRequest:
host.validationEngine.Stop()
go host.validationEngine.Stop()
default:
unknownRequest = true
}
Expand Down
8 changes: 4 additions & 4 deletions test/node/clock/tick/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package tick

import (
"fmt"
"testing"
"time"

"github.com/my-cloud/ruthenium/src/node/clock/tick"
"github.com/my-cloud/ruthenium/test"
"github.com/my-cloud/ruthenium/test/node/clock/clocktest"
"testing"
"time"
)

func Test_Do(t *testing.T) {
// Arrange
watchMock := new(clocktest.WatchMock)
watchMock.NowFunc = func() time.Time { return time.Unix(0, 0) }
var calls int
engine := tick.NewEngine(func(int64) { calls++ }, watchMock, 1, 0, 0, nil)
engine := tick.NewEngine(func(int64) { calls++ }, watchMock, 1, 0, 0)

// Act
engine.Do()

// Assert
engine.Wait()
test.Assert(t, calls == 1, fmt.Sprintf("The function is called %d times whereas it should be called once.", calls))
}

0 comments on commit e7f12e8

Please sign in to comment.