diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py index a73a7329..260a3abc 100644 --- a/safe_cli/operators/hw_accounts/ledger_account.py +++ b/safe_cli/operators/hw_accounts/ledger_account.py @@ -1,7 +1,4 @@ -import warnings - from eth_account.datastructures import SignedTransaction -from eth_account.signers.base import BaseAccount from hexbytes import HexBytes from ledgerblue import Dongle from ledgereth import create_transaction, sign_typed_data_draft @@ -9,8 +6,8 @@ from web3.types import TxParams -class LedgerAccount(BaseAccount): - def __init__(self, path, address, dongle: Dongle): +class LedgerAccount: + def __init__(self, path, address): """ Initialize a new ledger account (no private key) @@ -19,64 +16,16 @@ def __init__(self, path, address, dongle: Dongle): """ self._address = address self.path = path - self.dongle = dongle @property def address(self): return self._address - @property - def privateKey(self): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.signers.local.LocalAccount.key`. - This attribute will be removed in v0.5 - """ - warnings.warn( - "privateKey is deprecated in favor of key", - category=DeprecationWarning, - ) - return None - - @property - def key(self): - """ - Get the private key. - """ - return None - - def encrypt(self, password, kdf=None, iterations=None): - """ - Generate a string with the encrypted key. - - This uses the same structure as in - :meth:`~eth_account.account.Account.encrypt`, but without a private key argument. - """ - # return self._publicapi.encrypt(self.key, password, kdf=kdf, iterations=iterations) - # TODO with ledger - pass - - def signHash(self, domain_hash: bytes, message_hash: bytes): - signed = sign_typed_data_draft(domain_hash, message_hash, dongle=self.dongle) + def signMessage(self, domain_hash: bytes, message_hash: bytes, dongle: Dongle): + signed = sign_typed_data_draft(domain_hash, message_hash, self.path, dongle) return (signed.v, signed.r, signed.s) - def sign_message(self, signable_message): - """ - Generate a string with the encrypted key. - - This uses the same structure as in - :meth:`~eth_account.account.Account.sign_message`, but without a private key argument. - """ - # TODO with ledger - pass - - def signTransaction(self, transaction_dict): - warnings.warn( - "signTransaction is deprecated in favor of sign_transaction", - category=DeprecationWarning, - ) - pass - - def sign_transaction(self, tx: TxParams) -> SignedTransaction: + def sign_transaction(self, tx: TxParams, dongle: Dongle) -> SignedTransaction: signed = create_transaction( destination=tx["to"], amount=tx["value"], @@ -87,7 +36,7 @@ def sign_transaction(self, tx: TxParams) -> SignedTransaction: nonce=tx["nonce"], chain_id=tx["chainId"], sender_path=self.path, - dongle=self.dongle, + dongle=dongle, ) raw_transaction = signed.raw_transaction() return SignedTransaction( @@ -97,6 +46,3 @@ def sign_transaction(self, tx: TxParams) -> SignedTransaction: signed.sender_s, signed.y_parity, ) - - def __bytes__(self): - return self.key diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py new file mode 100644 index 00000000..93cf8bc8 --- /dev/null +++ b/safe_cli/operators/hw_accounts/ledger_manager.py @@ -0,0 +1,112 @@ +from enum import Enum +from typing import List, Optional, Set, Tuple + +from eth_typing import ChecksumAddress +from ledgereth.accounts import get_account_by_path +from ledgereth.comms import init_dongle +from ledgereth.constants import DEFAULT_PATH_STRING +from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound +from prompt_toolkit import HTML, print_formatted_text + +from gnosis.safe.signatures import signature_to_bytes + +from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount + + +class LedgerStatus(Enum): + DISCONNECTED = 0 + LOCKED = 1 # Connected but locked + APP_CLOSED = 2 # Connected, unlocked but app is closed + READY = 3 # Ready to communicate + + +class LedgerManager: + + LEDGER_SEARCH_DEEP = 10 + + def __init__(self): + self.dongle = None + self.accounts: Set[LedgerAccount] = set() + self.connected: bool + + def _print_error_message(self, message: str): + print_formatted_text(HTML(f"{message}")) + + def check_status(self, print_message: bool = False) -> LedgerStatus: + try: + self.dongle = init_dongle(self.dongle) + # Get default derivation to check following status + get_account_by_path(DEFAULT_PATH_STRING) + except LedgerNotFound: + if print_message: + self._print_error_message("Ledger is disconnected") + return LedgerStatus.DISCONNECTED + except LedgerLocked: + if print_message: + self._print_error_message("Ledger is locked") + return LedgerStatus.LOCKED + except LedgerAppNotOpened: + if print_message: + self._print_error_message("Ledger is disconnected") + return LedgerStatus.APP_CLOSED + + return LedgerStatus.READY + + @property + def connected(self) -> bool: + if self.check_status() != LedgerStatus.DISCONNECTED: + return True + return False + + def get_accounts( + self, legacy_account: Optional[bool] = False + ) -> List[Tuple[ChecksumAddress, str]] | None: + """ + :param legacy_account: + :return: a list of tuples with address and derivation path + """ + accounts = [] + if self.check_status(True) != LedgerStatus.READY: + return None + for i in range(self.LEDGER_SEARCH_DEEP): + if legacy_account: + path_string = f"44'/60'/0'/{i}" + else: + path_string = f"44'/60'/{i}'/0/0" + try: + account = get_account_by_path(path_string, self.dongle) + except LedgerLocked as ledger_error: + print(f"Ledger exception: {ledger_error}") + accounts.append((account.address, account.path)) + return accounts + + def add_account(self, derivation_path: str) -> bool: + """ + Add account to ledger manager list + + :param derivation_path: + :return: + """ + if self.check_status(True) != LedgerStatus.READY: + return False + account = get_account_by_path(derivation_path, self.dongle) + self.accounts.add(LedgerAccount(account.path, account.address)) + return True + + def sign_eip712( + self, domain_hash: bytes, message_hash: bytes, account: LedgerAccount + ) -> bytes | None: + """ + Sign eip712 hashes + + :param domain_hash: + :param message_hash: + :param account: ledger account + :return: bytes signature + """ + if self.check_status(True) != LedgerStatus.READY: + return None + + v, r, s = account.signMessage(domain_hash, message_hash, self.dongle) + + return signature_to_bytes(v, r, s) diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py index 859f175a..0aa3f7f9 100644 --- a/safe_cli/operators/safe_operator.py +++ b/safe_cli/operators/safe_operator.py @@ -9,9 +9,6 @@ from eth_typing import ChecksumAddress from eth_utils import ValidationError from hexbytes import HexBytes -from ledgereth import get_account_by_path -from ledgereth.comms import init_dongle -from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound from packaging import version as semantic_version from prompt_toolkit import HTML, print_formatted_text from web3 import Web3 @@ -36,16 +33,14 @@ from gnosis.safe import InvalidInternalTx, Safe, SafeOperation, SafeTx from gnosis.safe.api import TransactionServiceApi from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx -from gnosis.safe.signatures import signature_to_bytes from safe_cli.ethereum_hd_wallet import get_account_from_words -from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount from safe_cli.safe_addresses import ( get_default_fallback_handler_address, get_safe_contract_address, get_safe_l2_contract_address, ) -from safe_cli.utils import get_erc_20_list, yes_or_no_question +from safe_cli.utils import choose_option_question, get_erc_20_list, yes_or_no_question @dataclasses.dataclass @@ -227,13 +222,19 @@ def __init__(self, address: ChecksumAddress, node_url: str): self.safe_contract_1_1_0 = get_safe_V1_1_1_contract( self.ethereum_client.w3, address=self.address ) - self.accounts: Set[LocalAccount | LedgerAccount] = set() - self.default_sender: Optional[LocalAccount] | LedgerAccount = None + self.accounts: Set[LocalAccount] = set() + self.default_sender: Optional[LocalAccount] = None self.executed_transactions: List[str] = [] self._safe_cli_info: Optional[SafeCliInfo] = None # Cache for SafeCliInfo self.require_all_signatures = ( True # Require all signatures to be present to send a tx ) + try: + from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager + + self.ledger_manager = LedgerManager() + except (ModuleNotFoundError, IOError): + self.ledger_manager = None @cached_property def last_default_fallback_handler_address(self) -> ChecksumAddress: @@ -332,40 +333,30 @@ def load_cli_owners(self, keys: List[str]): print_formatted_text(HTML(f"Cannot load key={key}")) def load_ledger_cli_owners(self): - try: - dongle = init_dongle() - # Search between 10 first accounts - for index in range(10): - path = f"44'/60'/{index}'/0/0" - account = get_account_by_path(path, dongle=dongle) - if account.address in self.safe_cli_info.owners: - sender = LedgerAccount(account.path, account.address, dongle) - self.accounts.add(sender) - balance = self.ethereum_client.get_balance(account.address) - print_formatted_text( - HTML( - f"Loaded account {account.address} " - f'with balance={Web3.from_wei(balance, "ether")} ether' - f"Ledger account cannot be defined as sender" - ) - ) - # TODO add ledger as sender - break - except LedgerNotFound: - print_formatted_text( - HTML("Unable to find Ledger device") - ) - return - except LedgerAppNotOpened: - print_formatted_text( - HTML("Ensure open ethereum app on your ledger") - ) - return - except LedgerLocked: + if not self.ledger_manager: + return None + + ledger_accounts = self.ledger_manager.get_accounts() + if ledger_accounts is None: + return None + + for option, ledger_account in enumerate(ledger_accounts): + address, _ = ledger_account + print_formatted_text(HTML(f"{option} - {address} ")) + + option = choose_option_question( + "Select the owner address", len(ledger_accounts) - 1 + ) + address, derivation_path = ledger_accounts[option] + if self.ledger_manager.add_account(derivation_path): + balance = self.ethereum_client.get_balance(address) print_formatted_text( - HTML("Ensure open ethereum app on your ledger") + HTML( + f"Loaded account {address} " + f'with balance={Web3.from_wei(balance, "ether")} ether' + f"Ledger account cannot be defined as sender" + ) ) - return def unload_cli_owners(self, owners: List[str]): accounts_to_remove: Set[Account] = set() @@ -385,10 +376,15 @@ def unload_cli_owners(self, owners: List[str]): print_formatted_text(HTML("No account was deleted")) def show_cli_owners(self): - if not self.accounts: + accounts = ( + self.accounts | self.ledger_manager.accounts + if self.ledger_manager + else self.accounts + ) + if not accounts: print_formatted_text(HTML("No accounts loaded")) else: - for account in self.accounts: + for account in accounts: print_formatted_text( HTML( f"Account {account.address} loaded" @@ -719,6 +715,28 @@ def print_info(self): ) ) + if not self.ledger_manager: + print_formatted_text( + HTML( + "Ledger=" + "ledgereth library is not installed" + ) + ) + elif self.ledger_manager.connected: + print_formatted_text( + HTML( + "Ledger=" + "Connected" + ) + ) + else: + print_formatted_text( + HTML( + "Ledger=" + "disconnected" + ) + ) + if not self.is_version_updated(): print_formatted_text( HTML( @@ -899,28 +917,6 @@ def batch_safe_txs( else: return safe_tx - def ledger_sign(self, safe_tx: SafeTx, account: LedgerAccount) -> bytes: - """ - {bytes32 r}{bytes32 s}{uint8 v} - :param private_key: - :return: Signature - """ - encode_hash = eip712_encode(safe_tx.eip712_structured_data) - v, r, s = account.signHash(encode_hash[1], encode_hash[2]) - signature = signature_to_bytes(v, r, s) - # Insert signature sorted - if account.address not in safe_tx.signers: - new_owners = safe_tx.signers + [account.address] - new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( - account.address - ) - safe_tx.signatures = ( - safe_tx.signatures[: 65 * new_owner_pos] - + signature - + safe_tx.signatures[65 * new_owner_pos :] - ) - return safe_tx - # TODO Set sender so we can save gas in that signature def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: permitted_signers = self.get_permitted_signers() @@ -934,16 +930,41 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx: threshold -= 1 if threshold == 0: break + # If still pending required signatures continue with ledger owners + selected_ledger_accounts = [] + if threshold > 0 and self.ledger_manager: + for ledger_account in self.ledger_manager.accounts: + if ledger_account.address in permitted_signers: + selected_ledger_accounts.append(ledger_account) + threshold -= 1 + if threshold == 0: + break if self.require_all_signatures and threshold > 0: raise NotEnoughSignatures(threshold) for selected_account in selected_accounts: - if selected_account.key: - safe_tx.sign(selected_account.key) - else: - safe_tx = self.ledger_sign(safe_tx, selected_account) + safe_tx.sign(selected_account.key) + # Sign with ledger if + for selected_ledger_account in selected_ledger_accounts: + encode_hash = eip712_encode(safe_tx.eip712_structured_data) + signature = self.ledger_manager.sign_eip712( + encode_hash[1], encode_hash[2], selected_ledger_account + ) + if signature: + # TODO refactor on safe_eth_py function insert_signature_sorted + # Insert signature sorted + if selected_ledger_account.address not in safe_tx.signers: + new_owners = safe_tx.signers + [selected_ledger_account.address] + new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index( + selected_ledger_account.address + ) + safe_tx.signatures = ( + safe_tx.signatures[: 65 * new_owner_pos] + + signature + + safe_tx.signatures[65 * new_owner_pos :] + ) return safe_tx @require_tx_service diff --git a/safe_cli/utils.py b/safe_cli/utils.py index b3e96f87..806ef3f9 100644 --- a/safe_cli/utils.py +++ b/safe_cli/utils.py @@ -1,3 +1,4 @@ +import argparse import os from gnosis.eth import EthereumClient @@ -41,3 +42,16 @@ def yes_or_no_question(question: str, default_no: bool = True) -> bool: return False else: return False if default_no else True + + +def choose_option_question( + question: str, number_options: int, default_option: int = 0 +) -> bool: + if "PYTEST_CURRENT_TEST" in os.environ: + return True # Ignore confirmations when running tests + choices = f" [0-{number_options}] default {default_option}:" + reply = str(input(question + choices)).lower().strip() or str(default_option) + option = int(reply) + if option not in range(0, number_options): + argparse.ArgumentTypeError(f"{option} is not between [0-{number_options}}}") + return option