Skip to content

Commit

Permalink
Merge pull request #31 from derekpierre/refinement
Browse files Browse the repository at this point in the history
Fault Tolerance Refinements
  • Loading branch information
derekpierre authored Apr 16, 2024
2 parents 2109e3e + 63e2859 commit 6ee59df
Show file tree
Hide file tree
Showing 17 changed files with 1,381 additions and 770 deletions.
8 changes: 4 additions & 4 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ verify_ssl = true
name = "pypi"

[packages]
web3 = "*"
web3 = "<=6.15.1"
twisted = "*"
eth-account = "<0.9"
eth-account = "<=0.10.0"
eth-typing = "<4,>=3.5.2"
eth-utils = "<3,>=2.3.1"
eth-abi = "<5,>=4.2.1"
eth-keys = "<0.5,>=0.4.0"
python-statemachine = "*"
rlp = "<4,>=3.0.0"
urllib3 = "<2,>=1.26.16"
urllib3 = "<=2.2.0"

[dev-packages]
# testing
eth-ape = "<7"
eth-ape = "*"
pytest = "*"
pytest-mock = "*"
pytest-twisted = "*"
Expand Down
865 changes: 468 additions & 397 deletions Pipfile.lock

Large diffs are not rendered by default.

23 changes: 8 additions & 15 deletions atxm/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum

from web3.types import PendingTx, TxReceipt
from web3.types import PendingTx


class Fault(Enum):
Expand All @@ -13,20 +13,21 @@ class Fault(Enum):
# Strategy has been running for too long
TIMEOUT = "timeout"

# Transaction reverted
REVERT = "revert"

# Something went wrong
ERROR = "error"

# ...
INSUFFICIENT_FUNDS = "insufficient_funds"


class InsufficientFunds(Exception):
"""raised when a transaction exceeds the spending cap"""


class RPCException(Exception):
def __init__(self, error_code: int, error_message: str):
self.error_code = error_code
self.error_message = error_message
super().__init__(f"RPC Error [{error_code}]: {error_message}")


class TransactionFaulted(Exception):
"""Raised when a transaction has been faulted."""

Expand All @@ -35,11 +36,3 @@ def __init__(self, tx: PendingTx, fault: Fault, message: str):
self.fault = fault
self.message = message
super().__init__(message)


class TransactionReverted(TransactionFaulted):
"""Raised when a transaction has been reverted."""

def __init__(self, tx: PendingTx, receipt: TxReceipt, message: str):
self.receipt = receipt
super().__init__(tx=tx, fault=Fault.REVERT, message=message)
143 changes: 72 additions & 71 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
from copy import copy, deepcopy
from typing import List, Optional
from typing import List, Optional, Union

from eth_account.signers.local import LocalAccount
from eth_utils import ValidationError
from statemachine import State, StateMachine
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.exceptions import Web3Exception
from web3.types import TxParams

