Stax Ragger tests
This commit is contained in:
@@ -2,14 +2,9 @@ from enum import IntEnum, auto
|
||||
from typing import Optional
|
||||
from ragger.backend import BackendInterface
|
||||
from ragger.utils import RAPDU
|
||||
from ragger.navigator import NavInsID, NavIns, NanoNavigator
|
||||
from .command_builder import EthereumCmdBuilder
|
||||
from .setting import SettingType, SettingImpl
|
||||
from .command_builder import CommandBuilder
|
||||
from .eip712 import EIP712FieldType
|
||||
from .response_parser import EthereumRespParser
|
||||
from .tlv import format_tlv
|
||||
import signal
|
||||
import time
|
||||
from pathlib import Path
|
||||
import keychain
|
||||
import rlp
|
||||
@@ -41,58 +36,19 @@ class DOMAIN_NAME_TAG(IntEnum):
|
||||
ADDRESS = 0x22
|
||||
|
||||
|
||||
class EthereumClient:
|
||||
_settings: dict[SettingType, SettingImpl] = {
|
||||
SettingType.BLIND_SIGNING: SettingImpl(
|
||||
[ "nanos", "nanox", "nanosp" ]
|
||||
),
|
||||
SettingType.DEBUG_DATA: SettingImpl(
|
||||
[ "nanos", "nanox", "nanosp" ]
|
||||
),
|
||||
SettingType.NONCE: SettingImpl(
|
||||
[ "nanos", "nanox", "nanosp" ]
|
||||
),
|
||||
SettingType.VERBOSE_EIP712: SettingImpl(
|
||||
[ "nanox", "nanosp" ]
|
||||
),
|
||||
SettingType.VERBOSE_ENS: SettingImpl(
|
||||
[ "nanox", "nanosp" ]
|
||||
)
|
||||
}
|
||||
_click_delay = 1/4
|
||||
_eip712_filtering = False
|
||||
|
||||
def __init__(self, client: BackendInterface, golden_run: bool):
|
||||
class EthAppClient:
|
||||
def __init__(self, client: BackendInterface):
|
||||
self._client = client
|
||||
self._chain_id = 1
|
||||
self._cmd_builder = EthereumCmdBuilder()
|
||||
self._resp_parser = EthereumRespParser()
|
||||
self._nav = NanoNavigator(client, client.firmware, golden_run)
|
||||
signal.signal(signal.SIGALRM, self._click_signal_timeout)
|
||||
for setting in self._settings.values():
|
||||
setting.value = False
|
||||
self._cmd_builder = CommandBuilder()
|
||||
|
||||
def _send(self, payload: bytearray):
|
||||
return self._client.exchange_async_raw(payload)
|
||||
|
||||
def _recv(self) -> RAPDU:
|
||||
def response(self) -> RAPDU:
|
||||
return self._client._last_async_response
|
||||
|
||||
def _click_signal_timeout(self, _signum: int, _frame):
|
||||
self._client.right_click()
|
||||
|
||||
def _enable_click_until_response(self):
|
||||
signal.setitimer(signal.ITIMER_REAL,
|
||||
self._click_delay,
|
||||
self._click_delay)
|
||||
|
||||
def _disable_click_until_response(self):
|
||||
signal.setitimer(signal.ITIMER_REAL, 0, 0)
|
||||
|
||||
def eip712_send_struct_def_struct_name(self, name: str):
|
||||
with self._send(self._cmd_builder.eip712_send_struct_def_struct_name(name)):
|
||||
pass
|
||||
return self._recv().status == 0x9000
|
||||
return self._send(self._cmd_builder.eip712_send_struct_def_struct_name(name))
|
||||
|
||||
def eip712_send_struct_def_struct_field(self,
|
||||
field_type: EIP712FieldType,
|
||||
@@ -100,98 +56,45 @@ class EthereumClient:
|
||||
type_size: int,
|
||||
array_levels: [],
|
||||
key_name: str):
|
||||
with self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
|
||||
field_type,
|
||||
type_name,
|
||||
type_size,
|
||||
array_levels,
|
||||
key_name)):
|
||||
pass
|
||||
return self._recv()
|
||||
return self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
|
||||
field_type,
|
||||
type_name,
|
||||
type_size,
|
||||
array_levels,
|
||||
key_name))
|
||||
|
||||
def eip712_send_struct_impl_root_struct(self, name: str):
|
||||
with self._send(self._cmd_builder.eip712_send_struct_impl_root_struct(name)):
|
||||
self._enable_click_until_response()
|
||||
self._disable_click_until_response()
|
||||
return self._recv()
|
||||
return self._send(self._cmd_builder.eip712_send_struct_impl_root_struct(name))
|
||||
|
||||
def eip712_send_struct_impl_array(self, size: int):
|
||||
with self._send(self._cmd_builder.eip712_send_struct_impl_array(size)):
|
||||
pass
|
||||
return self._recv()
|
||||
return self._send(self._cmd_builder.eip712_send_struct_impl_array(size))
|
||||
|
||||
def eip712_send_struct_impl_struct_field(self, raw_value: bytes):
|
||||
for apdu in self._cmd_builder.eip712_send_struct_impl_struct_field(raw_value):
|
||||
with self._send(apdu):
|
||||
self._enable_click_until_response()
|
||||
self._disable_click_until_response()
|
||||
assert self._recv().status == 0x9000
|
||||
chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(raw_value)
|
||||
for chunk in chunks[:-1]:
|
||||
with self._send(chunk):
|
||||
pass
|
||||
return self._send(chunks[-1])
|
||||
|
||||
def eip712_sign_new(self, bip32_path: str):
|
||||
with self._send(self._cmd_builder.eip712_sign_new(bip32_path)):
|
||||
time.sleep(0.5) # tight on timing, needed by the CI otherwise might fail sometimes
|
||||
if not self._settings[SettingType.VERBOSE_EIP712].value and \
|
||||
not self._eip712_filtering: # need to skip the message hash
|
||||
self._client.right_click()
|
||||
self._client.right_click()
|
||||
self._client.both_click() # approve signature
|
||||
resp = self._recv()
|
||||
assert resp.status == 0x9000
|
||||
return self._resp_parser.sign(resp.data)
|
||||
def eip712_sign_new(self, bip32_path: str, verbose: bool):
|
||||
return self._send(self._cmd_builder.eip712_sign_new(bip32_path))
|
||||
|
||||
def eip712_sign_legacy(self,
|
||||
bip32_path: str,
|
||||
domain_hash: bytes,
|
||||
message_hash: bytes):
|
||||
with self._send(self._cmd_builder.eip712_sign_legacy(bip32_path,
|
||||
domain_hash,
|
||||
message_hash)):
|
||||
self._client.right_click() # sign typed message screen
|
||||
for _ in range(2): # two hashes (domain + message)
|
||||
if self._client.firmware.device == "nanos":
|
||||
screens_per_hash = 4
|
||||
else:
|
||||
screens_per_hash = 2
|
||||
for _ in range(screens_per_hash):
|
||||
self._client.right_click()
|
||||
self._client.both_click() # approve signature
|
||||
|
||||
resp = self._recv()
|
||||
|
||||
assert resp.status == 0x9000
|
||||
return self._resp_parser.sign(resp.data)
|
||||
|
||||
def settings_set(self, new_values: dict[SettingType, bool]):
|
||||
# Go to settings
|
||||
for _ in range(2):
|
||||
self._client.right_click()
|
||||
self._client.both_click()
|
||||
|
||||
for enum in self._settings.keys():
|
||||
if self._client.firmware.device in self._settings[enum].devices:
|
||||
if enum in new_values.keys():
|
||||
if new_values[enum] != self._settings[enum].value:
|
||||
self._client.both_click()
|
||||
self._settings[enum].value = new_values[enum]
|
||||
self._client.right_click()
|
||||
self._client.both_click()
|
||||
return self._send(self._cmd_builder.eip712_sign_legacy(bip32_path,
|
||||
domain_hash,
|
||||
message_hash))
|
||||
|
||||
def eip712_filtering_activate(self):
|
||||
with self._send(self._cmd_builder.eip712_filtering_activate()):
|
||||
pass
|
||||
self._eip712_filtering = True
|
||||
assert self._recv().status == 0x9000
|
||||
return self._send(self._cmd_builder.eip712_filtering_activate())
|
||||
|
||||
def eip712_filtering_message_info(self, name: str, filters_count: int, sig: bytes):
|
||||
with self._send(self._cmd_builder.eip712_filtering_message_info(name, filters_count, sig)):
|
||||
self._enable_click_until_response()
|
||||
self._disable_click_until_response()
|
||||
assert self._recv().status == 0x9000
|
||||
return self._send(self._cmd_builder.eip712_filtering_message_info(name, filters_count, sig))
|
||||
|
||||
def eip712_filtering_show_field(self, name: str, sig: bytes):
|
||||
with self._send(self._cmd_builder.eip712_filtering_show_field(name, sig)):
|
||||
pass
|
||||
assert self._recv().status == 0x9000
|
||||
return self._send(self._cmd_builder.eip712_filtering_show_field(name, sig))
|
||||
|
||||
def send_fund(self,
|
||||
bip32_path: str,
|
||||
@@ -200,8 +103,7 @@ class EthereumClient:
|
||||
gas_limit: int,
|
||||
to: bytes,
|
||||
amount: float,
|
||||
chain_id: int,
|
||||
screenshot_collection: str = None):
|
||||
chain_id: int):
|
||||
data = list()
|
||||
data.append(nonce)
|
||||
data.append(gas_price)
|
||||
@@ -213,27 +115,14 @@ class EthereumClient:
|
||||
data.append(bytes())
|
||||
data.append(bytes())
|
||||
|
||||
for chunk in self._cmd_builder.sign(bip32_path, rlp.encode(data)):
|
||||
chunks = self._cmd_builder.sign(bip32_path, rlp.encode(data))
|
||||
for chunk in chunks[:-1]:
|
||||
with self._send(chunk):
|
||||
nav_ins = NavIns(NavInsID.RIGHT_CLICK)
|
||||
final_ins = [ NavIns(NavInsID.BOTH_CLICK) ]
|
||||
target_text = "and send"
|
||||
if screenshot_collection:
|
||||
self._nav.navigate_until_text_and_compare(nav_ins,
|
||||
final_ins,
|
||||
target_text,
|
||||
ROOT_SCREENSHOT_PATH,
|
||||
screenshot_collection)
|
||||
else:
|
||||
self._nav.navigate_until_text(nav_ins,
|
||||
final_ins,
|
||||
target_text)
|
||||
pass
|
||||
return self._send(chunks[-1])
|
||||
|
||||
def get_challenge(self) -> int:
|
||||
with self._send(self._cmd_builder.get_challenge()):
|
||||
pass
|
||||
resp = self._recv()
|
||||
return self._resp_parser.challenge(resp.data)
|
||||
def get_challenge(self):
|
||||
return self._send(self._cmd_builder.get_challenge())
|
||||
|
||||
def provide_domain_name(self, challenge: int, name: str, addr: bytes):
|
||||
payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName
|
||||
@@ -247,6 +136,8 @@ class EthereumClient:
|
||||
payload += format_tlv(DOMAIN_NAME_TAG.SIGNATURE,
|
||||
keychain.sign_data(keychain.Key.DOMAIN_NAME, payload))
|
||||
|
||||
for chunk in self._cmd_builder.provide_domain_name(payload):
|
||||
chunks = self._cmd_builder.provide_domain_name(payload)
|
||||
for chunk in chunks[:-1]:
|
||||
with self._send(chunk):
|
||||
pass
|
||||
return self._send(chunks[-1])
|
||||
|
||||
@@ -29,7 +29,7 @@ class P2Type(IntEnum):
|
||||
FILTERING_CONTRACT_NAME = 0x0f
|
||||
FILTERING_FIELD_NAME = 0xff
|
||||
|
||||
class EthereumCmdBuilder:
|
||||
class CommandBuilder:
|
||||
_CLA: int = 0xE0
|
||||
|
||||
def _serialize(self,
|
||||
@@ -103,6 +103,7 @@ class EthereumCmdBuilder:
|
||||
data)
|
||||
|
||||
def eip712_send_struct_impl_struct_field(self, data: bytearray) -> Iterator[bytes]:
|
||||
chunks = list()
|
||||
# Add a 16-bit integer with the data's byte length (network byte order)
|
||||
data_w_length = bytearray()
|
||||
data_w_length.append((len(data) & 0xff00) >> 8)
|
||||
@@ -110,11 +111,12 @@ class EthereumCmdBuilder:
|
||||
data_w_length += data
|
||||
while len(data_w_length) > 0:
|
||||
p1 = P1Type.PARTIAL_SEND if len(data_w_length) > 0xff else P1Type.COMPLETE_SEND
|
||||
yield self._serialize(InsType.EIP712_SEND_STRUCT_IMPL,
|
||||
p1,
|
||||
P2Type.STRUCT_FIELD,
|
||||
data_w_length[:0xff])
|
||||
chunks.append(self._serialize(InsType.EIP712_SEND_STRUCT_IMPL,
|
||||
p1,
|
||||
P2Type.STRUCT_FIELD,
|
||||
data_w_length[:0xff]))
|
||||
data_w_length = data_w_length[0xff:]
|
||||
return chunks
|
||||
|
||||
def eip712_sign_new(self, bip32_path: str) -> bytes:
|
||||
data = pack_derivation_path(bip32_path)
|
||||
@@ -167,29 +169,33 @@ class EthereumCmdBuilder:
|
||||
P2Type.FILTERING_FIELD_NAME,
|
||||
self._eip712_filtering_send_name(name, sig))
|
||||
|
||||
def sign(self, bip32_path: str, rlp_data: bytes) -> Iterator[bytes]:
|
||||
def sign(self, bip32_path: str, rlp_data: bytes) -> list[bytes]:
|
||||
apdus = list()
|
||||
payload = pack_derivation_path(bip32_path)
|
||||
payload += rlp_data
|
||||
p1 = P1Type.SIGN_FIRST_CHUNK
|
||||
while len(payload) > 0:
|
||||
yield self._serialize(InsType.SIGN,
|
||||
p1,
|
||||
0x00,
|
||||
payload[:0xff])
|
||||
apdus.append(self._serialize(InsType.SIGN,
|
||||
p1,
|
||||
0x00,
|
||||
payload[:0xff]))
|
||||
payload = payload[0xff:]
|
||||
p1 = P1Type.SIGN_SUBSQT_CHUNK
|
||||
return apdus
|
||||
|
||||
def get_challenge(self) -> bytes:
|
||||
return self._serialize(InsType.GET_CHALLENGE, 0x00, 0x00)
|
||||
|
||||
def provide_domain_name(self, tlv_payload: bytes) -> bytes:
|
||||
def provide_domain_name(self, tlv_payload: bytes) -> list[bytes]:
|
||||
chunks = list()
|
||||
payload = struct.pack(">H", len(tlv_payload))
|
||||
payload += tlv_payload
|
||||
p1 = 1
|
||||
while len(payload) > 0:
|
||||
yield self._serialize(InsType.PROVIDE_DOMAIN_NAME,
|
||||
p1,
|
||||
0x00,
|
||||
payload[:0xff])
|
||||
chunks.append(self._serialize(InsType.PROVIDE_DOMAIN_NAME,
|
||||
p1,
|
||||
0x00,
|
||||
payload[:0xff]))
|
||||
payload = payload[0xff:]
|
||||
p1 = 0
|
||||
return chunks
|
||||
|
||||
@@ -1,18 +1,14 @@
|
||||
class EthereumRespParser:
|
||||
def sign(self, data: bytes):
|
||||
assert len(data) == (1 + 32 + 32)
|
||||
def signature(data: bytes) -> tuple[bytes, bytes, bytes]:
|
||||
assert len(data) == (1 + 32 + 32)
|
||||
|
||||
v = data[0:1]
|
||||
data = data[1:]
|
||||
v = data[0:1]
|
||||
data = data[1:]
|
||||
r = data[0:32]
|
||||
data = data[32:]
|
||||
s = data[0:32]
|
||||
|
||||
r = data[0:32]
|
||||
data = data[32:]
|
||||
return v, r, s
|
||||
|
||||
s = data[0:32]
|
||||
data = data[32:]
|
||||
|
||||
return v, r, s
|
||||
|
||||
def challenge(self, data: bytes) -> int:
|
||||
assert len(data) == 4
|
||||
return int.from_bytes(data, "big")
|
||||
def challenge(data: bytes) -> int:
|
||||
assert len(data) == 4
|
||||
return int.from_bytes(data, "big")
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
from enum import IntEnum, auto
|
||||
from typing import List
|
||||
|
||||
class SettingType(IntEnum):
|
||||
BLIND_SIGNING = 0,
|
||||
DEBUG_DATA = auto()
|
||||
NONCE = auto()
|
||||
VERBOSE_EIP712 = auto()
|
||||
VERBOSE_ENS = auto()
|
||||
|
||||
class SettingImpl:
|
||||
devices: List[str]
|
||||
value: bool
|
||||
|
||||
def __init__(self, devs: List[str]):
|
||||
self.devices = devs
|
||||
62
tests/ragger/app/settings.py
Normal file
62
tests/ragger/app/settings.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from enum import Enum, auto
|
||||
from typing import List
|
||||
from ragger.firmware import Firmware
|
||||
from ragger.navigator import Navigator, NavInsID, NavIns
|
||||
|
||||
class SettingID(Enum):
|
||||
BLIND_SIGNING = auto()
|
||||
DEBUG_DATA = auto()
|
||||
NONCE = auto()
|
||||
VERBOSE_EIP712 = auto()
|
||||
VERBOSE_ENS = auto()
|
||||
|
||||
def get_device_settings(device: str) -> list[SettingID]:
|
||||
if device == "nanos":
|
||||
return [
|
||||
SettingID.BLIND_SIGNING,
|
||||
SettingID.DEBUG_DATA,
|
||||
SettingID.NONCE
|
||||
]
|
||||
if (device == "nanox") or (device == "nanosp") or (device == "stax"):
|
||||
return [
|
||||
SettingID.BLIND_SIGNING,
|
||||
SettingID.DEBUG_DATA,
|
||||
SettingID.NONCE,
|
||||
SettingID.VERBOSE_EIP712,
|
||||
SettingID.VERBOSE_ENS
|
||||
]
|
||||
return []
|
||||
|
||||
settings_per_page = 3
|
||||
|
||||
def get_setting_position(device: str, setting: NavInsID) -> tuple[int, int]:
|
||||
screen_height = 672 # px
|
||||
header_height = 85 # px
|
||||
footer_height = 124 # px
|
||||
usable_height = screen_height - (header_height + footer_height)
|
||||
setting_height = usable_height // settings_per_page
|
||||
index_in_page = get_device_settings(device).index(setting) % settings_per_page
|
||||
return 350, header_height + (setting_height * index_in_page) + (setting_height // 2)
|
||||
|
||||
def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]):
|
||||
moves = list()
|
||||
settings = get_device_settings(fw.device)
|
||||
# Assume the app is on the home page
|
||||
if fw.device.startswith("nano"):
|
||||
moves += [NavInsID.RIGHT_CLICK] * 2
|
||||
moves += [NavInsID.BOTH_CLICK]
|
||||
for setting in settings:
|
||||
if setting in to_toggle:
|
||||
moves += [NavInsID.BOTH_CLICK]
|
||||
moves += [NavInsID.RIGHT_CLICK]
|
||||
moves += [NavInsID.BOTH_CLICK] # Back
|
||||
else:
|
||||
moves += [NavInsID.USE_CASE_HOME_SETTINGS]
|
||||
for setting in settings:
|
||||
setting_idx = settings.index(setting)
|
||||
if (setting_idx > 0) and (setting_idx % settings_per_page) == 0:
|
||||
moves += [NavInsID.USE_CASE_SETTINGS_NEXT]
|
||||
if setting in to_toggle:
|
||||
moves += [NavIns(NavInsID.TOUCH, get_setting_position(fw.device, setting))]
|
||||
moves += [NavInsID.EXIT_HEADER_TAP]
|
||||
nav.navigate(moves)
|
||||
@@ -1,12 +1,4 @@
|
||||
import pytest
|
||||
from ragger.conftest import configuration
|
||||
from ragger.backend import BackendInterface
|
||||
from app.client import EthereumClient
|
||||
|
||||
# This final fixture will return the properly configured app client, to be used in tests
|
||||
@pytest.fixture
|
||||
def app_client(backend: BackendInterface, golden_run: bool) -> EthereumClient:
|
||||
return EthereumClient(backend, golden_run)
|
||||
|
||||
# Pull all features from the base ragger conftest using the overridden configuration
|
||||
pytest_plugins = ("ragger.conftest.base_conftest", )
|
||||
|
||||
@@ -4,15 +4,19 @@ import json
|
||||
import sys
|
||||
import re
|
||||
import hashlib
|
||||
from app.client import EthereumClient, EIP712FieldType
|
||||
from app.client import EthAppClient, EIP712FieldType
|
||||
import keychain
|
||||
from typing import Callable
|
||||
import signal
|
||||
|
||||
# global variables
|
||||
app_client: EthereumClient = None
|
||||
app_client: EthAppClient = None
|
||||
filtering_paths = None
|
||||
current_path = list()
|
||||
sig_ctx = {}
|
||||
|
||||
autonext_handler: Callable = None
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -97,11 +101,12 @@ def send_struct_def_field(typename, keyname):
|
||||
type_enum = EIP712FieldType.CUSTOM
|
||||
typesize = None
|
||||
|
||||
app_client.eip712_send_struct_def_struct_field(type_enum,
|
||||
typename,
|
||||
typesize,
|
||||
array_lvls,
|
||||
keyname)
|
||||
with app_client.eip712_send_struct_def_struct_field(type_enum,
|
||||
typename,
|
||||
typesize,
|
||||
array_lvls,
|
||||
keyname):
|
||||
pass
|
||||
return (typename, type_enum, typesize, array_lvls)
|
||||
|
||||
|
||||
@@ -191,7 +196,9 @@ def send_struct_impl_field(value, field):
|
||||
if path in filtering_paths.keys():
|
||||
send_filtering_show_field(filtering_paths[path])
|
||||
|
||||
app_client.eip712_send_struct_impl_struct_field(data)
|
||||
with app_client.eip712_send_struct_impl_struct_field(data):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
|
||||
|
||||
|
||||
@@ -201,7 +208,8 @@ def evaluate_field(structs, data, field, lvls_left, new_level = True):
|
||||
if new_level:
|
||||
current_path.append(field["name"])
|
||||
if len(array_lvls) > 0 and lvls_left > 0:
|
||||
app_client.eip712_send_struct_impl_array(len(data))
|
||||
with app_client.eip712_send_struct_impl_array(len(data)):
|
||||
pass
|
||||
idx = 0
|
||||
for subdata in data:
|
||||
current_path.append("[]")
|
||||
@@ -252,7 +260,9 @@ def send_filtering_message_info(display_name: str, filters_count: int):
|
||||
to_sign.append(ord(char))
|
||||
|
||||
sig = keychain.sign_data(keychain.Key.CAL, to_sign)
|
||||
app_client.eip712_filtering_message_info(display_name, filters_count, sig)
|
||||
with app_client.eip712_filtering_message_info(display_name, filters_count, sig):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
|
||||
# ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures
|
||||
def send_filtering_show_field(display_name):
|
||||
@@ -270,7 +280,8 @@ def send_filtering_show_field(display_name):
|
||||
for char in display_name:
|
||||
to_sign.append(ord(char))
|
||||
sig = keychain.sign_data(keychain.Key.CAL, to_sign)
|
||||
app_client.eip712_filtering_show_field(display_name, sig)
|
||||
with app_client.eip712_filtering_show_field(display_name, sig):
|
||||
pass
|
||||
|
||||
def read_filtering_file(domain, message, filtering_file_path):
|
||||
data_json = None
|
||||
@@ -309,9 +320,25 @@ def init_signature_context(types, domain):
|
||||
schema_hash = hashlib.sha224(schema_str.encode())
|
||||
sig_ctx["schema_hash"] = bytearray.fromhex(schema_hash.hexdigest())
|
||||
|
||||
def process_file(aclient: EthereumClient, input_file_path: str, filtering_file_path = None) -> bool:
|
||||
|
||||
def next_timeout(_signum: int, _frame):
|
||||
autonext_handler()
|
||||
|
||||
def enable_autonext():
|
||||
delay = 1/4
|
||||
signal.setitimer(signal.ITIMER_REAL, delay, delay)
|
||||
|
||||
def disable_autonext():
|
||||
signal.setitimer(signal.ITIMER_REAL, 0, 0)
|
||||
|
||||
|
||||
def process_file(aclient: EthAppClient,
|
||||
input_file_path: str,
|
||||
filtering_file_path = None,
|
||||
autonext: Callable = None) -> bool:
|
||||
global sig_ctx
|
||||
global app_client
|
||||
global autonext_handler
|
||||
|
||||
app_client = aclient
|
||||
with open(input_file_path, "r") as data:
|
||||
@@ -322,23 +349,31 @@ def process_file(aclient: EthereumClient, input_file_path: str, filtering_file_p
|
||||
domain = data_json["domain"]
|
||||
message = data_json["message"]
|
||||
|
||||
if autonext:
|
||||
autonext_handler = autonext
|
||||
signal.signal(signal.SIGALRM, next_timeout)
|
||||
|
||||
if filtering_file_path:
|
||||
init_signature_context(types, domain)
|
||||
filtr = read_filtering_file(domain, message, filtering_file_path)
|
||||
|
||||
# send types definition
|
||||
for key in types.keys():
|
||||
app_client.eip712_send_struct_def_struct_name(key)
|
||||
with app_client.eip712_send_struct_def_struct_name(key):
|
||||
pass
|
||||
for f in types[key]:
|
||||
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
|
||||
send_struct_def_field(f["type"], f["name"])
|
||||
|
||||
if filtering_file_path:
|
||||
app_client.eip712_filtering_activate()
|
||||
with app_client.eip712_filtering_activate():
|
||||
pass
|
||||
prepare_filtering(filtr, message)
|
||||
|
||||
# send domain implementation
|
||||
app_client.eip712_send_struct_impl_root_struct(domain_typename)
|
||||
with app_client.eip712_send_struct_impl_root_struct(domain_typename):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
if not send_struct_impl(types, domain, domain_typename):
|
||||
return False
|
||||
|
||||
@@ -349,7 +384,9 @@ def process_file(aclient: EthereumClient, input_file_path: str, filtering_file_p
|
||||
send_filtering_message_info(domain["name"], len(filtering_paths))
|
||||
|
||||
# send message implementation
|
||||
app_client.eip712_send_struct_impl_root_struct(message_typename)
|
||||
with app_client.eip712_send_struct_impl_root_struct(message_typename):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
if not send_struct_impl(types, message, message_typename):
|
||||
return False
|
||||
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import pytest
|
||||
from ragger.error import ExceptionRAPDU
|
||||
from app.client import EthereumClient, StatusWord
|
||||
from app.setting import SettingType
|
||||
from ragger.firmware import Firmware
|
||||
from ragger.backend import BackendInterface
|
||||
from ragger.navigator import Navigator, NavInsID
|
||||
from app.client import EthAppClient, StatusWord, ROOT_SCREENSHOT_PATH
|
||||
from app.settings import SettingID, settings_toggle
|
||||
import app.response_parser as ResponseParser
|
||||
import struct
|
||||
|
||||
# Values used across all tests
|
||||
@@ -20,108 +24,182 @@ AMOUNT = 1.22
|
||||
def verbose(request) -> bool:
|
||||
return request.param
|
||||
|
||||
def common(app_client: EthereumClient) -> int:
|
||||
def common(app_client: EthAppClient) -> int:
|
||||
if app_client._client.firmware.device == "nanos":
|
||||
pytest.skip("Not supported on LNS")
|
||||
return app_client.get_challenge()
|
||||
with app_client.get_challenge():
|
||||
pass
|
||||
return ResponseParser.challenge(app_client.response().data)
|
||||
|
||||
|
||||
def test_send_fund(app_client: EthereumClient, verbose: bool):
|
||||
def test_send_fund(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator,
|
||||
test_name: str,
|
||||
verbose: bool):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
if verbose:
|
||||
app_client.settings_set({
|
||||
SettingType.VERBOSE_ENS: True
|
||||
})
|
||||
settings_toggle(firmware, navigator, [SettingID.VERBOSE_ENS])
|
||||
|
||||
app_client.provide_domain_name(challenge, NAME, ADDR)
|
||||
with app_client.provide_domain_name(challenge, NAME, ADDR):
|
||||
pass
|
||||
|
||||
app_client.send_fund(BIP32_PATH,
|
||||
NONCE,
|
||||
GAS_PRICE,
|
||||
GAS_LIMIT,
|
||||
ADDR,
|
||||
AMOUNT,
|
||||
CHAIN_ID,
|
||||
"domain_name_verbose_" + str(verbose))
|
||||
with app_client.send_fund(BIP32_PATH,
|
||||
NONCE,
|
||||
GAS_PRICE,
|
||||
GAS_LIMIT,
|
||||
ADDR,
|
||||
AMOUNT,
|
||||
CHAIN_ID):
|
||||
moves = list()
|
||||
if firmware.device.startswith("nano"):
|
||||
moves += [ NavInsID.RIGHT_CLICK ] * 4
|
||||
if verbose:
|
||||
moves += [ NavInsID.RIGHT_CLICK ]
|
||||
moves += [ NavInsID.BOTH_CLICK ]
|
||||
else:
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
|
||||
if verbose:
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_TAP ]
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
|
||||
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
|
||||
"domain_name_verbose_" + str(verbose),
|
||||
moves)
|
||||
|
||||
def test_send_fund_wrong_challenge(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_wrong_challenge(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator):
|
||||
app_client = EthAppClient(backend)
|
||||
caught = False
|
||||
challenge = common(app_client)
|
||||
|
||||
try:
|
||||
app_client.provide_domain_name(~challenge & 0xffffffff, NAME, ADDR)
|
||||
with app_client.provide_domain_name(~challenge & 0xffffffff, NAME, ADDR):
|
||||
pass
|
||||
except ExceptionRAPDU as e:
|
||||
assert e.status == StatusWord.INVALID_DATA
|
||||
else:
|
||||
assert False # An exception should have been raised
|
||||
|
||||
def test_send_fund_wrong_addr(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_wrong_addr(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator,
|
||||
test_name: str):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
app_client.provide_domain_name(challenge, NAME, ADDR)
|
||||
with app_client.provide_domain_name(challenge, NAME, ADDR):
|
||||
pass
|
||||
|
||||
addr = bytearray(ADDR)
|
||||
addr.reverse()
|
||||
|
||||
app_client.send_fund(BIP32_PATH,
|
||||
NONCE,
|
||||
GAS_PRICE,
|
||||
GAS_LIMIT,
|
||||
addr,
|
||||
AMOUNT,
|
||||
CHAIN_ID,
|
||||
"domain_name_wrong_addr")
|
||||
with app_client.send_fund(BIP32_PATH,
|
||||
NONCE,
|
||||
GAS_PRICE,
|
||||
GAS_LIMIT,
|
||||
addr,
|
||||
AMOUNT,
|
||||
CHAIN_ID):
|
||||
moves = list()
|
||||
if firmware.device.startswith("nano"):
|
||||
moves += [ NavInsID.RIGHT_CLICK ] * 4
|
||||
moves += [ NavInsID.BOTH_CLICK ]
|
||||
else:
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
|
||||
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
|
||||
"domain_name_wrong_addr",
|
||||
moves)
|
||||
|
||||
def test_send_fund_non_mainnet(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_non_mainnet(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator,
|
||||
test_name: str):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
app_client.provide_domain_name(challenge, NAME, ADDR)
|
||||
with app_client.provide_domain_name(challenge, NAME, ADDR):
|
||||
pass
|
||||
|
||||
app_client.send_fund(BIP32_PATH,
|
||||
NONCE,
|
||||
GAS_PRICE,
|
||||
GAS_LIMIT,
|
||||
ADDR,
|
||||
AMOUNT,
|
||||
5,
|
||||
"domain_name_non_mainnet")
|
||||
with app_client.send_fund(BIP32_PATH,
|
||||
NONCE,
|
||||
GAS_PRICE,
|
||||
GAS_LIMIT,
|
||||
ADDR,
|
||||
AMOUNT,
|
||||
5):
|
||||
moves = list()
|
||||
if firmware.device.startswith("nano"):
|
||||
moves += [ NavInsID.RIGHT_CLICK ] * 5
|
||||
moves += [ NavInsID.BOTH_CLICK ]
|
||||
else:
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 3
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
|
||||
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
|
||||
"domain_name_non_mainnet",
|
||||
moves)
|
||||
|
||||
def test_send_fund_domain_too_long(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_domain_too_long(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
try:
|
||||
app_client.provide_domain_name(challenge, "ledger" + "0"*25 + ".eth", ADDR)
|
||||
with app_client.provide_domain_name(challenge, "ledger" + "0"*25 + ".eth", ADDR):
|
||||
pass
|
||||
except ExceptionRAPDU as e:
|
||||
assert e.status == StatusWord.INVALID_DATA
|
||||
else:
|
||||
assert False # An exception should have been raised
|
||||
|
||||
def test_send_fund_domain_invalid_character(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_domain_invalid_character(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
try:
|
||||
app_client.provide_domain_name(challenge, "l\xe8dger.eth", ADDR)
|
||||
with app_client.provide_domain_name(challenge, "l\xe8dger.eth", ADDR):
|
||||
pass
|
||||
except ExceptionRAPDU as e:
|
||||
assert e.status == StatusWord.INVALID_DATA
|
||||
else:
|
||||
assert False # An exception should have been raised
|
||||
|
||||
def test_send_fund_uppercase(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_uppercase(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
try:
|
||||
app_client.provide_domain_name(challenge, NAME.upper(), ADDR)
|
||||
with app_client.provide_domain_name(challenge, NAME.upper(), ADDR):
|
||||
pass
|
||||
except ExceptionRAPDU as e:
|
||||
assert e.status == StatusWord.INVALID_DATA
|
||||
else:
|
||||
assert False # An exception should have been raised
|
||||
|
||||
def test_send_fund_domain_non_ens(app_client: EthereumClient):
|
||||
|
||||
def test_send_fund_domain_non_ens(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator):
|
||||
app_client = EthAppClient(backend)
|
||||
challenge = common(app_client)
|
||||
|
||||
try:
|
||||
app_client.provide_domain_name(challenge, "ledger.hte", ADDR)
|
||||
with app_client.provide_domain_name(challenge, "ledger.hte", ADDR):
|
||||
pass
|
||||
except ExceptionRAPDU as e:
|
||||
assert e.status == StatusWord.INVALID_DATA
|
||||
else:
|
||||
|
||||
@@ -2,10 +2,17 @@ import pytest
|
||||
import os
|
||||
import fnmatch
|
||||
from typing import List
|
||||
from app.client import EthereumClient, SettingType
|
||||
from ragger.firmware import Firmware
|
||||
from ragger.backend import BackendInterface
|
||||
from ragger.navigator import Navigator, NavInsID
|
||||
from app.client import EthAppClient
|
||||
from app.settings import SettingID, settings_toggle
|
||||
from eip712 import InputData
|
||||
from pathlib import Path
|
||||
from configparser import ConfigParser
|
||||
import app.response_parser as ResponseParser
|
||||
from functools import partial
|
||||
import time
|
||||
|
||||
BIP32_PATH = "m/44'/60'/0'/0/0"
|
||||
|
||||
@@ -30,19 +37,52 @@ def filtering(request) -> bool:
|
||||
return request.param
|
||||
|
||||
|
||||
def test_eip712_legacy(app_client: EthereumClient):
|
||||
v, r, s = app_client.eip712_sign_legacy(
|
||||
BIP32_PATH,
|
||||
bytes.fromhex('6137beb405d9ff777172aa879e33edb34a1460e701802746c5ef96e741710e59'),
|
||||
bytes.fromhex('eb4221181ff3f1a83ea7313993ca9218496e424604ba9492bb4052c03d5c3df8')
|
||||
)
|
||||
def test_eip712_legacy(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator):
|
||||
app_client = EthAppClient(backend)
|
||||
with app_client.eip712_sign_legacy(
|
||||
BIP32_PATH,
|
||||
bytes.fromhex('6137beb405d9ff777172aa879e33edb34a1460e701802746c5ef96e741710e59'),
|
||||
bytes.fromhex('eb4221181ff3f1a83ea7313993ca9218496e424604ba9492bb4052c03d5c3df8')):
|
||||
moves = list()
|
||||
if firmware.device.startswith("nano"):
|
||||
moves += [ NavInsID.RIGHT_CLICK ]
|
||||
if firmware.device == "nanos":
|
||||
screens_per_hash = 4
|
||||
else:
|
||||
screens_per_hash = 2
|
||||
moves += [ NavInsID.RIGHT_CLICK ] * screens_per_hash * 2
|
||||
moves += [ NavInsID.BOTH_CLICK ]
|
||||
else:
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
|
||||
navigator.navigate(moves)
|
||||
|
||||
v, r, s = ResponseParser.signature(app_client.response().data)
|
||||
|
||||
assert v == bytes.fromhex("1c")
|
||||
assert r == bytes.fromhex("ea66f747173762715751c889fea8722acac3fc35db2c226d37a2e58815398f64")
|
||||
assert s == bytes.fromhex("52d8ba9153de9255da220ffd36762c0b027701a3b5110f0a765f94b16a9dfb55")
|
||||
|
||||
def test_eip712_new(app_client: EthereumClient, input_file: Path, verbose: bool, filtering: bool):
|
||||
if app_client._client.firmware.device == "nanos":
|
||||
|
||||
def autonext(fw: Firmware, nav: Navigator):
|
||||
moves = list()
|
||||
if fw.device.startswith("nano"):
|
||||
moves = [ NavInsID.RIGHT_CLICK ]
|
||||
else:
|
||||
moves = [ NavInsID.USE_CASE_REVIEW_TAP ]
|
||||
nav.navigate(moves)
|
||||
|
||||
|
||||
def test_eip712_new(firmware: Firmware,
|
||||
backend: BackendInterface,
|
||||
navigator: Navigator,
|
||||
input_file: Path,
|
||||
verbose: bool,
|
||||
filtering: bool):
|
||||
app_client = EthAppClient(backend)
|
||||
if firmware.device == "nanos":
|
||||
pytest.skip("Not supported on LNS")
|
||||
else:
|
||||
test_path = "%s/%s" % (input_file.parent, "-".join(input_file.stem.split("-")[:-1]))
|
||||
@@ -63,12 +103,25 @@ def test_eip712_new(app_client: EthereumClient, input_file: Path, verbose: bool,
|
||||
|
||||
if not filtering or Path(filter_file).is_file():
|
||||
if verbose:
|
||||
app_client.settings_set({
|
||||
SettingType.VERBOSE_EIP712: True
|
||||
})
|
||||
settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712])
|
||||
|
||||
assert InputData.process_file(app_client, input_file, filter_file) == True
|
||||
v, r, s = app_client.eip712_sign_new(BIP32_PATH)
|
||||
assert InputData.process_file(app_client,
|
||||
input_file,
|
||||
filter_file,
|
||||
partial(autonext, firmware, navigator)) == True
|
||||
with app_client.eip712_sign_new(BIP32_PATH, verbose):
|
||||
time.sleep(0.5) # tight on timing, needed by the CI otherwise might fail sometimes
|
||||
moves = list()
|
||||
if firmware.device.startswith("nano"):
|
||||
if not verbose and not filtering: # need to skip the message hash
|
||||
moves = [ NavInsID.RIGHT_CLICK ] * 2
|
||||
moves += [ NavInsID.BOTH_CLICK ]
|
||||
else:
|
||||
if not verbose and not filtering: # need to skip the message hash
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_TAP ]
|
||||
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
|
||||
navigator.navigate(moves)
|
||||
v, r, s = ResponseParser.signature(app_client.response().data)
|
||||
#print("[signature]")
|
||||
#print("v = %s" % (v.hex()))
|
||||
#print("r = %s" % (r.hex()))
|
||||
|
||||
Reference in New Issue
Block a user