Skip to content

Commit

Permalink
feat: modify checkServiceAbility
Browse files Browse the repository at this point in the history
feat: support remote subscribeBlock api
  • Loading branch information
Karenlrx committed Jul 7, 2022
1 parent 4c8866d commit 4e5b350
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 79 deletions.
3 changes: 0 additions & 3 deletions api/grpc/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func (cbs *ChainBrokerService) SendTransactions(ctx context.Context, txs *pb.Mul
}

hash, err := cbs.sendTransaction(tx)
if tx.IsIBTP() {
cbs.logger.Infof("get transaction:appchain is %s, nonce is %d, index is %d", tx.GetFrom().String(), tx.GetNonce(), tx.GetIBTP().Index)
}
if err != nil {
return nil, status.Newf(codes.Internal, "internal handling transaction fail %s", err.Error()).Err()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/coreapi/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (api *FeedAPI) SubscribeNewTxEvent(ch chan<- pb.Transactions) event.Subscri
}

func (api *FeedAPI) SubscribeNewBlockEvent(ch chan<- events.ExecutedEvent) event.Subscription {
return api.bxh.BlockExecutor.SubscribeBlockEvent(ch)
return api.bxh.BlockExecutor.SubscribeBlockEventForRemote(ch)
}

func (api *FeedAPI) SubscribeLogsEvent(ch chan<- []*pb.EvmLog) event.Subscription {
Expand Down
71 changes: 62 additions & 9 deletions internal/executor/contracts/interchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/meshplus/bitxhub-core/boltvm"
service_mgr "github.com/meshplus/bitxhub-core/service-mgr"
Expand Down Expand Up @@ -33,6 +34,13 @@ const (

type InterchainManager struct {
boltvm.Stub
ServiceCache *sync.Map
}

func NewInterchainManager() *InterchainManager {
return &InterchainManager{
ServiceCache: &sync.Map{},
}
}

type BxhValidators struct {
Expand All @@ -54,6 +62,48 @@ func (cs *ChainService) getChainServiceId() string {
return fmt.Sprintf("%s:%s", cs.ChainId, cs.ServiceId)
}

func (x *InterchainManager) GetServiceCache(key string) *boltvm.Response {
servece, ok := x.getServiceCache(key)
if !ok {
return boltvm.Error(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), fmt.Errorf("interchain's serviceCache doesn't store %s", key)))
}
data, err := json.Marshal(&servece)
if err != nil {
return boltvm.Error(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), err.Error()))
}
return boltvm.Success(data)
}

func (x *InterchainManager) getServiceCache(key string) (*service_mgr.Service, bool) {
service := &service_mgr.Service{}
val, ok := x.ServiceCache.Load(key)
if ok {
service = val.(*service_mgr.Service)
}
return service, ok
}

func (x *InterchainManager) InitServiceCache() {
x.ServiceCache = &sync.Map{}
}

//func (x *InterchainManager) SetServiceCache(key string, data []byte) *boltvm.Response {
// service := &service_mgr.Service{}
// err := json.Unmarshal(data, service)
// if err != nil {
// return boltvm.Error(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), err.Error()))
// }
// x.setServiceCache(key, service)
// return boltvm.Success(nil)
//}

func (x *InterchainManager) SetServiceCache(key string, service *service_mgr.Service) {
if x.ServiceCache == nil {
x.ServiceCache = &sync.Map{}
}
x.ServiceCache.Store(key, service)
}

