Skip to content

Commit

Permalink
Merge branch 'feature/zstd'
Browse files Browse the repository at this point in the history
  • Loading branch information
dtzWill committed Jul 21, 2018
2 parents bc53371 + 0a2c7cd commit 41fd452
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Makefile.config.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ENABLE_S3 = @ENABLE_S3@
HAVE_SODIUM = @HAVE_SODIUM@
HAVE_READLINE = @HAVE_READLINE@
HAVE_BROTLI = @HAVE_BROTLI@
HAVE_ZSTD = @HAVE_ZSTD@
HAVE_SECCOMP = @HAVE_SECCOMP@
LIBCURL_LIBS = @LIBCURL_LIBS@
OPENSSL_LIBS = @OPENSSL_LIBS@
Expand All @@ -16,6 +17,7 @@ SODIUM_LIBS = @SODIUM_LIBS@
LIBLZMA_LIBS = @LIBLZMA_LIBS@
SQLITE3_LIBS = @SQLITE3_LIBS@
LIBBROTLI_LIBS = @LIBBROTLI_LIBS@
LIBZSTD_LIBS = @LIBZSTD_LIBS@
bash = @bash@
bindir = @bindir@
brotli = @brotli@
Expand Down
9 changes: 8 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,17 @@ AC_CHECK_LIB([lzma], [lzma_stream_encoder_mt],
# Look for libbrotli{enc,dec}, optional dependencies
PKG_CHECK_MODULES([LIBBROTLI], [libbrotlienc libbrotlidec],
[AC_DEFINE([HAVE_BROTLI], [1], [Whether to use libbrotli.])
CXXFLAGS="$LIBBROTLI_CFLAGS $CXXFLAGS"]
CXXFLAGS="$LIBBROTLI_CFLAGS $CXXFLAGS"
have_brotli=1], [have_brotli=])
AC_SUBST(HAVE_BROTLI, [$have_brotli])

# Look for libzstd, optional dependency
PKG_CHECK_MODULES([LIBZSTD], [libzstd],
[AC_DEFINE([HAVE_ZSTD], [1], [Whether to use libzstd.])
CXXFLAGS="$LIBZSTD_CFLAGS $CXXFLAGS"
have_zstd=1], [have_zstd=])
AC_SUBST(HAVE_ZSTD, [$have_zstd])

# Look for libseccomp, required for Linux sandboxing.
if test "$sys_name" = linux; then
AC_ARG_ENABLE([seccomp-sandboxing],
Expand Down
2 changes: 1 addition & 1 deletion release-common.nix
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ rec {

preConfigure = ":"; # override normal 'preConfigure', not needed when building from git
}))
bzip2 xz brotli
bzip2 xz brotli zstd
openssl pkgconfig sqlite boehmgc
boost

Expand Down
3 changes: 2 additions & 1 deletion src/libstore/binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const ref<std::str
/* Compress the NAR. */
narInfo->compression = compression;
auto now1 = std::chrono::steady_clock::now();
auto narCompressed = compress(compression, *nar, parallelCompression);
auto narCompressed = compress(compression, *nar, parallelCompression, compressionLevel);
auto now2 = std::chrono::steady_clock::now();
narInfo->fileHash = hashString(htSHA256, *narCompressed);
narInfo->fileSize = narCompressed->size();
Expand All @@ -165,6 +165,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const ref<std::str
+ (compression == "xz" ? ".xz" :
compression == "bzip2" ? ".bz2" :
compression == "br" ? ".br" :
compression == "zstd" ? ".zst" :
"");
if (repair || !fileExists(narInfo->url)) {
stats.narWrite++;
Expand Down
6 changes: 5 additions & 1 deletion src/libstore/binary-cache-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ public:
const Setting<Path> secretKeyFile{this, "", "secret-key", "path to secret key used to sign the binary cache"};
const Setting<Path> localNarCache{this, "", "local-nar-cache", "path to a local cache of NARs"};
const Setting<bool> parallelCompression{this, false, "parallel-compression",
"enable multi-threading compression, available for xz only currently"};
"enable multi-threading compression for NARs, available for xz only currently"};
const Setting<int> compressionLevel{this, -1, "compression-level",
"specify 'preset level' of compression to be used with NARs: "
"meaning and accepted range of values depends on compression method selected, "
"other than -1 which we reserve to indicate Nix defaults should be used"};

