From 723daba657c64645b3bae7a381aecc765e1190a7 Mon Sep 17 00:00:00 2001
From: moisses89 <7888669+moisses89@users.noreply.github.com>
Date: Mon, 23 Oct 2023 10:23:08 +0200
Subject: [PATCH] Add ledger manager
---
.../operators/hw_accounts/ledger_account.py | 66 +-------
.../operators/hw_accounts/ledger_manager.py | 112 +++++++++++++
safe_cli/operators/safe_operator.py | 157 ++++++++++--------
safe_cli/utils.py | 14 ++
4 files changed, 221 insertions(+), 128 deletions(-)
create mode 100644 safe_cli/operators/hw_accounts/ledger_manager.py
diff --git a/safe_cli/operators/hw_accounts/ledger_account.py b/safe_cli/operators/hw_accounts/ledger_account.py
index a73a7329..260a3abc 100644
--- a/safe_cli/operators/hw_accounts/ledger_account.py
+++ b/safe_cli/operators/hw_accounts/ledger_account.py
@@ -1,7 +1,4 @@
-import warnings
-
from eth_account.datastructures import SignedTransaction
-from eth_account.signers.base import BaseAccount
from hexbytes import HexBytes
from ledgerblue import Dongle
from ledgereth import create_transaction, sign_typed_data_draft
@@ -9,8 +6,8 @@
from web3.types import TxParams
-class LedgerAccount(BaseAccount):
- def __init__(self, path, address, dongle: Dongle):
+class LedgerAccount:
+ def __init__(self, path, address):
"""
Initialize a new ledger account (no private key)
@@ -19,64 +16,16 @@ def __init__(self, path, address, dongle: Dongle):
"""
self._address = address
self.path = path
- self.dongle = dongle
@property
def address(self):
return self._address
- @property
- def privateKey(self):
- """
- .. CAUTION:: Deprecated for :meth:`~eth_account.signers.local.LocalAccount.key`.
- This attribute will be removed in v0.5
- """
- warnings.warn(
- "privateKey is deprecated in favor of key",
- category=DeprecationWarning,
- )
- return None
-
- @property
- def key(self):
- """
- Get the private key.
- """
- return None
-
- def encrypt(self, password, kdf=None, iterations=None):
- """
- Generate a string with the encrypted key.
-
- This uses the same structure as in
- :meth:`~eth_account.account.Account.encrypt`, but without a private key argument.
- """
- # return self._publicapi.encrypt(self.key, password, kdf=kdf, iterations=iterations)
- # TODO with ledger
- pass
-
- def signHash(self, domain_hash: bytes, message_hash: bytes):
- signed = sign_typed_data_draft(domain_hash, message_hash, dongle=self.dongle)
+ def signMessage(self, domain_hash: bytes, message_hash: bytes, dongle: Dongle):
+ signed = sign_typed_data_draft(domain_hash, message_hash, self.path, dongle)
return (signed.v, signed.r, signed.s)
- def sign_message(self, signable_message):
- """
- Generate a string with the encrypted key.
-
- This uses the same structure as in
- :meth:`~eth_account.account.Account.sign_message`, but without a private key argument.
- """
- # TODO with ledger
- pass
-
- def signTransaction(self, transaction_dict):
- warnings.warn(
- "signTransaction is deprecated in favor of sign_transaction",
- category=DeprecationWarning,
- )
- pass
-
- def sign_transaction(self, tx: TxParams) -> SignedTransaction:
+ def sign_transaction(self, tx: TxParams, dongle: Dongle) -> SignedTransaction:
signed = create_transaction(
destination=tx["to"],
amount=tx["value"],
@@ -87,7 +36,7 @@ def sign_transaction(self, tx: TxParams) -> SignedTransaction:
nonce=tx["nonce"],
chain_id=tx["chainId"],
sender_path=self.path,
- dongle=self.dongle,
+ dongle=dongle,
)
raw_transaction = signed.raw_transaction()
return SignedTransaction(
@@ -97,6 +46,3 @@ def sign_transaction(self, tx: TxParams) -> SignedTransaction:
signed.sender_s,
signed.y_parity,
)
-
- def __bytes__(self):
- return self.key
diff --git a/safe_cli/operators/hw_accounts/ledger_manager.py b/safe_cli/operators/hw_accounts/ledger_manager.py
new file mode 100644
index 00000000..93cf8bc8
--- /dev/null
+++ b/safe_cli/operators/hw_accounts/ledger_manager.py
@@ -0,0 +1,112 @@
+from enum import Enum
+from typing import List, Optional, Set, Tuple
+
+from eth_typing import ChecksumAddress
+from ledgereth.accounts import get_account_by_path
+from ledgereth.comms import init_dongle
+from ledgereth.constants import DEFAULT_PATH_STRING
+from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound
+from prompt_toolkit import HTML, print_formatted_text
+
+from gnosis.safe.signatures import signature_to_bytes
+
+from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount
+
+
+class LedgerStatus(Enum):
+ DISCONNECTED = 0
+ LOCKED = 1 # Connected but locked
+ APP_CLOSED = 2 # Connected, unlocked but app is closed
+ READY = 3 # Ready to communicate
+
+
+class LedgerManager:
+
+ LEDGER_SEARCH_DEEP = 10
+
+ def __init__(self):
+ self.dongle = None
+ self.accounts: Set[LedgerAccount] = set()
+ self.connected: bool
+
+ def _print_error_message(self, message: str):
+ print_formatted_text(HTML(f"{message}"))
+
+ def check_status(self, print_message: bool = False) -> LedgerStatus:
+ try:
+ self.dongle = init_dongle(self.dongle)
+ # Get default derivation to check following status
+ get_account_by_path(DEFAULT_PATH_STRING)
+ except LedgerNotFound:
+ if print_message:
+ self._print_error_message("Ledger is disconnected")
+ return LedgerStatus.DISCONNECTED
+ except LedgerLocked:
+ if print_message:
+ self._print_error_message("Ledger is locked")
+ return LedgerStatus.LOCKED
+ except LedgerAppNotOpened:
+ if print_message:
+ self._print_error_message("Ledger is disconnected")
+ return LedgerStatus.APP_CLOSED
+
+ return LedgerStatus.READY
+
+ @property
+ def connected(self) -> bool:
+ if self.check_status() != LedgerStatus.DISCONNECTED:
+ return True
+ return False
+
+ def get_accounts(
+ self, legacy_account: Optional[bool] = False
+ ) -> List[Tuple[ChecksumAddress, str]] | None:
+ """
+ :param legacy_account:
+ :return: a list of tuples with address and derivation path
+ """
+ accounts = []
+ if self.check_status(True) != LedgerStatus.READY:
+ return None
+ for i in range(self.LEDGER_SEARCH_DEEP):
+ if legacy_account:
+ path_string = f"44'/60'/0'/{i}"
+ else:
+ path_string = f"44'/60'/{i}'/0/0"
+ try:
+ account = get_account_by_path(path_string, self.dongle)
+ except LedgerLocked as ledger_error:
+ print(f"Ledger exception: {ledger_error}")
+ accounts.append((account.address, account.path))
+ return accounts
+
+ def add_account(self, derivation_path: str) -> bool:
+ """
+ Add account to ledger manager list
+
+ :param derivation_path:
+ :return:
+ """
+ if self.check_status(True) != LedgerStatus.READY:
+ return False
+ account = get_account_by_path(derivation_path, self.dongle)
+ self.accounts.add(LedgerAccount(account.path, account.address))
+ return True
+
+ def sign_eip712(
+ self, domain_hash: bytes, message_hash: bytes, account: LedgerAccount
+ ) -> bytes | None:
+ """
+ Sign eip712 hashes
+
+ :param domain_hash:
+ :param message_hash:
+ :param account: ledger account
+ :return: bytes signature
+ """
+ if self.check_status(True) != LedgerStatus.READY:
+ return None
+
+ v, r, s = account.signMessage(domain_hash, message_hash, self.dongle)
+
+ return signature_to_bytes(v, r, s)
diff --git a/safe_cli/operators/safe_operator.py b/safe_cli/operators/safe_operator.py
index 859f175a..0aa3f7f9 100644
--- a/safe_cli/operators/safe_operator.py
+++ b/safe_cli/operators/safe_operator.py
@@ -9,9 +9,6 @@
from eth_typing import ChecksumAddress
from eth_utils import ValidationError
from hexbytes import HexBytes
-from ledgereth import get_account_by_path
-from ledgereth.comms import init_dongle
-from ledgereth.exceptions import LedgerAppNotOpened, LedgerLocked, LedgerNotFound
from packaging import version as semantic_version
from prompt_toolkit import HTML, print_formatted_text
from web3 import Web3
@@ -36,16 +33,14 @@
from gnosis.safe import InvalidInternalTx, Safe, SafeOperation, SafeTx
from gnosis.safe.api import TransactionServiceApi
from gnosis.safe.multi_send import MultiSend, MultiSendOperation, MultiSendTx
-from gnosis.safe.signatures import signature_to_bytes
from safe_cli.ethereum_hd_wallet import get_account_from_words
-from safe_cli.operators.hw_accounts.ledger_account import LedgerAccount
from safe_cli.safe_addresses import (
get_default_fallback_handler_address,
get_safe_contract_address,
get_safe_l2_contract_address,
)
-from safe_cli.utils import get_erc_20_list, yes_or_no_question
+from safe_cli.utils import choose_option_question, get_erc_20_list, yes_or_no_question
@dataclasses.dataclass
@@ -227,13 +222,19 @@ def __init__(self, address: ChecksumAddress, node_url: str):
self.safe_contract_1_1_0 = get_safe_V1_1_1_contract(
self.ethereum_client.w3, address=self.address
)
- self.accounts: Set[LocalAccount | LedgerAccount] = set()
- self.default_sender: Optional[LocalAccount] | LedgerAccount = None
+ self.accounts: Set[LocalAccount] = set()
+ self.default_sender: Optional[LocalAccount] = None
self.executed_transactions: List[str] = []
self._safe_cli_info: Optional[SafeCliInfo] = None # Cache for SafeCliInfo
self.require_all_signatures = (
True # Require all signatures to be present to send a tx
)
+ try:
+ from safe_cli.operators.hw_accounts.ledger_manager import LedgerManager
+
+ self.ledger_manager = LedgerManager()
+ except (ModuleNotFoundError, IOError):
+ self.ledger_manager = None
@cached_property
def last_default_fallback_handler_address(self) -> ChecksumAddress:
@@ -332,40 +333,30 @@ def load_cli_owners(self, keys: List[str]):
print_formatted_text(HTML(f"Cannot load key={key}"))
def load_ledger_cli_owners(self):
- try:
- dongle = init_dongle()
- # Search between 10 first accounts
- for index in range(10):
- path = f"44'/60'/{index}'/0/0"
- account = get_account_by_path(path, dongle=dongle)
- if account.address in self.safe_cli_info.owners:
- sender = LedgerAccount(account.path, account.address, dongle)
- self.accounts.add(sender)
- balance = self.ethereum_client.get_balance(account.address)
- print_formatted_text(
- HTML(
- f"Loaded account {account.address} "
- f'with balance={Web3.from_wei(balance, "ether")} ether'
- f"Ledger account cannot be defined as sender"
- )
- )
- # TODO add ledger as sender
- break
- except LedgerNotFound:
- print_formatted_text(
- HTML("Unable to find Ledger device")
- )
- return
- except LedgerAppNotOpened:
- print_formatted_text(
- HTML("Ensure open ethereum app on your ledger")
- )
- return
- except LedgerLocked:
+ if not self.ledger_manager:
+ return None
+
+ ledger_accounts = self.ledger_manager.get_accounts()
+ if ledger_accounts is None:
+ return None
+
+ for option, ledger_account in enumerate(ledger_accounts):
+ address, _ = ledger_account
+ print_formatted_text(HTML(f"{option} - {address} "))
+
+ option = choose_option_question(
+ "Select the owner address", len(ledger_accounts) - 1
+ )
+ address, derivation_path = ledger_accounts[option]
+ if self.ledger_manager.add_account(derivation_path):
+ balance = self.ethereum_client.get_balance(address)
print_formatted_text(
- HTML("Ensure open ethereum app on your ledger")
+ HTML(
+ f"Loaded account {address} "
+ f'with balance={Web3.from_wei(balance, "ether")} ether'
+ f"Ledger account cannot be defined as sender"
+ )
)
- return
def unload_cli_owners(self, owners: List[str]):
accounts_to_remove: Set[Account] = set()
@@ -385,10 +376,15 @@ def unload_cli_owners(self, owners: List[str]):
print_formatted_text(HTML("No account was deleted"))
def show_cli_owners(self):
- if not self.accounts:
+ accounts = (
+ self.accounts | self.ledger_manager.accounts
+ if self.ledger_manager
+ else self.accounts
+ )
+ if not accounts:
print_formatted_text(HTML("No accounts loaded"))
else:
- for account in self.accounts:
+ for account in accounts:
print_formatted_text(
HTML(
f"Account {account.address} loaded"
@@ -719,6 +715,28 @@ def print_info(self):
)
)
+ if not self.ledger_manager:
+ print_formatted_text(
+ HTML(
+ "Ledger="
+ "ledgereth library is not installed"
+ )
+ )
+ elif self.ledger_manager.connected:
+ print_formatted_text(
+ HTML(
+ "Ledger="
+ "Connected"
+ )
+ )
+ else:
+ print_formatted_text(
+ HTML(
+ "Ledger="
+ "disconnected"
+ )
+ )
+
if not self.is_version_updated():
print_formatted_text(
HTML(
@@ -899,28 +917,6 @@ def batch_safe_txs(
else:
return safe_tx
- def ledger_sign(self, safe_tx: SafeTx, account: LedgerAccount) -> bytes:
- """
- {bytes32 r}{bytes32 s}{uint8 v}
- :param private_key:
- :return: Signature
- """
- encode_hash = eip712_encode(safe_tx.eip712_structured_data)
- v, r, s = account.signHash(encode_hash[1], encode_hash[2])
- signature = signature_to_bytes(v, r, s)
- # Insert signature sorted
- if account.address not in safe_tx.signers:
- new_owners = safe_tx.signers + [account.address]
- new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index(
- account.address
- )
- safe_tx.signatures = (
- safe_tx.signatures[: 65 * new_owner_pos]
- + signature
- + safe_tx.signatures[65 * new_owner_pos :]
- )
- return safe_tx
-
# TODO Set sender so we can save gas in that signature
def sign_transaction(self, safe_tx: SafeTx) -> SafeTx:
permitted_signers = self.get_permitted_signers()
@@ -934,16 +930,41 @@ def sign_transaction(self, safe_tx: SafeTx) -> SafeTx:
threshold -= 1
if threshold == 0:
break
+ # If still pending required signatures continue with ledger owners
+ selected_ledger_accounts = []
+ if threshold > 0 and self.ledger_manager:
+ for ledger_account in self.ledger_manager.accounts:
+ if ledger_account.address in permitted_signers:
+ selected_ledger_accounts.append(ledger_account)
+ threshold -= 1
+ if threshold == 0:
+ break
if self.require_all_signatures and threshold > 0:
raise NotEnoughSignatures(threshold)
for selected_account in selected_accounts:
- if selected_account.key:
- safe_tx.sign(selected_account.key)
- else:
- safe_tx = self.ledger_sign(safe_tx, selected_account)
+ safe_tx.sign(selected_account.key)
+ # Sign with ledger if
+ for selected_ledger_account in selected_ledger_accounts:
+ encode_hash = eip712_encode(safe_tx.eip712_structured_data)
+ signature = self.ledger_manager.sign_eip712(
+ encode_hash[1], encode_hash[2], selected_ledger_account
+ )
+ if signature:
+ # TODO refactor on safe_eth_py function insert_signature_sorted
+ # Insert signature sorted
+ if selected_ledger_account.address not in safe_tx.signers:
+ new_owners = safe_tx.signers + [selected_ledger_account.address]
+ new_owner_pos = sorted(new_owners, key=lambda x: int(x, 16)).index(
+ selected_ledger_account.address
+ )
+ safe_tx.signatures = (
+ safe_tx.signatures[: 65 * new_owner_pos]
+ + signature
+ + safe_tx.signatures[65 * new_owner_pos :]
+ )
return safe_tx
@require_tx_service
diff --git a/safe_cli/utils.py b/safe_cli/utils.py
index b3e96f87..806ef3f9 100644
--- a/safe_cli/utils.py
+++ b/safe_cli/utils.py
@@ -1,3 +1,4 @@
+import argparse
import os
from gnosis.eth import EthereumClient
@@ -41,3 +42,16 @@ def yes_or_no_question(question: str, default_no: bool = True) -> bool:
return False
else:
return False if default_no else True
+
+
+def choose_option_question(
+ question: str, number_options: int, default_option: int = 0
+) -> bool:
+ if "PYTEST_CURRENT_TEST" in os.environ:
+ return True # Ignore confirmations when running tests
+ choices = f" [0-{number_options}] default {default_option}:"
+ reply = str(input(question + choices)).lower().strip() or str(default_option)
+ option = int(reply)
+ if option not in range(0, number_options):
+ argparse.ArgumentTypeError(f"{option} is not between [0-{number_options}}}")
+ return option