func (x *InterchainManager) Register(chainServiceID string) *boltvm.Response {
bxhID, err := x.getBitXHubID()
if err != nil {
Expand Down Expand Up @@ -534,13 +584,16 @@ func (x *InterchainManager) getBitXHubID() (string, error) {
func (x *InterchainManager) getServiceByID(id string) (*service_mgr.Service, error) {
service := &service_mgr.Service{}

res := x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "GetServiceInfo", pb.String(id))
if !res.Ok {
return nil, fmt.Errorf("can not get service %s info: %s", id, string(res.Result))
}

if err := json.Unmarshal(res.Result, service); err != nil {
return nil, fmt.Errorf("unmarshal service of ID %s: %w", id, err)
service, ok := x.getServiceCache(id)
if !ok {
res := x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "GetServiceInfo", pb.String(id))
if !res.Ok {
return nil, fmt.Errorf("can not get service %s info: %s", id, string(res.Result))
}
if err := json.Unmarshal(res.Result, service); err != nil {
return nil, fmt.Errorf("unmarshal service of ID %s: %w", id, err)
}
x.ServiceCache.LoadOrStore(id, service)
}

return service, nil
Expand All @@ -565,8 +618,8 @@ func (x *InterchainManager) checkBitXHubAvailability(id string) error {
}

func (x *InterchainManager) checkServiceAvailability(chainServiceID string) error {
res := x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "IsAvailable", pb.String(chainServiceID))
if !res.Ok || string(res.Result) == FALSE {
service, err := x.getServiceByID(chainServiceID)
if err != nil || !service.IsAvailable() {
return fmt.Errorf("service %s is not available", chainServiceID)
}

Expand Down
17 changes: 9 additions & 8 deletions internal/executor/contracts/interchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestInterchainManager_Register(t *testing.T) {
o3 := mockStub.EXPECT().Get(service_mgr.ServiceKey(srcChainService.getFullServiceId())).Return(true, data1).Times(2)
gomock.InOrder(o1, o2, o3)

im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}

mockStub.EXPECT().PostEvent(gomock.Any(), gomock.Any()).AnyTimes()

Expand Down Expand Up @@ -115,7 +116,7 @@ func TestInterchainManager_GetInterchain(t *testing.T) {
o2 := mockStub.EXPECT().Get(service_mgr.ServiceKey(srcChainService.getFullServiceId())).Return(true, data0)
gomock.InOrder(o1, o2)

im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}

res := im.GetInterchain(srcChainService.getFullServiceId())
assert.False(t, res.Ok)
Expand Down Expand Up @@ -293,7 +294,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) {
mockStub.EXPECT().GetTxHash().Return(&types.Hash{}).AnyTimes()
mockStub.EXPECT().PostEvent(gomock.Any(), gomock.Any()).AnyTimes()

im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}
ibtp := &pb.IBTP{}

res := im.HandleIBTP(ibtp)
Expand Down Expand Up @@ -475,7 +476,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) {
func TestInterchainManager_DeleteInterchain(t *testing.T) {
mockCtl := gomock.NewController(t)
mockStub := mock_stub.NewMockStub(mockCtl)
im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}

mockStub.EXPECT().Delete(gomock.Any())
mockStub.EXPECT().PostEvent(gomock.Any(), gomock.Any()).AnyTimes()
Expand All @@ -492,7 +493,7 @@ func TestInterchainManager_GetIBTPByID(t *testing.T) {
from := types.NewAddress([]byte{0}).String()

mockStub.EXPECT().Caller().Return(from).AnyTimes()
im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}

res := im.GetIBTPByID("a", true)
assert.False(t, res.Ok)
Expand All @@ -519,7 +520,7 @@ func TestInterchainManager_GetIBTPByID(t *testing.T) {
func TestInterchainManager_HandleIBTPData(t *testing.T) {
mockCtl := gomock.NewController(t)
mockStub := mock_stub.NewMockStub(mockCtl)
im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}

srcChainService, dstChainService := mockChainService()
ibtp := &pb.IBTP{
Expand All @@ -540,7 +541,7 @@ func TestInterchainManager_HandleIBTPData(t *testing.T) {
func TestInterchainManager_GetAllServiceIDs(t *testing.T) {
mockCtl := gomock.NewController(t)
mockStub := mock_stub.NewMockStub(mockCtl)
im := &InterchainManager{mockStub}
im := &InterchainManager{mockStub, &sync.Map{}}

mockStub.EXPECT().Query(gomock.Any()).Return(false, nil).Times(1)
res := im.GetAllServiceIDs()
Expand Down Expand Up @@ -625,7 +626,7 @@ func TestInterchainManager_GetAllServiceIDs(t *testing.T) {
// mockStub.EXPECT().PostInterchainEvent(gomock.Any()).AnyTimes()
// mockStub.EXPECT().GetTxHash().Return(&types.Hash{}).AnyTimes()
//
// im := &InterchainManager{mockStub}
// im := &InterchainManager{mockStub, &sync.Map{}}
//
// ibtp := &pb.IBTP{
// From: appchainMethod + "-" + appchainMethod,
Expand Down
69 changes: 65 additions & 4 deletions internal/executor/contracts/service_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,15 @@ func (sm *ServiceManager) Manage(eventTyp, proposalResult, lastStatus, objId str
}
}

if err := sm.postAuditServiceEvent(objId); err != nil {
if err = sm.postAuditServiceEvent(objId); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}

// record updated service in interchain contract cache
if err = sm.postServiceEvent(objId); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}

return boltvm.Success(nil)
}

Expand Down Expand Up @@ -256,6 +262,11 @@ func (sm *ServiceManager) RegisterService(chainID, serviceID, name, typ, intro s
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}

// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post excutor serviceCache event error: %v", err)))
}

return getGovernanceRet(string(res.Result), []byte(chainServiceID))
}

Expand Down Expand Up @@ -298,6 +309,12 @@ func (sm *ServiceManager) UpdateService(chainServiceID, name, intro, permits, de
if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}

// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}

return getGovernanceRet("", nil)
}

Expand Down Expand Up @@ -367,6 +384,11 @@ func (sm *ServiceManager) UpdateService(chainServiceID, name, intro, permits, de
if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}

// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
return getGovernanceRet(string(res.Result), nil)
}

Expand Down Expand Up @@ -423,6 +445,11 @@ func (sm *ServiceManager) basicGovernance(chainServiceID, reason string, permiss
if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}

// record updated service in interchain contract cache
if err := sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
return getGovernanceRet(string(res.Result), nil)
}

