Skip to content

Commit

Permalink
Merge pull request #941 from meshplus/fix/inter-bitxhub-transaction-u…
Browse files Browse the repository at this point in the history
…pdate

fix: support inter-bitxhub transaction
  • Loading branch information
jiuhuche120 authored Oct 31, 2022
2 parents 4050bf6 + 8bb8ea9 commit 6a59779
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/magiconair/properties v1.8.5
github.com/meshplus/bitxhub-core v1.3.1-0.20220927021127-645700e66121
github.com/meshplus/bitxhub-kit v1.2.1-0.20221010033511-7093f5492564
github.com/meshplus/bitxhub-model v1.2.1-0.20220815081129-609df840b044
github.com/meshplus/bitxhub-model v1.2.1-0.20221011093115-0a1f9264c58c
github.com/meshplus/consensus v0.0.0-20211228075008-5f469b198531
github.com/meshplus/eth-kit v0.0.0-20221020152328-caeec38b9a34
github.com/meshplus/go-libp2p-cert v0.0.0-20210125114242-7d9ed2eaaccd
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,9 @@ github.com/meshplus/bitxhub-kit v1.2.1-0.20221010033511-7093f5492564 h1:cipj0umW
github.com/meshplus/bitxhub-kit v1.2.1-0.20221010033511-7093f5492564/go.mod h1:S8qKlxfXRFS9HQgXyD4r+o1/0u6EJOQoMZyXtlEM8vM=
github.com/meshplus/bitxhub-model v1.0.0-rc3/go.mod h1:ZCctQIYTlE3vJ8Lhkrgs9bWwNA+Dw4JzojOSIzLVU6E=
github.com/meshplus/bitxhub-model v1.2.1-0.20220803022708-9ab7a71abdbf/go.mod h1:sPko8fD+G3MYOnH/44Ju4T4jD/tlEU3IJDIJ4hjrzxg=
github.com/meshplus/bitxhub-model v1.2.1-0.20220815081129-609df840b044 h1:uzj0wLXYzG1NvKlRdiVTHl+ikqbkL8+AzwmFK3b7OHA=
github.com/meshplus/bitxhub-model v1.2.1-0.20220815081129-609df840b044/go.mod h1:KIevoE50DZC7Tghvf7368FQ7hRBmK/4HWqiQbFrQlog=
github.com/meshplus/bitxhub-model v1.2.1-0.20221011093115-0a1f9264c58c h1:FdV/mEbNWLm6Cjq89jjHOygJhAupVwNNz0WH/kjqgNQ=
github.com/meshplus/bitxhub-model v1.2.1-0.20221011093115-0a1f9264c58c/go.mod h1:KIevoE50DZC7Tghvf7368FQ7hRBmK/4HWqiQbFrQlog=
github.com/meshplus/consensus v0.0.0-20211228075008-5f469b198531 h1:lU2XI7vWdapTLJfObY3A+e+a3rvHbnT0pHN9LEghwRk=
github.com/meshplus/consensus v0.0.0-20211228075008-5f469b198531/go.mod h1:2Sv1u1sOFta9dAO6OuHyxLaN1Z6/AdrIol99qi5ZZ0k=
github.com/meshplus/eth-kit v0.0.0-20221020152328-caeec38b9a34 h1:VFoV/CQXI93Z5upAgHupSfEXxAcXNGAX2G8FMi/sLTw=
Expand Down
85 changes: 59 additions & 26 deletions internal/executor/contracts/interchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,13 @@ func (x *InterchainManager) checkIBTP(ibtp *pb.IBTP) (*pb.Interchain, bool, *bol

interchain, _ := x.getInterchain(srcChainService.getFullServiceId())

if pb.IBTP_REQUEST == ibtp.Category() {
// if src chain service is from appchain registered in current bitxhub, check service index
isNotification, err := x.checkTxStatusForSourceBxh(ibtp)
if err != nil {
return nil, isBatch, nil, boltvm.BError(boltvm.InterchainInternalErrCode, fmt.Sprintf(string(boltvm.InterchainInternalErrMsg), err.Error()))
}

if pb.IBTP_REQUEST == ibtp.Category() && !isNotification {
// if src chain service is from appchain registered in current bitxhub && not notification for src chain service rollback, check service index
if srcChainService.IsLocal {
if err := x.checkSourceAvailability(srcChainService); err != nil {
return nil, isBatch, nil, err
Expand Down Expand Up @@ -286,7 +291,7 @@ func (x *InterchainManager) checkIBTP(ibtp *pb.IBTP) (*pb.Interchain, bool, *bol
}
// }
}
} else if ibtp.Category() == pb.IBTP_RESPONSE {
} else if ibtp.Category() == pb.IBTP_RESPONSE || isNotification {
// Situation which need to check the index
// - Bitxhub service:dstService == nil
// - The dst service needs to be invoked sequentially:dstService.Ordered
Expand Down Expand Up @@ -377,7 +382,9 @@ func (x *InterchainManager) ProcessIBTP(ibtp *pb.IBTP, interchain *pb.Interchain
srcChainService, _ := x.parseChainService(ibtp.From)
dstChainService, _ := x.parseChainService(ibtp.To)

if pb.IBTP_REQUEST == ibtp.Category() {
isNotification, _ := x.checkTxStatusForSourceBxh(ibtp)

if pb.IBTP_REQUEST == ibtp.Category() && !isNotification {
if interchain.InterchainCounter == nil {
x.Logger().Info("interchain counter is nil, make one")
interchain.InterchainCounter = make(map[string]uint64)
Expand Down Expand Up @@ -408,7 +415,7 @@ func (x *InterchainManager) ProcessIBTP(ibtp *pb.IBTP, interchain *pb.Interchain
x.SetObject(IndexReceiptMapKey(getIBTPID(ibtp.From, ibtp.To, ibtp.Index)), x.GetTxHash())

result := true
if ibtp.Type == pb.IBTP_RECEIPT_FAILURE {
if ibtp.Type == pb.IBTP_RECEIPT_FAILURE || isNotification {
result = false
}
x.CrossInvoke(constant.ServiceMgrContractAddr.Address().String(), "RecordInvokeService",
Expand Down Expand Up @@ -465,34 +472,36 @@ func (x *InterchainManager) beginTransaction(ibtp *pb.IBTP, isFailed bool) (*pb.
bxhID0, _, _ := ibtp.ParseFrom()
bxhID1, _, _ := ibtp.ParseTo()
timeoutHeight := uint64(ibtp.TimeoutHeight)
// TODO: disable transaction management for inter-bitxhub transaction temporarily
if bxhID0 != bxhID1 {
timeoutHeight = 0
if !isFailed {
sourceFailed, err := x.checkTxStatusForTargetBxh(ibtp)
if err != nil {
return nil, err
}
isFailed = sourceFailed
}
}

res := boltvm.Success(nil)
if ibtp.Group == nil {
res = x.CrossInvoke(constant.TransactionMgrContractAddr.Address().String(), "Begin", pb.String(txId), pb.Uint64(timeoutHeight), pb.Bool(isFailed))
if !res.Ok {
return nil, fmt.Errorf(string(res.Result))
}
} else {
count := uint64(len(ibtp.Group.Keys) + 1)
globalID, err := genGlobalTxID(ibtp)
if bxhID0 != bxhID1 {
currentBxhId, err := x.getBitXHubID()
if err != nil {
return nil, err
}
res = x.CrossInvoke(constant.TransactionMgrContractAddr.Address().String(), "BeginMultiTXs", pb.String(globalID), pb.String(ibtp.ID()), pb.Uint64(timeoutHeight), pb.Bool(isFailed), pb.Uint64(count))
if currentBxhId == bxhID0 {
timeoutHeight = 0
}
res = x.CrossInvoke(constant.TransactionMgrContractAddr.Address().String(), "BeginInterBitXHub", pb.String(txId), pb.Uint64(timeoutHeight), pb.Bytes(ibtp.Extra), pb.Bool(isFailed))
if !res.Ok {
return nil, fmt.Errorf(string(res.Result))
}
} else {
if ibtp.Group == nil {
res = x.CrossInvoke(constant.TransactionMgrContractAddr.Address().String(), "Begin", pb.String(txId), pb.Uint64(timeoutHeight), pb.Bool(isFailed))
if !res.Ok {
return nil, fmt.Errorf(string(res.Result))
}
} else {
count := uint64(len(ibtp.Group.Keys) + 1)
globalID, err := genGlobalTxID(ibtp)
if err != nil {
return nil, err
}
res = x.CrossInvoke(constant.TransactionMgrContractAddr.Address().String(), "BeginMultiTXs", pb.String(globalID), pb.String(ibtp.ID()), pb.Uint64(timeoutHeight), pb.Bool(isFailed), pb.Uint64(count))
if !res.Ok {
return nil, fmt.Errorf(string(res.Result))
}
}
}

change := pb.StatusChange{}
Expand Down Expand Up @@ -730,6 +739,30 @@ func (x *InterchainManager) GetAllServiceIDs() *boltvm.Response {
}
}

// checkTxStatusForSourceBxh check whether ibtp is the notification for source chain rollback
// - BEGIN_FAIL: dst chain service not existed, notify src chain rollback
// - BEGIN_ROLLBACK: dst chain don't receive receipt after timeout height, notify src & dst chain rollback
// - only happen in inter-bitxhub, ibtp type is always IBTP_INTERCHAIN
func (x *InterchainManager) checkTxStatusForSourceBxh(ibtp *pb.IBTP) (bool, error) {
sourceBxhId, _, _ := ibtp.ParseFrom()
targetBxhId, _, _ := ibtp.ParseTo()
if sourceBxhId == targetBxhId || ibtp.Category() == pb.IBTP_RESPONSE {
return false, nil
}
var txHash string
ok := x.GetObject(IndexMapKey(ibtp.ID()), &txHash)
if ok {
proof := &pb.BxhProof{}
if err := proof.Unmarshal(ibtp.Extra); err != nil {
return false, fmt.Errorf("unmarshal proof from dst bitxhub for IBTP %s failed: %s", ibtp.ID(), err.Error())
}
if proof.TxStatus == pb.TransactionStatus_BEGIN_FAILURE || proof.TxStatus == pb.TransactionStatus_BEGIN_ROLLBACK {
return true, nil
}
}
return false, nil
}

func (x *InterchainManager) checkTxStatusForTargetBxh(ibtp *pb.IBTP) (bool, error) {
targetBxhID, _, _ := ibtp.ParseTo()
curBxhID, err := x.getBitXHubID()
Expand Down
12 changes: 11 additions & 1 deletion internal/executor/contracts/interchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ func TestInterchainManager_HandleIBTP(t *testing.T) {
return boltvm.Success(data)
}).AnyTimes()

mockStub.EXPECT().CrossInvoke(gomock.Eq(constant.TransactionMgrContractAddr.Address().String()), gomock.Eq("BeginInterBitXHub"), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(addr string, method string, args ...*pb.Arg) *boltvm.Response {
change := pb.StatusChange{
PrevStatus: -1,
CurStatus: pb.TransactionStatus_BEGIN,
}
data, _ := change.Marshal()
return boltvm.Success(data)
}).AnyTimes()

ibtp.From = srcChainService.getFullServiceId()
ibtp.To = unavailableDstServiceFullServiceID
ibtp.Index = 1
Expand All @@ -360,7 +370,7 @@ func TestInterchainManager_HandleIBTP(t *testing.T) {

ibtp.Index = 1
ibtp.To = unavailableBitxhubServiceID
mockStub.EXPECT().PostInterchainEvent(map[string]*pb.EventWrapper{srcChainService.ChainId: event, DEFAULT_UNION_PIER_ID: event}).MaxTimes(1)
mockStub.EXPECT().PostInterchainEvent(map[string]*pb.EventWrapper{DEFAULT_UNION_PIER_ID: event}).MaxTimes(1)
res = im.HandleIBTP(ibtp)
assert.True(t, res.Ok)

Expand Down
74 changes: 74 additions & 0 deletions internal/executor/contracts/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
TransactionEvent_FAILURE TransactionEvent = "failure"
TransactionEvent_SUCCESS TransactionEvent = "success"
TransactionEvent_ROLLBACK TransactionEvent = "rollback"
TransactionEvent_DstROLLBACK TransactionEvent = "dst_rollback"
TransactionEvent_DstFAILURE TransactionEvent = "dst_failure"
TransactionState_INIT = "init"
)

Expand All @@ -53,6 +55,11 @@ var receipt2EventM = map[int32]TransactionEvent{
int32(pb.IBTP_RECEIPT_ROLLBACK): TransactionEvent_ROLLBACK,
}

var txStatus2EventM = map[int32]TransactionEvent{
int32(pb.TransactionStatus_BEGIN_FAILURE): TransactionEvent_DstFAILURE,
int32(pb.TransactionStatus_BEGIN_ROLLBACK): TransactionEvent_DstROLLBACK,
}

func (t *TransactionManager) BeginMultiTXs(globalID, ibtpID string, timeoutHeight uint64, isFailed bool, count uint64) *boltvm.Response {
if bxhErr := t.checkCurrentCaller(); bxhErr != nil {
return boltvm.Error(bxhErr.Code, string(bxhErr.Msg))
Expand Down Expand Up @@ -152,6 +159,69 @@ func (t *TransactionManager) Begin(txId string, timeoutHeight uint64, isFailed b
return boltvm.Success(data)
}

// BeginInterBitXHub transaction management for inter-bitxhub transaction
// - if curStatus == BEGIN && txStatus == BEGIN_FAIL, then change curStatus to FAIL and notify src chain
// - if curStatus == BEGIN && txStatus == BEGIN_ROLLBACK, then change curStatus to ROLLBACK and notify src chain
// - isFailed note that whether dstBitXHub is available
func (t *TransactionManager) BeginInterBitXHub(txId string, timeoutHeight uint64, proof []byte, isFailed bool) *boltvm.Response {
if bxhErr := t.checkCurrentCaller(); bxhErr != nil {
return boltvm.Error(bxhErr.Code, string(bxhErr.Msg))
}

change := pb.StatusChange{}
var record pb.TransactionRecord
ok := t.GetObject(TxInfoKey(txId), &record)
if ok {
bxhProof := &pb.BxhProof{}
if err := bxhProof.Unmarshal(proof); err != nil {
return boltvm.Error(boltvm.TransactionStateErrCode, fmt.Sprintf(string(boltvm.TransactionInternalErrMsg), fmt.Sprintf("unmarshal proof from dst BitXHub for ibtp %s failed: %s", txId, err.Error())))
}
txStatus := int32(bxhProof.TxStatus)
change.PrevStatus = record.Status
if err := t.setFSM(&record.Status, txStatus2EventM[txStatus]); err != nil {
return boltvm.Error(boltvm.TransactionStateErrCode, fmt.Sprintf(string(boltvm.TransactionStateErrMsg), fmt.Sprintf("transaction %s with state %v get unexpected receipt %v", txId, record.Status, txStatus)))
}
change.CurStatus = record.Status

recordData, err := record.Marshal()
if err != nil {
return boltvm.Error(boltvm.TransactionInternalErrCode, fmt.Sprintf(string(boltvm.TransactionInternalErrMsg), err.Error()))
}
t.Add(TxInfoKey(txId), recordData)
} else {
record := pb.TransactionRecord{
Status: pb.TransactionStatus_BEGIN,
Height: t.GetCurrentHeight() + timeoutHeight,
}

if timeoutHeight == 0 || timeoutHeight >= math.MaxUint64-t.GetCurrentHeight() {
record.Height = math.MaxUint64
}

if isFailed {
record.Status = pb.TransactionStatus_BEGIN_FAILURE
}

recordData, err := record.Marshal()
if err != nil {
return boltvm.Error(boltvm.TransactionInternalErrCode, fmt.Sprintf(string(boltvm.TransactionInternalErrMsg), err.Error()))
}
t.Add(TxInfoKey(txId), recordData)

change = pb.StatusChange{
PrevStatus: -1,
CurStatus: record.Status,
}
}

data, err := change.Marshal()
if err != nil {
return boltvm.Error(boltvm.TransactionInternalErrCode, fmt.Sprintf(string(boltvm.TransactionInternalErrMsg), err.Error()))
}

return boltvm.Success(data)
}

func (t *TransactionManager) Report(txId string, result int32) *boltvm.Response {
if bxhErr := t.checkCurrentCaller(); bxhErr != nil {
return boltvm.Error(bxhErr.Code, string(bxhErr.Msg))
Expand Down Expand Up @@ -263,6 +333,8 @@ func (t *TransactionManager) setFSM(state *pb.TransactionStatus, event Transacti
{Name: TransactionEvent_SUCCESS.String(), Src: []string{pb.TransactionStatus_BEGIN.String()}, Dst: pb.TransactionStatus_SUCCESS.String()},
{Name: TransactionEvent_FAILURE.String(), Src: []string{pb.TransactionStatus_BEGIN.String(), pb.TransactionStatus_BEGIN_FAILURE.String()}, Dst: pb.TransactionStatus_FAILURE.String()},
{Name: TransactionEvent_ROLLBACK.String(), Src: []string{pb.TransactionStatus_BEGIN_ROLLBACK.String()}, Dst: pb.TransactionStatus_ROLLBACK.String()},
{Name: TransactionEvent_DstFAILURE.String(), Src: []string{pb.TransactionStatus_BEGIN.String()}, Dst: pb.TransactionStatus_FAILURE.String()},
{Name: TransactionEvent_DstROLLBACK.String(), Src: []string{pb.TransactionStatus_BEGIN.String()}, Dst: pb.TransactionStatus_ROLLBACK.String()},
},
fsm.Callbacks{
TransactionEvent_BEGIN.String(): callbackFunc,
Expand All @@ -271,6 +343,8 @@ func (t *TransactionManager) setFSM(state *pb.TransactionStatus, event Transacti
TransactionEvent_SUCCESS.String(): callbackFunc,
TransactionEvent_FAILURE.String(): callbackFunc,
TransactionEvent_ROLLBACK.String(): callbackFunc,
TransactionEvent_DstFAILURE.String(): callbackFunc,
TransactionEvent_DstROLLBACK.String(): callbackFunc,
},
)

Expand Down

0 comments on commit 6a59779

Please sign in to comment.