Skip to content

Commit

Permalink
adding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
srrangarajan committed Jul 31, 2023
1 parent 6f3935f commit fde133b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder impl
static final String COMPRESSION = "COMPRESSION_METRIC";

private final TranscoderUtils tu = new TranscoderUtils(true);
private final String appName;
private final Compressor compressor;
private final String compressionAlgo;
private final Integer compressionLevel;
Expand All @@ -96,6 +97,7 @@ public EVCacheSerializingTranscoder(String appName) {
public EVCacheSerializingTranscoder(String appName, int max) {
super(max);
compressor = new Compressor();
this.appName = appName;
this.compressionAlgo = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".compression.algo", String.class).orElse("gzip").get();
this.compressionLevel = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".compression.level", Integer.class).orElse(3).get();
}
Expand Down Expand Up @@ -207,7 +209,7 @@ public CachedData encode(Object o) {
if (b.length > compressionThreshold) {
byte[] compressed;
try {
compressed = compressor.compress(b, compressionAlgo, compressionLevel);
compressed = compressor.compress(b, compressionAlgo, compressionLevel, appName);
} catch (IOException e) {
getLogger().error("throwing exception in encoding due to compression {}", e);
throw new RuntimeException(e);
Expand All @@ -232,20 +234,7 @@ public CachedData encode(Object o) {
getLogger().info("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
}

long compression_ratio = Math.round((double) compressed.length / b.length * 100);
updateTimerWithCompressionRatio(compression_ratio);
}
return new CachedData(flags, b, getMaxSize());
}

private void updateTimerWithCompressionRatio(long ratio_percentage) {
if(timer == null) {
final List<Tag> tagList = new ArrayList<Tag>(1);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, compressionAlgo));
timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100));
};
timer.record(ratio_percentage, TimeUnit.MILLISECONDS);
}

}
63 changes: 49 additions & 14 deletions evcache-core/src/main/java/com/netflix/evcache/util/Compressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.DistributionSummary;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
Expand All @@ -16,10 +17,10 @@
import org.xerial.snappy.Snappy;
import org.xerial.snappy.SnappyOutputStream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
Expand All @@ -28,8 +29,16 @@

public class Compressor {
private static final Logger log = LoggerFactory.getLogger(Compressor.class);
public static final String CLIENT_ALGO = "client.algo";
public static final String CLIENT_COMP_RATIO = "Client_CompressionRatio";
public static final String ORIG_SIZE = "Client_UncompressedSize";
public static final String COMP_SIZE = "Client_CompressedSize";
public static final String APP_NAME = "Client.appName";
public Map<String, DistributionSummary> distributionSummaryCompRatioMap = new ConcurrentHashMap<>();
public Map<String, DistributionSummary> distributionSummaryOrigSizeMap = new ConcurrentHashMap<>();
public Map<String, DistributionSummary> distributionSummaryCompSizeMap = new ConcurrentHashMap<>();
private static Timer timer;
public byte[] compress(byte[] data, String algorithm, int level) throws IOException {
public byte[] compress(byte[] data, String algorithm, int level, String appName) throws IOException {
byte[] compressedData = null;
switch (algorithm) {
case "zstd":
Expand All @@ -56,32 +65,54 @@ public byte[] compress(byte[] data, String algorithm, int level) throws IOExcept
snappyOut.close();
compressedData = snappyBaos.toByteArray();
break;
default:
case "gzip":
ByteArrayOutputStream gzipBaos = new ByteArrayOutputStream();
GZIPOutputStream gzipOut = new GZIPOutputStream(gzipBaos);
gzipOut.write(data);
gzipOut.close();
compressedData = gzipBaos.toByteArray();
break;
default:
throw new IllegalArgumentException("Invalid compression algorithm: " + algorithm);
}

// higher ratio means better compressed and vice-versa
double ratio = (double) data.length / compressedData.length;
System.out.println("compression ratio: " + ratio);
log.info("compression ratio: " + ratio);
if (timer == null) {
final List<Tag> tagList = new ArrayList<Tag>(1);
tagList.add(new BasicTag("repl.algo", algorithm));
timer = EVCacheMetricsFactory.getInstance().getPercentileTimer("repl.ratio", tagList, Duration.ofMillis(100));
log.debug("compression ratio: " + ratio + "for app: " + appName);
final List<Tag> tagList = new ArrayList<Tag>(2);
tagList.add(new BasicTag(CLIENT_ALGO, algorithm));
tagList.add(new BasicTag(APP_NAME, appName));
DistributionSummary distributionSummaryCompRatio = this.distributionSummaryCompRatioMap.get(appName);
if (distributionSummaryCompRatio == null) {
distributionSummaryCompRatio = EVCacheMetricsFactory.getInstance().getDistributionSummary(CLIENT_COMP_RATIO, tagList);
this.distributionSummaryCompRatioMap.put(appName, distributionSummaryCompRatio);
}
distributionSummaryCompRatio.record((long) ratio);

timer.record((long) ratio, TimeUnit.MILLISECONDS);
if (ratio > 1)
final List<Tag> tagList2 = new ArrayList<Tag>(1);
tagList2.add(new BasicTag(APP_NAME, appName));
DistributionSummary distributionSummaryOrigSize =
this.distributionSummaryOrigSizeMap.get(appName);
if (distributionSummaryOrigSize == null) {
distributionSummaryOrigSize = EVCacheMetricsFactory.getInstance().getDistributionSummary(ORIG_SIZE, tagList2);
this.distributionSummaryOrigSizeMap.put(appName, distributionSummaryOrigSize);
}
distributionSummaryOrigSize.record(data.length);

DistributionSummary distributionSummaryCompSize = this.distributionSummaryCompSizeMap.get(appName);
if (distributionSummaryCompSize == null) {
distributionSummaryCompSize = EVCacheMetricsFactory.getInstance().getDistributionSummary(COMP_SIZE, tagList2);
this.distributionSummaryCompSizeMap.put(appName, distributionSummaryCompSize);
}
distributionSummaryCompSize.record(compressedData.length);
if (ratio > 1) {
return compressedData;
}
return data;
}


public static byte[] decompress(byte[] data, String algorithm) throws IOException {
public byte[] decompress(byte[] data, String algorithm) throws IOException {
byte[] decompressedData = null;
int len;
switch (algorithm) {
Expand All @@ -97,18 +128,20 @@ public static byte[] decompress(byte[] data, String algorithm) throws IOExceptio
zstdIn.close();
baos.close();
break;

case "lz4":
LZ4Factory factory = LZ4Factory.fastestInstance();
LZ4SafeDecompressor decompressor = factory.safeDecompressor();
int decompressedLength = decompressor.decompress(data, 0, data.length, null, 0);
decompressedData = new byte[decompressedLength];
decompressor.decompress(data, 0, data.length, decompressedData, 0);
break;

case "snappy":
decompressedData = Snappy.uncompress(data);
break;

default:
case "gzip":
ByteArrayInputStream gzipBais = new ByteArrayInputStream(data);
GZIPInputStream gzipIn = new GZIPInputStream(gzipBais);
ByteArrayOutputStream gzipBaos = new ByteArrayOutputStream();
Expand All @@ -120,6 +153,8 @@ public static byte[] decompress(byte[] data, String algorithm) throws IOExceptio
gzipIn.close();
gzipBaos.close();
break;
default:
throw new IllegalArgumentException("Invalid compression algorithm: " + algorithm);
}
return decompressedData;
}
Expand Down

0 comments on commit fde133b

Please sign in to comment.