Skip to content

Commit

Permalink
ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt)
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1362656 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
phunt committed Jul 17, 2012
1 parent 76eda1b commit fb83418
Show file tree
Hide file tree
Showing 7 changed files with 499 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ BUGFIXES:
takes a long time with large datasets - is correlated to dataset size
(fpj and Thawan Kooburat via camille)

ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt)

IMPROVEMENTS:

ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Expand Down
115 changes: 115 additions & 0 deletions src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.common;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
* This code is originally from HDFS, see the similarly named files there
* in case of bug fixing, history, etc...
*/

/**
* A FileOutputStream that has the property that it will only show up at its
* destination once it has been entirely written and flushed to disk. While
* being written, it will use a .tmp suffix.
*
* When the output stream is closed, it is flushed, fsynced, and will be moved
* into place, overwriting any file that already exists at that location.
*
* <b>NOTE</b>: on Windows platforms, it will not atomically replace the target
* file - instead the target file is deleted before this one is moved into
* place.
*/
public class AtomicFileOutputStream extends FilterOutputStream {
private static final String TMP_EXTENSION = ".tmp";

private final static Logger LOG = LoggerFactory
.getLogger(AtomicFileOutputStream.class);

private final File origFile;
private final File tmpFile;

public AtomicFileOutputStream(File f) throws FileNotFoundException {
// Code unfortunately must be duplicated below since we can't assign
// anything
// before calling super
super(new FileOutputStream(new File(f.getParentFile(), f.getName()
+ TMP_EXTENSION)));
origFile = f.getAbsoluteFile();
tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION)
.getAbsoluteFile();
}

@Override
public void close() throws IOException {
boolean triedToClose = false, success = false;
try {
flush();
((FileOutputStream) out).getChannel().force(true);

triedToClose = true;
super.close();
success = true;
} finally {
if (success) {
boolean renamed = tmpFile.renameTo(origFile);
if (!renamed) {
// On windows, renameTo does not replace.
if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
throw new IOException(
"Could not rename temporary file " + tmpFile
+ " to " + origFile);
}
}
} else {
if (!triedToClose) {
// If we failed when flushing, try to close it to not leak
// an FD
IOUtils.closeStream(out);
}
// close wasn't successful, try to delete the tmp file
if (!tmpFile.delete()) {
LOG.warn("Unable to delete tmp file " + tmpFile);
}
}
}
}

/**
* Close the atomic file, but do not "commit" the temporary file on top of
* the destination. This should be used if there is a failure in writing.
*/
public void abort() {
try {
super.close();
} catch (IOException ioe) {
LOG.warn("Unable to abort file " + tmpFile, ioe);
}
if (!tmpFile.delete()) {
LOG.warn("Unable to delete tmp file during abort " + tmpFile);
}
}
}
123 changes: 123 additions & 0 deletions src/java/main/org/apache/zookeeper/common/IOUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.common;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;

import org.slf4j.Logger;

/*
* This code is originally from HDFS, see the similarly named files there
* in case of bug fixing, history, etc...
*/

public class IOUtils {
/**
* Closes the stream ignoring {@link IOException}. Must only be called in
* cleaning up from exception handlers.
*
* @param stream
* the Stream to close
*/
public static void closeStream(Closeable stream) {
cleanup(null, stream);
}

/**
* Close the Closeable objects and <b>ignore</b> any {@link IOException} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param log
* the log to record problems to at debug level. Can be null.
* @param closeables
* the objects to close
*/
public static void cleanup(Logger log, Closeable... closeables) {
for (Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch (IOException e) {
if (log != null) {
log.warn("Exception in closing " + c, e);
}
}
}
}
}

/**
* Copies from one stream to another.
*
* @param in
* InputStrem to read from
* @param out
* OutputStream to write to
* @param buffSize
* the size of the buffer
* @param close
* whether or not close the InputStream and OutputStream at the
* end. The streams are closed in the finally clause.
*/
public static void copyBytes(InputStream in, OutputStream out,
int buffSize, boolean close) throws IOException {
try {
copyBytes(in, out, buffSize);
if (close) {
out.close();
out = null;
in.close();
in = null;
}
} finally {
if (close) {
closeStream(out);
closeStream(in);
}
}
}

/**
* Copies from one stream to another.
*
* @param in
* InputStrem to read from
* @param out
* OutputStream to write to
* @param buffSize
* the size of the buffer
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}

}
28 changes: 24 additions & 4 deletions src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
Expand All @@ -39,6 +38,7 @@
import java.util.Map;
import java.util.Properties;

import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ServerCnxnFactory;
Expand Down Expand Up @@ -1283,16 +1283,36 @@ private long readLongFromFile(String name) throws IOException {

public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";

/**
* Write a long value to disk atomically. Either succeeds or an exception
* is thrown.
* @param name file name to write the long to
* @param value the long value to write to the named file
* @throws IOException if the file cannot be written atomically
*/
private void writeLongToFile(String name, long value) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
FileOutputStream out = new FileOutputStream(file);
AtomicFileOutputStream out = new AtomicFileOutputStream(file);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
boolean aborted = false;
try {
bw.write(Long.toString(value));
bw.flush();
out.getFD().sync();

out.flush();
} catch (IOException e) {
LOG.error("Failed to write new file " + file, e);
// worst case here the tmp file/resources(fd) are not cleaned up
// and the caller will be notified (IOException)
aborted = true;
out.abort();
throw e;
} finally {
bw.close();
if (!aborted) {
// if the close operation (rename) fails we'll get notified.
// worst case the tmp file may still exist
out.close();
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/java/test/config/findbugsExcludeFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,10 @@
</Or>
</Match>

<Match>
<Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
<Bug pattern="OS_OPEN_STREAM" />
<Method name="writeLongToFile" />
</Match>

</FindBugsFilter>
Loading

0 comments on commit fb83418

Please sign in to comment.