Modified EIP712 function to take dictionaries directly instead of filenames
This commit is contained in:
@@ -75,7 +75,7 @@ class EthAppClient:
|
||||
pass
|
||||
return self._send(chunks[-1])
|
||||
|
||||
def eip712_sign_new(self, bip32_path: str, verbose: bool):
|
||||
def eip712_sign_new(self, bip32_path: str):
|
||||
return self._send(self._cmd_builder.eip712_sign_new(bip32_path))
|
||||
|
||||
def eip712_sign_legacy(self,
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import re
|
||||
import signal
|
||||
import sys
|
||||
import copy
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
from ledger_app_clients.ethereum import keychain
|
||||
@@ -297,13 +298,6 @@ def send_filtering_show_field(display_name):
|
||||
pass
|
||||
|
||||
|
||||
def read_filtering_file(filtering_file_path: str):
|
||||
data_json = None
|
||||
with open(filtering_file_path) as data:
|
||||
data_json = json.load(data)
|
||||
return data_json
|
||||
|
||||
|
||||
def prepare_filtering(filtr_data, message):
|
||||
global filtering_paths
|
||||
|
||||
@@ -355,62 +349,61 @@ def disable_autonext():
|
||||
signal.setitimer(signal.ITIMER_REAL, 0, 0)
|
||||
|
||||
|
||||
def process_file(aclient: EthAppClient,
|
||||
input_file_path: str,
|
||||
filtering_file_path: Optional[str] = None,
|
||||
def process_data(aclient: EthAppClient,
|
||||
data_json: dict,
|
||||
filters: Optional[dict] = None,
|
||||
autonext: Optional[Callable] = None) -> bool:
|
||||
global sig_ctx
|
||||
global app_client
|
||||
global autonext_handler
|
||||
|
||||
# deepcopy because this function modifies the dict
|
||||
data_json = copy.deepcopy(data_json)
|
||||
app_client = aclient
|
||||
with open(input_file_path, "r") as data:
|
||||
data_json = json.load(data)
|
||||
domain_typename = "EIP712Domain"
|
||||
message_typename = data_json["primaryType"]
|
||||
types = data_json["types"]
|
||||
domain = data_json["domain"]
|
||||
message = data_json["message"]
|
||||
domain_typename = "EIP712Domain"
|
||||
message_typename = data_json["primaryType"]
|
||||
types = data_json["types"]
|
||||
domain = data_json["domain"]
|
||||
message = data_json["message"]
|
||||
|
||||
if autonext:
|
||||
autonext_handler = autonext
|
||||
signal.signal(signal.SIGALRM, next_timeout)
|
||||
if autonext:
|
||||
autonext_handler = autonext
|
||||
signal.signal(signal.SIGALRM, next_timeout)
|
||||
|
||||
if filtering_file_path:
|
||||
init_signature_context(types, domain)
|
||||
filtr = read_filtering_file(filtering_file_path)
|
||||
if filters:
|
||||
init_signature_context(types, domain)
|
||||
|
||||
# send types definition
|
||||
for key in types.keys():
|
||||
with app_client.eip712_send_struct_def_struct_name(key):
|
||||
pass
|
||||
for f in types[key]:
|
||||
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
|
||||
send_struct_def_field(f["type"], f["name"])
|
||||
# send types definition
|
||||
for key in types.keys():
|
||||
with app_client.eip712_send_struct_def_struct_name(key):
|
||||
pass
|
||||
for f in types[key]:
|
||||
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
|
||||
send_struct_def_field(f["type"], f["name"])
|
||||
|
||||
if filtering_file_path:
|
||||
with app_client.eip712_filtering_activate():
|
||||
pass
|
||||
prepare_filtering(filtr, message)
|
||||
if filters:
|
||||
with app_client.eip712_filtering_activate():
|
||||
pass
|
||||
prepare_filtering(filters, message)
|
||||
|
||||
# send domain implementation
|
||||
with app_client.eip712_send_struct_impl_root_struct(domain_typename):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
if not send_struct_impl(types, domain, domain_typename):
|
||||
return False
|
||||
# send domain implementation
|
||||
with app_client.eip712_send_struct_impl_root_struct(domain_typename):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
if not send_struct_impl(types, domain, domain_typename):
|
||||
return False
|
||||
|
||||
if filtering_file_path:
|
||||
if filtr and "name" in filtr:
|
||||
send_filtering_message_info(filtr["name"], len(filtering_paths))
|
||||
else:
|
||||
send_filtering_message_info(domain["name"], len(filtering_paths))
|
||||
if filters:
|
||||
if filters and "name" in filters:
|
||||
send_filtering_message_info(filters["name"], len(filtering_paths))
|
||||
else:
|
||||
send_filtering_message_info(domain["name"], len(filtering_paths))
|
||||
|
||||
# send message implementation
|
||||
with app_client.eip712_send_struct_impl_root_struct(message_typename):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
if not send_struct_impl(types, message, message_typename):
|
||||
return False
|
||||
# send message implementation
|
||||
with app_client.eip712_send_struct_impl_root_struct(message_typename):
|
||||
enable_autonext()
|
||||
disable_autonext()
|
||||
if not send_struct_impl(types, message, message_typename):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@@ -9,6 +9,7 @@ from ragger.backend import BackendInterface
|
||||
from ragger.firmware import Firmware
|
||||
from ragger.navigator import Navigator, NavInsID
|
||||
from typing import List
|
||||
import json
|
||||
|
||||
import ledger_app_clients.ethereum.response_parser as ResponseParser
|
||||
from ledger_app_clients.ethereum.client import EthAppClient
|
||||
@@ -92,10 +93,14 @@ def test_eip712_new(firmware: Firmware,
|
||||
else:
|
||||
test_path = "%s/%s" % (input_file.parent, "-".join(input_file.stem.split("-")[:-1]))
|
||||
conf_file = "%s.ini" % (test_path)
|
||||
filter_file = None
|
||||
|
||||
filters = None
|
||||
if filtering:
|
||||
filter_file = "%s-filter.json" % (test_path)
|
||||
try:
|
||||
with open("%s-filter.json" % (test_path)) as f:
|
||||
filters = json.load(f)
|
||||
except (IOError, json.decoder.JSONDecodeError) as e:
|
||||
pytest.skip("Filter file error: %s" % (e.strerror))
|
||||
|
||||
config = ConfigParser()
|
||||
config.read(conf_file)
|
||||
@@ -106,15 +111,15 @@ def test_eip712_new(firmware: Firmware,
|
||||
assert "r" in config["signature"]
|
||||
assert "s" in config["signature"]
|
||||
|
||||
if not filtering or Path(filter_file).is_file():
|
||||
if verbose:
|
||||
settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712])
|
||||
if verbose:
|
||||
settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712])
|
||||
|
||||
assert InputData.process_file(app_client,
|
||||
input_file,
|
||||
filter_file,
|
||||
with open(input_file) as file:
|
||||
assert InputData.process_data(app_client,
|
||||
json.load(file),
|
||||
filters,
|
||||
partial(autonext, firmware, navigator))
|
||||
with app_client.eip712_sign_new(BIP32_PATH, verbose):
|
||||
with app_client.eip712_sign_new(BIP32_PATH):
|
||||
# tight on timing, needed by the CI otherwise might fail sometimes
|
||||
time.sleep(0.5)
|
||||
|
||||
@@ -130,8 +135,6 @@ def test_eip712_new(firmware: Firmware,
|
||||
navigator.navigate(moves)
|
||||
v, r, s = ResponseParser.signature(app_client.response().data)
|
||||
|
||||
assert v == bytes.fromhex(config["signature"]["v"])
|
||||
assert r == bytes.fromhex(config["signature"]["r"])
|
||||
assert s == bytes.fromhex(config["signature"]["s"])
|
||||
else:
|
||||
pytest.skip("No filter file found")
|
||||
assert v == bytes.fromhex(config["signature"]["v"])
|
||||
assert r == bytes.fromhex(config["signature"]["r"])
|
||||
assert s == bytes.fromhex(config["signature"]["s"])
|
||||
|
||||
Reference in New Issue
Block a user