Skip to content

Commit

Permalink
Merge pull request hyperledger#743 from Toktar/task-1327-req-idr-to-t…
Browse files Browse the repository at this point in the history
…xn-change

[WIP][INDY-1327] migration of key-value in ReqIdrToTxn
  • Loading branch information
ashcherbakov authored Jun 6, 2018
2 parents cc0d4fa + 2712e2b commit 7f25f89
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 15 deletions.
105 changes: 96 additions & 9 deletions data/migrations/deb/1_3_428_to_1_3_429.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,31 @@
import sys
import tarfile
import traceback
import copy
from _sha256 import sha256

from common.serializers.serialization import ledger_txn_serializer
from common.serializers.serialization import ledger_txn_serializer, serialize_msg_for_signing
from indy_common.config_helper import NodeConfigHelper
from indy_common.config_util import getConfig
from plenum.common.txn_util import transform_to_new_format
from indy_common.constants import CONFIG_LEDGER_ID
from indy_node.server.config_req_handler import ConfigReqHandler
from indy_node.server.domain_req_handler import DomainReqHandler
from indy_node.server.pool_req_handler import PoolRequestHandler
from plenum.common.constants import TXN_TYPE, TXN_PAYLOAD, \
TXN_PAYLOAD_METADATA, TXN_PAYLOAD_METADATA_DIGEST, POOL_LEDGER_ID, DOMAIN_LEDGER_ID
from plenum.common.txn_util import transform_to_new_format, get_from, get_req_id, get_payload_data, \
get_protocol_version, get_seq_no, get_type
from plenum.common.types import f, OPERATION
from plenum.persistence.req_id_to_txn import ReqIdrToTxn
from storage.helper import initKeyValueStorage
from storage.kv_store_rocksdb_int_keys import KeyValueStorageRocksdbIntKeys
from stp_core.common.log import getlogger

logger = getlogger()

ENV_FILE_PATH = "/etc/indy/indy.env"
ledger_types = ['pool', 'domain', 'config']
config = getConfig()


def set_own_perm(usr, dir):
Expand Down Expand Up @@ -54,10 +67,58 @@ def get_node_name():
return node_name


def get_ledger_id_by_txn_type(txn_type):

def get_types_for_req_handler(req_handler):
return list(req_handler.write_types) + list(req_handler.query_types)

if txn_type in get_types_for_req_handler(PoolRequestHandler):
return POOL_LEDGER_ID
elif txn_type in get_types_for_req_handler(DomainReqHandler):
return DOMAIN_LEDGER_ID
elif txn_type in get_types_for_req_handler(ConfigReqHandler):
return CONFIG_LEDGER_ID
else:
logger.error("Unknown txn_type: {}".format(txn_type))
logger.error("Cannot write txn into SeqNoDB, because cannot define ledger_id")
sys.exit(1)


def migrate_txn_log(db_dir, db_name):

def put_into_seq_no_db(txn):
# If there is no reqId, then it's genesis txn
if get_req_id(txn) is None:
return
txn_new = copy.deepcopy(txn)
operation = get_payload_data(txn_new)
operation[TXN_TYPE] = get_type(txn_new)
dct = {
f.IDENTIFIER.nm: get_from(txn_new),
f.REQ_ID.nm: get_req_id(txn_new),
OPERATION: operation,
}
if get_protocol_version(txn_new) is not None:
dct[f.PROTOCOL_VERSION.nm] = get_protocol_version(txn_new)
digest = sha256(serialize_msg_for_signing(dct)).hexdigest().encode()
seq_no = get_seq_no(txn_new).encode()
ledger_id = get_ledger_id_by_txn_type(operation[TXN_TYPE])
line_to_record = str(ledger_id) + ReqIdrToTxn.delimiter + str(seq_no)
dest_seq_no_db_storage.put(digest, line_to_record)
return digest

new_db_name = db_name + '_new'
old_path = os.path.join(db_dir, db_name)
new_path = os.path.join(db_dir, new_db_name)
new_seqno_db_name = config.seqNoDbName + '_new'
try:
dest_seq_no_db_storage = initKeyValueStorage(config.reqIdToTxnStorage,
db_dir,
new_seqno_db_name)
except Exception:
logger.error(traceback.print_exc())
logger.error("Could not open new seq_no_db storage")
return False

