Skip to content

Commit

Permalink
feat: achieve stable state while receiving transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Georgi Grigorov committed Dec 2, 2024
1 parent 403232a commit 553d315
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ private void handleCatchupResponseMessage(byte[] message, PeerId peerId) {
*/
public void writeHandshakeToStream(Stream stream, PeerId peerId) {
byte[] handshake = new byte[]{
//TODO: Strange source for the node role -> hostConfig looks like better solution
(byte) handshakeBuilder.getBlockAnnounceHandshake().getNodeRole()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ public void sendHandshake() {
* Sends a neighbour message over the controller stream.
*/
public void sendTransactionsMessage(byte[] encodedTransactionMessage) {
engine.writeTransactionsMessage(stream, stream.remotePeerId(), encodedTransactionMessage);
engine.writeTransactionsMessage(stream, encodedTransactionMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public TransactionEngine() {
* @param stream stream, where the request was received
*/
public void receiveRequest(byte[] message, Stream stream) {
log.log(Level.INFO, "TRANSACTION REQUEST: ", message);
log.log(Level.INFO, "TRANSACTION REQUEST: " + new String(message));

if (message == null) {
if (message == null || message.length == 0) {
log.log(Level.WARNING,
String.format("Transactions message is null from Peer %s", stream.remotePeerId()));
return;
}
log.log(Level.FINE, "Transaction message length:" + message.length);
log.log(Level.INFO, "Transaction message length:" + message.length);

if (stream.isInitiator()) {
handleInitiatorStreamMessage(message, stream);
Expand All @@ -66,6 +66,8 @@ public void receiveRequest(byte[] message, Stream stream) {
}

private void handleInitiatorStreamMessage(byte[] message, Stream stream) {
log.log(Level.INFO, "handleInitiatorStreamMessage: PeerId " + stream.remotePeerId());

PeerId peerId = stream.remotePeerId();

if (!isHandshake(message)) {
Expand All @@ -75,13 +77,22 @@ private void handleInitiatorStreamMessage(byte[] message, Stream stream) {
}

connectionManager.addTransactionsStream(stream);
log.log(Level.INFO, "Transaction msg: " + new String(message));
log.log(Level.INFO, "Received transactions handshake from " + peerId);
//TODO Replace empty transaction messages once we have validation working.
stream.writeAndFlush(new byte[]{});
// stream.writeAndFlush(message);
log.log(Level.INFO, "Still working");

// if (connectionManager.isTransactionsConnected(peerId)) {
// byte[] transactionsForPeer = transactionProcessor.getTransactionsForPeer(peerId);
// stream.writeAndFlush(transactionsForPeer);
// log.log(Level.INFO, "Still working 1");
// }
// log.log(Level.INFO, "Still working 2");
}

private void handleResponderStreamMessage(byte[] message, Stream stream) {
log.log(Level.INFO, "handleResponderStreamMessage: PeerId" + stream.remotePeerId());

PeerId peerId = stream.remotePeerId();
boolean connectedToPeer = connectionManager.isTransactionsConnected(peerId);

Expand All @@ -92,32 +103,46 @@ private void handleResponderStreamMessage(byte[] message, Stream stream) {
}

if (isHandshake(message)) {
log.log(Level.INFO, "handleResponderStreamMessage handleHandshake: PeerId" + stream.remotePeerId());

handleHandshake(peerId, stream);
} else {
handleTransactionMessage(message, peerId);
log.log(Level.INFO, "handleResponderStreamMessage handleTransactionMessage: PeerId" + stream.remotePeerId());

handleTransactionMessage(message, stream);
}
}

//TODO: Think about this method!
private void handleHandshake(PeerId peerId, Stream stream) {
if (connectionManager.isTransactionsConnected(peerId)) {
log.log(Level.INFO, "Received existing transactions handshake from " + peerId);
stream.close();
} else {
connectionManager.addTransactionsStream(stream);
log.log(Level.INFO, "Received transactions handshake from " + peerId);
}

connectionManager.addTransactionsStream(stream);
log.log(Level.INFO, "Received transactions handshake from " + peerId);

writeHandshakeToStream(stream, peerId);
}

private void handleTransactionMessage(byte[] message, PeerId peerId) {
private void handleTransactionMessage(byte[] message, Stream stream) {
ScaleCodecReader reader = new ScaleCodecReader(message);
ExtrinsicArray transactions = reader.read(new TransactionReader());
log.log(Level.FINE, "Received " + transactions.getExtrinsics().length + " transactions from Peer "
+ peerId);
log.log(Level.INFO, "Received " + transactions.getExtrinsics().length + " transactions from Peer "
+ stream.remotePeerId());

synchronized (LOCK) {
transactionProcessor.handleExternalTransactions(transactions.getExtrinsics(), peerId);
transactionProcessor.handleExternalTransactions(transactions.getExtrinsics(), stream.remotePeerId());
}

//TODO: Broadcast received transaction to other peers if they are valid and the propagate flag is true
// for (Extrinsic extrinsic : transactions.getExtrinsics()) {
// ExtrinsicArray extrinsicArray = new ExtrinsicArray(new Extrinsic[]{extrinsic});
// byte[] scaleMessage = ScaleUtils.Encode.encode(new TransactionWriter(), extrinsicArray);
// //TODO: we should send the transactions to other streams, not the one that is the sender
//// writeTransactionsMessage(stream, scaleMessage);
// }
}

/**
Expand All @@ -128,19 +153,19 @@ private void handleTransactionMessage(byte[] message, PeerId peerId) {
*/
public void writeHandshakeToStream(Stream stream, PeerId peerId) {
byte[] handshake = new byte[]{};

log.log(Level.INFO, "Sending transactions handshake to " + peerId);
stream.writeAndFlush(handshake);
}

/**
* Ï
* Send our Transactions message from {@link WarpSyncState} on a given <b>responder</b> stream.
*
* @param stream <b>responder</b> stream to write the message to
* @param peerId peer to send to
* @param stream <b>responder</b> stream to write the message to
* @param encodedTransactionMessage scale encoded transaction message
*/
public void writeTransactionsMessage(Stream stream, PeerId peerId, byte[] encodedTransactionMessage) {
log.log(Level.INFO, "Sending transaction message to peer " + peerId);
public void writeTransactionsMessage(Stream stream, byte[] encodedTransactionMessage) {
log.log(Level.INFO, "Sending transaction message to peer " + stream.remotePeerId());
stream.writeAndFlush(encodedTransactionMessage);
}

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/limechain/transaction/TransactionProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
import com.limechain.exception.misc.RuntimeApiVersionException;
import com.limechain.exception.transaction.TransactionValidationException;
import com.limechain.network.PeerMessageCoordinator;
import com.limechain.network.protocol.transaction.scale.TransactionWriter;
import com.limechain.network.protocol.warp.dto.Block;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.runtime.Runtime;
import com.limechain.runtime.version.ApiVersionName;
import com.limechain.storage.block.BlockState;
import com.limechain.transaction.dto.Extrinsic;
import com.limechain.transaction.dto.ExtrinsicArray;
import com.limechain.transaction.dto.TransactionSource;
import com.limechain.transaction.dto.TransactionValidationRequest;
import com.limechain.transaction.dto.TransactionValidationResponse;
import com.limechain.transaction.dto.ValidTransaction;
import com.limechain.utils.scale.ScaleUtils;
import io.emeraldpay.polkaj.types.Hash256;
import io.libp2p.core.PeerId;
import lombok.extern.java.Log;
Expand All @@ -24,6 +27,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Log
@Component
Expand All @@ -39,6 +43,20 @@ public TransactionProcessor(TransactionState transactionState, PeerMessageCoordi
this.blockState = BlockState.getInstance();
}

public byte[] getTransactionsForPeer(PeerId peerId) {
Extrinsic[] extrinsics = Stream.concat(
Stream.of(Arrays.stream(transactionState.pending())
.filter(t -> !t.getIgnore().contains(peerId))
.map(ValidTransaction::getExtrinsic)),
Stream.of(Arrays.stream(transactionState.pendingInPool())
.filter(t -> !t.getIgnore().contains(peerId))
.map(ValidTransaction::getExtrinsic)))
.toArray(Extrinsic[]::new);

ExtrinsicArray extrinsicArray = new ExtrinsicArray(extrinsics);
return ScaleUtils.Encode.encode(new TransactionWriter(), extrinsicArray);
}

public void handleExternalTransactions(Extrinsic[] extrinsics, PeerId peerId) {
for (Extrinsic current : extrinsics) {

Expand Down

0 comments on commit 553d315

Please sign in to comment.