[clean] Linter / typing fixes

This commit is contained in:
Lucas PASCAL
2023-07-28 15:41:17 +02:00
parent e5c82d910e
commit 54b979186d
11 changed files with 117 additions and 73 deletions

View File

@@ -17,7 +17,7 @@ jobs:
uses: actions/checkout@v3
- run: pip install flake8
- name: Flake8 lint Python code
run: find client/src/ -type f -name '*.py' -exec flake8 --max-line-length=120 '{}' '+'
run: (cd client && find src/ -type f -name '*.py' -exec flake8 --max-line-length=120 '{}' '+')
mypy:
name: Type checking
@@ -27,7 +27,7 @@ jobs:
uses: actions/checkout@v3
- run: pip install mypy
- name: Mypy type checking
run: mypy client/src
run: (cd client && mypy src/)
build:
name: Building the package

12
client/CHANGELOG.md Normal file
View File

@@ -0,0 +1,12 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.1] - 2023-08-07
### Added
- Initial version

View File

@@ -10,12 +10,12 @@ This package is deployed:
- on `pypi.org` for the stable version. This version will work with the
application available on the `master` branch.
```bash
pip install ledger_app_clients.ethereum`
pip install ledger_app_clients.ethereum
```
- on `test.pypi.org` for the rolling release. This verison will work with the
- on `test.pypi.org` for the rolling release. This version will work with the
application code on the `develop` branch.
```bash
pip install --extra-index-url https://test.pypi.org/simple/ ledger_app_clients.ethereum`
pip install --extra-index-url https://test.pypi.org/simple/ ledger_app_clients.ethereum
```
### Installation from sources

View File

@@ -40,6 +40,5 @@ version = {attr = "ledger_app_clients.ethereum.__version__"}
[project.urls]
Home = "https://github.com/LedgerHQ/app-ethereum"
# [tool.setuptools_scm]
# write_to = "ledgerwallet/__version__.py"
# local_scheme = "no-local-version"
[tool.mypy]
ignore_missing_imports = true

View File

@@ -2,6 +2,7 @@ import rlp
from enum import IntEnum
from ragger.backend import BackendInterface
from ragger.utils import RAPDU
from typing import List, Optional, Union
from .command_builder import CommandBuilder
from .eip712 import EIP712FieldType
@@ -13,14 +14,15 @@ WEI_IN_ETH = 1e+18
class StatusWord(IntEnum):
OK = 0x9000
ERROR_NO_INFO = 0x6a00
INVALID_DATA = 0x6a80
INSUFFICIENT_MEMORY = 0x6a84
INVALID_INS = 0x6d00
INVALID_P1_P2 = 0x6b00
OK = 0x9000
ERROR_NO_INFO = 0x6a00
INVALID_DATA = 0x6a80
INSUFFICIENT_MEMORY = 0x6a84
INVALID_INS = 0x6d00
INVALID_P1_P2 = 0x6b00
CONDITION_NOT_SATISFIED = 0x6985
REF_DATA_NOT_FOUND = 0x6a88
REF_DATA_NOT_FOUND = 0x6a88
class DOMAIN_NAME_TAG(IntEnum):
STRUCTURE_TYPE = 0x01
@@ -39,10 +41,10 @@ class EthAppClient:
self._client = client
self._cmd_builder = CommandBuilder()
def _send(self, payload: bytearray):
def _send(self, payload: bytes):
return self._client.exchange_async_raw(payload)
def response(self) -> RAPDU:
def response(self) -> Optional[RAPDU]:
return self._client._last_async_response
def eip712_send_struct_def_struct_name(self, name: str):
@@ -52,7 +54,7 @@ class EthAppClient:
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: [],
array_levels: List,
key_name: str):
return self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
field_type,
@@ -68,7 +70,7 @@ class EthAppClient:
return self._send(self._cmd_builder.eip712_send_struct_impl_array(size))
def eip712_send_struct_impl_struct_field(self, raw_value: bytes):
chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(raw_value)
chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(bytearray(raw_value))
for chunk in chunks[:-1]:
with self._send(chunk):
pass
@@ -102,7 +104,7 @@ class EthAppClient:
to: bytes,
amount: float,
chain_id: int):
data = list()
data: List[Union[int, bytes]] = list()
data.append(nonce)
data.append(gas_price)
data.append(gas_limit)
@@ -123,12 +125,12 @@ class EthAppClient:
return self._send(self._cmd_builder.get_challenge())
def provide_domain_name(self, challenge: int, name: str, addr: bytes):
payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName
payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName
payload += format_tlv(DOMAIN_NAME_TAG.STRUCTURE_VERSION, 1)
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_KEY_ID, 0) # test key
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_ALGO, 1) # secp256k1
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_KEY_ID, 0) # test key
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_ALGO, 1) # secp256k1
payload += format_tlv(DOMAIN_NAME_TAG.CHALLENGE, challenge)
payload += format_tlv(DOMAIN_NAME_TAG.COIN_TYPE, 0x3c) # ETH in slip-44
payload += format_tlv(DOMAIN_NAME_TAG.COIN_TYPE, 0x3c) # ETH in slip-44
payload += format_tlv(DOMAIN_NAME_TAG.DOMAIN_NAME, name)
payload += format_tlv(DOMAIN_NAME_TAG.ADDRESS, addr)
payload += format_tlv(DOMAIN_NAME_TAG.SIGNATURE,

View File

@@ -1,7 +1,7 @@
import struct
from enum import IntEnum
from ragger.bip import pack_derivation_path
from typing import Iterator
from typing import List
from .eip712 import EIP712FieldType
@@ -41,7 +41,7 @@ class CommandBuilder:
ins: InsType,
p1: int,
p2: int,
cdata: bytearray = bytes()) -> bytes:
cdata: bytes = bytes()) -> bytes:
header = bytearray()
header.append(self._CLA)
@@ -67,24 +67,24 @@ class CommandBuilder:
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: [],
array_levels: List,
key_name: str) -> bytes:
data = bytearray()
typedesc = 0
typedesc |= (len(array_levels) > 0) << 7
typedesc |= (type_size != None) << 6
typedesc |= (type_size is not None) << 6
typedesc |= field_type
data.append(typedesc)
if field_type == EIP712FieldType.CUSTOM:
data.append(len(type_name))
data += self._string_to_bytes(type_name)
if type_size != None:
if type_size is not None:
data.append(type_size)
if len(array_levels) > 0:
data.append(len(array_levels))
for level in array_levels:
data.append(0 if level == None else 1)
if level != None:
data.append(0 if level is None else 1)
if level is not None:
data.append(level)
data.append(len(key_name))
data += self._string_to_bytes(key_name)
@@ -107,7 +107,7 @@ class CommandBuilder:
P2Type.ARRAY,
data)
def eip712_send_struct_impl_struct_field(self, data: bytearray) -> Iterator[bytes]:
def eip712_send_struct_impl_struct_field(self, data: bytearray) -> List[bytes]:
chunks = list()
# Add a 16-bit integer with the data's byte length (network byte order)
data_w_length = bytearray()
@@ -193,7 +193,7 @@ class CommandBuilder:
def provide_domain_name(self, tlv_payload: bytes) -> list[bytes]:
chunks = list()
payload = struct.pack(">H", len(tlv_payload))
payload = struct.pack(">H", len(tlv_payload))
payload += tlv_payload
p1 = 1
while len(payload) > 0:

View File

@@ -3,7 +3,7 @@ import json
import re
import signal
import sys
from typing import Callable
from typing import Any, Callable, Dict, List, Optional
from ledger_app_clients.ethereum import keychain
from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType
@@ -11,11 +11,16 @@ from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType
# global variables
app_client: EthAppClient = None
filtering_paths = None
current_path = list()
sig_ctx = {}
filtering_paths: Dict = {}
current_path: List[str] = list()
sig_ctx: Dict[str, Any] = {}
autonext_handler: Callable = None
def default_handler():
raise RuntimeError("Uninitialized handler")
autonext_handler: Callable = default_handler
# From a string typename, extract the type and all the array depth
@@ -55,29 +60,34 @@ def get_typesize(typename):
return (typename, typesize)
def parse_int(typesize):
return (EIP712FieldType.INT, int(typesize / 8))
def parse_uint(typesize):
return (EIP712FieldType.UINT, int(typesize / 8))
def parse_address(typesize):
return (EIP712FieldType.ADDRESS, None)
def parse_bool(typesize):
return (EIP712FieldType.BOOL, None)
def parse_string(typesize):
return (EIP712FieldType.STRING, None)
def parse_bytes(typesize):
if typesize != None:
if typesize is not None:
return (EIP712FieldType.FIX_BYTES, typesize)
return (EIP712FieldType.DYN_BYTES, None)
# set functions for each type
parsing_type_functions = {};
parsing_type_functions = {}
parsing_type_functions["int"] = parse_int
parsing_type_functions["uint"] = parse_uint
parsing_type_functions["address"] = parse_address
@@ -86,7 +96,6 @@ parsing_type_functions["string"] = parse_string
parsing_type_functions["bytes"] = parse_bytes
def send_struct_def_field(typename, keyname):
type_enum = None
@@ -108,7 +117,6 @@ def send_struct_def_field(typename, keyname):
return (typename, type_enum, typesize, array_lvls)
def encode_integer(value, typesize):
data = bytearray()
@@ -122,9 +130,9 @@ def encode_integer(value, typesize):
if value == 0:
data.append(0)
else:
if value < 0: # negative number, send it as unsigned
if value < 0: # negative number, send it as unsigned
mask = 0
for i in range(typesize): # make a mask as big as the typesize
for i in range(typesize): # make a mask as big as the typesize
mask = (mask << 8) | 0xff
value &= mask
while value > 0:
@@ -133,42 +141,51 @@ def encode_integer(value, typesize):
data.reverse()
return data
def encode_int(value, typesize):
return encode_integer(value, typesize)
def encode_uint(value, typesize):
return encode_integer(value, typesize)
def encode_hex_string(value, size):
data = bytearray()
value = value[2:] # skip 0x
value = value[2:] # skip 0x
byte_idx = 0
while byte_idx < size:
data.append(int(value[(byte_idx * 2):(byte_idx * 2 + 2)], 16))
byte_idx += 1
return data
def encode_address(value, typesize):
return encode_hex_string(value, 20)
def encode_bool(value, typesize):
return encode_integer(value, typesize)
def encode_string(value, typesize):
data = bytearray()
for char in value:
data.append(ord(char))
return data
def encode_bytes_fix(value, typesize):
return encode_hex_string(value, typesize)
def encode_bytes_dyn(value, typesize):
# length of the value string
# - the length of 0x (2)
# / by the length of one byte in a hex string (2)
return encode_hex_string(value, int((len(value) - 2) / 2))
# set functions for each type
encoding_functions = {}
encoding_functions[EIP712FieldType.INT] = encode_int
@@ -180,7 +197,6 @@ encoding_functions[EIP712FieldType.FIX_BYTES] = encode_bytes_fix
encoding_functions[EIP712FieldType.DYN_BYTES] = encode_bytes_dyn
def send_struct_impl_field(value, field):
# Something wrong happened if this triggers
if isinstance(value, list) or (field["enum"] == EIP712FieldType.CUSTOM):
@@ -188,7 +204,6 @@ def send_struct_impl_field(value, field):
data = encoding_functions[field["enum"]](value, field["typesize"])
if filtering_paths:
path = ".".join(current_path)
if path in filtering_paths.keys():
@@ -199,8 +214,7 @@ def send_struct_impl_field(value, field):
disable_autonext()
def evaluate_field(structs, data, field, lvls_left, new_level = True):
def evaluate_field(structs, data, field, lvls_left, new_level=True):
array_lvls = field["array_lvls"]
if new_level:
@@ -215,7 +229,7 @@ def evaluate_field(structs, data, field, lvls_left, new_level = True):
return False
current_path.pop()
idx += 1
if array_lvls[lvls_left - 1] != None:
if array_lvls[lvls_left - 1] is not None:
if array_lvls[lvls_left - 1] != idx:
print("Mismatch in array size! Got %d, expected %d\n" %
(idx, array_lvls[lvls_left - 1]),
@@ -232,7 +246,6 @@ def evaluate_field(structs, data, field, lvls_left, new_level = True):
return True
def send_struct_impl(structs, data, structname):
# Check if it is a struct we don't known
if structname not in structs.keys():
@@ -244,6 +257,7 @@ def send_struct_impl(structs, data, structname):
return False
return True
# ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures
def send_filtering_message_info(display_name: str, filters_count: int):
global sig_ctx
@@ -262,6 +276,7 @@ def send_filtering_message_info(display_name: str, filters_count: int):
enable_autonext()
disable_autonext()
# ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures
def send_filtering_show_field(display_name):
global sig_ctx
@@ -281,12 +296,14 @@ def send_filtering_show_field(display_name):
with app_client.eip712_filtering_show_field(display_name, sig):
pass
def read_filtering_file(domain, message, filtering_file_path):
def read_filtering_file(filtering_file_path: str):
data_json = None
with open(filtering_file_path) as data:
data_json = json.load(data)
return data_json
def prepare_filtering(filtr_data, message):
global filtering_paths
@@ -295,12 +312,14 @@ def prepare_filtering(filtr_data, message):
else:
filtering_paths = {}
def handle_optional_domain_values(domain):
if "chainId" not in domain.keys():
domain["chainId"] = 0
if "verifyingContract" not in domain.keys():
domain["verifyingContract"] = "0x0000000000000000000000000000000000000000"
def init_signature_context(types, domain):
global sig_ctx
@@ -314,7 +333,7 @@ def init_signature_context(types, domain):
for i in range(8):
sig_ctx["chainid"].append(chainid & (0xff << (i * 8)))
sig_ctx["chainid"].reverse()
schema_str = json.dumps(types).replace(" ","")
schema_str = json.dumps(types).replace(" ", "")
schema_hash = hashlib.sha224(schema_str.encode())
sig_ctx["schema_hash"] = bytearray.fromhex(schema_hash.hexdigest())
@@ -322,22 +341,24 @@ def init_signature_context(types, domain):
def next_timeout(_signum: int, _frame):
autonext_handler()
def enable_autonext():
seconds = 1/4
if app_client._client.firmware.device == 'stax': # Stax Speculos is slow
if app_client._client.firmware.device == 'stax': # Stax Speculos is slow
interval = seconds * 3
else:
interval = seconds
signal.setitimer(signal.ITIMER_REAL, seconds, interval)
def disable_autonext():
signal.setitimer(signal.ITIMER_REAL, 0, 0)
def process_file(aclient: EthAppClient,
input_file_path: str,
filtering_file_path = None,
autonext: Callable = None) -> bool:
filtering_file_path: Optional[str] = None,
autonext: Optional[Callable] = None) -> bool:
global sig_ctx
global app_client
global autonext_handler
@@ -357,7 +378,7 @@ def process_file(aclient: EthAppClient,
if filtering_file_path:
init_signature_context(types, domain)
filtr = read_filtering_file(domain, message, filtering_file_path)
filtr = read_filtering_file(filtering_file_path)
# send types definition
for key in types.keys():
@@ -365,7 +386,7 @@ def process_file(aclient: EthAppClient,
pass
for f in types[key]:
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
send_struct_def_field(f["type"], f["name"])
send_struct_def_field(f["type"], f["name"])
if filtering_file_path:
with app_client.eip712_filtering_activate():

View File

@@ -3,6 +3,7 @@ import hashlib
from ecdsa import SigningKey
from ecdsa.util import sigencode_der
from enum import Enum, auto
from typing import Dict
# Private key PEM files have to be named the same (lowercase) as their corresponding enum entries
@@ -11,14 +12,17 @@ class Key(Enum):
CAL = auto()
DOMAIN_NAME = auto()
_keys: dict[Key, SigningKey] = dict()
_keys: Dict[Key, SigningKey] = dict()
# Open the corresponding PEM file and load its key in the global dict
def _init_key(key: Key):
global _keys
with open("%s/keychain/%s.pem" % (os.path.dirname(__file__), key.name.lower())) as pem_file:
_keys[key] = SigningKey.from_pem(pem_file.read(), hashlib.sha256)
assert (key in _keys) and (_keys[key] != None)
assert (key in _keys) and (_keys[key] is not None)
# Generate a SECP256K1 signature of the given data with the given key
def sign_data(key: Key, data: bytes) -> bytes:

View File

@@ -9,6 +9,7 @@ def signature(data: bytes) -> tuple[bytes, bytes, bytes]:
return v, r, s
def challenge(data: bytes) -> int:
assert len(data) == 4
return int.from_bytes(data, "big")

View File

@@ -1,6 +1,7 @@
from enum import Enum, auto
from ragger.firmware import Firmware
from ragger.navigator import Navigator, NavInsID, NavIns
from typing import List, Union
class SettingID(Enum):
@@ -10,6 +11,7 @@ class SettingID(Enum):
VERBOSE_EIP712 = auto()
VERBOSE_ENS = auto()
def get_device_settings(device: str) -> list[SettingID]:
if device == "nanos":
return [
@@ -27,19 +29,22 @@ def get_device_settings(device: str) -> list[SettingID]:
]
return []
settings_per_page = 3
def get_setting_position(device: str, setting: NavInsID) -> tuple[int, int]:
screen_height = 672 # px
header_height = 85 # px
footer_height = 124 # px
def get_setting_position(device: str, setting: Union[NavInsID, SettingID]) -> tuple[int, int]:
screen_height = 672 # px
header_height = 85 # px
footer_height = 124 # px
usable_height = screen_height - (header_height + footer_height)
setting_height = usable_height // settings_per_page
index_in_page = get_device_settings(device).index(setting) % settings_per_page
index_in_page = get_device_settings(device).index(SettingID(setting)) % settings_per_page
return 350, header_height + (setting_height * index_in_page) + (setting_height // 2)
def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]):
moves = list()
moves: List[Union[NavIns, NavInsID]] = list()
settings = get_device_settings(fw.device)
# Assume the app is on the home page
if fw.device.startswith("nano"):
@@ -49,7 +54,7 @@ def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]):
if setting in to_toggle:
moves += [NavInsID.BOTH_CLICK]
moves += [NavInsID.RIGHT_CLICK]
moves += [NavInsID.BOTH_CLICK] # Back
moves += [NavInsID.BOTH_CLICK] # Back
else:
moves += [NavInsID.USE_CASE_HOME_SETTINGS]
moves += [NavInsID.USE_CASE_SETTINGS_NEXT]

View File

@@ -1,4 +1,5 @@
from typing import Any
from typing import Union
def der_encode(value: int) -> bytes:
# max() to have minimum length of 1
@@ -7,16 +8,15 @@ def der_encode(value: int) -> bytes:
value_bytes = (0x80 | len(value_bytes)).to_bytes(1, 'big') + value_bytes
return value_bytes
def format_tlv(tag: int, value: Any) -> bytes:
def format_tlv(tag: int, value: Union[int, str, bytes]) -> bytes:
if isinstance(value, int):
# max() to have minimum length of 1
value = value.to_bytes(max(1, (value.bit_length() + 7) // 8), 'big')
elif isinstance(value, str):
value = value.encode()
if not isinstance(value, bytes):
print("Unhandled TLV formatting for type : %s" % (type(value)))
return None
assert isinstance(value, bytes), f"Unhandled TLV formatting for type : {type(value)}"
tlv = bytearray()
tlv += der_encode(tag)