Expand Down Expand Up @@ -453,13 +480,17 @@ func (sm *ServiceManager) PauseChainService(chainID string) *boltvm.Response {

// 3. pause services
for _, chainServiceID := range idList {
if err := sm.pauseService(chainServiceID); err != nil {
if err = sm.pauseService(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("pause service %s err: %v", chainServiceID, err)))
}

if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
if err = sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}
// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
}

return getGovernanceRet("", nil)
Expand Down Expand Up @@ -519,6 +550,10 @@ func (sm *ServiceManager) UnPauseChainService(chainID string) *boltvm.Response {
if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}
// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
}

return getGovernanceRet("", nil)
Expand Down Expand Up @@ -585,6 +620,10 @@ func (sm *ServiceManager) ClearChainService(chainID string) *boltvm.Response {
if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}
// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
}

return getGovernanceRet("", nil)
Expand Down Expand Up @@ -647,6 +686,10 @@ func (sm *ServiceManager) EvaluateService(chainServiceID, desc string, score flo
if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}
// record updated service in interchain contract cache
if err := sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
return getGovernanceRet("", nil)
}

Expand Down Expand Up @@ -716,9 +759,14 @@ func (sm *ServiceManager) RecordInvokeService(fullServiceID, fromFullServiceID s
"result": result,
}).Info("record invoke service")

if err := sm.postAuditServiceEvent(chainServiceID); err != nil {
if err = sm.postAuditServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("post audit service event error: %v", err)))
}

// record updated service in interchain contract cache
if err = sm.postServiceEvent(chainServiceID); err != nil {
return boltvm.Error(boltvm.ServiceInternalErrCode, fmt.Sprintf(string(boltvm.ServiceInternalErrMsg), fmt.Sprintf("invoke Interchain serviceCache error: %v", err)))
}
return boltvm.Success(nil)
}

