Skip to content

Commit

Permalink
feat: limit number of downloading peers, rotate on downloading peers
Browse files Browse the repository at this point in the history
  • Loading branch information
t-karaca committed Aug 11, 2024
1 parent c9c0600 commit 2f3433a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Created by https://www.toptal.com/developers/gitignore/api/macos,linux,windows,intellij+all,visualstudiocode,eclipse,java,gradle,node,git
# Edit at https://www.toptal.com/developers/gitignore?templates=macos,linux,windows,intellij+all,visualstudiocode,eclipse,java,gradle,node,git

*.iso

### Eclipse ###
.metadata
bin/
Expand Down
115 changes: 114 additions & 1 deletion src/main/java/vertx/bittorrent/ClientVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -32,6 +34,7 @@ public class ClientVerticle extends AbstractVerticle {
private NetServer netServer;

private int maxConnections = 50;
private int maxDownloadingPeers = 6;
private SecureRandom random = new SecureRandom();

private long timerId = -1;
Expand Down Expand Up @@ -95,6 +98,9 @@ public void start() throws Exception {
double totalDownloadRate = 0.0;
double totalUploadRate = 0.0;

double totalAvgDownloadRate = 0.0;
double totalAvgUploadRate = 0.0;

for (var connection : connections) {
int deltaBytes = connection.getBytesDownloaded() - connection.getPreviousBytesDownloaded();
int deltaBytesUploaded = connection.getBytesUploaded() - connection.getPreviousBytesUploaded();
Expand All @@ -107,16 +113,23 @@ public void start() throws Exception {

connection.setPreviousBytesDownloaded(connection.getBytesDownloaded());
connection.setPreviousBytesUploaded(connection.getBytesUploaded());

totalAvgDownloadRate += connection.getAverageDownloadRate();
totalAvgUploadRate += connection.getAverageUploadRate();
}

long completedBytes = clientState.getCompletedBytes();
long remainingBytes = clientState.getTorrent().getLength() - completedBytes;
long remainingTime = (long) (remainingBytes / totalAvgDownloadRate);
double downloadedRatio =
completedBytes / (double) clientState.getTorrent().getLength();

String progress = String.format("%.02f", downloadedRatio * 100.0);

long seeding = getSeedingPeersCount();
long leeching = getLeechingPeersCount();
long downloading = getDownloadingPeersCount();

log.info(
"{}% ({} / {}) (↓ {}/s | ↑ {}/s) ({} connected peers, {} seeding, {} leeching)",
progress,
Expand All @@ -128,6 +141,93 @@ public void start() throws Exception {
seeding,
leeching);
});

vertx.setPeriodic(10_000, id -> {
for (var connection : connections) {
connection.keepAlive();
}

int downloadingPeersCount = getDownloadingPeersCount();

if (downloadingPeersCount < maxDownloadingPeers) {
return;
}

// check if we have enough connected peers to rotate on
if (connections.size() > downloadingPeersCount) {

List<PeerConnection> untestedPeers = connections.stream()
.filter(conn -> !conn.isDownloading())
.filter(conn -> conn.getDownloadingDuration() < 10.0)
.filter(this::hasRequiredPieces)
.sorted(Comparator.comparingDouble(PeerConnection::getRemoteUnchokedDuration)
.thenComparingDouble(PeerConnection::getTotalWaitingDuration))
.collect(Collectors.toCollection(ArrayList::new));

List<PeerConnection> testedPeers = connections.stream()
.filter(conn -> !conn.isDownloading())
.filter(conn -> conn.getDownloadingDuration() >= 10.0)
.filter(this::hasRequiredPieces)
.sorted(Comparator.comparingDouble(PeerConnection::getAverageDownloadRate)
.reversed())
.peek(conn -> log.debug(
"[{}] Tested: {}/s, duration: {}",
conn.getPeer(),
ByteFormat.format(conn.getAverageDownloadRate()),
conn.getDownloadingDuration()))
.collect(Collectors.toCollection(ArrayList::new));

List<PeerConnection> seedingPeers = connections.stream()
.filter(conn -> conn.isDownloading())
.filter(conn -> conn.getDownloadingDuration() >= 10.0)
.sorted(Comparator.comparingDouble(PeerConnection::getAverageDownloadRate))
.peek(conn -> log.debug(
"[{}] Monitoring: {}/s, duration: {}",
conn.getPeer(),
ByteFormat.format(conn.getAverageDownloadRate()),
conn.getDownloadingDuration()))
.collect(Collectors.toCollection(ArrayList::new));

int rotatingPeers = Math.max(0, seedingPeers.size() - maxDownloadingPeers + 2);
while (rotatingPeers > 2) {
var conn = seedingPeers.remove(0);
conn.setDownloading(false);
rotatingPeers--;
log.debug(
"[{}] Not interested in peer with download rate: {}/s",
conn.getPeer(),
ByteFormat.format(conn.getAverageDownloadRate()));
}

while (rotatingPeers > 0 && !untestedPeers.isEmpty()) {
var conn = untestedPeers.remove(0);
var seedingConn = seedingPeers.remove(0);
conn.setDownloading(true);
seedingConn.setDownloading(false);
requestNextPiece(conn);
rotatingPeers--;
log.debug("[{}] Testing download speed of peer", conn.getPeer());
}

while (rotatingPeers > 0 && !testedPeers.isEmpty()) {
var seedingConn = seedingPeers.remove(0);
var testedConn = testedPeers.remove(0);

if (testedConn.getAverageDownloadRate() > seedingConn.getAverageDownloadRate()) {
testedConn.setDownloading(true);
requestNextPiece(testedConn);
seedingConn.setDownloading(false);
rotatingPeers--;
log.debug(
"[{}] Switching to faster peer with download rate: {}/s",
testedConn.getPeer(),
ByteFormat.format(testedConn.getAverageDownloadRate()));
} else {
break;
}
}
}
});
}

@Override
Expand Down Expand Up @@ -171,6 +271,12 @@ private int getSeedingPeersCount() {
.count();
}

private int getDownloadingPeersCount() {
return (int) connections.stream()
.filter(conn -> conn.isDownloading() && !conn.isRemoteChoked())
.count();
}

private int getLeechingPeersCount() {
return (int) connections.stream().filter(conn -> !conn.isChoked()).count();
}
Expand Down Expand Up @@ -351,8 +457,15 @@ private void setupPeerConnection(PeerConnection connection) {
}
});