private:

Expand Down
194 changes: 170 additions & 24 deletions src/libutil/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,37 @@
#include <brotli/encode.h>
#endif // HAVE_BROTLI

#if HAVE_ZSTD
#include <zstd.h>
// Not set by earlier versions, so be sure it's set
#ifndef ZSTD_CLEVEL_DEFAULT
# define ZSTD_CLEVEL_DEFAULT 3
#endif
#endif

#include <algorithm>
#include <iostream>

namespace nix {

static const size_t bufSize = 32 * 1024;

static const int COMPRESSION_LEVEL_DEFAULT = -1;

static unsigned checkLevel(unsigned min, unsigned max, unsigned methodDefault, std::string method, int level) {
if (level == COMPRESSION_LEVEL_DEFAULT)
return methodDefault;
if (level < 0)
throw CompressionError("compression level must be a non-negative integer");

unsigned l = static_cast<unsigned>(level);
if (min <= l && l <= max)
return l;

throw CompressionError("requested compression level '%u' not valid for method '%s': must be [%u,%u] (default=%u)",
l, method, min, max, methodDefault);
}

static void decompressNone(Source & source, Sink & sink)
{
std::vector<unsigned char> buf(bufSize);
Expand Down Expand Up @@ -199,6 +224,48 @@ static void decompressBrotli(Source & source, Sink & sink)
#endif // HAVE_BROTLI
}

#if HAVE_ZSTD
static void decompressZstd(Source & source, Sink & sink)
{
auto *s = ZSTD_createDStream();
if (!s)
throw CompressionError("unable to initialize zstd decoder");

Finally free([s]() { ZSTD_freeDStream(s); });

size_t toRead = ZSTD_initDStream(s);
if (ZSTD_isError(toRead))
throw CompressionError("unable to initialize zstd streaming decoder");

std::vector<uint8_t> inbuf(ZSTD_DStreamInSize());
std::vector<uint8_t> outbuf(ZSTD_DStreamOutSize());

while (true) {
checkInterrupt();

size_t read = 0;
try {
read = source.read(inbuf.data(), toRead);
} catch (EndOfFile &) {
// *** This is different than others! ***
// We're done if no more input
return;
}

ZSTD_inBuffer input{inbuf.data(), read, 0};

while (input.pos < input.size) {
checkInterrupt();
ZSTD_outBuffer output{outbuf.data(), outbuf.size(), 0};
toRead = ZSTD_decompressStream(s, &output, &input);
if (ZSTD_isError(toRead))
throw CompressionError("error while decompressing zstd data");
sink(outbuf.data(), output.pos);
}
}
}
#endif // HAVE_ZSTD

ref<std::string> decompress(const std::string & method, const std::string & in)
{
StringSource source(in);
Expand All @@ -217,14 +284,21 @@ void decompress(const std::string & method, Source & source, Sink & sink)
return decompressBzip2(source, sink);
else if (method == "br")
return decompressBrotli(source, sink);
#if HAVE_ZSTD
else if (method == "zstd")
return decompressZstd(source, sink);
#endif
else
throw UnknownCompressionMethod("unknown compression method '%s'", method);
}

struct NoneSink : CompressionSink
{
Sink & nextSink;
NoneSink(Sink & nextSink) : nextSink(nextSink) { }
NoneSink(Sink & nextSink, int level = COMPRESSION_LEVEL_DEFAULT) : nextSink(nextSink) {
if (level != COMPRESSION_LEVEL_DEFAULT)
printError("Warning: requested compression level '%d' not supported by compression method 'none'", level);
}
void finish() override { flush(); }
void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
};
Expand All @@ -237,18 +311,18 @@ struct XzSink : CompressionSink
bool finished = false;

template <typename F>
XzSink(Sink & nextSink, F&& initEncoder) : nextSink(nextSink) {
lzma_ret ret = initEncoder();
XzSink(Sink & nextSink, F&& initEncoder, int level = COMPRESSION_LEVEL_DEFAULT) : nextSink(nextSink) {
lzma_ret ret = initEncoder(checkLevel(0, 9, LZMA_PRESET_DEFAULT, "xz", level));
if (ret != LZMA_OK)
throw CompressionError("unable to initialise lzma encoder");
// FIXME: apply the x86 BCJ filter?

strm.next_out = outbuf;
strm.avail_out = sizeof(outbuf);
}
XzSink(Sink & nextSink) : XzSink(nextSink, [this]() {
return lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
}) {}
XzSink(Sink & nextSink, int level = COMPRESSION_LEVEL_DEFAULT) : XzSink(nextSink, [this](unsigned level) {
return lzma_easy_encoder(&strm, level, LZMA_CHECK_CRC64);
}, level) {}

~XzSink()
{
Expand Down Expand Up @@ -305,11 +379,11 @@ struct XzSink : CompressionSink
#ifdef HAVE_LZMA_MT
struct ParallelXzSink : public XzSink
{
ParallelXzSink(Sink &nextSink) : XzSink(nextSink, [this]() {
ParallelXzSink(Sink &nextSink, int level) : XzSink(nextSink, [this](unsigned level) {
lzma_mt mt_options = {};
mt_options.flags = 0;
mt_options.timeout = 300; // Using the same setting as the xz cmd line
mt_options.preset = LZMA_PRESET_DEFAULT;
mt_options.preset = level;
mt_options.filters = NULL;
mt_options.check = LZMA_CHECK_CRC64;
mt_options.threads = lzma_cputhreads();
Expand All @@ -319,7 +393,7 @@ struct ParallelXzSink : public XzSink
// FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
// number of threads.
return lzma_stream_encoder_mt(&strm, &mt_options);
}) {}
}, level) {}
};
#endif

Expand All @@ -330,10 +404,11 @@ struct BzipSink : CompressionSink
bz_stream strm;
bool finished = false;

BzipSink(Sink & nextSink) : nextSink(nextSink)
BzipSink(Sink & nextSink, int level = COMPRESSION_LEVEL_DEFAULT) : nextSink(nextSink)
{
memset(&strm, 0, sizeof(strm));
int ret = BZ2_bzCompressInit(&strm, 9, 0, 30);
auto l = checkLevel(1, 9, 9, "bzip2", level);
int ret = BZ2_bzCompressInit(&strm, l, 0, 30);
if (ret != BZ_OK)
throw CompressionError("unable to initialise bzip2 encoder");

Expand Down Expand Up @@ -433,9 +508,11 @@ struct LambdaCompressionSink : CompressionSink

struct BrotliCmdSink : LambdaCompressionSink
{
BrotliCmdSink(Sink& nextSink)
: LambdaCompressionSink(nextSink, [](const std::string& data) {
return runProgram(BROTLI, true, {}, data);
BrotliCmdSink(Sink& nextSink, int level = COMPRESSION_LEVEL_DEFAULT)
: LambdaCompressionSink(nextSink, [level](const std::string& data) {
// Hard-code values from brotli manpage
std::string levelArg = fmt("-%u", checkLevel(0, 11, 11, "brotli", level));
return runProgram(BROTLI, true, {levelArg}, data);
})
{
}
Expand All @@ -449,11 +526,16 @@ struct BrotliSink : CompressionSink
BrotliEncoderState *state;
bool finished = false;

BrotliSink(Sink & nextSink) : nextSink(nextSink)
BrotliSink(Sink & nextSink, int level = COMPRESSION_LEVEL_DEFAULT) : nextSink(nextSink)
{
state = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
if (!state)
throw CompressionError("unable to initialise brotli encoder");

if (!BrotliEncoderSetParameter(state, BROTLI_PARAM_QUALITY,
checkLevel(BROTLI_MIN_QUALITY, BROTLI_MAX_QUALITY,
BROTLI_DEFAULT_QUALITY, "brotli", level)))
throw CompressionError("failure setting requested compression level for brotli encoder");
}

~BrotliSink()
Expand Down Expand Up @@ -530,36 +612,100 @@ struct BrotliSink : CompressionSink
};
#endif // HAVE_BROTLI

ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel)
#if HAVE_ZSTD
struct ZstdSink: CompressionSink
{
Sink & nextSink;
std::vector<uint8_t> outbuf;
ZSTD_CStream *state;

bool finished = false;

ZstdSink(Sink & nextSink, int level = COMPRESSION_LEVEL_DEFAULT)
: nextSink(nextSink), outbuf(std::max<size_t>(ZSTD_CStreamOutSize(), BUFSIZ))
{
state = ZSTD_createCStream();
if (!state)
throw CompressionError("unable to initialise zstd encoder");

auto r = ZSTD_initCStream(state, checkLevel(1, 19, ZSTD_CLEVEL_DEFAULT, "zstd", level));
if (ZSTD_isError(r))
throw CompressionError("unable to initialise zstd encoder");
}

~ZstdSink()
{
ZSTD_freeCStream(state);
}

void finish() override
{
flush();
assert(!finished);

ZSTD_outBuffer output{outbuf.data(), outbuf.size(), 0};

auto r = ZSTD_endStream(state, &output);
if (r > 0)
throw CompressionError("zstd not flushed, bytes remaining: %zd", r);
else if (ZSTD_isError(r))
throw CompressionError("error finish'ing zstd stream");

nextSink(outbuf.data(), output.pos);
}

void write(const unsigned char * data, size_t len) override
{
ZSTD_inBuffer input{data, len, 0};
while (input.pos < input.size) {
checkInterrupt();
ZSTD_outBuffer output{outbuf.data(), outbuf.size(), 0};
auto r = ZSTD_compressStream(state, &output, &input);
if (ZSTD_isError(r))
throw CompressionError("error compressing with zstd");
// (zstd suggests amount that should be 'read' next,
// which we ignore since we can't make use of it currently)

nextSink(outbuf.data(), output.pos);
}
}
};
#endif // HAVE_ZSTD

ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel, int level)
{
if (parallel) {
#ifdef HAVE_LZMA_MT
if (method == "xz")
return make_ref<ParallelXzSink>(nextSink);
return make_ref<ParallelXzSink>(nextSink, level);
#endif
printMsg(lvlError, format("Warning: parallel compression requested but not supported for method '%1%', falling back to single-threaded compression") % method);
}

if (method == "none")
return make_ref<NoneSink>(nextSink);
return make_ref<NoneSink>(nextSink, level);
else if (method == "xz")
return make_ref<XzSink>(nextSink);
return make_ref<XzSink>(nextSink, level);
else if (method == "bzip2")
return make_ref<BzipSink>(nextSink);
return make_ref<BzipSink>(nextSink, level);
else if (method == "br")
#if HAVE_BROTLI
return make_ref<BrotliSink>(nextSink);
return make_ref<BrotliSink>(nextSink, level);
#else
return make_ref<BrotliCmdSink>(nextSink);
return make_ref<BrotliCmdSink>(nextSink, level);
#endif
#if HAVE_ZSTD
else if (method == "zstd")
return make_ref<ZstdSink>(nextSink, level);
#endif
else
throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
}

ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel, int level)
{
StringSink ssink;
auto sink = makeCompressionSink(method, ssink, parallel);
auto sink = makeCompressionSink(method, ssink, parallel, level);
(*sink)(in);
sink->finish();
return ssink.s;
Expand Down
Loading

0 comments on commit 41fd452

Please sign in to comment.