Skip to content
This repository has been archived by the owner on Jan 18, 2023. It is now read-only.

Commit

Permalink
[DP-313] Dont drop messages on rebalance (#2)
Browse files Browse the repository at this point in the history
* Add FormatConvertingUploadManager

* [DP-313] Dont drop messages on rebalance

* Address PR feedback

* Remove some println's
  • Loading branch information
jmbutter authored Aug 15, 2019
1 parent 2b666be commit 2bc2a1b
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ public String getUploadManagerClass() {
return getString("secor.upload.manager.class");
}

public String getInnerUploadManagerClass() {
return getString("secor.inner.upload.manager.class");
}

public String getMessageTransformerClass(){
return getString("secor.message.transformer.class");
}
Expand Down Expand Up @@ -572,6 +576,10 @@ public String getFileReaderWriterFactory() {
return getString("secor.file.reader.writer.factory");
}

public String getDestinationFileReaderWriterFactory() {
return getString("secor.file.destination.reader.writer.factory");
}

public String getFileReaderDelimiter(){
String readerDelimiter = getString("secor.file.reader.Delimiter");
if (readerDelimiter.length() > 1) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.uploader;

import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.uploader.UploadManager;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.ReflectionUtil;

import java.io.File;
import java.nio.file.Paths;

import org.apache.hadoop.io.compress.CompressionCodec;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Manages uploads using the configured upload manager
* Converts the local files to the specified format before upload
*
* @author Jason Butterfield ([email protected])
*/
public class FormatConvertingUploadManager extends UploadManager {
private static final Logger LOG = LoggerFactory.getLogger(FormatConvertingUploadManager.class);

private UploadManager mUploadManager;

public FormatConvertingUploadManager(SecorConfig config) throws Exception {
super(config);
mUploadManager = createUploadManager();
}

@Override
public TempFileUploadHandle<Handle<?>> upload(LogFilePath localPath) throws Exception {
// convert the file from the internal format to the external format
LogFilePath convertedFilePath = convertFile(localPath);

// The TempFileUploadHandle will delete the temp file after it resolves
return new TempFileUploadHandle(mUploadManager.upload(convertedFilePath), convertedFilePath);
}

private LogFilePath convertFile(LogFilePath srcPath) throws Exception {
FileReader reader = null;
FileWriter writer = null;
int copiedMessages = 0;

String localConvertedPrefix = Paths.get(mConfig.getLocalPath(), IdUtil.getLocalMessageDir(), "convertedForUpload").toString();
LogFilePath convertedFilePath = srcPath.withPrefix(localConvertedPrefix);

try {
CompressionCodec codec = null;
String extension = "";
if (mConfig.getCompressionCodec() != null && !mConfig.getCompressionCodec().isEmpty()) {
codec = CompressionUtil.createCompressionCodec(mConfig.getCompressionCodec());
extension = codec.getDefaultExtension();
}

reader = createReader(srcPath, codec);
writer = createWriter(convertedFilePath, codec);

KeyValue keyVal;
while ((keyVal = reader.next()) != null) {
writer.write(keyVal);
copiedMessages++;
}
} finally {
if (reader != null) {
reader.close();
}
if (writer != null) {
writer.close();
}
}

LOG.info("converted {} messages from {} to {}",
copiedMessages,
srcPath.getLogFilePath(),
convertedFilePath.getLogFilePath()
);

return convertedFilePath;
}

/**
* This method is intended to make mocking easier in tests.
* @param srcPath source Path
* @param codec compression codec
* @return FileReader created file reader
* @throws Exception on error
*/
protected FileReader createReader(LogFilePath srcPath, CompressionCodec codec) throws Exception {
return ReflectionUtil.createFileReader(
mConfig.getFileReaderWriterFactory(),
srcPath,
codec,
mConfig
);
}

/**
* This method is intended to make mocking easier in tests.
* @param dstPath destination Path
* @param codec compression codec
* @return FileWriter created file writer
* @throws Exception on error
*/
protected FileWriter createWriter(LogFilePath dstPath, CompressionCodec codec) throws Exception {
FileWriter writer = ReflectionUtil.createFileWriter(
mConfig.getDestinationFileReaderWriterFactory(),
dstPath,
codec,
mConfig
);
FileUtil.deleteOnExit(dstPath.getLogFilePath());
FileUtil.deleteOnExit(dstPath.getLogFileCrcPath());
return writer;
}

/**
* This method is intended to make mocking easier in tests.
* @return UploadManager created upload manager
* @throws Exception on error
*/
public UploadManager createUploadManager() throws Exception {
return ReflectionUtil.createUploadManager(mConfig.getInnerUploadManagerClass(), mConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.uploader;

import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.uploader.Handle;
import com.pinterest.secor.util.FileUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Wraps an Upload of a temp file
* Deletes the temp file after the
* wrapped Handle.get() resolves
*
* @author Jason Butterfield ([email protected])
*/
public class TempFileUploadHandle<T> implements Handle<T> {
private static final Logger LOG = LoggerFactory.getLogger(TempFileUploadHandle.class);

private Handle<T> mHandle;
private LogFilePath mPath;

public TempFileUploadHandle(Handle<T> h, LogFilePath srcPath) {
mHandle = h;
mPath = srcPath;
}

public T get() throws Exception {
T result = null;
try {
result = mHandle.get();
} finally {
FileUtil.delete(mPath.getLogFilePath());
FileUtil.delete(mPath.getLogFileCrcPath());
LOG.debug("deleting temp files {} and {}",
mPath.getLogFilePath(),
mPath.getLogFileCrcPath()
);
}

return result;
}

public LogFilePath getTempFilePath() {
return mPath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.uploader;

import com.google.common.io.Files;

import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory;
import com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.uploader.FormatConvertingUploadManager;
import com.pinterest.secor.uploader.S3UploadManager;
import com.pinterest.secor.uploader.S3UploadHandle;
import com.pinterest.secor.uploader.Handle;
import com.pinterest.secor.uploader.TempFileUploadHandle;
import com.pinterest.secor.uploader.TestHandle;
import com.pinterest.secor.uploader.TestUploadManager;
import com.pinterest.secor.uploader.UploadManager;
import com.pinterest.secor.util.FileUtil;

import java.io.IOException;
import java.io.File;
import java.util.HashSet;

import junit.framework.TestCase;

import org.apache.hadoop.io.compress.CompressionCodec;
import org.joda.time.DateTime;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

/**
* Tests logic of converting files from
* one format to another prior to upload
*
* @author Jason Butterfield ([email protected])
*/
@RunWith(PowerMockRunner.class)
public class FormatConvertingUploadManagerTest extends TestCase {

public void testUpload() throws Exception {
SecorConfig mConfig = Mockito.mock(SecorConfig.class);
Mockito.when(mConfig.getFileReaderWriterFactory()).thenReturn("com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory");
Mockito.when(mConfig.getDestinationFileReaderWriterFactory()).thenReturn("com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory");
Mockito.when(mConfig.getInnerUploadManagerClass()).thenReturn("com.pinterest.secor.uploader.TestUploadManager");

FormatConvertingUploadManager unspiedUploadManager = new FormatConvertingUploadManager(mConfig);
FormatConvertingUploadManager uploadManager = Mockito.spy(unspiedUploadManager);

PowerMockito.whenNew(FormatConvertingUploadManager.class).withArguments(mConfig).thenReturn(uploadManager);

SequenceFileReaderWriterFactory sequenceFileFactory = new SequenceFileReaderWriterFactory();
LogFilePath tempLogFilePath = new LogFilePath(Files.createTempDir().toString(),
"test-topic",
new String[]{"part-1"},
0,
1,
23232,
".log"
);

FileWriter writer = sequenceFileFactory.BuildFileWriter(tempLogFilePath, null);

KeyValue kv1 = new KeyValue(23232, "value1".getBytes());
KeyValue kv2 = new KeyValue(23233, "value2".getBytes());
writer.write(kv1);
writer.write(kv2);
writer.close();

TempFileUploadHandle<?> tempFileUploadHandle = uploadManager.upload(tempLogFilePath);
LogFilePath convertedFilePath = tempFileUploadHandle.getTempFilePath();

// test that the converted file contains the correct format
DelimitedTextFileReaderWriterFactory textFileFactory = new DelimitedTextFileReaderWriterFactory();
FileReader reader = textFileFactory.BuildFileReader(convertedFilePath, null);
KeyValue kvout = reader.next();
assertEquals(kv1.getOffset(), kvout.getOffset());
assertArrayEquals(kv1.getValue(), kvout.getValue());

kvout = reader.next();
assertEquals(kv2.getOffset(), kvout.getOffset());
assertArrayEquals(kv2.getValue(), kvout.getValue());
reader.close();

// Test that it deletes the file after resolving the handle
tempFileUploadHandle.get();
assertEquals(false, (new File(convertedFilePath.getLogFilePath()).exists()));
assertEquals(false, (new File(convertedFilePath.getLogFileCrcPath()).exists()));
}
}
33 changes: 33 additions & 0 deletions src/test/java/com/pinterest/secor/uploader/TestHandle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.uploader;

import com.pinterest.secor.uploader.Handle;

/**
* Used in tests only
*/
public class TestHandle implements Handle<String> {

public TestHandle() {
}

@Override
public String get() throws Exception {
return "";
}
}
Loading

0 comments on commit 2bc2a1b

Please sign in to comment.