Skip to content
This repository has been archived by the owner on Jan 12, 2021. It is now read-only.

Commit

Permalink
Serve file list reliably, including transferring data with and withou…
Browse files Browse the repository at this point in the history
…t recursion
  • Loading branch information
codebje committed Mar 3, 2014
1 parent c7140af commit 0d4bb4b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 75 deletions.
101 changes: 27 additions & 74 deletions protocol/src/main/java/net/apnic/rpki/protocol/ProtocolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.util.*;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;

import static net.apnic.rpki.protocol.RsyncUtils.write_varlong;

Expand All @@ -20,11 +22,7 @@ class ProtocolImpl implements Protocol {
private Map<String, List<String>> properties;
private int checksumSeed = (int)(System.currentTimeMillis() / 1000);

private final List<FileList> fileLists = new ArrayList<>();
private int completedLists;
private int sentFileCount;
private int fileListNdx;
private int fileListMax;
private FileList fileList;
private int phase;

ProtocolImpl(int version, List<Module> modules) {
Expand All @@ -36,9 +34,7 @@ class ProtocolImpl implements Protocol {
@Override public int getVersion() { return version; }

private static int NDX_DONE = -1;
private static int NDX_FLIST_EOF = -2;
// private static int NDX_DEL_STATS = -3;
private static int NDX_FLIST_OFFSET = -101;

private int previous_positive = -1, previous_negative = 1;
private void writeNdx(MessageSender sender, int ndx) {
int diff;
Expand Down Expand Up @@ -86,61 +82,23 @@ private void writeInt(MessageSender sender, int value) {
public void sendFileList(List<String> paths, MessageSender sender) throws ProtocolError {
if (paths.size() != 1) throw new ProtocolError(ProtocolError.ErrorType.FERROR, "Requesting multiple paths is not supported");

fileLists.clear();
fileListNdx = 0;
completedLists = 0;
sentFileCount = 0;
phase = 0;

for (String path: paths) {
try {
List<FileList> lists = Collections.singletonList(activeModule.getFileList(path, true));
if (properties.containsKey("recurse"))
fileLists.addAll(lists);
else
fileLists.add(lists.get(0));
fileList = activeModule.getFileList(path, properties.containsKey("recurse"));
} catch (NoSuchPathException ex) {
throw new ProtocolError(ProtocolError.ErrorType.FERROR, "the requested path does not exist.");
}
}

FileList fileList = fileLists.get(fileLists.size() - 1);
fileListMax = fileList.getFirstIndex() + fileList.getSize() - 1;

sendExtraFileList(sender);
sender.sendBytes(fileList.getFileListData());
sender.sendByte((byte)0);
}

@Override
public boolean sendExtraFileList(MessageSender sender) {
if (fileListNdx >= fileLists.size()) return true;
LOGGER.debug("Send extra file list: {} sent so far", sentFileCount);

while (sentFileCount < 1000) {
if (fileListNdx >= fileLists.size()) break;
FileList fileList = fileLists.get(fileListNdx);
if (fileListNdx > 0) writeNdx(sender, NDX_FLIST_OFFSET - fileListNdx);
sender.sendBytes(fileList.getFileListData());
sentFileCount += fileList.getSize();
fileListNdx++;
}

if (fileListNdx >= fileLists.size() && properties.containsKey("recurse")) {
LOGGER.debug("Sending NDX_FLIST_EOF");
writeNdx(sender, NDX_FLIST_EOF);

return true;
}

return false;
}

private FileList fileListForIndex(int ndx) {
for (FileList fileList : fileLists) {
int first = fileList.getFirstIndex();
if (ndx >= first - 1 && ndx < first + fileList.getSize())
return fileList;
}
return null;
return true;
}

@Override
Expand All @@ -167,11 +125,10 @@ public void transferFile(TransferAttributes attributes, Checksums checksums, Mes
sender.sendBytes(new byte[] { (byte)(length & 0xff) });
}

FileList fileList = fileListForIndex(attributes.getFileIndex());
if (fileList == null)
if (attributes.getFileIndex() >= fileList.getSize())
throw new ProtocolError(ProtocolError.ErrorType.FERROR,
String.format("RsyncFile-list index %d not in %d - %d [repositoryd]",
attributes.getFileIndex(), 0, fileListMax));
attributes.getFileIndex(), 0, fileList.getSize() - 1));

int position = attributes.getFileIndex() - fileList.getFirstIndex();
RsyncFile file = (position >= 0) ? fileList.getFile(position) : fileList.getRoot();
Expand Down Expand Up @@ -211,26 +168,22 @@ public void transferFile(TransferAttributes attributes, Checksums checksums, Mes

@Override
public boolean completedList(MessageSender sender) throws ProtocolError {
if (++completedLists >= fileLists.size()) {
phase++;
LOGGER.debug("All file lists completed, sender at phase {}", phase);
if (phase == 2) {
writeNdx(sender, NDX_DONE);
LOGGER.debug("Sending transfer statistics");
ByteArrayOutputStream os = new ByteArrayOutputStream();
write_varlong(os, 0, 3); // total_read
write_varlong(os, 0, 3); // total_written
write_varlong(os, 0, 3); // total_size
write_varlong(os, 0, 3); // flist_buildtime
write_varlong(os, 0, 3); // flist_xfertime
sender.sendBytes(os.toByteArray());
return false;
} else if (phase > 2) {
writeNdx(sender, NDX_DONE);
return true;
}
} else {
sentFileCount -= fileLists.get(completedLists - 1).getSize();
phase++;
LOGGER.debug("All file lists completed, sender at phase {}", phase);
if (phase == 2) {
writeNdx(sender, NDX_DONE);
LOGGER.debug("Sending transfer statistics");
ByteArrayOutputStream os = new ByteArrayOutputStream();
write_varlong(os, 0, 3); // total_read
write_varlong(os, 0, 3); // total_written
write_varlong(os, 0, 3); // total_size
write_varlong(os, 0, 3); // flist_buildtime
write_varlong(os, 0, 3); // flist_xfertime
sender.sendBytes(os.toByteArray());
return false;
} else if (phase > 2) {
writeNdx(sender, NDX_DONE);
return true;
}
writeNdx(sender, NDX_DONE);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void run() throws InterruptedException {
* @since 0.9
*/
public static void main(String[] args) throws Exception {
Module simple = new MemoryCachedModule("simple", "simple repo", new FileSystemRepository(
Module simple = new MemoryCachedModule("repository", "simple repo", new FileSystemRepository(
Paths.get(new URI("file:///data/repositoryd/repository"))
));
new RsyncServer(8730, simple).run();
Expand Down

0 comments on commit 0d4bb4b

Please sign in to comment.