Remove 'speculos' tests
This commit is contained in:
66
.github/workflows/ci-workflow.yml
vendored
66
.github/workflows/ci-workflow.yml
vendored
@@ -1,5 +1,5 @@
|
||||
---
|
||||
name: Tests (Speculos and Zemu)
|
||||
name: Tests (Zemu)
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
@@ -71,67 +71,3 @@ jobs:
|
||||
|
||||
- name: Run zemu tests
|
||||
run: cd tests/zemu/ && yarn test
|
||||
|
||||
# =====================================================
|
||||
# SPECULOS TESTS
|
||||
# =====================================================
|
||||
|
||||
|
||||
building_for_e2e_speculos_tests:
|
||||
name: Building binaries for E2E Speculos tests
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: ghcr.io/ledgerhq/ledger-app-builder/ledger-app-builder-lite:latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Build testing binaries
|
||||
run: |
|
||||
mkdir tests/speculos/elfs
|
||||
make clean && make -j DEBUG=1 NFT_STAGING_KEY=1 BOLOS_SDK=$NANOS_SDK && mv bin/app.elf tests/speculos/elfs/nanos.elf
|
||||
make clean && make -j DEBUG=1 NFT_STAGING_KEY=1 BOLOS_SDK=$NANOX_SDK && mv bin/app.elf tests/speculos/elfs/nanox.elf
|
||||
make clean && make -j DEBUG=1 NFT_STAGING_KEY=1 BOLOS_SDK=$NANOSP_SDK && mv bin/app.elf tests/speculos/elfs/nanosp.elf
|
||||
|
||||
- name: Upload app binaries
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: e2e_speculos_elfs
|
||||
path: ./tests/speculos/elfs
|
||||
|
||||
|
||||
jobs-e2e-speculos-tests:
|
||||
name: Speculos tests
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
model: ["nanosp", "nanos", "nanox"]
|
||||
|
||||
needs: [building_for_e2e_speculos_tests]
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Clone
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Create tmp folder for artifacts
|
||||
run: mkdir tests/speculos/elfs
|
||||
|
||||
- name: Download app binaries
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
path: tmp/
|
||||
|
||||
- name: Gather elfs
|
||||
run: cp tmp/e2e_speculos_elfs/*.elf tests/speculos/elfs/
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
cd tests/speculos
|
||||
sudo apt-get update && sudo apt-get install -y qemu-user-static
|
||||
pip install -r requirements.txt
|
||||
|
||||
- name: Run speculos tests
|
||||
run: |
|
||||
cd tests/speculos
|
||||
pytest --model ${{ matrix.model }} --path ./elfs/${{ matrix.model }}.elf --display headless
|
||||
|
||||
25
tests/speculos/.gitignore
vendored
25
tests/speculos/.gitignore
vendored
@@ -1,25 +0,0 @@
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# generated by pip
|
||||
pip-wheel-metadata/
|
||||
|
||||
# pytest debug logs generated via --debug
|
||||
pytestdebug.log
|
||||
.cache
|
||||
.pytest_cache
|
||||
.mypy_cache
|
||||
.coverage
|
||||
.coverage.*
|
||||
coverage.xml
|
||||
@@ -1,53 +0,0 @@
|
||||
# Speculos functional tests
|
||||
|
||||
These tests are implemented in Python with the `SpeculosClient` interface which allows easy execution on the [Speculos](https://github.com/LedgerHQ/speculos) emulator.
|
||||
|
||||
## Requirements
|
||||
|
||||
- [python >= 3.8](https://www.python.org/downloads/)
|
||||
- [pip](https://pip.pypa.io/en/stable/installation/)
|
||||
|
||||
### Dependencies
|
||||
|
||||
Python dependencies are listed in [requirements.txt](requirements.txt)
|
||||
|
||||
```shell
|
||||
python3 -m pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Compilation app
|
||||
|
||||
Go to the root of the repository:
|
||||
|
||||
```sh
|
||||
make DEBUG=1 NFT_TESTING_KEY=1 BOLOS_SDK=$NANOX_SDK
|
||||
mv bin/app.elf tests/speculos/<some name>.elf
|
||||
```
|
||||
|
||||
Given the requirements are installed, just do (by default command):
|
||||
|
||||
```shell
|
||||
cd tests/speculos/
|
||||
pytest
|
||||
```
|
||||
|
||||
### Custom options
|
||||
|
||||
- **--model:** "nanos", "nanox", "nanosp" | default: "nanos"
|
||||
- **--display:** "qt", "headless" | default: "qt"
|
||||
- **--path:** the path of the binary app | default: path of makefile compilation
|
||||
|
||||
## Example
|
||||
|
||||
With `nanox` binary app:
|
||||
|
||||
```sh
|
||||
# the --path is variable to where you put your binary
|
||||
|
||||
pytest --model nanox --path ./elfs/nanox.elf
|
||||
|
||||
# Execute specific test:
|
||||
pytest --model nanox --path ./elfs/nanox.elf test_pubkey_cmd.py
|
||||
```
|
||||
@@ -1,39 +0,0 @@
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from speculos.client import SpeculosClient
|
||||
|
||||
from ethereum_client.ethereum_cmd import EthereumCommand
|
||||
|
||||
|
||||
SCRIPT_DIR = Path(__file__).absolute().parent
|
||||
API_URL = "http://127.0.0.1:5000"
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
# nanos, nanox, nanosp
|
||||
parser.addoption("--model", action="store", default="nanos")
|
||||
# qt: default, requires a X server
|
||||
# headless: nothing is displayed
|
||||
parser.addoption("--display", action="store", default="qt")
|
||||
|
||||
path: str = SCRIPT_DIR.parent.parent / "bin" / "app.elf"
|
||||
parser.addoption("--path", action="store", default=path)
|
||||
|
||||
@pytest.fixture()
|
||||
def client(pytestconfig):
|
||||
file_path = pytestconfig.getoption("path")
|
||||
model = pytestconfig.getoption("model")
|
||||
|
||||
args = ['--log-level', 'speculos:DEBUG','--model', model, '--display', pytestconfig.getoption("display")]
|
||||
with SpeculosClient(app=str(file_path), args=args) as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def cmd(client, pytestconfig):
|
||||
yield EthereumCommand(
|
||||
client=client,
|
||||
debug=True,
|
||||
model=pytestconfig.getoption("model"),
|
||||
)
|
||||
@@ -1,134 +0,0 @@
|
||||
# Documentation of Ethereum's client test
|
||||
|
||||
```sh
|
||||
.
|
||||
├── conftest.py # Configuration for pytest
|
||||
├── ethereum_client # All utils of client test
|
||||
│ ├── ethereum_cmd_builder.py # Creation of apdu to send
|
||||
│ ├── ethereum_cmd.py # Send Apdu and parsing of response
|
||||
│ ├── exception
|
||||
│ │ ├── device_exception.py
|
||||
│ │ └── errors.py
|
||||
│ ├── plugin.py # Creation of content apdu which manage plugin, erc20Information, provide nft information
|
||||
│ ├── transaction.py # Creation of content apdu which manage personal tx, transaction, eip712
|
||||
│ └── utils.py
|
||||
├── requirements.txt
|
||||
├── screenshots # All screenshot of nanoS,X,SP for compare in tests
|
||||
├── setup.cfg
|
||||
|
||||
# ========= All Tests =========
|
||||
├── test_configuration_cmd.py
|
||||
├── test_eip1559.py
|
||||
├── test_eip191.py
|
||||
├── test_eip2930.py
|
||||
├── test_eip712.py
|
||||
├── test_erc1155.py
|
||||
├── test_erc20information.py
|
||||
├── test_erc721.py
|
||||
├── test_pubkey_cmd.py
|
||||
└── test_sign_cmd.py
|
||||
```
|
||||
|
||||
## Ethereum_client
|
||||
|
||||
### Ethereum_cmd_builder
|
||||
|
||||
```py
|
||||
def chunked(size, source)
|
||||
|
||||
class EthereumCommandBuilder:
|
||||
# Creation of the apdu
|
||||
def get_configuration(self) -> bytes:
|
||||
def set_plugin(self, plugin: Plugin) -> bytes:
|
||||
def provide_nft_information(self, plugin: Plugin) -> bytes:
|
||||
def provide_erc20_token_information(self, info: ERC20Information):
|
||||
def get_public_key(self, bip32_path: str, display: bool = False) -> bytes:
|
||||
def perform_privacy_operation(self, bip32_path: str, display: bool, shared_secret: bool) -> bytes:
|
||||
def simple_sign_tx(self, bip32_path: str, transaction: Transaction) -> bytes:
|
||||
def sign_eip712(self, bip32_path: str, transaction: EIP712) -> bytes:
|
||||
def personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction) -> Tuple[bool,bytes]:
|
||||
```
|
||||
|
||||
### Ethereum_cmd
|
||||
|
||||
```py
|
||||
class EthereumCommand:
|
||||
# Sending apdu and parsing the response in the right form
|
||||
def get_configuration(self) -> Tuple[int, int, int, int]:
|
||||
def set_plugin(self, plugin: Plugin):
|
||||
def provide_nft_information(self, plugin: Plugin):
|
||||
def provide_erc20_token_information(self, info: ERC20Information):
|
||||
def get_public_key(self, bip32_path: str, result: List, display: bool = False) -> Tuple[bytes, bytes, bytes]:
|
||||
def perform_privacy_operation(self, bip32_path: str, result: List, display: bool = False, shared_secret: bool = False) -> Tuple[bytes, bytes, bytes]:
|
||||
def simple_sign_tx(self, bip32_path: str, transaction: Transaction, result: List = list()) -> None:
|
||||
def sign_eip712(self, bip32_path: str, transaction: EIP712, result: List = list()) -> None:
|
||||
def personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction, result: List = list()) -> None:
|
||||
# Allows to send an apdu without return of speculos
|
||||
def send_apdu(self, apdu: bytes) -> bytes:
|
||||
# Allows to send an apdu with return of speculos
|
||||
def send_apdu_context(self, apdu: bytes, result: List = list()) -> bytes:
|
||||
```
|
||||
|
||||
### Utils
|
||||
|
||||
```py
|
||||
def save_screenshot(cmd, path: str):
|
||||
def compare_screenshot(cmd, path: str):
|
||||
def parse_sign_response(response : bytes) -> Tuple[bytes, bytes, bytes]:
|
||||
def bip32_path_from_string(path: str) -> List[bytes]:
|
||||
def packed_bip32_path_from_string(path: str) -> bytes:
|
||||
def write_varint(n: int) -> bytes:
|
||||
def read_varint(buf: BytesIO, prefix: Optional[bytes] = None) -> int:
|
||||
def read(buf: BytesIO, size: int) -> bytes:
|
||||
def read_uint(buf: BytesIO, bit_len: int, byteorder: Literal['big', 'little'] = 'little') -> int:
|
||||
```
|
||||
|
||||
## Tests new apdu
|
||||
|
||||
If a new instruction is programmed it will be necessary to create 2 new functions.
|
||||
one in `ethereum_cmd_builder`:
|
||||
|
||||
- Creation of the raw apdu you can find some examples in this same file
|
||||
|
||||
and one in `ethereum_cmd`:
|
||||
|
||||
- Send the apdu to speculos and parse the answer in a `list` named result you can find some examples in this same file
|
||||
|
||||
## Example for write new tests
|
||||
|
||||
To send several apdu and get the return
|
||||
|
||||
```py
|
||||
FIRST = bytes.fromhex("{YourAPDU}")
|
||||
SECOND = bytes.fromhex("{YourAPDU}")
|
||||
|
||||
def test_multiple_raw_apdu(cmd):
|
||||
result: list = []
|
||||
|
||||
cmd.send_apdu(FIRST)
|
||||
with cmd.send_apdu_context(SECOND, result) as ex:
|
||||
sleep(0.5)
|
||||
# Here your code for press button and compare screen if you want
|
||||
|
||||
response: bytes = result[0] # response returning
|
||||
# Here you function to parse response of some code
|
||||
v, r, s = parse_sign_response(response)
|
||||
|
||||
# And here assertion of your tests
|
||||
assert v == 0x25 # 37
|
||||
assert r.hex() == "68ba082523584adbfc31d36d68b51d6f209ce0838215026bf1802a8f17dcdff4"
|
||||
assert s.hex() == "7c92908fa05c8bc86507a3d6a1c8b3c2722ee01c836d89a61df60c1ab0b43fff"
|
||||
```
|
||||
|
||||
To test an error
|
||||
|
||||
```py
|
||||
def test_some_error(cmd):
|
||||
result: list = []
|
||||
|
||||
with pytest.raises(ethereum_client.exception.errors.UnknownDeviceError) as error:
|
||||
# With an function in ethereum_cmd
|
||||
with cmd.send_apdu_context(bytes.fromhex("{YourAPDU}"), result) as ex:
|
||||
pass
|
||||
assert error.args[0] == '0x6a80'
|
||||
```
|
||||
@@ -1,223 +0,0 @@
|
||||
from ast import List
|
||||
from contextlib import contextmanager
|
||||
import struct
|
||||
from time import sleep
|
||||
from typing import Tuple
|
||||
|
||||
from speculos.client import SpeculosClient, ApduException
|
||||
|
||||
from ethereum_client.ethereum_cmd_builder import EthereumCommandBuilder, InsType
|
||||
from ethereum_client.exception import DeviceException
|
||||
from ethereum_client.transaction import EIP712, PersonalTransaction, Transaction
|
||||
from ethereum_client.plugin import ERC20Information, Plugin
|
||||
from ethereum_client.utils import parse_sign_response
|
||||
|
||||
|
||||
class EthereumCommand:
|
||||
def __init__(self,
|
||||
client: SpeculosClient,
|
||||
debug: bool = False,
|
||||
model: str = "nanos") -> None:
|
||||
self.client = client
|
||||
self.builder = EthereumCommandBuilder(debug=debug)
|
||||
self.debug = debug
|
||||
self.model = model
|
||||
|
||||
def get_configuration(self) -> Tuple[int, int, int, int]:
|
||||
try:
|
||||
response = self.client._apdu_exchange(
|
||||
self.builder.get_configuration()
|
||||
) # type: int, bytes
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_GET_VERSION)
|
||||
|
||||
# response = FLAG (1) || MAJOR (1) || MINOR (1) || PATCH (1)
|
||||
assert len(response) == 4
|
||||
|
||||
info, major, minor, patch = struct.unpack(
|
||||
"BBBB",
|
||||
response
|
||||
) # type: int, int, int
|
||||
|
||||
return info, major, minor, patch
|
||||
|
||||
def set_plugin(self, plugin: Plugin):
|
||||
try:
|
||||
self.client._apdu_exchange(
|
||||
self.builder.set_plugin(plugin=plugin)
|
||||
)
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_SET_PLUGIN)
|
||||
|
||||
def provide_nft_information(self, plugin: Plugin):
|
||||
try:
|
||||
self.client._apdu_exchange(
|
||||
self.builder.provide_nft_information(plugin=plugin)
|
||||
)
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_PROVIDE_NFT_INFORMATION)
|
||||
|
||||
def provide_erc20_token_information(self, info: ERC20Information):
|
||||
try:
|
||||
self.client._apdu_exchange(
|
||||
self.builder.provide_erc20_token_information(info=info)
|
||||
)
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_PROVIDE_ERC20)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_public_key(self, bip32_path: str, result: List, display: bool = False) -> Tuple[bytes, bytes, bytes]:
|
||||
try:
|
||||
chunk: bytes = self.builder.get_public_key(bip32_path=bip32_path, display=display)
|
||||
|
||||
with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1],
|
||||
p1=chunk[2], p2=chunk[3],
|
||||
data=chunk[5:]) as exchange:
|
||||
yield exchange
|
||||
response: bytes = exchange.receive()
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_GET_PUBLIC_KEY)
|
||||
|
||||
# response = pub_key_len (1) ||
|
||||
# pub_key (var) ||
|
||||
# chain_code_len (1) ||
|
||||
# chain_code (var)
|
||||
offset: int = 0
|
||||
|
||||
pub_key_len: int = response[offset]
|
||||
offset += 1
|
||||
|
||||
uncompressed_addr_len: bytes = response[offset:offset + pub_key_len]
|
||||
offset += pub_key_len
|
||||
|
||||
eth_addr_len: int = response[offset]
|
||||
offset += 1
|
||||
|
||||
eth_addr: bytes = response[offset:offset + eth_addr_len]
|
||||
offset += eth_addr_len
|
||||
|
||||
chain_code: bytes = response[offset:]
|
||||
|
||||
assert len(response) == 1 + pub_key_len + 1 + eth_addr_len + 32 # 32 -> chain_code_len
|
||||
|
||||
result.append(uncompressed_addr_len)
|
||||
result.append(eth_addr)
|
||||
result.append(chain_code)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def perform_privacy_operation(self, bip32_path: str, result: List, display: bool = False, shared_secret: bool = False) -> Tuple[bytes, bytes, bytes]:
|
||||
try:
|
||||
chunk: bytes = self.builder.perform_privacy_operation(bip32_path=bip32_path, display=display, shared_secret=shared_secret)
|
||||
|
||||
with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1],
|
||||
p1=chunk[2], p2=chunk[3],
|
||||
data=chunk[5:]) as exchange:
|
||||
yield exchange
|
||||
response: bytes = exchange.receive()
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_PERFORM_PRIVACY_OPERATION)
|
||||
|
||||
# response = Public encryption key or shared secret (32)
|
||||
assert len(response) == 32
|
||||
|
||||
result.append(response)
|
||||
|
||||
def send_apdu(self, apdu: bytes) -> bytes:
|
||||
try:
|
||||
self.client.apdu_exchange(cla=apdu[0], ins=apdu[1],
|
||||
p1=apdu[2], p2=apdu[3],
|
||||
data=apdu[5:])
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_TX)
|
||||
|
||||
@contextmanager
|
||||
def send_apdu_context(self, apdu: bytes, result: List = list()) -> bytes:
|
||||
try:
|
||||
|
||||
with self.client.apdu_exchange_nowait(cla=apdu[0], ins=apdu[1],
|
||||
p1=apdu[2], p2=apdu[3],
|
||||
data=apdu[5:]) as exchange:
|
||||
yield exchange
|
||||
result.append(exchange.receive())
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_TX)
|
||||
|
||||
|
||||
|
||||
@contextmanager
|
||||
def simple_sign_tx(self, bip32_path: str, transaction: Transaction, result: List = list()) -> None:
|
||||
try:
|
||||
chunk: bytes = self.builder.simple_sign_tx(bip32_path=bip32_path, transaction=transaction)
|
||||
|
||||
with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1],
|
||||
p1=chunk[2], p2=chunk[3],
|
||||
data=chunk[5:]) as exchange:
|
||||
yield exchange
|
||||
response: bytes = exchange.receive()
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_TX)
|
||||
|
||||
# response = V (1) || R (32) || S (32)
|
||||
assert len(response) == 65
|
||||
v, r, s = parse_sign_response(response)
|
||||
|
||||
result.append(v)
|
||||
result.append(r)
|
||||
result.append(s)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def sign_eip712(self, bip32_path: str, transaction: EIP712, result: List = list()) -> None:
|
||||
try:
|
||||
chunk: bytes = self.builder.sign_eip712(bip32_path=bip32_path, transaction=transaction)
|
||||
|
||||
with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1],
|
||||
p1=chunk[2], p2=chunk[3],
|
||||
data=chunk[5:]) as exchange:
|
||||
yield exchange
|
||||
response: bytes = exchange.receive()
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_EIP712)
|
||||
|
||||
# response = V (1) || R (32) || S (32)
|
||||
assert len(response) == 65
|
||||
v, r, s = parse_sign_response(response)
|
||||
|
||||
result.append(v)
|
||||
result.append(r)
|
||||
result.append(s)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction, result: List = list()) -> None:
|
||||
try:
|
||||
for islast_apdu, apdu in self.builder.personal_sign_tx(bip32_path=bip32_path, transaction=transaction):
|
||||
if islast_apdu:
|
||||
with self.client.apdu_exchange_nowait(cla=apdu[0], ins=apdu[1],
|
||||
p1=apdu[2], p2=apdu[3],
|
||||
data=apdu[5:]) as exchange:
|
||||
# the "yield" here allows to wait for a button interaction (click right, left, both)
|
||||
yield exchange
|
||||
response: bytes = exchange.receive()
|
||||
else:
|
||||
self.send_apdu(apdu)
|
||||
|
||||
except ApduException as error:
|
||||
raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_TX)
|
||||
|
||||
# response = V (1) || R (32) || S (32)
|
||||
v, r, s = parse_sign_response(response)
|
||||
|
||||
result.append(v)
|
||||
result.append(r)
|
||||
result.append(s)
|
||||
@@ -1,293 +0,0 @@
|
||||
import enum
|
||||
import logging
|
||||
import struct
|
||||
from typing import List, Tuple, Union, Iterator, cast
|
||||
|
||||
from ethereum_client.transaction import EIP712, PersonalTransaction, Transaction
|
||||
from ethereum_client.plugin import ERC20Information, Plugin
|
||||
from ethereum_client.utils import packed_bip32_path_from_string
|
||||
|
||||
MAX_APDU_LEN: int = 255
|
||||
|
||||
def chunked(size, source):
|
||||
for i in range(0, len(source), size):
|
||||
yield source[i:i+size]
|
||||
|
||||
def chunkify(data: bytes, chunk_len: int) -> Iterator[Tuple[bool, bytes]]:
|
||||
size: int = len(data)
|
||||
|
||||
if size <= chunk_len:
|
||||
yield True, data
|
||||
return
|
||||
|
||||
chunk: int = size // chunk_len
|
||||
remaining: int = size % chunk_len
|
||||
offset: int = 0
|
||||
|
||||
for i in range(chunk):
|
||||
yield False, data[offset:offset + chunk_len]
|
||||
offset += chunk_len
|
||||
|
||||
if remaining:
|
||||
yield True, data[offset:]
|
||||
|
||||
class InsType(enum.IntEnum):
|
||||
INS_GET_PUBLIC_KEY = 0x02
|
||||
INS_SIGN_TX = 0x04
|
||||
INS_GET_CONFIGURATION = 0x06
|
||||
INS_SIGN_PERSONAL_TX = 0x08
|
||||
INS_PROVIDE_ERC20 = 0x0A
|
||||
INS_SIGN_EIP712 = 0x0c
|
||||
INS_ETH2_GET_PUBLIC_KEY = 0x0E
|
||||
INS_SET_ETH2_WITHDRAWAL = 0x10
|
||||
INS_SET_EXTERNAL_PLUGIN = 0x12
|
||||
INS_PROVIDE_NFT_INFORMATION = 0x14
|
||||
INS_SET_PLUGIN = 0x16
|
||||
INS_PERFORM_PRIVACY_OPERATION = 0x18
|
||||
|
||||
|
||||
class EthereumCommandBuilder:
|
||||
"""APDU command builder for the Boilerplate application.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
debug: bool
|
||||
Whether you want to see logging or not.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
debug: bool
|
||||
Whether you want to see logging or not.
|
||||
|
||||
"""
|
||||
CLA: int = 0xE0
|
||||
|
||||
def __init__(self, debug: bool = False):
|
||||
"""Init constructor."""
|
||||
self.debug = debug
|
||||
|
||||
def serialize(self,
|
||||
cla: int,
|
||||
ins: Union[int, enum.IntEnum],
|
||||
p1: int = 0,
|
||||
p2: int = 0,
|
||||
cdata: bytes = b"") -> bytes:
|
||||
"""Serialize the whole APDU command (header + data).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cla : int
|
||||
Instruction class: CLA (1 byte)
|
||||
ins : Union[int, IntEnum]
|
||||
Instruction code: INS (1 byte)
|
||||
p1 : int
|
||||
Instruction parameter 1: P1 (1 byte).
|
||||
p2 : int
|
||||
Instruction parameter 2: P2 (1 byte).
|
||||
cdata : bytes
|
||||
Bytes of command data.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes
|
||||
Bytes of a complete APDU command.
|
||||
|
||||
"""
|
||||
ins = cast(int, ins.value) if isinstance(ins, enum.IntEnum) else cast(int, ins)
|
||||
|
||||
header: bytes = struct.pack("BBBBB",
|
||||
cla,
|
||||
ins,
|
||||
p1,
|
||||
p2,
|
||||
len(cdata)) # add Lc to APDU header
|
||||
|
||||
if self.debug:
|
||||
logging.info("header: %s", header.hex())
|
||||
logging.info("cdata: %s", cdata.hex())
|
||||
|
||||
return header + cdata
|
||||
|
||||
def get_configuration(self) -> bytes:
|
||||
"""Command builder for GET_CONFIGURATON
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes
|
||||
APDU command for GET_CONFIGURATON
|
||||
|
||||
"""
|
||||
return self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_GET_CONFIGURATION,
|
||||
p1=0x00,
|
||||
p2=0x00,
|
||||
cdata=b"")
|
||||
|
||||
def _same_header_builder(self, data: Union[Plugin, ERC20Information], ins: int) -> bytes:
|
||||
return self.serialize(cla=self.CLA,
|
||||
ins=ins,
|
||||
p1=0x00,
|
||||
p2=0x00,
|
||||
cdata=data.serialize())
|
||||
|
||||
def set_plugin(self, plugin: Plugin) -> bytes:
|
||||
return self._same_header_builder(plugin, InsType.INS_SET_PLUGIN)
|
||||
|
||||
def provide_nft_information(self, plugin: Plugin) -> bytes:
|
||||
return self._same_header_builder(plugin, InsType.INS_PROVIDE_NFT_INFORMATION)
|
||||
|
||||
def provide_erc20_token_information(self, info: ERC20Information):
|
||||
return self._same_header_builder(info, InsType.INS_PROVIDE_ERC20)
|
||||
|
||||
def get_public_key(self, bip32_path: str, display: bool = False) -> bytes:
|
||||
"""Command builder for GET_PUBLIC_KEY.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bip32_path: str
|
||||
String representation of BIP32 path.
|
||||
display : bool
|
||||
Whether you want to display the address on the device.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bytes
|
||||
APDU command for GET_PUBLIC_KEY.
|
||||
|
||||
"""
|
||||
cdata = packed_bip32_path_from_string(bip32_path)
|
||||
|
||||
return self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_GET_PUBLIC_KEY,
|
||||
p1=0x01 if display else 0x00,
|
||||
p2=0x01,
|
||||
cdata=cdata)
|
||||
|
||||
def perform_privacy_operation(self, bip32_path: str, display: bool, shared_secret: bool) -> bytes:
|
||||
"""Command builder for INS_PERFORM_PRIVACY_OPERATION.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bip32_path : str
|
||||
String representation of BIP32 path.
|
||||
Third party public key on Curve25519 : 32 bytes
|
||||
Optional if returning the shared secret
|
||||
|
||||
"""
|
||||
cdata = packed_bip32_path_from_string(bip32_path)
|
||||
|
||||
return self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_PERFORM_PRIVACY_OPERATION,
|
||||
p1=0x01 if display else 0x00,
|
||||
p2=0x01 if shared_secret else 0x00,
|
||||
cdata=cdata)
|
||||
|
||||
|
||||
def simple_sign_tx(self, bip32_path: str, transaction: Transaction) -> bytes:
|
||||
"""Command builder for INS_SIGN_TX.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bip32_path : str
|
||||
String representation of BIP32 path.
|
||||
transaction : Transaction
|
||||
Representation of the transaction to be signed.
|
||||
|
||||
Yields
|
||||
-------
|
||||
bytes
|
||||
APDU command chunk for INS_SIGN_TX.
|
||||
|
||||
"""
|
||||
cdata = packed_bip32_path_from_string(bip32_path)
|
||||
|
||||
tx: bytes = transaction.serialize()
|
||||
|
||||
cdata = cdata + tx
|
||||
|
||||
return self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_SIGN_TX,
|
||||
p1=0x00,
|
||||
p2=0x00,
|
||||
cdata=cdata)
|
||||
|
||||
def sign_eip712(self, bip32_path: str, transaction: EIP712) -> bytes:
|
||||
"""Command builder for INS_SIGN_EIP712.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bip32_path : str
|
||||
String representation of BIP32 path.
|
||||
transaction : EIP712
|
||||
Domain hash -> 32 bytes
|
||||
Message hash -> 32 bytes
|
||||
|
||||
Yields
|
||||
-------
|
||||
bytes
|
||||
APDU command chunk for INS_SIGN_EIP712.
|
||||
|
||||
"""
|
||||
cdata = packed_bip32_path_from_string(bip32_path)
|
||||
|
||||
|
||||
tx: bytes = transaction.serialize()
|
||||
|
||||
cdata = cdata + tx
|
||||
|
||||
return self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_SIGN_EIP712,
|
||||
p1=0x00,
|
||||
p2=0x00,
|
||||
cdata=cdata)
|
||||
|
||||
def personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction) -> Tuple[bool,bytes]:
|
||||
"""Command builder for INS_SIGN_PERSONAL_TX.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bip32_path : str
|
||||
String representation of BIP32 path.
|
||||
transaction : Transaction
|
||||
Representation of the transaction to be signed.
|
||||
|
||||
Yields
|
||||
-------
|
||||
bytes
|
||||
APDU command chunk for INS_SIGN_PERSONAL_TX.
|
||||
|
||||
"""
|
||||
|
||||
cdata = packed_bip32_path_from_string(bip32_path)
|
||||
|
||||
tx: bytes = transaction.serialize()
|
||||
|
||||
cdata = cdata + tx
|
||||
last_chunk = len(cdata) // MAX_APDU_LEN
|
||||
|
||||
# The generator allows to send apdu frames because we can't send an apdu > 255
|
||||
for i, (chunk) in enumerate(chunked(MAX_APDU_LEN, cdata)):
|
||||
if i == 0 and i == last_chunk:
|
||||
yield True, self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_SIGN_PERSONAL_TX,
|
||||
p1=0x00,
|
||||
p2=0x00,
|
||||
cdata=chunk)
|
||||
elif i == 0:
|
||||
yield False, self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_SIGN_PERSONAL_TX,
|
||||
p1=0x00,
|
||||
p2=0x00,
|
||||
cdata=chunk)
|
||||
elif i == last_chunk:
|
||||
yield True, self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_SIGN_PERSONAL_TX,
|
||||
p1=0x80,
|
||||
p2=0x00,
|
||||
cdata=chunk)
|
||||
else:
|
||||
yield False, self.serialize(cla=self.CLA,
|
||||
ins=InsType.INS_SIGN_PERSONAL_TX,
|
||||
p1=0x80,
|
||||
p2=0x00,
|
||||
cdata=chunk)
|
||||
@@ -1,35 +0,0 @@
|
||||
from .device_exception import DeviceException
|
||||
from .errors import (UnknownDeviceError,
|
||||
DenyError,
|
||||
WrongP1P2Error,
|
||||
WrongDataLengthError,
|
||||
InsNotSupportedError,
|
||||
ClaNotSupportedError,
|
||||
WrongResponseLengthError,
|
||||
DisplayBip32PathFailError,
|
||||
DisplayAddressFailError,
|
||||
DisplayAmountFailError,
|
||||
WrongTxLengthError,
|
||||
TxParsingFailError,
|
||||
TxHashFail,
|
||||
BadStateError,
|
||||
SignatureFailError)
|
||||
|
||||
__all__ = [
|
||||
"DeviceException",
|
||||
"DenyError",
|
||||
"UnknownDeviceError",
|
||||
"WrongP1P2Error",
|
||||
"WrongDataLengthError",
|
||||
"InsNotSupportedError",
|
||||
"ClaNotSupportedError",
|
||||
"WrongResponseLengthError",
|
||||
"DisplayBip32PathFailError",
|
||||
"DisplayAddressFailError",
|
||||
"DisplayAmountFailError",
|
||||
"WrongTxLengthError",
|
||||
"TxParsingFailError",
|
||||
"TxHashFail",
|
||||
"BadStateError",
|
||||
"SignatureFailError"
|
||||
]
|
||||
@@ -1,38 +0,0 @@
|
||||
import enum
|
||||
from typing import Dict, Any, Union
|
||||
|
||||
from .errors import *
|
||||
|
||||
|
||||
class DeviceException(Exception): # pylint: disable=too-few-public-methods
|
||||
exc: Dict[int, Any] = {
|
||||
0x6985: DenyError,
|
||||
0x6A86: WrongP1P2Error,
|
||||
0x6A87: WrongDataLengthError,
|
||||
0x6D00: InsNotSupportedError,
|
||||
0x6E00: ClaNotSupportedError,
|
||||
0xB000: WrongResponseLengthError,
|
||||
0xB001: DisplayBip32PathFailError,
|
||||
0xB002: DisplayAddressFailError,
|
||||
0xB003: DisplayAmountFailError,
|
||||
0xB004: WrongTxLengthError,
|
||||
0xB005: TxParsingFailError,
|
||||
0xB006: TxHashFail,
|
||||
0xB007: BadStateError,
|
||||
0xB008: SignatureFailError
|
||||
}
|
||||
|
||||
def __new__(cls,
|
||||
error_code: int,
|
||||
ins: Union[int, enum.IntEnum, None] = None,
|
||||
message: str = ""
|
||||
) -> Any:
|
||||
error_message: str = (f"Error in {ins!r} command"
|
||||
if ins else "Error in command")
|
||||
|
||||
if error_code in DeviceException.exc:
|
||||
return DeviceException.exc[error_code](hex(error_code),
|
||||
error_message,
|
||||
message)
|
||||
|
||||
return UnknownDeviceError(hex(error_code), error_message, message)
|
||||
@@ -1,58 +0,0 @@
|
||||
class UnknownDeviceError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DenyError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WrongP1P2Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WrongDataLengthError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InsNotSupportedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ClaNotSupportedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WrongResponseLengthError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DisplayBip32PathFailError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DisplayAddressFailError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DisplayAmountFailError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WrongTxLengthError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TxParsingFailError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TxHashFail(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BadStateError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SignatureFailError(Exception):
|
||||
pass
|
||||
@@ -1,70 +0,0 @@
|
||||
import string
|
||||
from typing import Union
|
||||
|
||||
from ethereum_client.utils import write_varint
|
||||
|
||||
class ERC20Information:
|
||||
def __init__(self, erc20_ticker: string , addr: Union[str, bytes], nb_decimals: int, chainID: int, sign: str) -> None:
|
||||
self.erc20_ticker: bytes = bytes.fromhex(erc20_ticker)
|
||||
self.addr: bytes = bytes.fromhex(addr[2:]) if isinstance(addr, str) else addr
|
||||
self.nb_decimals: int = nb_decimals
|
||||
self.chainID: int = chainID
|
||||
self.sign: bytes = bytes.fromhex(sign)
|
||||
|
||||
def serialize(self) -> bytes:
|
||||
return b"".join([
|
||||
write_varint(len(self.erc20_ticker)),
|
||||
self.erc20_ticker,
|
||||
|
||||
self.addr,
|
||||
|
||||
self.nb_decimals.to_bytes(4, byteorder="big"),
|
||||
|
||||
self.chainID.to_bytes(4, byteorder="big"),
|
||||
|
||||
self.sign,
|
||||
])
|
||||
|
||||
class Plugin:
|
||||
"""Plugin class
|
||||
Allows to generate an apdu of the SET_PLUGIN command or PROVIDE_NFT_INFORMATION
|
||||
|
||||
PROVIDE_NFT_INFORMATION
|
||||
----
|
||||
do not define a selector
|
||||
|
||||
"""
|
||||
def __init__(self, type: int, version: int, name: str, addr: Union[str, bytes], selector: int = -1, chainID: int = 1, keyID: int = 0, algorithm: int = 1, sign: str = "") -> None:
|
||||
self.type: int = type
|
||||
self.version: int = version
|
||||
self.name: bytes = bytes(name, 'UTF-8')
|
||||
self.addr: bytes = bytes.fromhex(addr[2:]) if isinstance(addr, str) else addr
|
||||
self.selector: int = selector
|
||||
self.chainID: int = chainID
|
||||
self.keyID: int = keyID
|
||||
self.algorithm: int = algorithm
|
||||
self.sign: bytes = bytes.fromhex(sign)
|
||||
|
||||
def serialize(self) -> bytes:
|
||||
return b"".join([
|
||||
self.type.to_bytes(1, byteorder="big"),
|
||||
|
||||
self.version.to_bytes(1, byteorder="big"),
|
||||
|
||||
write_varint(len(self.name)),
|
||||
self.name,
|
||||
|
||||
self.addr,
|
||||
|
||||
b'' if self.selector == -1 else self.selector.to_bytes(4, byteorder="big"),
|
||||
|
||||
self.chainID.to_bytes(8, byteorder="big"),
|
||||
|
||||
self.keyID.to_bytes(1, byteorder="big"),
|
||||
|
||||
self.algorithm.to_bytes(1, byteorder="big"),
|
||||
|
||||
write_varint(len(self.sign)),
|
||||
self.sign,
|
||||
|
||||
])
|
||||
@@ -1,96 +0,0 @@
|
||||
from typing import Union
|
||||
|
||||
from ethereum_client.utils import write_varint, UINT64_MAX
|
||||
|
||||
|
||||
class TransactionError(Exception):
|
||||
pass
|
||||
|
||||
EIP2930 = 1
|
||||
EIP1559 = 2
|
||||
|
||||
class PersonalTransaction:
|
||||
def __init__(self, msg: Union[str, bytes]) -> None:
|
||||
# If you want to send bytes directly you have to put "0x" before the string
|
||||
if msg[0:2] == "0x":
|
||||
self.msg: bytes = bytes.fromhex(msg[2:])
|
||||
else:
|
||||
self.msg: bytes = bytes(msg, "utf-8")
|
||||
|
||||
def serialize(self) -> bytes:
|
||||
return b"".join([
|
||||
len(self.msg).to_bytes(4, byteorder="big"),
|
||||
self.msg,
|
||||
])
|
||||
|
||||
class Transaction:
|
||||
def __init__(self, txType: int, nonce: int, gasPrice: int, gasLimit: int, to: Union[str, bytes], value: int, data: Union[str, bytes] = "", chainID: int = -1) -> None:
|
||||
self.txType: int = txType
|
||||
self.nonce: int = nonce
|
||||
self.gasPrice: int = gasPrice
|
||||
self.gasLimit: int = gasLimit
|
||||
self.to: bytes = bytes.fromhex(to[2:]) if isinstance(to, str) else to
|
||||
self.value: int = value
|
||||
self.data: bytes = bytes(data, "utf-8")
|
||||
self.chainID = b''
|
||||
|
||||
if not (0 <= self.nonce <= UINT64_MAX):
|
||||
raise TransactionError(f"Bad nonce: '{self.nonce}'!")
|
||||
|
||||
if not (0 <= self.value <= UINT64_MAX):
|
||||
raise TransactionError(f"Bad value: '{self.value}'!")
|
||||
|
||||
if len(self.to) != 20:
|
||||
raise TransactionError(f"Bad address: '{self.to}'!")
|
||||
|
||||
self.lenNonce = int((len(hex(self.nonce)) - 1) / 2)
|
||||
self.lenGP = int((len(hex(self.gasPrice)) - 1) / 2)
|
||||
self.lenGL = int((len(hex(self.gasLimit)) - 1) / 2)
|
||||
self.lenValue = int((len(hex(self.value)) - 1) / 2)
|
||||
|
||||
self.lenChainID = int((len(hex(chainID)) - 1) / 2)
|
||||
|
||||
if chainID != -1:
|
||||
self.chainID = b"".join([
|
||||
b'' if self.lenChainID == 1 else (self.lenChainID + 0x80).to_bytes(1, byteorder="big"),
|
||||
chainID.to_bytes(self.lenChainID, byteorder="big"),
|
||||
write_varint(0 + 0x80),
|
||||
write_varint(0 + 0x80),
|
||||
])
|
||||
|
||||
def serialize(self) -> bytes:
|
||||
return b"".join([
|
||||
self.txType.to_bytes(1, byteorder="big"),
|
||||
|
||||
b'' if self.lenNonce == 1 else write_varint(self.lenNonce + 0x80),
|
||||
self.nonce.to_bytes(self.lenNonce, byteorder="big"),
|
||||
|
||||
write_varint(self.lenGP + 0x80),
|
||||
self.gasPrice.to_bytes(self.lenGP, byteorder="big"),
|
||||
|
||||
write_varint(self.lenGL + 0x80),
|
||||
self.gasLimit.to_bytes(self.lenGL, byteorder="big"),
|
||||
|
||||
write_varint(len(self.to) + 0x80),
|
||||
self.to,
|
||||
|
||||
write_varint(self.lenValue + 0x80),
|
||||
self.value.to_bytes(self.lenValue, byteorder="big"),
|
||||
|
||||
write_varint(len(self.data) + 0x80),
|
||||
self.data,
|
||||
|
||||
self.chainID,
|
||||
|
||||
])
|
||||
|
||||
class EIP712:
|
||||
def __init__(self, domain_hash: str, msg_hash: str) -> None:
|
||||
self.domain_hash = bytes.fromhex(domain_hash)
|
||||
self.msg_hash = bytes.fromhex(msg_hash)
|
||||
|
||||
def serialize(self) -> bytes:
|
||||
return b"".join([
|
||||
self.domain_hash,
|
||||
self.msg_hash
|
||||
])
|
||||
@@ -1,115 +0,0 @@
|
||||
from io import BytesIO
|
||||
from typing import List, Optional, Literal, Tuple
|
||||
import PIL.Image as Image
|
||||
|
||||
import speculos.client
|
||||
|
||||
UINT64_MAX: int = 18446744073709551615
|
||||
UINT32_MAX: int = 4294967295
|
||||
UINT16_MAX: int = 65535
|
||||
|
||||
# Association tableau si écran nanos ou nanox
|
||||
PATH_IMG = {"nanos": "nanos", "nanox": "nanox", "nanosp": "nanox"}
|
||||
|
||||
def save_screenshot(cmd, path: str):
|
||||
screenshot = cmd.client.get_screenshot()
|
||||
img = Image.open(BytesIO(screenshot))
|
||||
img.save(path)
|
||||
|
||||
|
||||
def compare_screenshot(cmd, path: str):
|
||||
screenshot = cmd.client.get_screenshot()
|
||||
assert speculos.client.screenshot_equal(path, BytesIO(screenshot))
|
||||
|
||||
|
||||
def parse_sign_response(response : bytes) -> Tuple[bytes, bytes, bytes]:
|
||||
assert len(response) == 65
|
||||
|
||||
offset: int = 0
|
||||
|
||||
v: bytes = response[offset]
|
||||
offset += 1
|
||||
|
||||
r: bytes = response[offset:offset + 32]
|
||||
offset += 32
|
||||
|
||||
s: bytes = response[offset:]
|
||||
|
||||
return (v, r, s)
|
||||
|
||||
|
||||
def bip32_path_from_string(path: str) -> List[bytes]:
|
||||
splitted_path: List[str] = path.split("/")
|
||||
|
||||
if not splitted_path:
|
||||
raise Exception(f"BIP32 path format error: '{path}'")
|
||||
|
||||
if "m" in splitted_path and splitted_path[0] == "m":
|
||||
splitted_path = splitted_path[1:]
|
||||
|
||||
return [int(p).to_bytes(4, byteorder="big") if "'" not in p
|
||||
else (0x80000000 | int(p[:-1])).to_bytes(4, byteorder="big")
|
||||
for p in splitted_path]
|
||||
|
||||
|
||||
def packed_bip32_path_from_string(path: str) -> bytes:
|
||||
bip32_paths = bip32_path_from_string(path)
|
||||
|
||||
return b"".join([
|
||||
len(bip32_paths).to_bytes(1, byteorder="big"),
|
||||
*bip32_paths
|
||||
])
|
||||
|
||||
|
||||
def write_varint(n: int) -> bytes:
|
||||
if n < 0xFC:
|
||||
return n.to_bytes(1, byteorder="little")
|
||||
|
||||
if n <= UINT16_MAX:
|
||||
return b"\xFD" + n.to_bytes(2, byteorder="little")
|
||||
|
||||
if n <= UINT32_MAX:
|
||||
return b"\xFE" + n.to_bytes(4, byteorder="little")
|
||||
|
||||
if n <= UINT64_MAX:
|
||||
return b"\xFF" + n.to_bytes(8, byteorder="little")
|
||||
|
||||
raise ValueError(f"Can't write to varint: '{n}'!")
|
||||
|
||||
|
||||
def read_varint(buf: BytesIO,
|
||||
prefix: Optional[bytes] = None) -> int:
|
||||
b: bytes = prefix if prefix else buf.read(1)
|
||||
|
||||
if not b:
|
||||
raise ValueError(f"Can't read prefix: '{b}'!")
|
||||
|
||||
n: int = {b"\xfd": 2, b"\xfe": 4, b"\xff": 8}.get(b, 1) # default to 1
|
||||
|
||||
b = buf.read(n) if n > 1 else b
|
||||
|
||||
if len(b) != n:
|
||||
raise ValueError("Can't read varint!")
|
||||
|
||||
return int.from_bytes(b, byteorder="little")
|
||||
|
||||
|
||||
def read(buf: BytesIO, size: int) -> bytes:
|
||||
b: bytes = buf.read(size)
|
||||
|
||||
if len(b) < size:
|
||||
raise ValueError(f"Can't read {size} bytes in buffer!")
|
||||
|
||||
return b
|
||||
|
||||
|
||||
def read_uint(buf: BytesIO,
|
||||
bit_len: int,
|
||||
byteorder: Literal['big', 'little'] = 'little') -> int:
|
||||
size: int = bit_len // 8
|
||||
b: bytes = buf.read(size)
|
||||
|
||||
if len(b) < size:
|
||||
raise ValueError(f"Can't read u{bit_len} in buffer!")
|
||||
|
||||
return int.from_bytes(b, byteorder)
|
||||
@@ -1,5 +0,0 @@
|
||||
speculos
|
||||
pytest>=6.1.1,<7.0.0
|
||||
ledgercomm>=1.1.0,<1.2.0
|
||||
ecdsa>=0.16.1,<0.17.0
|
||||
pysha3>=1.0.0,<2.0.0
|
||||
@@ -1,20 +0,0 @@
|
||||
[tool:pytest]
|
||||
addopts = --strict-markers
|
||||
|
||||
[pylint]
|
||||
disable = C0114, # missing-module-docstring
|
||||
C0115, # missing-class-docstring
|
||||
C0116, # missing-function-docstring
|
||||
C0103, # invalid-name
|
||||
R0801, # duplicate-code
|
||||
R0913 # too-many-arguments
|
||||
extension-pkg-whitelist=hid
|
||||
|
||||
[pycodestyle]
|
||||
max-line-length = 90
|
||||
|
||||
[mypy-hid.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-pytest.*]
|
||||
ignore_missing_imports = True
|
||||
Reference in New Issue
Block a user