Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 23, 2025
1 parent 1847c56 commit 02b5bb5
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class FileTransferClientImpl implements FileTransferClient {
private final NodeInfo nodeInfo;
private final TargetNodeSetFactory targetNodeSetFactory;
private final WebTargetFactory webTargetFactory;
private final SequentialFileStore fileStore;
private final PartDestination partDestination;
private final SecurityContext securityContext;

@Inject
Expand All @@ -53,14 +53,14 @@ public FileTransferClientImpl(final Provider<PlanBConfig> configProvider,
final NodeInfo nodeInfo,
@Nullable final TargetNodeSetFactory targetNodeSetFactory,
@Nullable final WebTargetFactory webTargetFactory,
final SequentialFileStore fileStore,
final PartDestination partDestination,
final SecurityContext securityContext) {
this.configProvider = configProvider;
this.nodeService = nodeService;
this.nodeInfo = nodeInfo;
this.targetNodeSetFactory = targetNodeSetFactory;
this.webTargetFactory = webTargetFactory;
this.fileStore = fileStore;
this.partDestination = partDestination;
this.securityContext = securityContext;
}

Expand Down Expand Up @@ -99,9 +99,12 @@ public void storePart(final FileDescriptor fileDescriptor,
// Send the data to all nodes.
for (final String nodeName : targetNodes) {
if (NodeCallUtil.shouldExecuteLocally(nodeInfo, nodeName)) {
// Allow file move if the only target is the local node.
final boolean allowMove = targetNodes.size() == 1;
storePartLocally(
fileDescriptor,
path);
path,
allowMove);
} else {
storePartRemotely(
nodeName,
Expand All @@ -117,8 +120,9 @@ public void storePart(final FileDescriptor fileDescriptor,
}

private void storePartLocally(final FileDescriptor fileDescriptor,
final Path path) throws IOException {
fileStore.add(fileDescriptor, path);
final Path path,
final boolean allowMove) throws IOException {
partDestination.receiveLocalPart(fileDescriptor, path, allowMove);
}

private void storePartRemotely(final String targetNode,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,30 @@
package stroom.planb.impl.data;

import stroom.planb.impl.db.StatePaths;
import stroom.security.api.SecurityContext;
import stroom.util.io.FileUtil;
import stroom.util.io.StreamUtil;
import stroom.util.shared.PermissionException;
import stroom.util.string.StringIdUtil;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;

@Singleton
public class FileTransferServiceImpl implements FileTransferService {

private final SequentialFileStore fileStore;
private final PartDestination partReceiver;
private final SecurityContext securityContext;
private final ShardManager shardManager;

private final Path receiveDir;
private final AtomicLong receiveId = new AtomicLong();

@Inject
public FileTransferServiceImpl(final SequentialFileStore fileStore,
public FileTransferServiceImpl(final PartDestination partReceiver,
final SecurityContext securityContext,
final ShardManager shardManager,
final StatePaths statePaths) {
this.fileStore = fileStore;
final ShardManager shardManager) {
this.partReceiver = partReceiver;
this.securityContext = securityContext;
this.shardManager = shardManager;

// Create the receive directory.
receiveDir = statePaths.getReceiveDir();
FileUtil.ensureDirExists(receiveDir);
if (!FileUtil.deleteContents(receiveDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(receiveDir));
}
}

/**
Expand Down Expand Up @@ -87,15 +71,6 @@ public void receivePart(final long createTime,
final String fileHash,
final String fileName,
final InputStream inputStream) throws IOException {
if (!securityContext.isProcessingUser()) {
throw new PermissionException(securityContext.getUserRef(), "Only processing users can use this resource");
}

final FileDescriptor fileDescriptor = new FileDescriptor(createTime, metaId, fileHash);
final String receiveFileName = StringIdUtil.idToString(receiveId.incrementAndGet()) +
SequentialFile.ZIP_EXTENSION;
final Path receiveFile = receiveDir.resolve(receiveFileName);
StreamUtil.streamToFile(inputStream, receiveFile);
fileStore.add(fileDescriptor, receiveFile);
partReceiver.receiveRemotePart(createTime, metaId, fileHash, fileName, inputStream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package stroom.planb.impl.data;

import stroom.planb.impl.db.StatePaths;
import stroom.security.api.SecurityContext;
import stroom.util.io.FileUtil;
import stroom.util.io.StreamUtil;
import stroom.util.shared.PermissionException;
import stroom.util.string.StringIdUtil;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;

@Singleton
public class PartDestination {

private final SequentialFileStore fileStore;
private final SecurityContext securityContext;

private final Path receiveDir;
private final AtomicLong receiveId = new AtomicLong();

@Inject
public PartDestination(final SequentialFileStore fileStore,
final SecurityContext securityContext,
final StatePaths statePaths) {
this.fileStore = fileStore;
this.securityContext = securityContext;

// Create the receive directory.
receiveDir = statePaths.getReceiveDir();
FileUtil.ensureDirExists(receiveDir);
if (!FileUtil.deleteContents(receiveDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(receiveDir));
}
}

/**
* Receive a part file to add to an existing shard.
*
* @param createTime
* @param metaId
* @param fileHash
* @param fileName
* @param inputStream
* @throws IOException
*/
public void receiveRemotePart(final long createTime,
final long metaId,
final String fileHash,
final String fileName,
final InputStream inputStream) throws IOException {
if (!securityContext.isProcessingUser()) {
throw new PermissionException(securityContext.getUserRef(), "Only processing users can use this resource");
}

final FileDescriptor fileDescriptor = new FileDescriptor(createTime, metaId, fileHash);
final String receiveFileName = StringIdUtil.idToString(receiveId.incrementAndGet()) +
SequentialFile.ZIP_EXTENSION;
final Path receiveFile = receiveDir.resolve(receiveFileName);
StreamUtil.streamToFile(inputStream, receiveFile);
fileStore.add(fileDescriptor, receiveFile);
}

public void receiveLocalPart(final FileDescriptor fileDescriptor,
final Path sourcePath,
final boolean allowMove)
throws IOException {
if (allowMove) {
// If we allow move then we can allow the file store to move the file directly into the store.
fileStore.add(fileDescriptor, sourcePath);

} else {
// Otherwise we need to copy the file to a temporary location first before it can be moved into the store.
final String receiveFileName = StringIdUtil.idToString(receiveId.incrementAndGet()) +
SequentialFile.ZIP_EXTENSION;
final Path receiveFile = receiveDir.resolve(receiveFileName);
Files.copy(sourcePath, receiveFile);
fileStore.add(fileDescriptor, receiveFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void close() throws IOException {
try {
FileUtil.deleteDir(dir);
if (zipFile != null) {
FileUtil.delete(zipFile);
Files.deleteIfExists(zipFile);
}
} catch (final Exception e) {
LOGGER.error(e.getMessage(), e);
Expand Down

0 comments on commit 02b5bb5

Please sign in to comment.