Skip to content

Commit

Permalink
Add zip mode
Browse files Browse the repository at this point in the history
There's a 170 GB zip file of rocket league replays. It's annoying to
need to unzip the file in order to run rrrocket.

By adding zip support into rrrocket, it's easier to unzip only problematic
replays.

To active zip mode, pass a single zip file input.
  • Loading branch information
nickbabcock committed Apr 29, 2024
1 parent ef6d900 commit 2ff2783
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 2 deletions.
100 changes: 100 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ boxcars = "0.9.12"
glob = "0.3"
either = "1"
memmap = "0.7"
zip = { version = "1", default-features = false, features = ["deflate-zlib-ng"] }

[dev-dependencies]
assert_cmd = "2"
Expand Down
Binary file added assets/replays.zip
Binary file not shown.
118 changes: 116 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use rayon::iter::ParallelBridge;
use rayon::prelude::*;
use serde::Serialize;
use std::fs::{self, OpenOptions};
use std::io::prelude::*;
use std::io::{self, BufWriter};
use std::io::{prelude::*, Cursor};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, sync_channel};
use std::thread;

/// Parses Rocket League replay files and outputs JSON with decoded information
#[derive(Parser, Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -244,9 +246,109 @@ fn serialize<W: Write>(pretty: bool, writer: W, replay: &Replay) -> anyhow::Resu
res.map_err(|e| e.into())
}

fn zip(file_path: &Path, opt: &Opt) -> anyhow::Result<()> {
let parallelism = std::thread::available_parallelism()
.map(|x| x.get().max(2))
.unwrap_or(2);

let (tx, rx) = sync_channel(parallelism - 1);
let (return_buf, receive_buf) = channel::<Vec<u8>>();

let f = fs::File::open(file_path)?;
let mmap = unsafe { memmap::MmapOptions::new().map(&f)? };
let zipreader = Cursor::new(&mmap);
let mut archive = zip::ZipArchive::new(zipreader)?;

thread::scope(|scope| {
scope.spawn(move || {
for i in 0..archive.len() {
let zipfile = archive
.by_index(i)
.with_context(|| format!("unable to retrieve zip index: {}", i));

let mut zipfile = match zipfile {
Ok(zipfile) if zipfile.is_file() => zipfile,
Ok(_) => continue,
Err(e) => {
if tx.send(Err(e)).is_err() {
return;
}
continue;
}
};

let name = String::from(zipfile.name());
let mut buf = if let Ok(mut existing_buf) = receive_buf.try_recv() {
existing_buf.resize(zipfile.size() as usize, 0);
existing_buf
} else {
vec![0u8; zipfile.size() as usize]
};

let result = zipfile
.read_exact(&mut buf)
.with_context(|| format!("unable to inflate: {}", name))
.map(|_| (name, buf));

// If the other end hung up stop processing.
if tx.send(result).is_err() {
return;
}
}
});

let data = rx.into_iter().par_bridge().map(|args| {
let (name, data) = args?;
let result = parse_replay(opt, &data).with_context(|| format!("{}: FAILED", name));
let _ = return_buf.send(data);

Ok((name, result?))
});

if opt.dry_run {
data.for_each(|result: anyhow::Result<_>| match result {
Ok((name, _replay)) => println!("Parsed {}", name),
Err(e) => eprintln!("Failed {:?}", e),
})
} else {
data.for_each_with(
Vec::with_capacity(50 * 1000 * 1000),
|mut out, result| match result {
Ok((name, replay)) => {
out.clear();
let rep = RocketReplay {
file: &PathBuf::from(&name),
replay,
};

let replay_json = serde_json::to_writer(&mut out, &rep);
if let Err(e) = replay_json {
eprintln!("Could not serialize replay: {} {}", name, e);
return;
}
let mut lock = io::stdout().lock();
let _ = lock.write_all(out);
let _ = lock.write_all(b"\n");
}
Err(e) => eprintln!("Failed {:?}", e),
},
)
}
});

Ok(())
}

fn run() -> anyhow::Result<()> {
let opt = Opt::parse();
if opt.multiple {

let enter_zip_mode = opt
.input
.first()
.is_some_and(|x| x.extension().and_then(|ext| ext.to_str()) == Some("zip"));
if enter_zip_mode {
zip(&opt.input[0], &opt)
} else if opt.multiple {
parse_multiple_replays(&opt)
} else if opt.input.len() > 1 {
bail!("Expected one input file when --multiple is not specified");
Expand Down Expand Up @@ -419,4 +521,16 @@ mod tests {
)
.stdout(predicate::str::contains("\n").count(2));
}

#[test]
fn test_zip() {
Command::cargo_bin("rrrocket")
.unwrap()
.args(&["-n", "--dry-run", "assets/replays.zip"])
.assert()
.success()
.stdout(predicate::str::contains(
r#"Parsed replays/1ec9.replay"#,
));
}
}

0 comments on commit 2ff2783

Please sign in to comment.