feat: first test with speculos

This commit is contained in:
Coline
2022-04-08 09:57:12 +02:00
parent 3bb1807858
commit 104bfee2a9
17 changed files with 740 additions and 0 deletions

View File

@@ -215,6 +215,33 @@ The input data is the RLP encoded transaction, without v/r/s present, streamed t
|r|32|
|s|32|
Exemple:
With path `"44'/60'/1'/0/0"`
CLA: E0
INS: 04
P1 : 00 (First transaction block)
P2 : 00
Lc : ?
Le :
- 04 (number BIP 32 derivations)
- 80 00 00 2c
- 80 00 00 3c
- 00 00 00 00
- 00 00 00 00
- RLP chunk
<br />
CLA: E0
INS: 04
P1 : 80 (subsequent transaction block)
P2 : 00
Lc : ?
Le :
- RLP chunk
</details>
<br/>

25
tests/speculos/.gitignore vendored Normal file
View File

@@ -0,0 +1,25 @@
__pycache__/
*.py[cod]
*$py.class
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# generated by pip
pip-wheel-metadata/
# pytest debug logs generated via --debug
pytestdebug.log
.cache
.pytest_cache
.mypy_cache
.coverage
.coverage.*
coverage.xml

39
tests/speculos/README.md Normal file
View File

