Skip to content

Commit

Permalink
reimplement transaction management
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Dec 10, 2024
1 parent 7c07bdf commit ffc5fff
Show file tree
Hide file tree
Showing 15 changed files with 1,075 additions and 860 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates
RUN update-ca-certificates
ENV PATH="$PATH:/app"
COPY --from=build-env /app/* /app
CMD ["./spamoor"]
ENTRYPOINT ["./spamoor"]
107 changes: 49 additions & 58 deletions scenarios/blob-combined/blob_combined.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blobcombined

import (
"context"
"fmt"
"math/big"
"math/rand"
Expand Down Expand Up @@ -256,74 +257,65 @@ func (s *Scenario) sendBlobTx(txIdx uint64, replacementIdx uint64, txNonce uint6
return nil, client, err
}

err = client.SendTransaction(tx)
if err != nil {
return nil, client, err
rebroadcast := 0
if s.options.Rebroadcast > 0 {
rebroadcast = 10
}

s.pendingWGroup.Add(1)
go s.awaitTx(txIdx, tx, client, wallet, replacementIdx)
var awaitConfirmation bool = true
err = s.tester.GetTxPool().SendTransaction(context.Background(), wallet, tx, &txbuilder.SendTransactionOptions{
Client: client,
MaxRebroadcasts: rebroadcast,
RebroadcastInterval: time.Duration(s.options.Rebroadcast) * time.Second,
OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) {
defer func() {
awaitConfirmation = false
if replacementIdx == 0 {
if s.pendingChan != nil {
<-s.pendingChan
}
}
s.pendingWGroup.Done()
}()

return tx, client, nil
}
if err != nil {
s.logger.WithField("client", client.GetName()).Warnf("blob tx %6d.%v: await receipt failed: %v", txIdx+1, replacementIdx, err)
return
}
if receipt == nil {
return
}

func (s *Scenario) awaitTx(txIdx uint64, tx *types.Transaction, client *txbuilder.Client, wallet *txbuilder.Wallet, replacementIdx uint64) {
var awaitConfirmation bool = true
defer func() {
awaitConfirmation = false
if replacementIdx == 0 {
if s.pendingChan != nil {
<-s.pendingChan
effectiveGasPrice := receipt.EffectiveGasPrice
if effectiveGasPrice == nil {
effectiveGasPrice = big.NewInt(0)
}
}
s.pendingWGroup.Done()
}()
if s.options.Replace > 0 && replacementIdx < s.options.MaxReplacements && rand.Intn(100) < 70 {
go s.delayedReplace(txIdx, tx, &awaitConfirmation, replacementIdx)
} else if s.options.Rebroadcast > 0 {
go s.delayedResend(txIdx, tx, &awaitConfirmation, replacementIdx)
}
blobGasPrice := receipt.BlobGasPrice
if blobGasPrice == nil {
blobGasPrice = big.NewInt(0)
}
feeAmount := new(big.Int).Mul(effectiveGasPrice, big.NewInt(int64(receipt.GasUsed)))
totalAmount := new(big.Int).Add(tx.Value(), feeAmount)
wallet.SubBalance(totalAmount)

receipt, blockNum, err := client.AwaitTransaction(tx)
if err != nil {
s.logger.WithField("client", client.GetName()).Warnf("blob tx %6d.%v: await receipt failed: %v", txIdx+1, replacementIdx, err)
return
}
if receipt == nil {
return
}
gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000))
gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000))
gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000))

effectiveGasPrice := receipt.EffectiveGasPrice
if effectiveGasPrice == nil {
effectiveGasPrice = big.NewInt(0)
}
blobGasPrice := receipt.BlobGasPrice
if blobGasPrice == nil {
blobGasPrice = big.NewInt(0)
s.logger.WithField("client", client.GetName()).Infof("blob tx %6d.%v confirmed in block #%v! total fee: %v gwei (base: %v, blob: %v)", txIdx+1, replacementIdx, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee)
},
})
if err != nil {
return nil, client, err
}
feeAmount := new(big.Int).Mul(effectiveGasPrice, big.NewInt(int64(receipt.GasUsed)))
totalAmount := new(big.Int).Add(tx.Value(), feeAmount)
wallet.SubBalance(totalAmount)

gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000))
gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000))
gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000))

s.logger.WithField("client", client.GetName()).Infof("blob tx %6d.%v confirmed in block #%v! total fee: %v gwei (base: %v, blob: %v)", txIdx+1, replacementIdx, blockNum, gweiTotalFee, gweiBaseFee, gweiBlobFee)
}

func (s *Scenario) delayedResend(txIdx uint64, tx *types.Transaction, awaitConfirmation *bool, replacementIdx uint64) {
for {
time.Sleep(time.Duration(s.options.Rebroadcast) * time.Second)

if !*awaitConfirmation {
break
}
s.pendingWGroup.Add(1)

client := s.tester.GetClient(tester.SelectRandom, 0)
client.SendTransaction(tx)
s.logger.WithField("client", client.GetName()).Debugf("blob tx %6d.%v re-broadcasted.", txIdx+1, replacementIdx)
if s.options.Replace > 0 && replacementIdx < s.options.MaxReplacements && rand.Intn(100) < 70 {
go s.delayedReplace(txIdx, tx, &awaitConfirmation, replacementIdx)
}

return tx, client, nil
}

func (s *Scenario) delayedReplace(txIdx uint64, tx *types.Transaction, awaitConfirmation *bool, replacementIdx uint64) {
Expand All @@ -336,7 +328,6 @@ func (s *Scenario) delayedReplace(txIdx uint64, tx *types.Transaction, awaitConf
replaceTx, client, err := s.sendBlobTx(txIdx, replacementIdx+1, tx.Nonce())
if err != nil {
s.logger.WithField("client", client.GetName()).Warnf("blob tx %6d.%v replacement failed: %v", txIdx+1, replacementIdx+1, err)
s.delayedResend(txIdx, tx, awaitConfirmation, replacementIdx)
return
}
s.logger.WithField("client", client.GetName()).Infof("blob tx %6d.%v sent: %v (%v sidecars)", txIdx+1, replacementIdx+1, replaceTx.Hash().String(), len(tx.BlobTxSidecar().Blobs))
Expand Down
98 changes: 53 additions & 45 deletions scenarios/blob-conflicting/blob_conflicting.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blobconflicting

import (
"context"
"fmt"
"math/big"
"math/rand"
Expand Down Expand Up @@ -246,12 +247,38 @@ func (s *Scenario) sendBlobTx(txIdx uint64) (*types.Transaction, *txbuilder.Clie
return nil, nil, err
}

rebroadcast := 0
if s.options.Rebroadcast > 0 {
rebroadcast = 10
}

// send both tx at exactly the same time
wg := sync.WaitGroup{}
wg.Add(2)
var err1, err2 error
go func() {
err1 = client.SendTransaction(tx1)
err1 = s.tester.GetTxPool().SendTransaction(context.Background(), wallet, tx1, &txbuilder.SendTransactionOptions{
Client: client,
MaxRebroadcasts: rebroadcast,
RebroadcastInterval: time.Duration(s.options.Rebroadcast) * time.Second,
OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) {
defer func() {
if s.pendingChan != nil {
<-s.pendingChan
}
s.pendingWGroup.Done()
}()

if err != nil {
s.logger.WithField("client", client.GetName()).Warnf("error while awaiting tx receipt: %v", err)
return
}

if receipt != nil {
s.processTxReceipt(txIdx, tx, receipt, client, wallet, "blob")
}
},
})
if err1 != nil {
s.logger.WithField("client", client.GetName()).Warnf("error while sending blob tx %v: %v", txIdx, err1)
}
Expand All @@ -260,54 +287,49 @@ func (s *Scenario) sendBlobTx(txIdx uint64) (*types.Transaction, *txbuilder.Clie
go func() {
delay := time.Duration(rand.Int63n(500)) * time.Millisecond
time.Sleep(delay)
err2 = client2.SendTransaction(tx2)
err2 = s.tester.GetTxPool().SendTransaction(context.Background(), wallet, tx2, &txbuilder.SendTransactionOptions{
Client: client2,
MaxRebroadcasts: rebroadcast,
RebroadcastInterval: time.Duration(s.options.Rebroadcast) * time.Second,
OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) {
defer func() {
s.pendingWGroup.Done()
}()

if err != nil {
s.logger.WithField("client", client.GetName()).Warnf("error while awaiting tx receipt: %v", err)
return
}

if receipt != nil {
s.processTxReceipt(txIdx, tx, receipt, client, wallet, "dynfee")
}
},
})
if err2 != nil {
s.logger.WithField("client", client2.GetName()).Warnf("error while sending dynfee tx %v: %v", txIdx, err2)
}
wg.Done()
}()
wg.Wait()

replacementIdx := uint64(0)
errCount := uint64(0)
if err1 == nil {
s.pendingWGroup.Add(1)
go s.awaitTxs(txIdx, tx1, client, wallet, replacementIdx, "blob")
replacementIdx++
errCount++
}
if err2 == nil {
s.pendingWGroup.Add(1)
go s.awaitTxs(txIdx, tx2, client2, wallet, replacementIdx, "dynfee")
replacementIdx++
errCount++
}
if replacementIdx == 0 {
if errCount == 0 {
return nil, nil, err1
}

return tx1, client, nil
}

func (s *Scenario) awaitTxs(txIdx uint64, tx *types.Transaction, client *txbuilder.Client, wallet *txbuilder.Wallet, _ uint64, txLabel string) {
var awaitConfirmation bool = true
defer func() {
awaitConfirmation = false
if s.pendingChan != nil {
<-s.pendingChan
}
s.pendingWGroup.Done()
}()
if s.options.Rebroadcast > 0 {
go s.delayedResend(txIdx, tx, &awaitConfirmation)
}

receipt, blockNum, err := client.AwaitTransaction(tx)
if err != nil {
s.logger.WithField("client", client.GetName()).Warnf("error while awaiting tx receipt: %v", err)
return
}
if receipt == nil {
return
}

func (s *Scenario) processTxReceipt(txIdx uint64, tx *types.Transaction, receipt *types.Receipt, client *txbuilder.Client, wallet *txbuilder.Wallet, txLabel string) {
effectiveGasPrice := receipt.EffectiveGasPrice
if effectiveGasPrice == nil {
effectiveGasPrice = big.NewInt(0)
Expand All @@ -324,19 +346,5 @@ func (s *Scenario) awaitTxs(txIdx uint64, tx *types.Transaction, client *txbuild
gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000))
gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000))

s.logger.WithField("client", client.GetName()).Infof(" transaction %d/%v confirmed in block #%v. total fee: %v gwei (base: %v, blob: %v)", txIdx+1, txLabel, blockNum, gweiTotalFee, gweiBaseFee, gweiBlobFee)
}

func (s *Scenario) delayedResend(txIdx uint64, tx *types.Transaction, awaitConfirmation *bool) {
for {
time.Sleep(time.Duration(s.options.Rebroadcast) * time.Second)

if !*awaitConfirmation {
break
}

client := s.tester.GetClient(tester.SelectRandom, 0)
client.SendTransaction(tx)
s.logger.WithField("client", client.GetName()).Infof(" transaction %d re-broadcasted.", txIdx+1)
}
s.logger.WithField("client", client.GetName()).Infof(" transaction %d/%v confirmed in block #%v. total fee: %v gwei (base: %v, blob: %v)", txIdx+1, txLabel, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee)
}
Loading

0 comments on commit ffc5fff

Please sign in to comment.