from atxm.exceptions import (
Fault,
InsufficientFunds,
TransactionFaulted,
TransactionReverted,
)
from atxm.strategies import AsyncTxStrategy, TimeoutStrategy
from atxm.tracker import _TxTracker
from atxm.tx import (
AsyncTx,
FutureTx,
PendingTx,
TxHash,
Expand All @@ -29,6 +25,7 @@
_get_average_blocktime,
_get_confirmations,
_get_receipt,
_process_send_raw_transaction_exception,
_is_recoverable_send_tx_error,
_make_tx_params,
fire_hook,
Expand Down Expand Up @@ -92,8 +89,8 @@ class _Machine(StateMachine):
_BLOCK_INTERVAL = 20 # ~20 blocks
_BLOCK_SAMPLE_SIZE = 10_000 # blocks

# max requeues/retries
_MAX_REDO_ATTEMPTS = 3
# max retries (broadcast or active)
_MAX_RETRY_ATTEMPTS = 3

class LogObserver:
"""StateMachine observer for logging information about state/transitions."""
Expand Down Expand Up @@ -225,7 +222,7 @@ def _stop(self):

def _wake(self) -> None:
"""Runs the looping call immediately."""
log.info("[wake] running looping call now.")
self.log.info("[wake] running looping call now.")
if self._task.running:
# TODO instead of stopping/starting, can you set interval to 0
# and call reset to have looping call immediately?
Expand All @@ -235,7 +232,7 @@ def _wake(self) -> None:

def _sleep(self) -> None:
if self._task.running:
log.info("[sleep] sleeping")
self.log.info("[sleep] sleeping")
self._stop()

#
Expand All @@ -246,38 +243,31 @@ def __handle_active_transaction(self) -> None:
"""
Handles the currently tracked pending transaction.
The 4 possible outcomes for the pending ("active") transaction in one cycle:
The 3 possible outcomes for the pending ("active") transaction in one cycle:
1. paused
2. reverted (fault)
3. finalized
4. strategize: retry, do nothing and wait, or fault
2. finalized (successful or reverted)
3. still pending: strategize and retry, do nothing and wait, or fault
Returns True if the next queued transaction can be broadcasted right now.
"""

pending_tx = self._tx_tracker.pending

try:
receipt = _get_receipt(w3=self.w3, pending_tx=pending_tx)

# Outcome 2: the pending transaction was reverted (final error)
except TransactionReverted as e:
self._tx_tracker.fault(fault_error=e)
return
receipt = _get_receipt(w3=self.w3, pending_tx=pending_tx)

# Outcome 3: pending transaction is finalized (final success)
# Outcome 2: pending transaction is finalized (final success)
if receipt:
final_txhash = receipt["transactionHash"]
confirmations = _get_confirmations(w3=self.w3, tx=pending_tx)
self.log.info(
f"[finalized] Transaction #atx-{pending_tx.id} has been finalized "
f"with {confirmations} confirmation(s) txhash: {final_txhash.hex()}"
f"[finalized] Transaction #atx-{pending_tx.id} with txhash: {final_txhash.hex()} "
f"and status {receipt['status']} has been finalized with {confirmations} confirmation(s)"
)
self._tx_tracker.finalize_active_tx(receipt=receipt)
return

# Outcome 4: re-strategize the pending transaction
# Outcome 3: re-strategize the pending transaction
self.__strategize()

#
Expand All @@ -291,29 +281,18 @@ def __get_signer(self, address: str) -> LocalAccount:
raise ValueError(f"Signer {address} not found")
return signer

def __fire(self, tx: AsyncTx, msg: str) -> TxHash:
def __fire(self, tx: Union[FutureTx, PendingTx], msg: str) -> TxHash:
"""
Signs and broadcasts a transaction, handling RPC errors
and internal state changes.
On success, returns the `PendingTx` object.
On failure, returns None.
Morphs a `FutureTx` into a `PendingTx` and advances it
into the active transaction slot if broadcast is successful.
"""
signer: LocalAccount = self.__get_signer(tx.params["from"])
try:
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
except ValidationError as e:
# special case for insufficient funds
if "Sender does not have enough" in str(e):
# TODO raised exception should be handled in some way #13.
raise InsufficientFunds

raise e
except Exception as e:
raise _process_send_raw_transaction_exception(e)

self.log.info(
f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}"
Expand Down Expand Up @@ -341,7 +320,7 @@ def __strategize(self) -> None:

if not params_updated:
# mandatory default timeout strategy prevents this from being a forever wait
log.info(
self.log.info(
f"[wait] strategies made no suggested updates to "
f"pending tx #{_active_copy.id} - skipping retry round"
)
Expand All @@ -352,33 +331,39 @@ def __strategize(self) -> None:

try:
txhash = self.__fire(tx=_active_copy, msg=_names)
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
# TODO should the following also be done?
# self._tx_tracker.update_failed_retry_attempt(_active_copy)
raise
except (ValidationError, Web3Exception, ValueError) as e:
self._tx_tracker.update_failed_retry_attempt(_active_copy)
except InsufficientFunds as e:
# special case
self.log.error(
f"[insufficient funds] transaction #atx-{_active_copy.id}|{_active_copy.params['nonce']} "
f"failed because of insufficient funds - {e}"
)
# get hook from actual pending object (not a deep copy)
hook = self._tx_tracker.pending.on_insufficient_funds
fire_hook(hook, _active_copy, e)
return
except Exception as e:
self._tx_tracker.update_active_after_failed_strategy_update(_active_copy)
self.__handle_retry_failure(_active_copy, e)
return

_active_copy.txhash = txhash
self._tx_tracker.update_after_retry(_active_copy)
self._tx_tracker.update_active_after_successful_strategy_update(_active_copy)

pending_tx = self._tx_tracker.pending
self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted")
self.log.info(
f"[retry] transaction #{pending_tx.id} has been re-broadcasted with updated params"
)
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

def __handle_retry_failure(self, attempted_tx: PendingTx, e: Exception):
log.warn(
self.log.warn(
f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} "
f"failed with updated params - {str(e)}; retry again next round"
)

if attempted_tx.retries >= self._MAX_REDO_ATTEMPTS:
log.error(
if attempted_tx.retries >= self._MAX_RETRY_ATTEMPTS:
self.log.error(
f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} "
f"failed for { attempted_tx.retries} attempts; tx will no longer be retried"
)
Expand All @@ -395,7 +380,7 @@ def __broadcast(self):
Attempts to broadcast the next `FutureTx` in the queue.
If the broadcast is not successful, it is re-queued.
"""
future_tx = self._tx_tracker.pop() # popleft
future_tx = self._tx_tracker.head() # don't pop until successful
future_tx.params = _make_tx_params(future_tx.params)

# update nonce as necessary
Expand All @@ -411,12 +396,16 @@ def __broadcast(self):

try:
txhash = self.__fire(tx=future_tx, msg="broadcast")
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
raise
except (ValidationError, Web3Exception, ValueError) as e:
# either requeue OR fail and move on to subsequent txs
except InsufficientFunds as e:
# special case
self.log.error(
f"[insufficient funds] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed because of insufficient funds - {e}"
)
fire_hook(future_tx.on_insufficient_funds, future_tx, e)
return
except Exception as e:
# notify user of failure and have them decide
self.__handle_broadcast_failure(future_tx, e)
return

Expand All @@ -428,30 +417,31 @@ def __broadcast(self):
def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception):
is_broadcast_failure = False
if _is_recoverable_send_tx_error(e):
if future_tx.requeues >= self._MAX_REDO_ATTEMPTS:
if future_tx.retries >= self._MAX_RETRY_ATTEMPTS:
is_broadcast_failure = True
log.error(
self.log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed for {future_tx.requeues} attempts; tx will not be requeued"
f"failed after {future_tx.retries} retry attempts"
)
else:
log.warn(
self.log.warn(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed - {str(e)}; requeueing tx"
f"failed - {e}; tx will be retried"
)
self._tx_tracker.requeue(future_tx)
self._tx_tracker.increment_broadcast_retries(future_tx)
else:
# non-recoverable
is_broadcast_failure = True
log.error(
self.log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"has non-recoverable failure - {str(e)}; tx will not be requeued"
f"has non-recoverable failure - {e}"
)

if is_broadcast_failure:
hook = future_tx.on_broadcast_failure
if hook:
fire_hook(hook, future_tx, e)
# since reporting failure; reset retry count
self._tx_tracker.reset_broadcast_retries(future_tx)

fire_hook(future_tx.on_broadcast_failure, future_tx, e)

#
# Monitoring
Expand All @@ -460,8 +450,10 @@ def __monitor_finalized(self) -> None:
"""Follow-up on finalized transactions for a little while."""
if not self._tx_tracker.finalized:
return

current_block = self.w3.eth.block_number
for tx in self._tx_tracker.finalized.copy():
confirmations = _get_confirmations(w3=self.w3, tx=tx)
confirmations = current_block - tx.block_number
if confirmations >= self._TRACKING_CONFIRMATIONS:
if tx in self._tx_tracker.finalized:
self._tx_tracker.finalized.remove(tx)
Expand Down Expand Up @@ -520,3 +512,12 @@ def queue_transaction(
self._wake()

return tx

def remove_queued_transaction(self, tx: FutureTx):
"""
Removes a queued transaction; useful when an existing queued transaction has broadcast
failures, or a queued transaction is no longer necessary.
Returns true if transaction was present and removed, false otherwise.
"""
return self._tx_tracker.remove_queued_tx(tx)
Loading

0 comments on commit 6ee59df

Please sign in to comment.