Skip to content

Commit

Permalink
compressor: add QAT support
Browse files Browse the repository at this point in the history
This patch adds new QATzip plugin to support QAT for compression.

QATZip is a user space library which builds on top of the Intel
QAT (QuickAssist Technology) user space library, to provide extended
accelerated compression and decompression services by offloading the
actual compression and decompression request(s) to the hardware
QAT accelerators, which are more efficient in terms of cost and power
than general purpose CPUs for those specific compute-intensive
workloads.

Based on QAT accelerators, QATZip can support several compression
algorithm, including deflate, snappy, lz4, etc..

Signed-off-by: Qiaowei Ren <[email protected]>
  • Loading branch information
qwren committed Apr 18, 2018
1 parent c643ffd commit 9f3965a
Show file tree
Hide file tree
Showing 18 changed files with 367 additions and 7 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ endif()

option(WITH_BLUEFS "libbluefs library" OFF)

option(WITH_QATZIP "Enable QATZIP" OFF)
if(WITH_QATZIP)
find_package(qatzip REQUIRED)
set(HAVE_QATZIP ${QATZIP_FOUND})
endif(WITH_QATZIP)

# needs mds and? XXX
option(WITH_LIBCEPHFS "libcephfs client library" ON)

Expand Down
17 changes: 17 additions & 0 deletions cmake/modules/Findqatzip.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# - Find Qatzip
# Find the qatzip compression library and includes
#
# QATZIP_INCLUDE_DIR - where to find qatzip.h, etc.
# QATZIP_LIBRARIES - List of libraries when using qatzip.
# QATZIP_FOUND - True if qatzip found.

find_path(QATZIP_INCLUDE_DIR NAMES qatzip.h)

find_library(QATZIP_LIBRARIES NAMES qatzip)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(qatzip DEFAULT_MSG QATZIP_LIBRARIES QATZIP_INCLUDE_DIR)

mark_as_advanced(
QATZIP_LIBRARIES
QATZIP_INCLUDE_DIR)
3 changes: 3 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,9 @@ endif()
if(NOT WITH_SYSTEM_BOOST)
list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
endif()
if(HAVE_QATZIP)
list(APPEND ceph_common_deps ${QATZIP_LIBRARIES})
endif()