# open new and old ledgers
try:
Expand All @@ -77,17 +138,24 @@ def migrate_txn_log(db_dir, db_name):
# put values from old ledger to the new one
try:
for key, val in src_storage.iterator():
key = key.decode()
val = ledger_txn_serializer.deserialize(val)
new_val = transform_to_new_format(txn=val, seq_no=int(key))
digest = put_into_seq_no_db(new_val)
# add digest into txn
if get_req_id(new_val):
new_val[TXN_PAYLOAD][TXN_PAYLOAD_METADATA][TXN_PAYLOAD_METADATA_DIGEST] = digest
new_val = ledger_txn_serializer.serialize(new_val)
dest_storage.put(key, new_val)

except Exception:
logger.error(traceback.print_exc())
logger.error("Could not put key/value to the new ledger '{}'".format(db_name))
return False

src_storage.close()
dest_storage.close()
dest_seq_no_db_storage.close()

# Remove old ledger
try:
Expand All @@ -106,12 +174,29 @@ def migrate_txn_log(db_dir, db_name):
logger.error("Could not rename temporary new ledger from '{}' to '{}'"
.format(new_path, old_path))
return False

set_own_perm("indy", old_path)
try:
set_own_perm("indy", old_path)
except Exception:
pass

return True


def rename_seq_no_db(db_dir):
old_seqno_path = os.path.join(db_dir, config.seqNoDbName)
new_seqno_db_name = config.seqNoDbName + '_new'
new_seqno_path = os.path.join(db_dir, new_seqno_db_name)
try:
shutil.move(new_seqno_path, old_seqno_path)
except Exception:
logger.error(traceback.print_exc())
logger.error("Could not rename temporary new seq_no_db from '{}' to '{}'"
.format(new_seqno_path, old_seqno_path))
return False

set_own_perm("indy", old_seqno_path)


def migrate_txn_logs(ledger_dir):
# Migrate transaction logs, they use integer keys
for ledger_type in ledger_types:
Expand Down Expand Up @@ -144,7 +229,7 @@ def migrate_states(ledger_dir):
return True


def migrate_ts_store(ledger_dir, config):
def migrate_ts_store(ledger_dir):
# just remove, since state root hash may be changed
for ledger_type in ledger_types:
db = os.path.join(ledger_dir, config.stateTsDbName)
Expand All @@ -153,7 +238,7 @@ def migrate_ts_store(ledger_dir, config):
return True


def migrate_bls_signature_store(ledger_dir, config):
def migrate_bls_signature_store(ledger_dir):
# just remove, since state root hash may be changed
for ledger_type in ledger_types:
db = os.path.join(ledger_dir, config.stateSignatureDbName)
Expand All @@ -178,7 +263,6 @@ def migrate_all():
logger.error("Could not get node name")
return False

config = getConfig()
config_helper = NodeConfigHelper(node_name, config)

ledger_dir = config_helper.ledger_dir
Expand All @@ -196,6 +280,9 @@ def migrate_all():
logger.error("Txn log migration from old to new format failed!")
return False

# Rename new seq_no_db into old
rename_seq_no_db(ledger_dir)

# 3. migrate hash store
if migrate_hash_stores(ledger_dir):
logger.info("All hash stores migrated successfully from old to new transaction format")
Expand All @@ -211,14 +298,14 @@ def migrate_all():
return False

# 5. migrate ts store
if migrate_ts_store(ledger_dir, config):
if migrate_ts_store(ledger_dir):
logger.info("Timestamp store migrated successfully from old to new transaction format")
else:
logger.error("Timestamp store migration from old to new format failed!")
return False

