diff --git a/api/grpc/transaction.go b/api/grpc/transaction.go index a555b6904..5717f5c81 100644 --- a/api/grpc/transaction.go +++ b/api/grpc/transaction.go @@ -44,9 +44,6 @@ func (cbs *ChainBrokerService) SendTransactions(ctx context.Context, txs *pb.Mul } hash, err := cbs.sendTransaction(tx) - if tx.IsIBTP() { - cbs.logger.Infof("get transaction:appchain is %s, nonce is %d, index is %d", tx.GetFrom().String(), tx.GetNonce(), tx.GetIBTP().Index) - } if err != nil { return nil, status.Newf(codes.Internal, "internal handling transaction fail %s", err.Error()).Err() } diff --git a/internal/coreapi/feed.go b/internal/coreapi/feed.go index 913df8e83..723e7eef4 100644 --- a/internal/coreapi/feed.go +++ b/internal/coreapi/feed.go @@ -16,7 +16,7 @@ func (api *FeedAPI) SubscribeNewTxEvent(ch chan<- pb.Transactions) event.Subscri } func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription { - return api.bxh.BlockExecutor.SubscribeBlockEvent(ch) + return api.bxh.BlockExecutor.SubscribeBlockEventForRemote(ch) } func (api *FeedAPI) SubscribeLogsEvent(ch chan<- []*pb.EvmLog) event.Subscription { diff --git a/internal/executor/contracts/interchain.go b/internal/executor/contracts/interchain.go index 609b209a0..32adebda5 100755 --- a/internal/executor/contracts/interchain.go +++ b/internal/executor/contracts/interchain.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "github.com/meshplus/bitxhub-core/boltvm" service_mgr "github.com/meshplus/bitxhub-core/service-mgr" @@ -33,6 +34,13 @@ const ( type InterchainManager struct { boltvm.Stub + ServiceCache *sync.Map +} + +func NewInterchainManager() *InterchainManager { + return &InterchainManager{ + ServiceCache: &sync.Map{}, + } } type BxhValidators struct { @@ -54,6 +62,48 @@ func (cs *ChainService) getChainServiceId() string { return fmt.Sprintf("%s:%s", cs.ChainId, cs.ServiceId) } +func (x *InterchainManager) GetServiceCache(key string) *boltvm.Response { + servece, ok := x.getServiceCache(key) + if !ok { + return boltvm.Error(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), fmt.Errorf("interchain's serviceCache doesn't store %s", key))) + } + data, err := json.Marshal(&servece) + if err != nil { + return boltvm.Error(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), err.Error())) + } + return boltvm.Success(data) +} + +func (x *InterchainManager) getServiceCache(key string) (*service_mgr.Service, bool) { + service := &service_mgr.Service{} + val, ok := x.ServiceCache.Load(key) + if ok { + service = val.(*service_mgr.Service) + } + return service, ok +} + +func (x *InterchainManager) InitServiceCache() { + x.ServiceCache = &sync.Map{} +} + +//func (x *InterchainManager) SetServiceCache(key string, data []byte) *boltvm.Response { +// service := &service_mgr.Service{} +// err := json.Unmarshal(data, service) +// if err != nil { +// return boltvm.Error(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), err.Error())) +// } +// x.setServiceCache(key, service) +// return boltvm.Success(nil) +//} + +func (x *InterchainManager) SetServiceCache(key string, service *service_mgr.Service) { + if x.ServiceCache == nil { + x.ServiceCache = &sync.Map{} + } + x.ServiceCache.Store(key, service) +} + func (x *InterchainManager) Register(chainServiceID string) *boltvm.Response { bxhID, err := x.getBitXHubID() if err != nil { @@ -534,13 +584,16 @@ func (x *InterchainManager) getBitXHubID() (string, error) { func (x *InterchainManager) getServiceByID(id string) (*service_mgr.Service, error) { service := &service_mgr.Service{} - res := x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "GetServiceInfo", pb.String(id)) - if !res.Ok { - return nil, fmt.Errorf("can not get service %s info: %s", id, string(res.Result)) - } - - if err := json.Unmarshal(res.Result, service); err != nil { - return nil, fmt.Errorf("unmarshal service of ID %s: %w", id, err) + service, ok := x.getServiceCache(id) + if !ok { + res := x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "GetServiceInfo", pb.String(id)) + if !res.Ok { + return nil, fmt.Errorf("can not get service %s info: %s", id, string(res.Result)) + } + if err := json.Unmarshal(res.Result, service); err != nil { + return nil, fmt.Errorf("unmarshal service of ID %s: %w", id, err) + } + x.ServiceCache.LoadOrStore(id, service) } return service, nil @@ -565,8 +618,8 @@ func (x *InterchainManager) checkBitXHubAvailability(id string) error { } func (x *InterchainManager) checkServiceAvailability(chainServiceID string) error { - res := x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "IsAvailable", pb.String(chainServiceID)) - if !res.Ok || string(res.Result) == FALSE { + service, err := x.getServiceByID(chainServiceID) + if err != nil || !service.IsAvailable() { return fmt.Errorf("service %s is not available", chainServiceID) } diff --git a/internal/executor/contracts/interchain_test.go b/internal/executor/contracts/interchain_test.go index 56a69aae3..0b30273be 100644 --- a/internal/executor/contracts/interchain_test.go +++ b/internal/executor/contracts/interchain_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" "testing" "github.com/golang/mock/gomock" @@ -65,7 +66,7 @@ func TestInterchainManager_Register(t *testing.T) { o3 := mockStub.EXPECT().Get(service_mgr.ServiceKey(srcChainService.getFullServiceId())).Return(true, data1).Times(2) gomock.InOrder(o1, o2, o3) - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} mockStub.EXPECT().PostEvent(gomock.Any(), gomock.Any()).AnyTimes() @@ -115,7 +116,7 @@ func TestInterchainManager_GetInterchain(t *testing.T) { o2 := mockStub.EXPECT().Get(service_mgr.ServiceKey(srcChainService.getFullServiceId())).Return(true, data0) gomock.InOrder(o1, o2) - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} res := im.GetInterchain(srcChainService.getFullServiceId()) assert.False(t, res.Ok) @@ -293,7 +294,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) { mockStub.EXPECT().GetTxHash().Return(&types.Hash{}).AnyTimes() mockStub.EXPECT().PostEvent(gomock.Any(), gomock.Any()).AnyTimes() - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} ibtp := &pb.IBTP{} res := im.HandleIBTP(ibtp) @@ -475,7 +476,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) { func TestInterchainManager_DeleteInterchain(t *testing.T) { mockCtl := gomock.NewController(t) mockStub := mock_stub.NewMockStub(mockCtl) - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} mockStub.EXPECT().Delete(gomock.Any()) mockStub.EXPECT().PostEvent(gomock.Any(), gomock.Any()).AnyTimes() @@ -492,7 +493,7 @@ func TestInterchainManager_GetIBTPByID(t *testing.T) { from := types.NewAddress([]byte{0}).String() mockStub.EXPECT().Caller().Return(from).AnyTimes() - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} res := im.GetIBTPByID("a", true) assert.False(t, res.Ok) @@ -519,7 +520,7 @@ func TestInterchainManager_GetIBTPByID(t *testing.T) { func TestInterchainManager_HandleIBTPData(t *testing.T) { mockCtl := gomock.NewController(t) mockStub := mock_stub.NewMockStub(mockCtl) - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} srcChainService, dstChainService := mockChainService() ibtp := &pb.IBTP{ @@ -540,7 +541,7 @@ func TestInterchainManager_HandleIBTPData(t *testing.T) { func TestInterchainManager_GetAllServiceIDs(t *testing.T) { mockCtl := gomock.NewController(t) mockStub := mock_stub.NewMockStub(mockCtl) - im := &InterchainManager{mockStub} + im := &InterchainManager{mockStub, &sync.Map{}} mockStub.EXPECT().Query(gomock.Any()).Return(false, nil).Times(1) res := im.GetAllServiceIDs() @@ -625,7 +626,7 @@ func TestInterchainManager_GetAllServiceIDs(t *testing.T) { // mockStub.EXPECT().PostInterchainEvent(gomock.Any()).AnyTimes() // mockStub.EXPECT().GetTxHash().Return(&types.Hash{}).AnyTimes() // -// im := &InterchainManager{mockStub} +// im := &InterchainManager{mockStub, &sync.Map{}} // // ibtp := &pb.IBTP{ // From: appchainMethod + "-" + appchainMethod, diff --git a/internal/executor/contracts/service_manager.go b/internal/executor/contracts/service_manager.go index 73c1f6991..34a18425b 100644 --- a/internal/executor/contracts/service_manager.go +++ b/internal/executor/contracts/service_manager.go @@ -194,9 +194,15 @@ func (sm *ServiceManager) Manage(eventTyp, proposalResult, lastStatus, objId str } } - if err := sm.postAuditServiceEvent(objId); err != nil { + if err = sm.postAuditServiceEvent(objId); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + + // record updated service in interchain contract cache + if err = sm.postServiceEvent(objId); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } + return boltvm.Success(nil) } @@ -256,6 +262,11 @@ func (sm *ServiceManager) RegisterService(chainID, serviceID, name, typ, intro s return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post excutor serviceCache event error: %v", err))) + } + return getGovernanceRet(string(res.Result), []byte(chainServiceID)) } @@ -298,6 +309,12 @@ func (sm *ServiceManager) UpdateService(chainServiceID, name, intro, permits, de if err := sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } + return getGovernanceRet("", nil) } @@ -367,6 +384,11 @@ func (sm *ServiceManager) UpdateService(chainServiceID, name, intro, permits, de if err := sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } return getGovernanceRet(string(res.Result), nil) } @@ -423,6 +445,11 @@ func (sm *ServiceManager) basicGovernance(chainServiceID, reason string, permiss if err := sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + + // record updated service in interchain contract cache + if err := sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } return getGovernanceRet(string(res.Result), nil) } @@ -453,13 +480,17 @@ func (sm *ServiceManager) PauseChainService(chainID string) *boltvm.Response { // 3. pause services for _, chainServiceID := range idList { - if err := sm.pauseService(chainServiceID); err != nil { + if err = sm.pauseService(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("pause service %s err: %v", chainServiceID, err))) } - if err := sm.postAuditServiceEvent(chainServiceID); err != nil { + if err = sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } } return getGovernanceRet("", nil) @@ -519,6 +550,10 @@ func (sm *ServiceManager) UnPauseChainService(chainID string) *boltvm.Response { if err := sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } } return getGovernanceRet("", nil) @@ -585,6 +620,10 @@ func (sm *ServiceManager) ClearChainService(chainID string) *boltvm.Response { if err := sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } } return getGovernanceRet("", nil) @@ -647,6 +686,10 @@ func (sm *ServiceManager) EvaluateService(chainServiceID, desc string, score flo if err := sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + // record updated service in interchain contract cache + if err := sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } return getGovernanceRet("", nil) } @@ -716,9 +759,14 @@ func (sm *ServiceManager) RecordInvokeService(fullServiceID, fromFullServiceID s "result": result, }).Info("record invoke service") - if err := sm.postAuditServiceEvent(chainServiceID); err != nil { + if err = sm.postAuditServiceEvent(chainServiceID); err != nil { return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err))) } + + // record updated service in interchain contract cache + if err = sm.postServiceEvent(chainServiceID); err != nil { + return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err))) + } return boltvm.Success(nil) } @@ -952,3 +1000,16 @@ func (sm *ServiceManager) postAuditServiceEvent(chainServiceID string) error { return nil } + +// postServiceEvent record changed service in executor's interchain contract cache +func (sm *ServiceManager) postServiceEvent(chainServiceID string) error { + sm.ServiceManager.Persister = sm.Stub + service := &servicemgr.Service{} + ok := sm.GetObject(servicemgr.ServiceKey(chainServiceID), service) + if !ok { + return fmt.Errorf("not found service %s", chainServiceID) + } + + sm.PostEvent(pb.Event_SERVICE, service) + return nil +} diff --git a/internal/executor/contracts/service_manager_test.go b/internal/executor/contracts/service_manager_test.go index f01b9268b..d9b1786c4 100644 --- a/internal/executor/contracts/service_manager_test.go +++ b/internal/executor/contracts/service_manager_test.go @@ -26,6 +26,7 @@ func TestServiceManager_ManageRegister(t *testing.T) { chainServiceRegisteringID := fmt.Sprintf("%s:%s", services[4].ChainID, services[4].ServiceID) logger := log.NewWithModule("contracts") + //mockStub.EXPECT().PostServiceEvent(gomock.Any()).Return(nil).AnyTimes() mockStub.EXPECT().Logger().Return(logger).AnyTimes() mockStub.EXPECT().GetObject(service_mgr.ServiceKey(chainServiceID), gomock.Any()).Return(false).Times(1) mockStub.EXPECT().GetObject(service_mgr.ServiceKey(chainServiceID), gomock.Any()).SetArg(1, *services[0]).Return(true).AnyTimes() @@ -179,9 +180,10 @@ func TestServiceManager_RegisterService(t *testing.T) { checkAppchainErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1) checkInfoErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(4) submitErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1) + cacheReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1) - okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(false).AnyTimes() - gomock.InOrder(governancePreErrReq, checkAppchainErrReq, checkInfoErrReq, submitErrReq, okReq) + okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(false).Times(1) + gomock.InOrder(governancePreErrReq, checkAppchainErrReq, checkInfoErrReq, submitErrReq, okReq, cacheReq) mockStub.EXPECT().CrossInvoke(constant.AppchainMgrContractAddr.Address().String(), "IsAvailable", gomock.Any()).Return(boltvm.Success([]byte(FALSE))).Times(1) mockStub.EXPECT().CrossInvoke(constant.AppchainMgrContractAddr.Address().String(), "IsAvailable", gomock.Any()).Return(boltvm.Success([]byte(TRUE))).AnyTimes() @@ -247,7 +249,7 @@ func TestServiceManager_UpdateService(t *testing.T) { submitErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1) changeStatusErrReq1 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1) changeStatusErrReq2 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1) - okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(2) + okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(3) gomock.InOrder(governancePreErrReq, checkPermissionErrReq, checkInfoErrReq, updateErrReq, updateErrReq2, submitErrReq, changeStatusErrReq1, changeStatusErrReq2, okReq) mockStub.EXPECT().CurrentCaller().Return(noAdminAddr).Times(1) @@ -309,7 +311,7 @@ func TestServiceManager_LogoutService(t *testing.T) { submitErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1) changeStatusErrReq1 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1) changeStatusErrReq2 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1) - okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(2) + okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(3) gomock.InOrder(governancePreErrReq, checkPermissionErrReq, submitErrReq, changeStatusErrReq1, changeStatusErrReq2, okReq) mockStub.EXPECT().CurrentCaller().Return(noAdminAddr).Times(1) diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 412a5f569..3c903b072 100755 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "encoding/json" "fmt" "math/big" "sync" @@ -10,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/meshplus/bitxhub-core/agency" + service_mgr "github.com/meshplus/bitxhub-core/service-mgr" "github.com/meshplus/bitxhub-core/validator" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/constant" @@ -37,23 +39,25 @@ var _ Executor = (*BlockExecutor)(nil) // BlockExecutor executes block from order type BlockExecutor struct { - client *appchain.Client - ledger *ledger.Ledger - logger logrus.FieldLogger - blockC chan *BlockWrapper - preBlockC chan *pb.CommitEvent - persistC chan *ledger.BlockData - ibtpVerify proof.Verify - validationEngine validator.Engine - currentHeight uint64 - currentBlockHash *types.Hash - txsExecutor agency.TxsExecutor - blockFeed event.Feed - logsFeed event.Feed - nodeFeed event.Feed - auditFeed event.Feed - ctx context.Context - cancel context.CancelFunc + client *appchain.Client + ledger *ledger.Ledger + logger logrus.FieldLogger + blockC chan *BlockWrapper + preBlockC chan *pb.CommitEvent + persistC chan *ledger.BlockData + ibtpVerify proof.Verify + validationEngine validator.Engine + interchainManager *contracts.InterchainManager + currentHeight uint64 + currentBlockHash *types.Hash + txsExecutor agency.TxsExecutor + blockFeed event.Feed + blockFeedForRemote event.Feed + logsFeed event.Feed + nodeFeed event.Feed + auditFeed event.Feed + ctx context.Context + cancel context.CancelFunc evm *vm.EVM evmChainCfg *params.ChainConfig @@ -80,23 +84,24 @@ func New(chainLedger *ledger.Ledger, logger logrus.FieldLogger, client *appchain ctx, cancel := context.WithCancel(context.Background()) blockExecutor := &BlockExecutor{ - client: client, - ledger: chainLedger, - logger: logger, - ctx: ctx, - cancel: cancel, - blockC: make(chan *BlockWrapper, blockChanNumber), - preBlockC: make(chan *pb.CommitEvent, blockChanNumber), - persistC: make(chan *ledger.BlockData, persistChanNumber), - ibtpVerify: ibtpVerify, - validationEngine: ibtpVerify.ValidationEngine(), - currentHeight: chainLedger.GetChainMeta().Height, - currentBlockHash: chainLedger.GetChainMeta().BlockHash, - evmChainCfg: newEVMChainCfg(config), - config: *config, - bxhGasPrice: gasPrice, - gasLimit: config.GasLimit, - lock: &sync.Mutex{}, + client: client, + ledger: chainLedger, + logger: logger, + ctx: ctx, + cancel: cancel, + blockC: make(chan *BlockWrapper, blockChanNumber), + preBlockC: make(chan *pb.CommitEvent, blockChanNumber), + persistC: make(chan *ledger.BlockData, persistChanNumber), + ibtpVerify: ibtpVerify, + validationEngine: ibtpVerify.ValidationEngine(), + currentHeight: chainLedger.GetChainMeta().Height, + currentBlockHash: chainLedger.GetChainMeta().BlockHash, + evmChainCfg: newEVMChainCfg(config), + interchainManager: &contracts.InterchainManager{}, + config: *config, + bxhGasPrice: gasPrice, + gasLimit: config.GasLimit, + lock: &sync.Mutex{}, } for _, admin := range config.Genesis.Admins { @@ -110,6 +115,26 @@ func New(chainLedger *ledger.Ledger, logger logrus.FieldLogger, client *appchain return blockExecutor, nil } +func (exec *BlockExecutor) loadServiceCache() error { + ok, value := exec.ledger.Copy().QueryByPrefix(constant.ServiceMgrContractAddr.Address(), service_mgr.ServicePrefix) + if !ok { + exec.logger.Debug("loadServiceCache return nil") + return nil + } + + for _, data := range value { + service := &service_mgr.Service{} + if err := json.Unmarshal(data, service); err != nil { + exec.logger.Errorf("unmarshal service error:%v", err) + return fmt.Errorf("unmarshal service error:%v", err) + } + chainServiceID := fmt.Sprintf("%s:%s", service.ChainID, service.ServiceID) + exec.interchainManager.SetServiceCache(chainServiceID, service) + exec.logger.WithFields(logrus.Fields{"key": chainServiceID, "service": service}).Infof("succefful store service fron leader") + } + return nil +} + // Start starts executor func (exec *BlockExecutor) Start() error { go exec.listenExecuteEvent() @@ -118,6 +143,11 @@ func (exec *BlockExecutor) Start() error { go exec.persistData() + err := exec.loadServiceCache() + if err != nil { + return fmt.Errorf("executor load serviceCache from ledger failed, err:%s", err) + } + exec.logger.WithFields(logrus.Fields{ "height": exec.currentHeight, "hash": exec.currentBlockHash.String(), @@ -146,6 +176,11 @@ func (exec *BlockExecutor) SubscribeBlockEvent(ch chan<- events.ExecutedEvent) e return exec.blockFeed.Subscribe(ch) } +// SubscribeBlockEvent registers a subscription of ExecutedEvent. +func (exec *BlockExecutor) SubscribeBlockEventForRemote(ch chan<- events.ExecutedEvent) event.Subscription { + return exec.blockFeedForRemote.Subscribe(ch) +} + func (exec *BlockExecutor) SubscribeLogsEvent(ch chan<- []*pb.EvmLog) event.Subscription { return exec.logsFeed.Subscribe(ch) } diff --git a/internal/executor/executor_test.go b/internal/executor/executor_test.go index a9a9772ec..54bb0ea8e 100644 --- a/internal/executor/executor_test.go +++ b/internal/executor/executor_test.go @@ -176,6 +176,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) { } evs = append(evs, ev, ev2, ev3) + stateLedger.EXPECT().Copy().Return(stateLedger).AnyTimes() + stateLedger.EXPECT().QueryByPrefix(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() chainLedger.EXPECT().GetChainMeta().Return(chainMeta).AnyTimes() stateLedger.EXPECT().Events(gomock.Any()).Return(evs).AnyTimes() stateLedger.EXPECT().Commit(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -301,14 +303,14 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) { go listenBlock(&wg, done, ch) // send blocks to executor - commitEvent1 := mockCommitEvent(uint64(1), nil) + commitEvent1 := mockCommitEvent(uint64(2), nil) transactions := make([]pb.Transaction, 0) for _, tx := range txs { transactions = append(transactions, tx) } - commitEvent2 := mockCommitEvent(uint64(2), transactions) + commitEvent2 := mockCommitEvent(uint64(3), transactions) exec.ExecuteBlock(commitEvent1) exec.ExecuteBlock(commitEvent2) diff --git a/internal/executor/handle.go b/internal/executor/handle.go index 715b1049a..12a7bca2c 100755 --- a/internal/executor/handle.go +++ b/internal/executor/handle.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/meshplus/bitxhub-core/agency" + servicemgr "github.com/meshplus/bitxhub-core/service-mgr" "github.com/meshplus/bitxhub-kit/types" "github.com/meshplus/bitxhub-model/constant" "github.com/meshplus/bitxhub-model/pb" @@ -54,6 +55,13 @@ func (exec *BlockExecutor) processExecuteEvent(blockWrapper *BlockWrapper) *ledg current := time.Now() block := blockWrapper.block + // check executor handle the right block + if block.BlockHeader.Number != exec.currentHeight+1 { + exec.logger.WithFields(logrus.Fields{"block height": block.BlockHeader.Number, + "matchedHeight": exec.currentHeight + 1}).Warning("current block height is not matched") + return nil + } + for _, tx := range block.Transactions.Transactions { txHashList = append(txHashList, tx.GetHash()) } @@ -62,7 +70,6 @@ func (exec *BlockExecutor) processExecuteEvent(blockWrapper *BlockWrapper) *ledg exec.evm = newEvm(block.Height(), uint64(block.BlockHeader.Timestamp), exec.evmChainCfg, exec.ledger.StateLedger, exec.ledger.ChainLedger, exec.admins[0]) exec.ledger.PrepareBlock(block.BlockHash, block.Height()) receipts := exec.txsExecutor.ApplyTransactions(block.Transactions.Transactions, blockWrapper.invalidTx) - applyTxsDuration.Observe(float64(time.Since(current)) / float64(time.Second)) exec.logger.WithFields(logrus.Fields{ "time": time.Since(current), @@ -387,6 +394,15 @@ func (exec *BlockExecutor) applyTx(index int, tx pb.Transaction, invalidReason a for nodeID, _ := range auditRelatedObjInfo.RelatedNodeIDList { relatedNodeIDList[nodeID] = []byte{} } + case pb.Event_SERVICE: + serviceInfo := &servicemgr.Service{} + err := json.Unmarshal(ev.Data, &serviceInfo) + if err != nil { + panic(err) + } + chainServiceID := fmt.Sprintf("%s:%s", serviceInfo.ChainID, serviceInfo.ServiceID) + exec.logger.WithFields(logrus.Fields{"key": chainServiceID, "service": serviceInfo}).Debug("record service change in cache") + exec.interchainManager.SetServiceCache(chainServiceID, serviceInfo) } } if auditDataUpdate { @@ -422,6 +438,11 @@ func (exec *BlockExecutor) postBlockEvent(block *pb.Block, interchainMeta *pb.In InterchainMeta: interchainMeta, TxHashList: txHashList, }) + go exec.blockFeedForRemote.Send(events.ExecutedEvent{ + Block: block, + InterchainMeta: interchainMeta, + TxHashList: txHashList, + }) } func (exec *BlockExecutor) postLogsEvent(receipts []*pb.Receipt) { @@ -502,7 +523,7 @@ func (exec *BlockExecutor) applyBxhTransaction(i int, tx *pb.BxhTransaction, inv if tx.IsIBTP() { ctx := vm.NewContext(tx, uint64(i), nil, exec.currentHeight, exec.ledger, exec.logger) instance := boltvm.New(ctx, exec.validationEngine, exec.evm, exec.getContracts(opt)) - ret, err := instance.HandleIBTP(tx.GetIBTP()) + ret, err := instance.HandleIBTP(tx.GetIBTP(), exec.interchainManager) return ret, GasBVMTx, err } @@ -727,7 +748,7 @@ func (exec *BlockExecutor) setTimeoutRollback(height uint64) error { } for _, id := range list { - if isGlobalID(id) { + if exec.isGlobalID(id) { if err := exec.setGlobalTxStatus(id, pb.TransactionStatus_BEGIN_ROLLBACK); err != nil { return fmt.Errorf("set global tx status of id %s: %w", id, err) } @@ -978,19 +999,19 @@ func (exec *BlockExecutor) getTimeoutIBTPsMap(height uint64) (map[string][]strin timeoutIBTPsMap := make(map[string][]string) for _, value := range timeoutList { - if isGlobalID(value) { + if exec.isGlobalID(value) { txInfo, err := exec.getTxInfoByGlobalID(value) if err != nil { return nil, err } for id := range txInfo.ChildTxInfo { - if err := addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap, id, bxhID); err != nil { + if err := exec.addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap, id, bxhID); err != nil { return nil, err } } } else { - if err := addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap, value, bxhID); err != nil { + if err := exec.addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap, value, bxhID); err != nil { return nil, err } } @@ -999,9 +1020,9 @@ func (exec *BlockExecutor) getTimeoutIBTPsMap(height uint64) (map[string][]strin return timeoutIBTPsMap, nil } -func addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap map[string][]string, txId string, bitXHubID string) error { +func (exec *BlockExecutor) addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap map[string][]string, txId string, bitXHubID string) error { listArray := strings.Split(txId, "-") - bxhID, chainID, _, err := parseChainServiceID(listArray[0]) + bxhID, chainID, _, err := exec.parseChainServiceID(listArray[0]) if err != nil { return err } @@ -1019,7 +1040,7 @@ func addTxIdToTimeoutIBTPsMap(timeoutIBTPsMap map[string][]string, txId string, return nil } -func parseChainServiceID(id string) (string, string, string, error) { +func (exec *BlockExecutor) parseChainServiceID(id string) (string, string, string, error) { splits := strings.Split(id, ":") if len(splits) != 3 { @@ -1029,7 +1050,7 @@ func parseChainServiceID(id string) (string, string, string, error) { return splits[0], splits[1], splits[2], nil } -func isGlobalID(id string) bool { +func (exec *BlockExecutor) isGlobalID(id string) bool { return !strings.Contains(id, "-") } @@ -1055,7 +1076,7 @@ func (exec *BlockExecutor) filterValidTx(receipts []*pb.Receipt) (map[string]boo } func (exec *BlockExecutor) isDstChainFromBxh(to string, bxhId string) bool { - _, chainId, _, _ := parseChainServiceID(to) + _, chainId, _, _ := exec.parseChainServiceID(to) if chainId == bxhId { return true } diff --git a/internal/executor/types.go b/internal/executor/types.go index f22082be7..70745b8f7 100644 --- a/internal/executor/types.go +++ b/internal/executor/types.go @@ -23,6 +23,9 @@ type Executor interface { // SubscribeBlockEvent SubscribeBlockEvent(chan<- events.ExecutedEvent) event.Subscription + // SubscribeBlockEventForRemote + SubscribeBlockEventForRemote(chan<- events.ExecutedEvent) event.Subscription + // SubscribeLogEvent SubscribeLogsEvent(chan<- []*pb.EvmLog) event.Subscription diff --git a/pkg/vm/boltvm/boltvm.go b/pkg/vm/boltvm/boltvm.go index d896804ca..08486bd5a 100755 --- a/pkg/vm/boltvm/boltvm.go +++ b/pkg/vm/boltvm/boltvm.go @@ -43,14 +43,14 @@ func (bvm *BoltVM) Run(input []byte, _ uint64) (ret []byte, gasUsed uint64, err return bvm.InvokeBVM(bvm.ctx.Callee.String(), input) } -func (bvm *BoltVM) HandleIBTP(ibtp *pb.IBTP) (ret []byte, err error) { +func (bvm *BoltVM) HandleIBTP(ibtp *pb.IBTP, con *contracts.InterchainManager) (ret []byte, err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("%v", e) } }() - con := &contracts.InterchainManager{} + //con := &contracts.InterchainManager{} con.Stub = &BoltStubImpl{ bvm: bvm, ctx: bvm.ctx, diff --git a/pkg/vm/boltvm/boltvm_test.go b/pkg/vm/boltvm/boltvm_test.go index 406fb83de..141daf5be 100644 --- a/pkg/vm/boltvm/boltvm_test.go +++ b/pkg/vm/boltvm/boltvm_test.go @@ -116,7 +116,7 @@ func TestBoltVM_Run(t *testing.T) { ctxInterchain := vm.NewContext(txInterchain, 1, nil, 100, mockLedger, log.NewWithModule("vm")) boltVMInterchain := New(ctxInterchain, mockEngine, nil, cons) ibtp := mockIBTP(t, 1, pb.IBTP_INTERCHAIN) - _, err = boltVMInterchain.HandleIBTP(ibtp) + _, err = boltVMInterchain.HandleIBTP(ibtp, &contracts.InterchainManager{}) require.NotNil(t, err) }