set_source_files_properties(${CMAKE_SOURCE_DIR}/src/ceph_ver.c
${CMAKE_SOURCE_DIR}/src/common/version.cc
Expand Down
2 changes: 2 additions & 0 deletions src/common/legacy_config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ OPTION(xio_max_send_inline, OPT_INT) // xio maximum threshold to send inline
OPTION(compressor_zlib_isal, OPT_BOOL)
OPTION(compressor_zlib_level, OPT_INT) //regular zlib compression level, not applicable to isa-l optimized version

OPTION(qat_compressor_enabled, OPT_BOOL)

OPTION(plugin_crypto_accelerator, OPT_STR)

OPTION(mempool_debug, OPT_BOOL)
Expand Down
4 changes: 4 additions & 0 deletions src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,10 @@ std::vector<Option> get_global_options() {
.set_default(5)
.set_description(""),

Option("qat_compressor_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(false)
.set_description("enable qat acceleration support for compression"),

Option("plugin_crypto_accelerator", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_default("crypto_isal")
.set_description(""),
Expand Down
8 changes: 7 additions & 1 deletion src/compressor/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@

set(compressor_srcs
set(compressor_srcs
Compressor.cc)
if (HAVE_QATZIP)
list(APPEND compressor_srcs QatAccel.cc)
endif()
add_library(compressor_objs OBJECT ${compressor_srcs})

## compressor plugins
Expand Down Expand Up @@ -38,6 +41,9 @@ add_custom_target(compressor_plugins DEPENDS
if(WITH_EMBEDDED)
include(MergeStaticLibraries)
add_library(cephd_compressor_base STATIC ${compressor_srcs})
if(HAVE_QATZIP)
target_link_libraries(cephd_compressor_base ${QATZIP_LIBRARIES})
endif()
set_target_properties(cephd_compressor_base PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
set(cephd_compressor_libs
cephd_compressor_base
Expand Down
8 changes: 8 additions & 0 deletions src/compressor/Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#include "include/assert.h" // boost clobbers this
#include "include/buffer.h"
#include "include/int_types.h"
#ifdef HAVE_QATZIP
#include "QatAccel.h"
#endif

class Compressor;
typedef std::shared_ptr<Compressor> CompressorRef;
Expand Down Expand Up @@ -65,6 +68,11 @@ class Compressor {
COMP_FORCE ///< compress always
};

#ifdef HAVE_QATZIP
bool qat_enabled;
QatAccel qat_accel;
#endif

static std::string get_comp_alg_name(int a);
static boost::optional<CompressionAlgorithm> get_comp_alg_type(const std::string &s);

Expand Down
141 changes: 141 additions & 0 deletions src/compressor/QatAccel.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Intel Corporation
*
* Author: Qiaowei Ren <[email protected]>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include "QatAccel.h"

/* Estimate data expansion after decompression */
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};

QatAccel::~QatAccel() {
if (NULL != session.internal) {
qzTeardownSession(&session);
qzClose(&session);
}
}

bool QatAccel::init(const std::string &alg) {
QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
int rc;

rc = qzGetDefaults(&params);
if (rc != QZ_OK)
return false;
params.direction = QZ_DIR_BOTH;
if (alg == "snappy")
params.comp_algorithm = QZ_SNAPPY;
else if (alg == "zlib")
params.comp_algorithm = QZ_DEFLATE;
else if (alg == "lz4")
params.comp_algorithm = QZ_LZ4;
else
return false;

rc = qzSetDefaults(&params);
if (rc != QZ_OK)
return false;

rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT);
if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW)
return false;

rc = qzSetupSession(&session, &params);
if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) {
qzTeardownSession(&session);
qzClose(&session);
return false;
}

return true;
}

int QatAccel::compress(const bufferlist &in, bufferlist &out) {
for (auto &i : in.buffers()) {
const unsigned char* c_in = (unsigned char*) i.c_str();
unsigned int len = i.length();
unsigned int out_len = qzMaxCompressedLength(len);

bufferptr ptr = buffer::create_page_aligned(out_len);
int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
if (rc != QZ_OK)
return -1;
out.append(ptr, 0, out_len);
}

return 0;
}

int QatAccel::decompress(const bufferlist &in, bufferlist &out) {
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
return decompress(i, in.length(), out);
}

int QatAccel::decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) {
unsigned int ratio_idx = 0;
bool read_more = false;
bool joint = false;
int rc = 0;
bufferlist tmp;
size_t remaining = MIN(p.get_remaining(), compressed_len);

while (remaining) {
if (p.end()) {
return -1;
}

bufferptr cur_ptr = p.get_current_ptr();
unsigned int len = cur_ptr.length();
if (joint) {
if (read_more)
tmp.append(cur_ptr.c_str(), len);
len = tmp.length();
}
unsigned int out_len = len * expansion_ratio[ratio_idx];
bufferptr ptr = buffer::create_page_aligned(out_len);

if (joint)
rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
else
rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
if (rc == QZ_DATA_ERROR) {
if (!joint) {
tmp.append(cur_ptr.c_str(), cur_ptr.length());
p.advance(remaining);
joint = true;
}
read_more = true;
continue;
} else if (rc == QZ_BUF_ERROR) {
if (ratio_idx == std::size(expansion_ratio))
return -1;
if (joint)
read_more = false;
ratio_idx++;
continue;
} else if (rc != QZ_OK) {
return -1;
} else {
ratio_idx = 0;
joint = false;
read_more = false;
}

p.advance(remaining);
remaining -= len;
dst.append(ptr, 0, out_len);
}

return 0;
}
35 changes: 35 additions & 0 deletions src/compressor/QatAccel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Intel Corporation
*
* Author: Qiaowei Ren <[email protected]>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_QATACCEL_H
#define CEPH_QATACCEL_H

#include <qatzip.h>
#include "include/buffer.h"

class QatAccel {
QzSession_T session;

public:
QatAccel() : session({0}) {}
~QatAccel();

bool init(const std::string &alg);

int compress(const bufferlist &in, bufferlist &out);
int decompress(const bufferlist &in, bufferlist &out);
int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &dst);
};

#endif
2 changes: 1 addition & 1 deletion src/compressor/lz4/CompressionPluginLZ4.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class CompressionPluginLZ4 : public CompressionPlugin {

int factory(CompressorRef *cs, std::ostream *ss) override {
if (compressor == 0) {
LZ4Compressor *interface = new LZ4Compressor();
LZ4Compressor *interface = new LZ4Compressor(cct);
compressor = CompressorRef(interface);
}
*cs = compressor;
Expand Down
22 changes: 21 additions & 1 deletion src/compressor/lz4/LZ4Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,26 @@
#include "compressor/Compressor.h"
#include "include/buffer.h"
#include "include/encoding.h"
#include "common/config.h"
#include "common/Tub.h"


class LZ4Compressor : public Compressor {
public:
LZ4Compressor() : Compressor(COMP_ALG_LZ4, "lz4") {}
LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
#ifdef HAVE_QATZIP
if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
qat_enabled = true;
else
qat_enabled = false;
#endif
}

int compress(const bufferlist &src, bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.compress(src, dst);
#endif
bufferptr outptr = buffer::create_page_aligned(
LZ4_compressBound(src.length()));
LZ4_stream_t lz4_stream;
Expand Down Expand Up @@ -60,13 +72,21 @@ class LZ4Compressor : public Compressor {
}

int decompress(const bufferlist &src, bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(src, dst);
#endif
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, src.length(), dst);
}

int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
#ifdef HAVE_QATZIP
if (qat_enabled)
return qat_accel.decompress(p, compressed_len, dst);
#endif
uint32_t count;
std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs;
decode(count, p);
Expand Down
2 changes: 1 addition & 1 deletion src/compressor/snappy/CompressionPluginSnappy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CompressionPluginSnappy : public CompressionPlugin {
std::ostream *ss) override
{
if (compressor == 0) {
SnappyCompressor *interface = new SnappyCompressor();
SnappyCompressor *interface = new SnappyCompressor(cct);
compressor = CompressorRef(interface);
}
*cs = compressor;
Expand Down
Loading

0 comments on commit 9f3965a

Please sign in to comment.