Skip to content

Commit

Permalink
Adds long compression methods (apache#3148)
Browse files Browse the repository at this point in the history
* add read

* update deprecated guava calls

* add write and vsizeserde

* add benchmark

* separate encoding and compression

* add header and reformat

* update doc

* address PR comment

* fix buffer order

* generate benchmark files

* separate encoding strategy and format

* fix benchmark

* modify supplier write to channel

* add float NONE handling

* address PR comment

* address PR comment 2
  • Loading branch information
acslk authored and gianm committed Aug 30, 2016
1 parent 4e91330 commit c4e8440
Show file tree
Hide file tree
Showing 69 changed files with 4,662 additions and 2,493 deletions.
6 changes: 6 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@ This product contains conjunctive normal form conversion code and a variance agg
* https://github.com/apache/hive/blob/branch-2.0/LICENSE (Apache License, Version 2.0)
* HOMEPAGE:
* https://github.com/apache/hive

This product contains variable length long deserialization code adapted from Apache Lucene
* LICENSE:
* https://github.com/apache/lucene-solr/blob/master/lucene/LICENSE.txt (Apache License, Version 2.0)
* HOMEPAGE:
* https://github.com/apache/lucene-solr
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.benchmark;

// Run FloatCompressionBenchmarkFileGenerator to generate the required files before running this benchmark

import com.google.common.base.Supplier;
import com.google.common.io.Files;
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.IndexedFloats;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class FloatCompressionBenchmark
{
@Param("floatCompress/")
private static String dirPath;

@Param({"enumerate", "zipfLow", "zipfHigh", "sequential", "uniform"})
private static String file;

@Param({"lz4", "none"})
private static String strategy;

private Random rand;
private Supplier<IndexedFloats> supplier;

@Setup
public void setup() throws Exception
{
File dir = new File(dirPath);
File compFile = new File(dir, file + "-" + strategy);
rand = new Random();
ByteBuffer buffer = Files.map(compFile);
supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
}

@Benchmark
public void readContinuous(Blackhole bh) throws IOException
{
IndexedFloats indexedFloats = supplier.get();
int count = indexedFloats.size();
float sum = 0;
for (int i = 0; i < count; i++) {
sum += indexedFloats.get(i);
}
bh.consume(sum);
indexedFloats.close();
}

@Benchmark
public void readSkipping(Blackhole bh) throws IOException
{
IndexedFloats indexedFloats = supplier.get();
int count = indexedFloats.size();
float sum = 0;
for (int i = 0; i < count; i += rand.nextInt(2000)) {
sum += indexedFloats.get(i);
}
bh.consume(sum);
indexedFloats.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.benchmark;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSink;
import io.druid.benchmark.datagen.BenchmarkColumnSchema;
import io.druid.benchmark.datagen.BenchmarkColumnValueGenerator;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.FloatSupplierSerializer;
import io.druid.segment.data.TmpFileIOPeon;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FloatCompressionBenchmarkFileGenerator
{
public static final int ROW_NUM = 5000000;
public static final List<CompressedObjectStrategy.CompressionStrategy> compressions =
ImmutableList.of(
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.NONE
);

private static String dirPath = "floatCompress/";

public static void main(String[] args) throws IOException, URISyntaxException
{
if (args.length >= 1) {
dirPath = args[0];
}

BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.FLOAT, true, 1, 0d,
ImmutableList.<Object>of(
0f,
1.1f,
2.2f,
3.3f,
4.4f
),
ImmutableList.of(
0.95,
0.001,
0.0189,
0.03,
0.0001
)
);
BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf(
"",
ValueType.FLOAT,
true,
1,
0d,
-1,
1000,
1d
);
BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf(
"",
ValueType.FLOAT,
true,
1,
0d,
-1,
1000,
3d
);
BenchmarkColumnSchema sequentialSchema = BenchmarkColumnSchema.makeSequential(
"",
ValueType.FLOAT,
true,
1,
0d,
1470187671,
2000000000
);
BenchmarkColumnSchema uniformSchema = BenchmarkColumnSchema.makeContinuousUniform(
"",
ValueType.FLOAT,
true,
1,
0d,
0,
1000
);

Map<String, BenchmarkColumnValueGenerator> generators = new HashMap<>();
generators.put("enumerate", new BenchmarkColumnValueGenerator(enumeratedSchema, 1));
generators.put("zipfLow", new BenchmarkColumnValueGenerator(zipfLowSchema, 1));
generators.put("zipfHigh", new BenchmarkColumnValueGenerator(zipfHighSchema, 1));
generators.put("sequential", new BenchmarkColumnValueGenerator(sequentialSchema, 1));
generators.put("uniform", new BenchmarkColumnValueGenerator(uniformSchema, 1));

File dir = new File(dirPath);
dir.mkdir();

// create data files using BenchmarkColunValueGenerator
for (Map.Entry<String, BenchmarkColumnValueGenerator> entry : generators.entrySet()) {
final File dataFile = new File(dir, entry.getKey());
dataFile.delete();
try (Writer writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dataFile)))) {
for (int i = 0; i < ROW_NUM; i++) {
writer.write((Float) entry.getValue().generateRowValue() + "\n");
}
}
}

// create compressed files using all combinations of CompressionStrategy and FloatEncoding provided
for (Map.Entry<String, BenchmarkColumnValueGenerator> entry : generators.entrySet()) {
for (CompressedObjectStrategy.CompressionStrategy compression : compressions) {
String name = entry.getKey() + "-" + compression.toString();
System.out.print(name + ": ");
File compFile = new File(dir, name);
compFile.delete();
File dataFile = new File(dir, entry.getKey());

TmpFileIOPeon iopeon = new TmpFileIOPeon(true);
FloatSupplierSerializer writer = CompressionFactory.getFloatSerializer(
iopeon,
"float",
ByteOrder.nativeOrder(),
compression
);
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile)));

try (FileChannel output = FileChannel.open(
compFile.toPath(),
StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE
)) {
writer.open();
String line;
while ((line = br.readLine()) != null) {
writer.add(Float.parseFloat(line));
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.closeAndConsolidate(
new ByteSink()
{
@Override
public OutputStream openStream() throws IOException
{
return baos;
}
}
);
output.write(ByteBuffer.wrap(baos.toByteArray()));
}
finally {
iopeon.cleanup();
br.close();
}
System.out.print(compFile.length() / 1024 + "\n");
}
}
}
}
Loading

0 comments on commit c4e8440

Please sign in to comment.