# 6. migrate bls signature xtore
if migrate_bls_signature_store(ledger_dir, config):
if migrate_bls_signature_store(ledger_dir):
logger.info("BLS signature store migrated successfully from old to new transaction format")
else:
logger.error("BLS signature store migration from old to new format failed!")
Expand Down
27 changes: 22 additions & 5 deletions indy_node/test/state_proof/test_state_proofs_for_get_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,15 @@ def test_state_proofs_for_get_attr(request_handler):
raw_attribute = '{"last_name":"Anderson"}'
seq_no = 0
txn_time = int(time.time())
identifier = "6ouriXMZkLeHsuXrN1X1fd"
txn = {
TXN_TYPE: ATTRIB,
TARGET_NYM: nym,
RAW: raw_attribute,
}
txn = append_txn_metadata(reqToTxn(Request(operation=txn)),
txn = append_txn_metadata(reqToTxn(Request(operation=txn,
protocolVersion=CURRENT_PROTOCOL_VERSION,
identifier=identifier)),
seq_no=seq_no, txn_time=txn_time)
request_handler._addAttr(txn)
request_handler.state.commit()
Expand Down Expand Up @@ -135,6 +138,7 @@ def test_state_proofs_for_get_claim_def(request_handler):

seq_no = 0
txn_time = int(time.time())
identifier = "6ouriXMZkLeHsuXrN1X1fd"

schema_seqno = 0
signature_type = 'CL'
Expand All @@ -146,7 +150,9 @@ def test_state_proofs_for_get_claim_def(request_handler):
REF: schema_seqno,
DATA: key_components
}
txn = append_txn_metadata(reqToTxn(Request(operation=txn)),
txn = append_txn_metadata(reqToTxn(Request(operation=txn,
protocolVersion=CURRENT_PROTOCOL_VERSION,
identifier=identifier)),
seq_no=seq_no, txn_time=txn_time)
txn = append_payload_metadata(txn, frm=nym)

Expand Down Expand Up @@ -184,6 +190,7 @@ def test_state_proofs_for_get_schema(request_handler):

seq_no = 0
txn_time = int(time.time())
identifier = "6ouriXMZkLeHsuXrN1X1fd"

schema_name = "schema_a"
schema_version = "1.0"
Expand All @@ -194,7 +201,9 @@ def test_state_proofs_for_get_schema(request_handler):
TXN_TYPE: SCHEMA,
DATA: data,
}
txn = append_txn_metadata(reqToTxn(Request(operation=txn)),
txn = append_txn_metadata(reqToTxn(Request(operation=txn,
protocolVersion=CURRENT_PROTOCOL_VERSION,
identifier=identifier)),
seq_no=seq_no, txn_time=txn_time)
txn = append_payload_metadata(txn, frm=nym)

Expand Down Expand Up @@ -228,6 +237,8 @@ def test_state_proofs_for_get_schema(request_handler):

def prep_multi_sig(request_handler, nym, role, verkey, seq_no):
txn_time = int(time.time())
identifier = "6ouriXMZkLeHsuXrN1X1fd"

# Adding nym
data = {
f.IDENTIFIER.nm: nym,
Expand All @@ -236,7 +247,9 @@ def prep_multi_sig(request_handler, nym, role, verkey, seq_no):
f.SEQ_NO.nm: seq_no,
TXN_TIME: txn_time,
}
txn = append_txn_metadata(reqToTxn(Request(operation=data)),
txn = append_txn_metadata(reqToTxn(Request(operation=data,
protocolVersion=CURRENT_PROTOCOL_VERSION,
identifier=identifier)),
seq_no=seq_no, txn_time=txn_time)
txn = append_payload_metadata(txn, frm=nym)
request_handler.updateNym(nym, txn)
Expand Down Expand Up @@ -303,6 +316,8 @@ def test_no_state_proofs_if_protocol_version_less(request_handler):
nym = 'Gw6pDLhcBcoQesN72qfotTgFa7cbuqZpkX3Xo6pLhPhv'
role = "2"
verkey = "~7TYfekw4GUagBnBVCqPjiC"
identifier = "6ouriXMZkLeHsuXrN1X1fd"

seq_no = 0
txn_time = int(time.time())
# Adding nym
Expand All @@ -313,7 +328,9 @@ def test_no_state_proofs_if_protocol_version_less(request_handler):
f.SEQ_NO.nm: seq_no,
TXN_TIME: txn_time,
}
txn = append_txn_metadata(reqToTxn(Request(operation=data)),
txn = append_txn_metadata(reqToTxn(Request(operation=data,
protocolVersion=CURRENT_PROTOCOL_VERSION,
identifier=identifier)),
seq_no=seq_no, txn_time=txn_time)
txn = append_payload_metadata(txn, frm=nym)
request_handler.updateNym(nym, txn)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
data_files=[(
(BASE_DIR, ['data/nssm_original.exe'])
)],
install_requires=['indy-plenum-dev==1.2.382',
install_requires=['indy-plenum-dev==1.2.388',
'indy-anoncreds-dev==1.0.32',
'python-dateutil',
'timeout-decorator==0.4.0'],
Expand Down

0 comments on commit 7f25f89

Please sign in to comment.