Skip to content

Commit

Permalink
Use native code for secp256k1 operations (andelf#73)
Browse files Browse the repository at this point in the history
* Use native code for secp256k1 operations

No longer use pure-python ecdsa library

* Add one more test
  • Loading branch information
MrNaif2018 authored Feb 2, 2023
1 parent 02af96d commit 9bc7442
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ classifiers = [
[tool.poetry.dependencies]
python = ">=3.7,<4.0"
base58 = "*"
ecdsa = ">=0.18,<0.19"
coincurve = "*"
eth_abi = ">=4.0.0a,<5.0.0"
httpx = "*"
pycryptodome = "<4"
requests = "*"
httpx = "*"

[tool.poetry.dev-dependencies]
pytest = "*"
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

install_requires = [
'base58',
'ecdsa>=0.18,<0.19',
'coincurve',
'eth_abi>=4.0.0a,<5.0.0',
'httpx',
'pycryptodome<4',
'requests',
'httpx',
]

setup_kwargs = {
Expand Down
14 changes: 14 additions & 0 deletions tests/test_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ def test_signature_sign(signature: Signature, raw_data: bytes, txid: bytes):
assert priv_key.public_key == pub_key


def test_key_derivation():
priv_key = PrivateKey.fromhex(
"fd605fb953fcdabb952be161265a75b8a3ce1c0def2c7db72265f9db9a471be4"
)
assert priv_key.hex() == "fd605fb953fcdabb952be161265a75b8a3ce1c0def2c7db72265f9db9a471be4"
public_key = priv_key.public_key
assert (
public_key.hex() == "ecab6eace957bdb5a50366f449965550b7c30137c77bd429122949eb4"
"a40be06376cce12d2342f9297a25aa186c8eb7b3da65c5923011c503064a3f87943ebfe"
)
assert public_key.to_base58check_address() == "TBDCyrZ1hT1PDDFf2yRABwPrFica5qqPUX"
assert public_key.to_hex_address() == "410d9dee927cc1ea6b6e67f4993fac317826ea0c26"


def test_to_base58check_address():
assert (
to_base58check_address("410000000000000000000000000000000000000000")
Expand Down
128 changes: 87 additions & 41 deletions tronpy/keys/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,55 @@
import ecdsa # type: ignore
from Crypto.Hash import keccak
import hashlib
import base58
from collections.abc import ByteString, Hashable
import random
from typing import Any, Union
from typing import Any, Union, Iterator
from coincurve import PrivateKey as CoincurvePrivateKey, PublicKey as CoincurvePublicKey

from tronpy.exceptions import BadKey, BadSignature, BadAddress

SECPK1_N = 115792089237316195423570985008687907852837564279074904382605163141518161494337


def coerce_low_s(value: int) -> int:
"""Coerce the s component of an ECDSA signature into its low-s form.
See https://bitcoin.stackexchange.com/questions/83408/in-ecdsa-why-is-r-%E2%88%92s-mod-n-complementary-to-r-s
"""
return min(value, -value % SECPK1_N)


def two_int_sequence_encoder(signature_r: int, signature_s: int) -> Iterator[int]:
# Sequence tag
yield 0x30

encoded1 = bytes(_encode_int(signature_r))
encoded2 = bytes(_encode_int(signature_s))

# Sequence length
yield len(encoded1) + len(encoded2)

yield from encoded1
yield from encoded2


def int_to_big_endian(value: int) -> bytes:
return value.to_bytes((value.bit_length() + 7) // 8 or 1, "big")


def _encode_int(primitive: int) -> Iterator[int]:
# Integer tag
yield 0x02

encoded = int_to_big_endian(primitive)
if encoded[0] >= 128:
# Indicate that integer is positive (it always is, but doesn't always need the flag)
yield len(encoded) + 1
yield 0x00
else:
yield len(encoded)

yield from encoded


def keccak256(data: bytes) -> bytes:
hasher = keccak.new(digest_bits=256)
Expand Down Expand Up @@ -208,42 +250,24 @@ def __init__(self, private_key_bytes: bytes):

self._raw_key = private_key_bytes

priv_key = ecdsa.SigningKey.from_string(self._raw_key, curve=ecdsa.SECP256k1)
self.public_key = PublicKey(priv_key.get_verifying_key().to_string())
priv_key = CoincurvePrivateKey(self._raw_key)
self.public_key = PublicKey(priv_key.public_key.format(compressed=False)[1:])

super().__init__()

def sign_msg(self, message: bytes) -> "Signature":
"""Sign a raw message."""
sk = ecdsa.SigningKey.from_string(self._raw_key, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256)
signature = sk.sign_deterministic(message)

# recover address to get rec_id
vks = ecdsa.VerifyingKey.from_public_key_recovery(
signature, message, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256
)
for v, pk in enumerate(vks):
if pk.to_string() == self.public_key:
break

signature += bytes([v])
return Signature(signature)
message_hash = sha256(message)
return self.sign_msg_hash(message_hash)

def sign_msg_hash(self, message_hash: bytes) -> "Signature":
"""Sign a message hash(sha256)."""
sk = ecdsa.SigningKey.from_string(self._raw_key, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256)
signature = sk.sign_digest_deterministic(message_hash)

# recover address to get rec_id
vks = ecdsa.VerifyingKey.from_public_key_recovery_with_digest(
signature, message_hash, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256
private_key_bytes = self.to_bytes()
signature_bytes = CoincurvePrivateKey(private_key_bytes).sign_recoverable(
message_hash,
hasher=None,
)
for v, pk in enumerate(vks):
if pk.to_string() == self.public_key:
break

signature += bytes([v])
return Signature(signature)
return Signature(signature_bytes)

@classmethod
def random(cls) -> "PrivateKey":
Expand Down Expand Up @@ -275,27 +299,49 @@ def __init__(self, signature_bytes: bytes):

def recover_public_key_from_msg(self, message: bytes) -> PublicKey:
"""Recover public key(address) from message and signature."""
vks = ecdsa.VerifyingKey.from_public_key_recovery(
self._raw_signature[:64], message, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256
)
return PublicKey(vks[self.v].to_string())
message_hash = sha256(message)
return self.recover_public_key_from_msg_hash(message_hash)

def recover_public_key_from_msg_hash(self, message_hash: bytes) -> PublicKey:
"""Recover public key(address) from message hash and signature."""
vks = ecdsa.VerifyingKey.from_public_key_recovery_with_digest(
self._raw_signature[:64], message_hash, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256
)
return PublicKey(vks[self.v].to_string())
signature_bytes = self.to_bytes()
try:
public_key_bytes = CoincurvePublicKey.from_signature_and_message(
signature_bytes,
message_hash,
hasher=None,
).format(compressed=False)[1:]
except (ValueError, Exception) as err:
# `coincurve` can raise `ValueError` or `Exception` dependending on
# how the signature is invalid.
raise BadSignature(str(err))
public_key = PublicKey(public_key_bytes)
return public_key

def verify_msg(self, message: bytes, public_key: PublicKey) -> bool:
"""Verify message and signature."""
vk = ecdsa.VerifyingKey.from_string(public_key.to_bytes(), curve=ecdsa.SECP256k1)
return vk.verify(self._raw_signature[:64], message, hashfunc=hashlib.sha256)
message_hash = sha256(message)
return self.verify_msg_hash(message_hash, public_key)

def verify_msg_hash(self, message_hash: bytes, public_key: PublicKey) -> bool:
"""Verify message hash and signature."""
vk = ecdsa.VerifyingKey.from_string(public_key.to_bytes(), curve=ecdsa.SECP256k1)
return vk.verify_digest(self._raw_signature[:64], message_hash)
# coincurve rejects signatures with a high s, so convert to the equivalent low s form
low_s = coerce_low_s(self.s)
der_encoded_signature = bytes(two_int_sequence_encoder(self.r, low_s))
coincurve_public_key = CoincurvePublicKey(b"\x04" + public_key.to_bytes())
return coincurve_public_key.verify(
der_encoded_signature,
message_hash,
hasher=None,
)

@property
def r(self) -> int:
return int.from_bytes(self._raw_signature[:32], "big")

@property
def s(self) -> int:
return int.from_bytes(self._raw_signature[32:64], "big")

@property
def v(self) -> int:
Expand Down

0 comments on commit 9bc7442

Please sign in to comment.