diff --git a/.gitignore b/.gitignore index 146df8d..c000231 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Created by https://www.toptal.com/developers/gitignore/api/macos,linux,windows,intellij+all,visualstudiocode,eclipse,java,gradle,node,git # Edit at https://www.toptal.com/developers/gitignore?templates=macos,linux,windows,intellij+all,visualstudiocode,eclipse,java,gradle,node,git +*.iso + ### Eclipse ### .metadata bin/ diff --git a/src/main/java/vertx/bittorrent/ClientVerticle.java b/src/main/java/vertx/bittorrent/ClientVerticle.java index 9f364da..73bd3ab 100644 --- a/src/main/java/vertx/bittorrent/ClientVerticle.java +++ b/src/main/java/vertx/bittorrent/ClientVerticle.java @@ -10,9 +10,11 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -32,6 +34,7 @@ public class ClientVerticle extends AbstractVerticle { private NetServer netServer; private int maxConnections = 50; + private int maxDownloadingPeers = 6; private SecureRandom random = new SecureRandom(); private long timerId = -1; @@ -95,6 +98,9 @@ public void start() throws Exception { double totalDownloadRate = 0.0; double totalUploadRate = 0.0; + double totalAvgDownloadRate = 0.0; + double totalAvgUploadRate = 0.0; + for (var connection : connections) { int deltaBytes = connection.getBytesDownloaded() - connection.getPreviousBytesDownloaded(); int deltaBytesUploaded = connection.getBytesUploaded() - connection.getPreviousBytesUploaded(); @@ -107,9 +113,14 @@ public void start() throws Exception { connection.setPreviousBytesDownloaded(connection.getBytesDownloaded()); connection.setPreviousBytesUploaded(connection.getBytesUploaded()); + + totalAvgDownloadRate += connection.getAverageDownloadRate(); + totalAvgUploadRate += connection.getAverageUploadRate(); } long completedBytes = clientState.getCompletedBytes(); + long remainingBytes = clientState.getTorrent().getLength() - completedBytes; + long remainingTime = (long) (remainingBytes / totalAvgDownloadRate); double downloadedRatio = completedBytes / (double) clientState.getTorrent().getLength(); @@ -117,6 +128,8 @@ public void start() throws Exception { long seeding = getSeedingPeersCount(); long leeching = getLeechingPeersCount(); + long downloading = getDownloadingPeersCount(); + log.info( "{}% ({} / {}) (↓ {}/s | ↑ {}/s) ({} connected peers, {} seeding, {} leeching)", progress, @@ -128,6 +141,93 @@ public void start() throws Exception { seeding, leeching); }); + + vertx.setPeriodic(10_000, id -> { + for (var connection : connections) { + connection.keepAlive(); + } + + int downloadingPeersCount = getDownloadingPeersCount(); + + if (downloadingPeersCount < maxDownloadingPeers) { + return; + } + + // check if we have enough connected peers to rotate on + if (connections.size() > downloadingPeersCount) { + + List untestedPeers = connections.stream() + .filter(conn -> !conn.isDownloading()) + .filter(conn -> conn.getDownloadingDuration() < 10.0) + .filter(this::hasRequiredPieces) + .sorted(Comparator.comparingDouble(PeerConnection::getRemoteUnchokedDuration) + .thenComparingDouble(PeerConnection::getTotalWaitingDuration)) + .collect(Collectors.toCollection(ArrayList::new)); + + List testedPeers = connections.stream() + .filter(conn -> !conn.isDownloading()) + .filter(conn -> conn.getDownloadingDuration() >= 10.0) + .filter(this::hasRequiredPieces) + .sorted(Comparator.comparingDouble(PeerConnection::getAverageDownloadRate) + .reversed()) + .peek(conn -> log.debug( + "[{}] Tested: {}/s, duration: {}", + conn.getPeer(), + ByteFormat.format(conn.getAverageDownloadRate()), + conn.getDownloadingDuration())) + .collect(Collectors.toCollection(ArrayList::new)); + + List seedingPeers = connections.stream() + .filter(conn -> conn.isDownloading()) + .filter(conn -> conn.getDownloadingDuration() >= 10.0) + .sorted(Comparator.comparingDouble(PeerConnection::getAverageDownloadRate)) + .peek(conn -> log.debug( + "[{}] Monitoring: {}/s, duration: {}", + conn.getPeer(), + ByteFormat.format(conn.getAverageDownloadRate()), + conn.getDownloadingDuration())) + .collect(Collectors.toCollection(ArrayList::new)); + + int rotatingPeers = Math.max(0, seedingPeers.size() - maxDownloadingPeers + 2); + while (rotatingPeers > 2) { + var conn = seedingPeers.remove(0); + conn.setDownloading(false); + rotatingPeers--; + log.debug( + "[{}] Not interested in peer with download rate: {}/s", + conn.getPeer(), + ByteFormat.format(conn.getAverageDownloadRate())); + } + + while (rotatingPeers > 0 && !untestedPeers.isEmpty()) { + var conn = untestedPeers.remove(0); + var seedingConn = seedingPeers.remove(0); + conn.setDownloading(true); + seedingConn.setDownloading(false); + requestNextPiece(conn); + rotatingPeers--; + log.debug("[{}] Testing download speed of peer", conn.getPeer()); + } + + while (rotatingPeers > 0 && !testedPeers.isEmpty()) { + var seedingConn = seedingPeers.remove(0); + var testedConn = testedPeers.remove(0); + + if (testedConn.getAverageDownloadRate() > seedingConn.getAverageDownloadRate()) { + testedConn.setDownloading(true); + requestNextPiece(testedConn); + seedingConn.setDownloading(false); + rotatingPeers--; + log.debug( + "[{}] Switching to faster peer with download rate: {}/s", + testedConn.getPeer(), + ByteFormat.format(testedConn.getAverageDownloadRate())); + } else { + break; + } + } + } + }); } @Override @@ -171,6 +271,12 @@ private int getSeedingPeersCount() { .count(); } + private int getDownloadingPeersCount() { + return (int) connections.stream() + .filter(conn -> conn.isDownloading() && !conn.isRemoteChoked()) + .count(); + } + private int getLeechingPeersCount() { return (int) connections.stream().filter(conn -> !conn.isChoked()).count(); } @@ -351,8 +457,15 @@ private void setupPeerConnection(PeerConnection connection) { } }); + connection.onChoked(v -> { + connection.setDownloading(false); + }); + connection.onUnchoked(v -> { - requestNextPiece(connection); + if (getDownloadingPeersCount() < maxDownloadingPeers) { + connection.setDownloading(true); + requestNextPiece(connection); + } }); connection.onClosed(v -> connections.remove(connection)); diff --git a/src/main/java/vertx/bittorrent/PeerConnection.java b/src/main/java/vertx/bittorrent/PeerConnection.java index 6a9a9b8..87264a0 100644 --- a/src/main/java/vertx/bittorrent/PeerConnection.java +++ b/src/main/java/vertx/bittorrent/PeerConnection.java @@ -70,10 +70,15 @@ public class PeerConnection { @Getter @Setter private float downloadRate = 0.0f; + private long downloadStartedAt = -1; + private long downloadingDuration = 0; private int currentRequestCount = 0; private int requestLimit = 6; + @Getter + private boolean downloading = false; + private Map pieceStates = new HashMap<>(); private Handler handshakeHandler; @@ -112,6 +117,26 @@ public PeerConnection(NetSocket socket, ClientState clientState, Peer peer) { }); } + public double getAverageDownloadRate() { + double duration = getDownloadingDuration(); + + if (duration == 0.0) { + return 0.0; + } + + return bytesDownloaded / duration; + } + + public double getDownloadingDuration() { + long duration = downloadingDuration; + + if (downloadStartedAt != -1) { + duration += (System.currentTimeMillis() - downloadStartedAt); + } + + return duration / 1000.0; + } + public boolean isPieceRequested(int index) { return pieceStates.containsKey(index); } @@ -174,6 +199,19 @@ public Future close() { return socket.close(); } + public void setDownloading(boolean downloading) { + if (downloading && !this.downloading) { + downloadStartedAt = System.currentTimeMillis(); + } + + if (!downloading && this.downloading) { + downloadingDuration += (System.currentTimeMillis() - downloadStartedAt); + downloadStartedAt = -1; + } + + this.downloading = downloading; + } + public void handshake() { if (!handshakeSent) { handshakeSent = true; @@ -234,7 +272,7 @@ public void requestPiece(int pieceIndex) { } private boolean canRequest() { - if (remoteChoked || pieceStates.isEmpty()) { + if (!downloading || remoteChoked || pieceStates.isEmpty()) { return false; }