@@ -0,0 +1,39 @@
# Speculos functional tests
These tests are implemented in Python with the `SpeculosClient` interface which allows easy execution on the [Speculos](https://github.com/LedgerHQ/speculos) emulator.
## Requirements
- [python >= 3.8](https://www.python.org/downloads/)
- [pip](https://pip.pypa.io/en/stable/installation/)
### Dependencies
Python dependencies are listed in [requirements.txt](requirements.txt)
```shell
python3 -m pip install --extra-index-url https://test.pypi.org/simple/ -r requirements.txt
```
> The extra index allows to fetch the latest version of Speculos.
## Usage
Given the requirements are installed, just do:
```
pytest tests/speculos/
```
## Tests by APDU
you will find the list of apdu [here](../../doc/apdu.md)
- Get
- GET APP CONFIGURATIOn
- [X] Simple test
- GET ETH PUBLIC ADDRESS
- [X] Test get key of coin (Ether, Dai)
- [ ] Test get key of coin (Ether, Dai) with display
- [ ] Test without chain code
- GET ETH2 PUBLIC KEY
- [ ] Test get key
- [ ] Test get key with display

View File

@@ -0,0 +1,107 @@
import struct
from typing import Tuple
from speculos.client import SpeculosClient, ApduException
from boilerplate_client.boilerplate_cmd_builder import BoilerplateCommandBuilder, InsType
from boilerplate_client.exception import DeviceException
from boilerplate_client.transaction import Transaction
class BoilerplateCommand:
def __init__(self,
client: SpeculosClient,
debug: bool = False) -> None:
self.client = client
self.builder = BoilerplateCommandBuilder(debug=debug)
self.debug = debug
def get_configuration(self) -> Tuple[int, int, int, int]:
try:
response = self.client._apdu_exchange(
self.builder.get_configuration()
) # type: int, bytes
except ApduException as error:
raise DeviceException(error_code=error.sw, ins=InsType.INS_GET_VERSION)
# response = MAJOR (1) || MINOR (1) || PATCH (1)
assert len(response) == 4
info, major, minor, patch = struct.unpack(
"BBBB",
response
) # type: int, int, int
return info, major, minor, patch
def get_public_key(self, bip32_path: str, display: bool = False) -> Tuple[bytes, bytes, bytes]:
try:
response = self.client._apdu_exchange(
self.builder.get_public_key(bip32_path=bip32_path,
display=display)
) # type: int, bytes
except ApduException as error:
raise DeviceException(error_code=error.sw, ins=InsType.INS_GET_PUBLIC_KEY)
# response = pub_key_len (1) ||
# pub_key (var) ||
# chain_code_len (1) ||
# chain_code (var)
offset: int = 0
pub_key_len: int = response[offset]
offset += 1
uncompressed_addr_len: bytes = response[offset:offset + pub_key_len]
offset += pub_key_len
eth_addr_len: int = response[offset]
offset += 1
eth_addr: bytes = response[offset:offset + eth_addr_len]
offset += eth_addr_len
chain_code: bytes = response[offset:]
assert len(response) == 1 + pub_key_len + 1 + eth_addr_len + 32 # 32 -> chain_code_len
return uncompressed_addr_len, eth_addr, chain_code
def sign_tx(self, bip32_path: str, transaction: Transaction) -> Tuple[int, bytes]:
sw: int
response: bytes = b""
for is_last, chunk in self.builder.sign_tx(bip32_path=bip32_path, transaction=transaction):
if is_last:
with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1],
p1=chunk[2], p2=chunk[3],
data=chunk[5:]) as exchange:
# Review Transaction
self.client.press_and_release('right')
# Address 1/3, 2/3, 3/3
self.client.press_and_release('right')
self.client.press_and_release('right')
self.client.press_and_release('right')
# Amount
self.client.press_and_release('right')
# Approve
self.client.press_and_release('both')
response = exchange.receive()
else:
response = self.client._apdu_exchange(chunk)
print(response)
# response = der_sig_len (1) ||
# der_sig (var) ||
# v (1)
offset: int = 0
der_sig_len: int = response[offset]
offset += 1
der_sig: bytes = response[offset:offset + der_sig_len]
offset += der_sig_len
v: int = response[offset]
offset += 1
assert len(response) == 1 + der_sig_len + 1
return v, der_sig

View File

@@ -0,0 +1,187 @@
import enum
import logging
import struct
from typing import List, Tuple, Union, Iterator, cast
from boilerplate_client.transaction import Transaction
from boilerplate_client.utils import bip32_path_from_string
MAX_APDU_LEN: int = 255
def chunkify(data: bytes, chunk_len: int) -> Iterator[Tuple[bool, bytes]]:
size: int = len(data)
if size <= chunk_len:
yield True, data
return
chunk: int = size // chunk_len
remaining: int = size % chunk_len
offset: int = 0
for i in range(chunk):
yield False, data[offset:offset + chunk_len]
offset += chunk_len
if remaining:
yield True, data[offset:]
class InsType(enum.IntEnum):
INS_GET_PUBLIC_KEY = 0x02
INS_SIGN_TX = 0x04
INS_GET_CONFIGURATION = 0x06
class BoilerplateCommandBuilder:
"""APDU command builder for the Boilerplate application.
Parameters
----------
debug: bool
Whether you want to see logging or not.
Attributes
----------
debug: bool
Whether you want to see logging or not.
"""
CLA: int = 0xE0
def __init__(self, debug: bool = False):
"""Init constructor."""
self.debug = debug
def serialize(self,
cla: int,
ins: Union[int, enum.IntEnum],
p1: int = 0,
p2: int = 0,
cdata: bytes = b"") -> bytes:
"""Serialize the whole APDU command (header + data).
Parameters
----------
cla : int
Instruction class: CLA (1 byte)
ins : Union[int, IntEnum]
Instruction code: INS (1 byte)
p1 : int
Instruction parameter 1: P1 (1 byte).
p2 : int
Instruction parameter 2: P2 (1 byte).
cdata : bytes
Bytes of command data.
Returns
-------
bytes
Bytes of a complete APDU command.
"""
ins = cast(int, ins.value) if isinstance(ins, enum.IntEnum) else cast(int, ins)
header: bytes = struct.pack("BBBBB",
cla,
ins,
p1,
p2,
len(cdata)) # add Lc to APDU header
if self.debug:
logging.info("header: %s", header.hex())
logging.info("cdata: %s", cdata.hex())
return header + cdata
def get_configuration(self) -> bytes:
"""Command builder for GET_CONFIGURATON
Returns
-------
bytes
APDU command for GET_CONFIGURATON
"""
return self.serialize(cla=self.CLA,
ins=InsType.INS_GET_CONFIGURATION,
p1=0x00,
p2=0x00,
cdata=b"")
def get_public_key(self, bip32_path: str, display: bool = False) -> bytes:
"""Command builder for GET_PUBLIC_KEY.
Parameters
----------
bip32_path: str
String representation of BIP32 path.
display : bool
Whether you want to display the address on the device.
Returns
-------
bytes
APDU command for GET_PUBLIC_KEY.
"""
bip32_paths: List[bytes] = bip32_path_from_string(bip32_path)
cdata: bytes = b"".join([
len(bip32_paths).to_bytes(1, byteorder="big"),
*bip32_paths
])
return self.serialize(cla=self.CLA,
ins=InsType.INS_GET_PUBLIC_KEY,
p1=0x01 if display else 0x00,
p2=0x01,
cdata=cdata)
def sign_tx(self, bip32_path: str, transaction: Transaction) -> Iterator[Tuple[bool, bytes]]:
"""Command builder for INS_SIGN_TX.
Parameters
----------
bip32_path : str
String representation of BIP32 path.
transaction : Transaction
Representation of the transaction to be signed.
Yields
-------
bytes
APDU command chunk for INS_SIGN_TX.
"""
bip32_paths: List[bytes] = bip32_path_from_string(bip32_path)
cdata: bytes = b"".join([
len(bip32_paths).to_bytes(1, byteorder="big"),
*bip32_paths
])
yield False, self.serialize(cla=self.CLA,
ins=InsType.INS_SIGN_TX,
p1=0x00,
p2=0x00,
cdata=cdata)
tx: bytes = transaction.serialize()
for i, (is_last, chunk) in enumerate(chunkify(tx, MAX_APDU_LEN)):
if is_last:
yield True, self.serialize(cla=self.CLA,
ins=InsType.INS_SIGN_TX,
p1=0x00,
p2=0x00,
cdata=chunk)
return
else:
yield False, self.serialize(cla=self.CLA,
ins=InsType.INS_SIGN_TX,
p1=0x00,
p2=0x00,
cdata=chunk)

View File

@@ -0,0 +1,35 @@
from .device_exception import DeviceException
from .errors import (UnknownDeviceError,
DenyError,
WrongP1P2Error,
WrongDataLengthError,
InsNotSupportedError,
ClaNotSupportedError,
WrongResponseLengthError,
DisplayBip32PathFailError,
DisplayAddressFailError,
DisplayAmountFailError,
WrongTxLengthError,
TxParsingFailError,
TxHashFail,
BadStateError,
SignatureFailError)
__all__ = [
"DeviceException",
"DenyError",
"UnknownDeviceError",
"WrongP1P2Error",
"WrongDataLengthError",
"InsNotSupportedError",
"ClaNotSupportedError",
"WrongResponseLengthError",
"DisplayBip32PathFailError",
"DisplayAddressFailError",
"DisplayAmountFailError",
"WrongTxLengthError",
"TxParsingFailError",
"TxHashFail",
"BadStateError",
"SignatureFailError"
]

View File

@@ -0,0 +1,38 @@
import enum
from typing import Dict, Any, Union
from .errors import *
class DeviceException(Exception): # pylint: disable=too-few-public-methods
exc: Dict[int, Any] = {
0x6985: DenyError,
0x6A86: WrongP1P2Error,
0x6A87: WrongDataLengthError,
0x6D00: InsNotSupportedError,
0x6E00: ClaNotSupportedError,
0xB000: WrongResponseLengthError,
0xB001: DisplayBip32PathFailError,
0xB002: DisplayAddressFailError,
0xB003: DisplayAmountFailError,
0xB004: WrongTxLengthError,
0xB005: TxParsingFailError,
0xB006: TxHashFail,
0xB007: BadStateError,
0xB008: SignatureFailError
}
def __new__(cls,
error_code: int,
ins: Union[int, enum.IntEnum, None] = None,
message: str = ""
) -> Any:
error_message: str = (f"Error in {ins!r} command"
if ins else "Error in command")
if error_code in DeviceException.exc:
return DeviceException.exc[error_code](hex(error_code),
error_message,
message)
return UnknownDeviceError(hex(error_code), error_message, message)

View File

@@ -0,0 +1,58 @@
class UnknownDeviceError(Exception):
pass
class DenyError(Exception):
pass
class WrongP1P2Error(Exception):
pass
class WrongDataLengthError(Exception):
pass
class InsNotSupportedError(Exception):
pass
class ClaNotSupportedError(Exception):
pass
class WrongResponseLengthError(Exception):
pass
class DisplayBip32PathFailError(Exception):
pass
class DisplayAddressFailError(Exception):
pass
class DisplayAmountFailError(Exception):
pass
class WrongTxLengthError(Exception):
pass
class TxParsingFailError(Exception):
pass
class TxHashFail(Exception):
pass
class BadStateError(Exception):
pass
class SignatureFailError(Exception):
pass

View File

@@ -0,0 +1,47 @@
from io import BytesIO
from typing import Union
from boilerplate_client.utils import (read, read_uint, read_varint,
write_varint, UINT64_MAX)
class TransactionError(Exception):
pass
class Transaction:
def __init__(self, nonce: int, to: Union[str, bytes], value: int, memo: str) -> None:
self.nonce: int = nonce
self.to: bytes = bytes.fromhex(to[2:]) if isinstance(to, str) else to
self.value: int = value
self.memo: bytes = memo.encode("ascii")
if not (0 <= self.nonce <= UINT64_MAX):
raise TransactionError(f"Bad nonce: '{self.nonce}'!")
if not (0 <= self.value <= UINT64_MAX):
raise TransactionError(f"Bad value: '{self.value}'!")
if len(self.to) != 20:
raise TransactionError(f"Bad address: '{self.to}'!")
def serialize(self) -> bytes:
return b"".join([
self.nonce.to_bytes(8, byteorder="big"),
self.to,
self.value.to_bytes(8, byteorder="big"),
write_varint(len(self.memo)),
self.memo
])
@classmethod
def from_bytes(cls, hexa: Union[bytes, BytesIO]):
buf: BytesIO = BytesIO(hexa) if isinstance(hexa, bytes) else hexa
nonce: int = read_uint(buf, 64, byteorder="big")
to: bytes = read(buf, 20)
value: int = read_uint(buf, 64, byteorder="big")
memo_len: int = read_varint(buf)
memo: str = read(buf, memo_len).decode("ascii")
return cls(nonce=nonce, to=to, value=value, memo=memo)

View File

@@ -0,0 +1,75 @@
from io import BytesIO
from typing import List, Optional, Literal
UINT64_MAX: int = 18446744073709551615
UINT32_MAX: int = 4294967295
UINT16_MAX: int = 65535
def bip32_path_from_string(path: str) -> List[bytes]:
splitted_path: List[str] = path.split("/")
if not splitted_path:
raise Exception(f"BIP32 path format error: '{path}'")
if "m" in splitted_path and splitted_path[0] == "m":
splitted_path = splitted_path[1:]
return [int(p).to_bytes(4, byteorder="big") if "'" not in p
else (0x80000000 | int(p[:-1])).to_bytes(4, byteorder="big")
for p in splitted_path]
def write_varint(n: int) -> bytes:
if n < 0xFC:
return n.to_bytes(1, byteorder="little")
if n <= UINT16_MAX:
return b"\xFD" + n.to_bytes(2, byteorder="little")
if n <= UINT32_MAX:
return b"\xFE" + n.to_bytes(4, byteorder="little")
if n <= UINT64_MAX:
return b"\xFF" + n.to_bytes(8, byteorder="little")
raise ValueError(f"Can't write to varint: '{n}'!")
def read_varint(buf: BytesIO,
prefix: Optional[bytes] = None) -> int:
b: bytes = prefix if prefix else buf.read(1)
if not b:
raise ValueError(f"Can't read prefix: '{b}'!")
n: int = {b"\xfd": 2, b"\xfe": 4, b"\xff": 8}.get(b, 1) # default to 1
b = buf.read(n) if n > 1 else b
if len(b) != n:
raise ValueError("Can't read varint!")
return int.from_bytes(b, byteorder="little")
def read(buf: BytesIO, size: int) -> bytes:
b: bytes = buf.read(size)
if len(b) < size:
raise ValueError(f"Cant read {size} bytes in buffer!")
return b
def read_uint(buf: BytesIO,
bit_len: int,
byteorder: Literal['big', 'little'] = 'little') -> int:
size: int = bit_len // 8
b: bytes = buf.read(size)
if len(b) < size:
raise ValueError(f"Can't read u{bit_len} in buffer!")
return int.from_bytes(b, byteorder)

View File

@@ -0,0 +1,27 @@
from collections import namedtuple
from pathlib import Path
import pytest
from speculos.client import SpeculosClient
from boilerplate_client.boilerplate_cmd import BoilerplateCommand
SCRIPT_DIR = Path(__file__).absolute().parent
API_URL = "http://127.0.0.1:5000"
@pytest.fixture(scope="session")
def client():
file_path = SCRIPT_DIR.parent.parent / "bin" / "app.elf"
args = ['--model', 'nanos', '--display', 'qt', '--sdk', '2.1']
with SpeculosClient(app=str(file_path), args=args) as client:
yield client
@pytest.fixture(scope="session")
def cmd(client):
yield BoilerplateCommand(
client=client,
debug=True
)

View File

@@ -0,0 +1,5 @@
speculos
pytest>=6.1.1,<7.0.0
ledgercomm>=1.1.0,<1.2.0
ecdsa>=0.16.1,<0.17.0
pysha3>=1.0.0,<2.0.0

20
tests/speculos/setup.cfg Normal file
View File

@@ -0,0 +1,20 @@
[tool:pytest]
addopts = --strict-markers
[pylint]
disable = C0114, # missing-module-docstring
C0115, # missing-class-docstring
C0116, # missing-function-docstring
C0103, # invalid-name
R0801, # duplicate-code
R0913 # too-many-arguments
extension-pkg-whitelist=hid
[pycodestyle]
max-line-length = 90
[mypy-hid.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True

View File

@@ -0,0 +1,3 @@
def test_configuration(cmd):
assert cmd.get_configuration() == (14, 1, 9, 17)

View File

@@ -0,0 +1,37 @@
from pickle import TRUE
from typing import Tuple
def test_get_public_key(cmd):
# ETHER COIN
uncompressed_addr_len, eth_addr, chain_code = cmd.get_public_key(
bip32_path="44'/60'/1'/0/0",
display=False
) # type: bytes, bytes, bytes
print("HERE", uncompressed_addr_len)
assert len(uncompressed_addr_len) == 65
assert len(eth_addr) == 40
assert len(chain_code) == 32
assert uncompressed_addr_len == b'\x04\xea\x02&\x91\xc7\x87\x00\xd2\xc3\xa0\xc7E\xbe\xa4\xf2\xb8\xe5\xe3\x13\x97j\x10B\xf6\xa1Vc\\\xb2\x05\xda\x1a\xcb\xfe\x04*\nZ\x89eyn6"E\x89\x0eT\xbd-\xbex\xec\x1e\x18df\xf2\xe9\xd0\xf5\xd5\xd8\xdf'
assert eth_addr == b'463e4e114AA57F54f2Fd2C3ec03572C6f75d84C2'
assert chain_code == b'\xaf\x89\xcd)\xea${8I\xec\xc80\xc2\xc8\x94\\e1\xd6P\x87\x07?\x9f\xd09\x00\xa0\xea\xa7\x96\xc8'
# DAI COIN
uncompressed_addr_len, eth_addr, chain_code = cmd.get_public_key(
bip32_path="44'/700'/1'/0/0",
display=False
) # type: bytes, bytes, bytes
print("HERE2", uncompressed_addr_len)
assert len(uncompressed_addr_len) == 65
assert len(eth_addr) == 40
assert len(chain_code) == 32
assert uncompressed_addr_len == b'\x04V\x8a\x15\xdc\xed\xc8[\x16\x17\x8d\xaf\xcax\x91v~{\x9c\x06\xba\xaa\xde\xf4\xe7\x9f\x86\x1d~\xed)\xdc\n8\x9c\x84\xf01@E\x13]\xd7~6\x8e\x8e\xabb-\xad\xcdo\xc3Fw\xb7\xc8y\xdbQ/\xc3\xe5\x18'
assert eth_addr == b'Ba9A9aED0a1AbBE1da1155F64e73e57Af7995880'
assert chain_code == b'4\xaa\x95\xf4\x02\x12\x12-T\x155\x86\xed\xc5\x0b\x1d8\x81\xae\xce\xbd\x1a\xbbv\x9a\xc7\xd5\x1a\xd0KT\xe4'

View File

@@ -0,0 +1,10 @@
#from warnings import catch_warnings
#
#import boilerplate_client
#
#
#def test_version(cmd):
# try:
# cmd.get_version()
# except boilerplate_client.exception.errors.InsNotSupportedError as error:
# assert error.args[0] == '0x6d00'