Expand Down Expand Up @@ -952,3 +1000,16 @@ func (sm *ServiceManager) postAuditServiceEvent(chainServiceID string) error {

return nil
}

// postServiceEvent record changed service in executor's interchain contract cache
func (sm *ServiceManager) postServiceEvent(chainServiceID string) error {
sm.ServiceManager.Persister = sm.Stub
service := &servicemgr.Service{}
ok := sm.GetObject(servicemgr.ServiceKey(chainServiceID), service)
if !ok {
return fmt.Errorf("not found service %s", chainServiceID)
}

sm.PostEvent(pb.Event_SERVICE, service)
return nil
}
10 changes: 6 additions & 4 deletions internal/executor/contracts/service_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestServiceManager_ManageRegister(t *testing.T) {
chainServiceRegisteringID := fmt.Sprintf("%s:%s", services[4].ChainID, services[4].ServiceID)
logger := log.NewWithModule("contracts")

//mockStub.EXPECT().PostServiceEvent(gomock.Any()).Return(nil).AnyTimes()
mockStub.EXPECT().Logger().Return(logger).AnyTimes()
mockStub.EXPECT().GetObject(service_mgr.ServiceKey(chainServiceID), gomock.Any()).Return(false).Times(1)
mockStub.EXPECT().GetObject(service_mgr.ServiceKey(chainServiceID), gomock.Any()).SetArg(1, *services[0]).Return(true).AnyTimes()
Expand Down Expand Up @@ -179,9 +180,10 @@ func TestServiceManager_RegisterService(t *testing.T) {
checkAppchainErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1)
checkInfoErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(4)
submitErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1)
cacheReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1)

okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(false).AnyTimes()
gomock.InOrder(governancePreErrReq, checkAppchainErrReq, checkInfoErrReq, submitErrReq, okReq)
okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(false).Times(1)
gomock.InOrder(governancePreErrReq, checkAppchainErrReq, checkInfoErrReq, submitErrReq, okReq, cacheReq)

mockStub.EXPECT().CrossInvoke(constant.AppchainMgrContractAddr.Address().String(), "IsAvailable", gomock.Any()).Return(boltvm.Success([]byte(FALSE))).Times(1)
mockStub.EXPECT().CrossInvoke(constant.AppchainMgrContractAddr.Address().String(), "IsAvailable", gomock.Any()).Return(boltvm.Success([]byte(TRUE))).AnyTimes()
Expand Down Expand Up @@ -247,7 +249,7 @@ func TestServiceManager_UpdateService(t *testing.T) {
submitErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1)
changeStatusErrReq1 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1)
changeStatusErrReq2 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1)
okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(2)
okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(3)
gomock.InOrder(governancePreErrReq, checkPermissionErrReq, checkInfoErrReq, updateErrReq, updateErrReq2, submitErrReq, changeStatusErrReq1, changeStatusErrReq2, okReq)

mockStub.EXPECT().CurrentCaller().Return(noAdminAddr).Times(1)
Expand Down Expand Up @@ -309,7 +311,7 @@ func TestServiceManager_LogoutService(t *testing.T) {
submitErrReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1)
changeStatusErrReq1 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(1)
changeStatusErrReq2 := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[2]).Return(true).Times(1)
okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(2)
okReq := mockStub.EXPECT().GetObject(gomock.Any(), gomock.Any()).SetArg(1, *services[0]).Return(true).Times(3)
gomock.InOrder(governancePreErrReq, checkPermissionErrReq, submitErrReq, changeStatusErrReq1, changeStatusErrReq2, okReq)

mockStub.EXPECT().CurrentCaller().Return(noAdminAddr).Times(1)
Expand Down
Loading

0 comments on commit 4e5b350

Please sign in to comment.