connection.onChoked(v -> {
connection.setDownloading(false);
});

connection.onUnchoked(v -> {
requestNextPiece(connection);
if (getDownloadingPeersCount() < maxDownloadingPeers) {
connection.setDownloading(true);
requestNextPiece(connection);
}
});

connection.onClosed(v -> connections.remove(connection));
Expand Down
40 changes: 39 additions & 1 deletion src/main/java/vertx/bittorrent/PeerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@ public class PeerConnection {
@Getter
@Setter
private float downloadRate = 0.0f;
private long downloadStartedAt = -1;
private long downloadingDuration = 0;

private int currentRequestCount = 0;
private int requestLimit = 6;

@Getter
private boolean downloading = false;

private Map<Integer, PieceState> pieceStates = new HashMap<>();

private Handler<HandshakeMessage> handshakeHandler;
Expand Down Expand Up @@ -112,6 +117,26 @@ public PeerConnection(NetSocket socket, ClientState clientState, Peer peer) {
});
}

public double getAverageDownloadRate() {
double duration = getDownloadingDuration();

if (duration == 0.0) {
return 0.0;
}

return bytesDownloaded / duration;
}

public double getDownloadingDuration() {
long duration = downloadingDuration;

if (downloadStartedAt != -1) {
duration += (System.currentTimeMillis() - downloadStartedAt);
}

return duration / 1000.0;
}

public boolean isPieceRequested(int index) {
return pieceStates.containsKey(index);
}
Expand Down Expand Up @@ -174,6 +199,19 @@ public Future<Void> close() {
return socket.close();
}

public void setDownloading(boolean downloading) {
if (downloading && !this.downloading) {
downloadStartedAt = System.currentTimeMillis();
}

if (!downloading && this.downloading) {
downloadingDuration += (System.currentTimeMillis() - downloadStartedAt);
downloadStartedAt = -1;
}

this.downloading = downloading;
}

public void handshake() {
if (!handshakeSent) {
handshakeSent = true;
Expand Down Expand Up @@ -234,7 +272,7 @@ public void requestPiece(int pieceIndex) {
}

private boolean canRequest() {
if (remoteChoked || pieceStates.isEmpty()) {
if (!downloading || remoteChoked || pieceStates.isEmpty()) {
return false;
}

Expand Down

0 comments on commit 2f3433a

Please sign in to comment.