diff --git a/comp-file/src/main/java/org/jumpmind/metl/core/runtime/component/BinaryFileWriter.java b/comp-file/src/main/java/org/jumpmind/metl/core/runtime/component/BinaryFileWriter.java index 7a882a18b..270ef03aa 100644 --- a/comp-file/src/main/java/org/jumpmind/metl/core/runtime/component/BinaryFileWriter.java +++ b/comp-file/src/main/java/org/jumpmind/metl/core/runtime/component/BinaryFileWriter.java @@ -16,6 +16,8 @@ public class BinaryFileWriter extends AbstractFileWriter { public static final String TYPE = "Binary File Writer"; + IDirectory streamable; + @Override public void start() { init(); @@ -24,15 +26,15 @@ public void start() { @Override public void handle(Message inputMessage, ISendMessageCallback callback, boolean unitOfWorkBoundaryReached) { - IDirectory streamable = (IDirectory) getResourceReference(); + streamable = (IDirectory) getResourceReference(); if (inputMessage instanceof BinaryMessage) { BinaryMessage message = (BinaryMessage) inputMessage; String fileName = getFileName(inputMessage); if (!append) { - streamable.delete(fileName); + streamable.delete(fileName, false); } - OutputStream fos = streamable.getOutputStream(fileName, mustExist); + OutputStream fos = streamable.getOutputStream(fileName, mustExist, false, false); try { fos.write(message.getPayload()); @@ -54,5 +56,10 @@ public void handle(Message inputMessage, ISendMessageCallback callback, boolean public boolean supportsStartupMessages() { return false; } + + @Override + public void stop() { + streamable.close(); + } } diff --git a/resource-core/src/main/java/org/jumpmind/metl/core/runtime/resource/SftpDirectory.java b/resource-core/src/main/java/org/jumpmind/metl/core/runtime/resource/SftpDirectory.java index 36cd7fb7d..6c4791572 100644 --- a/resource-core/src/main/java/org/jumpmind/metl/core/runtime/resource/SftpDirectory.java +++ b/resource-core/src/main/java/org/jumpmind/metl/core/runtime/resource/SftpDirectory.java @@ -203,8 +203,9 @@ private boolean fileExists(ChannelSftp sftp, String filePath) throws SftpExcepti @Override public void close() { + close(true); } - + @Override public void close(boolean success) { Session session = threadSession.get(); @@ -220,12 +221,12 @@ public void close(boolean success) { ChannelSftp channel = entry.getValue(); if (channel != null) { channel.disconnect(); - channels.remove(entry.getKey()); } } + channels.clear(); threadChannels.set(null); } - } + } @Override public boolean requiresContentLength() { @@ -283,6 +284,11 @@ protected ChannelSftp openConnectedChannel() throws JSchException { Session session = openSession(); ChannelSftp channel = (ChannelSftp) session.openChannel("sftp"); channel.connect(); + try { + channel.cd(basePath); + } catch (SftpException e) { + throw new IoException(e); + } return channel; } @@ -306,6 +312,11 @@ protected ChannelSftp openConnectedChannel(int channelId) throws JSchException { } if (!channel.isConnected()) { channel.connect(); + try { + channel.cd(basePath); + } catch (SftpException e) { + throw new IoException(e); + } } channels.put(channelId, channel); threadChannels.set(channels); @@ -482,7 +493,6 @@ public OutputStream getOutputStream(String relativePath, boolean mustExist, bool session = openSession(); // Get a reusable channel if the session is not auto closed. sftp = (closeSession) ? openConnectedChannel() : openConnectedChannel(CHANNEL_OUT); - sftp.cd(basePath); createRelativePathDirectoriesIfNecessary(sftp, relativePath, mustExist); return new CloseableOutputStream(sftp.put(relativePath, ChannelSftp.OVERWRITE), session, sftp, closeSession); } catch (Exception e) {