Skip to content

Commit

Permalink
TEZ-3848. Tez Local mode doesn't localize distributed cache files (Ja…
Browse files Browse the repository at this point in the history
…cob Tolar via jeagles)
  • Loading branch information
Jonathan Eagles committed Oct 4, 2017
1 parent 3b2933f commit b875eb1
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.tez.dag.app.launcher;


import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -48,7 +47,6 @@
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
Expand Down Expand Up @@ -100,6 +98,9 @@ public class LocalContainerLauncher extends DagContainerLauncher {
runningContainers =
new ConcurrentHashMap<ContainerId, RunningTaskCallback>();

private final ConcurrentHashMap<ContainerId, TezLocalCacheManager>
cacheManagers = new ConcurrentHashMap<>();

private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());

Expand Down Expand Up @@ -229,6 +230,10 @@ void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {

private void handleLaunchFailed(Throwable t, ContainerId containerId) {
String message;

// clean up distributed cache files
cleanupCacheFiles(containerId);

if (t instanceof RejectedExecutionException) {
message = "Failed to queue container launch for container Id: " + containerId;
} else {
Expand All @@ -244,10 +249,22 @@ private void launch(ContainerLaunchRequest event) {
String tokenIdentifier = context.getApplicationID().toString();
try {
TezChild tezChild;

try {
int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName());

Configuration conf = context.getAMConf();
if (isLocalMode) {
TezLocalCacheManager cacheManager = new TezLocalCacheManager(
event.getContainerLaunchContext().getLocalResources(),
conf
);
cacheManagers.put(event.getContainerId(), cacheManager);
cacheManager.localize();
}

tezChild =
createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
createTezChild(conf, event.getContainerId(), tokenIdentifier,
context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(),
TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
Expand Down Expand Up @@ -322,6 +339,9 @@ public void onSuccess(TezChild.ContainerExecutionResult result) {
(result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
}

// clean up distributed cache files
cleanupCacheFiles(containerId);
}

@Override
Expand All @@ -341,6 +361,22 @@ public void onFailure(Throwable t) {
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
"CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED);
}

// clean up distributed cache files
cleanupCacheFiles(containerId);
}
}

private void cleanupCacheFiles(ContainerId container) {
if (isLocalMode) {
TezLocalCacheManager manager = cacheManagers.remove(container);
try {
if (manager != null) {
manager.cleanup();
}
} catch (IOException e) {
LOG.info("Unable to clean up local cache files: ", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app.launcher;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.util.FSDownload;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is responsible for localizing files from the distributed cache for Tez local mode.
*/
public class TezLocalCacheManager {

private static final Logger LOG = LoggerFactory.getLogger(TezLocalCacheManager.class);

final private Map<String, LocalResource> resources;
final private Configuration conf;
final private UserGroupInformation ugi;
final private FileContext fileContext;
final private java.nio.file.Path tempDir;

final private Map<LocalResource, ResourceInfo> resourceInfo = new HashMap<>();

public TezLocalCacheManager(Map<String, LocalResource> resources, Configuration conf) throws IOException {
this.ugi = UserGroupInformation.getCurrentUser();
this.fileContext = FileContext.getLocalFSFileContext();
this.resources = resources;
this.conf = conf;
this.tempDir = Files.createTempDirectory(Paths.get("."), "tez-local-cache");
}

/**
* Localize this instance's resources by downloading and symlinking them.
*
* @throws IOException when an error occurs in download or link
*/
public void localize() throws IOException {
String absPath = Paths.get(".").toAbsolutePath().normalize().toString();
Path cwd = fileContext.makeQualified(new Path(absPath));
ExecutorService threadPool = null;

try {
// construct new threads with helpful names
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("TezLocalCacheManager Downloader #%d")
.build();
threadPool = Executors.newCachedThreadPool(threadFactory);

// start all fetches
for (Map.Entry<String, LocalResource> entry : resources.entrySet()) {
String resourceName = entry.getKey();
LocalResource resource = entry.getValue();

if (resource.getType() == LocalResourceType.PATTERN) {
throw new IllegalArgumentException("Resource type PATTERN not supported.");
}

// submit task to download the object
java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName);
Path dest = new Path(downloadDir.toAbsolutePath().toString());
FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource);
Future<Path> downloadedPath = threadPool.submit(downloader);

// linkPath is the path we want to symlink the file/directory into
Path linkPath = new Path(cwd, entry.getKey());
resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath));
}

// Link each file
for (Map.Entry<LocalResource, ResourceInfo> entry : resourceInfo.entrySet()) {
LocalResource resource = entry.getKey();
ResourceInfo resourceMeta = entry.getValue();

Path linkPath = resourceMeta.linkPath;
Path targetPath;

try {
// this blocks on the download completing
targetPath = resourceMeta.downloadPath.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}

if (createSymlink(targetPath, linkPath)) {
LOG.info("Localized file: {} as {}", resource, linkPath);
} else {
LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath);
}
}
} finally {
if (threadPool != null) {
threadPool.shutdownNow();
}
}
}

/**
* Clean up any symlinks and temp files that were created.
*
* @throws IOException when an error occurs in cleanup
*/
public void cleanup() throws IOException {
for (ResourceInfo info : resourceInfo.values()) {
if (fileContext.util().exists(info.linkPath)) {
fileContext.delete(info.linkPath, true);
}
}

Path temp = new Path(tempDir.toString());
if (fileContext.util().exists(temp)) {
fileContext.delete(temp, true);
}
}

/**
* Create a symlink.
*/
private boolean createSymlink(Path target, Path link) throws IOException {
LOG.info("Creating symlink: {} <- {}", target, link);
String targetPath = target.toUri().getPath();
String linkPath = link.toUri().getPath();

if (fileContext.util().exists(link)) {
LOG.warn("File already exists at symlink path: {}", link);
return false;
} else {
try {
Files.createSymbolicLink(Paths.get(linkPath), Paths.get(targetPath));
return true;
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create symlink {} <- {}: UnsupportedOperationException", target, link);
return false;
}
}
}

/**
* Wrapper to keep track of download path and link path
*/
private static class ResourceInfo {
final Future<Path> downloadPath;
final Path linkPath;

public ResourceInfo(Future<Path> downloadPath, Path linkPath) {
this.downloadPath = downloadPath;
this.linkPath = linkPath;
}
}
}

0 comments on commit b875eb1

Please sign in to comment.