From b875eb1f2e0a56d4b9c01127b40d81983e9be896 Mon Sep 17 00:00:00 2001 From: Jonathan Eagles Date: Wed, 4 Oct 2017 16:43:32 -0500 Subject: [PATCH] TEZ-3848. Tez Local mode doesn't localize distributed cache files (Jacob Tolar via jeagles) --- .../app/launcher/LocalContainerLauncher.java | 42 +++- .../app/launcher/TezLocalCacheManager.java | 184 ++++++++++++++++++ 2 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index d50b49eb52..9764daaaef 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -18,7 +18,6 @@ package org.apache.tez.dag.app.launcher; - import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -48,7 +47,6 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -100,6 +98,9 @@ public class LocalContainerLauncher extends DagContainerLauncher { runningContainers = new ConcurrentHashMap(); + private final ConcurrentHashMap + cacheManagers = new ConcurrentHashMap<>(); + private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build()); @@ -229,6 +230,10 @@ void sendContainerLaunchFailedMsg(ContainerId containerId, String message) { private void handleLaunchFailed(Throwable t, ContainerId containerId) { String message; + + // clean up distributed cache files + cleanupCacheFiles(containerId); + if (t instanceof RejectedExecutionException) { message = "Failed to queue container launch for container Id: " + containerId; } else { @@ -244,10 +249,22 @@ private void launch(ContainerLaunchRequest event) { String tokenIdentifier = context.getApplicationID().toString(); try { TezChild tezChild; + try { int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName()); + + Configuration conf = context.getAMConf(); + if (isLocalMode) { + TezLocalCacheManager cacheManager = new TezLocalCacheManager( + event.getContainerLaunchContext().getLocalResources(), + conf + ); + cacheManagers.put(event.getContainerId(), cacheManager); + cacheManager.localize(); + } + tezChild = - createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier, + createTezChild(conf, event.getContainerId(), tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(), ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(), TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array())); @@ -322,6 +339,9 @@ public void onSuccess(TezChild.ContainerExecutionResult result) { (result.getThrowable() == null ? null : result.getThrowable().getMessage()) : result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR); } + + // clean up distributed cache files + cleanupCacheFiles(containerId); } @Override @@ -341,6 +361,22 @@ public void onFailure(Throwable t) { TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(), "CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED); } + + // clean up distributed cache files + cleanupCacheFiles(containerId); + } + } + + private void cleanupCacheFiles(ContainerId container) { + if (isLocalMode) { + TezLocalCacheManager manager = cacheManagers.remove(container); + try { + if (manager != null) { + manager.cleanup(); + } + } catch (IOException e) { + LOG.info("Unable to clean up local cache files: ", e); + } } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java new file mode 100644 index 0000000000..80f73aa9aa --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.util.FSDownload; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for localizing files from the distributed cache for Tez local mode. + */ +public class TezLocalCacheManager { + + private static final Logger LOG = LoggerFactory.getLogger(TezLocalCacheManager.class); + + final private Map resources; + final private Configuration conf; + final private UserGroupInformation ugi; + final private FileContext fileContext; + final private java.nio.file.Path tempDir; + + final private Map resourceInfo = new HashMap<>(); + + public TezLocalCacheManager(Map resources, Configuration conf) throws IOException { + this.ugi = UserGroupInformation.getCurrentUser(); + this.fileContext = FileContext.getLocalFSFileContext(); + this.resources = resources; + this.conf = conf; + this.tempDir = Files.createTempDirectory(Paths.get("."), "tez-local-cache"); + } + + /** + * Localize this instance's resources by downloading and symlinking them. + * + * @throws IOException when an error occurs in download or link + */ + public void localize() throws IOException { + String absPath = Paths.get(".").toAbsolutePath().normalize().toString(); + Path cwd = fileContext.makeQualified(new Path(absPath)); + ExecutorService threadPool = null; + + try { + // construct new threads with helpful names + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("TezLocalCacheManager Downloader #%d") + .build(); + threadPool = Executors.newCachedThreadPool(threadFactory); + + // start all fetches + for (Map.Entry entry : resources.entrySet()) { + String resourceName = entry.getKey(); + LocalResource resource = entry.getValue(); + + if (resource.getType() == LocalResourceType.PATTERN) { + throw new IllegalArgumentException("Resource type PATTERN not supported."); + } + + // submit task to download the object + java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName); + Path dest = new Path(downloadDir.toAbsolutePath().toString()); + FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource); + Future downloadedPath = threadPool.submit(downloader); + + // linkPath is the path we want to symlink the file/directory into + Path linkPath = new Path(cwd, entry.getKey()); + resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath)); + } + + // Link each file + for (Map.Entry entry : resourceInfo.entrySet()) { + LocalResource resource = entry.getKey(); + ResourceInfo resourceMeta = entry.getValue(); + + Path linkPath = resourceMeta.linkPath; + Path targetPath; + + try { + // this blocks on the download completing + targetPath = resourceMeta.downloadPath.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } + + if (createSymlink(targetPath, linkPath)) { + LOG.info("Localized file: {} as {}", resource, linkPath); + } else { + LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath); + } + } + } finally { + if (threadPool != null) { + threadPool.shutdownNow(); + } + } + } + + /** + * Clean up any symlinks and temp files that were created. + * + * @throws IOException when an error occurs in cleanup + */ + public void cleanup() throws IOException { + for (ResourceInfo info : resourceInfo.values()) { + if (fileContext.util().exists(info.linkPath)) { + fileContext.delete(info.linkPath, true); + } + } + + Path temp = new Path(tempDir.toString()); + if (fileContext.util().exists(temp)) { + fileContext.delete(temp, true); + } + } + + /** + * Create a symlink. + */ + private boolean createSymlink(Path target, Path link) throws IOException { + LOG.info("Creating symlink: {} <- {}", target, link); + String targetPath = target.toUri().getPath(); + String linkPath = link.toUri().getPath(); + + if (fileContext.util().exists(link)) { + LOG.warn("File already exists at symlink path: {}", link); + return false; + } else { + try { + Files.createSymbolicLink(Paths.get(linkPath), Paths.get(targetPath)); + return true; + } catch (UnsupportedOperationException e) { + LOG.warn("Unable to create symlink {} <- {}: UnsupportedOperationException", target, link); + return false; + } + } + } + + /** + * Wrapper to keep track of download path and link path + */ + private static class ResourceInfo { + final Future downloadPath; + final Path linkPath; + + public ResourceInfo(Future downloadPath, Path linkPath) { + this.downloadPath = downloadPath; + this.linkPath = linkPath; + } + } +}