Skip to content

Commit

Permalink
changes to support s3/s4 URI in Ivory
Browse files Browse the repository at this point in the history
  • Loading branch information
shaikidris committed Dec 17, 2012
1 parent 653cc42 commit 8d0b860
Show file tree
Hide file tree
Showing 19 changed files with 101 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private FileSystem getFileSystem(

FileSystem fs;
try {
fs = new Path(ClusterHelper.getHdfsUrl(cluster))
fs = new Path(ClusterHelper.getStoageUrl(cluster))
.getFileSystem(new Configuration());
} catch (IOException e) {
throw new IvoryException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private ClusterHelper() {}

public static Configuration getConfiguration(Cluster cluster) {
Configuration conf = new Configuration();
conf.set("fs.default.name", getHdfsUrl(cluster));
conf.set("fs.default.name", getStoageUrl(cluster));
conf.set("mapred.job.tracker", getMREndPoint(cluster));
if(cluster.getProperties() != null)
for(Property prop:cluster.getProperties().getProperties()) {
Expand All @@ -46,11 +46,11 @@ public static String getOozieUrl(Cluster cluster) {
return getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
}

public static String getHdfsUrl(Cluster cluster) {
public static String getStoageUrl(Cluster cluster) {
return getNormalizedUrl(cluster, Interfacetype.WRITE);
}

public static String getReadOnlyHdfsUrl(Cluster cluster) {
public static String getReadOnlyStorageUrl(Cluster cluster) {
return getNormalizedUrl(cluster, Interfacetype.READONLY);
}

Expand Down Expand Up @@ -84,7 +84,7 @@ private static String getNormalizedUrl(Cluster cluster, Interfacetype type) {
}

public static String getCompleteLocation(Cluster cluster, String locationKey) {
return getHdfsUrl(cluster) + "/" + getLocation(cluster, locationKey);
return getStoageUrl(cluster) + "/" + getLocation(cluster, locationKey);
}

public static String getLocation(Cluster cluster, String locationKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.ivory.entity.ClusterHelper;
import org.apache.ivory.entity.store.StoreAccessException;
import org.apache.ivory.entity.v0.EntityType;
Expand All @@ -37,19 +38,19 @@ public ClusterEntityParser() {
@Override
public void validate(Cluster cluster) throws StoreAccessException,
ValidationException {
if (!ClusterHelper.getHdfsUrl(cluster).startsWith("hdfs://")) {
if (new Path(ClusterHelper.getStoageUrl(cluster)).toUri().getScheme()==null) {
throw new ValidationException(
"Cannot get valid nameNode from write interface of cluster: "
"Cannot get valid scheme for namenode from write interface of cluster: "
+ cluster.getName());
}
try {
Configuration conf = new Configuration();
conf.set("fs.default.name", ClusterHelper.getHdfsUrl(cluster));
conf.set("fs.default.name", ClusterHelper.getStoageUrl(cluster));
conf.setInt("ipc.client.connect.max.retries", 10);
FileSystem.get(conf);
} catch (Exception e) {
throw new ValidationException("Invalid HDFS server or port:"
+ ClusterHelper.getHdfsUrl(cluster), e);
+ ClusterHelper.getStoageUrl(cluster), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ private void validateHDFSpaths(Process process, String clusterName) throws Ivory
private String getNameNode(Cluster cluster, String clusterName) throws ValidationException {
// cluster should never be null as it is validated while submitting
// feeds.
if (!ClusterHelper.getHdfsUrl(cluster).startsWith("hdfs://")) {
throw new ValidationException("Cannot get valid nameNode from write interface of cluster: " + clusterName);
if (new Path(ClusterHelper.getStoageUrl(cluster)).toUri().getScheme()==null) {
throw new ValidationException("Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
}
return ClusterHelper.getHdfsUrl(cluster);
return ClusterHelper.getStoageUrl(cluster);
}

private void validateProcessValidity(Date start, Date end) throws IvoryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster
SYNCDATASET outputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);

inputDataset.setUriTemplate(new Path(ClusterHelper
.getHdfsUrl(srcCluster), FeedHelper.getLocation(feed,
.getStoageUrl(srcCluster), FeedHelper.getLocation(feed,
LocationType.DATA,srcCluster.getName()).getPath()).toString());
outputDataset.setUriTemplate(getHDFSPath(FeedHelper.getLocation(
feed, LocationType.DATA, trgCluster.getName()).getPath()));
Expand Down
24 changes: 24 additions & 0 deletions feed/src/main/resources/config/workflow/replication-workflow.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.s4.impl</name>
<value>com.inmobi.grid.fs.s4fs.NativeS4FileSystem</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
Expand All @@ -43,6 +51,7 @@
<arg>-out</arg><arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
<arg>-paths</arg><arg>${ivoryInPaths}</arg>
<arg>-ivoryInputFeeds</arg><arg>${ivoryInputFeeds}</arg>
<file>${wf:conf("ivory.libpath")}/s4fs-0.1-SNAPSHOT.jar</file>
<capture-output />
</java>
<ok to="replication" />
Expand All @@ -53,6 +62,10 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.s4.impl</name>
<value>com.inmobi.grid.fs.s4fs.NativeS4FileSystem</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
Expand All @@ -72,6 +85,7 @@
<arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
<arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
<file>${wf:conf("ivory.libpath")}/hadoop-distcp.jar</file>
<file>${wf:conf("ivory.libpath")}/s4fs-0.1-SNAPSHOT.jar</file>
</java>
<ok to="succeeded-post-processing"/>
<error to="failed-post-processing"/>
Expand All @@ -81,6 +95,10 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.s4.impl</name>
<value>com.inmobi.grid.fs.s4fs.NativeS4FileSystem</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
Expand Down Expand Up @@ -118,6 +136,7 @@
<file>${wf:conf("ivory.libpath")}/json-simple.jar</file>
<file>${wf:conf("ivory.libpath")}/oozie-client.jar</file>
<file>${wf:conf("ivory.libpath")}/spring-jms.jar</file>
<file>${wf:conf("ivory.libpath")}/s4fs-0.1-SNAPSHOT.jar</file>
</java>
<ok to="end" />
<error to="fail" />
Expand All @@ -127,6 +146,10 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.s4.impl</name>
<value>com.inmobi.grid.fs.s4fs.NativeS4FileSystem</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
Expand Down Expand Up @@ -164,6 +187,7 @@
<file>${wf:conf("ivory.libpath")}/json-simple.jar</file>
<file>${wf:conf("ivory.libpath")}/oozie-client.jar</file>
<file>${wf:conf("ivory.libpath")}/spring-jms.jar</file>
<file>${wf:conf("ivory.libpath")}/s4fs-0.1-SNAPSHOT.jar</file>
</java>
<ok to="fail" />
<error to="fail" />
Expand Down
10 changes: 10 additions & 0 deletions feed/src/main/resources/config/workflow/retention-workflow.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.s4.impl</name>
<value>com.inmobi.grid.fs.s4fs.NativeS4FileSystem</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
Expand All @@ -38,6 +42,7 @@
<arg>-timeZone</arg><arg>${timeZone}</arg>
<arg>-frequency</arg><arg>${frequency}</arg>
<arg>-logFile</arg><arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
<file>${wf:conf("ivory.libpath")}/s4fs-0.1-SNAPSHOT.jar</file>
</java>
<ok to="jms-messaging"/>
<error to="fail"/>
Expand All @@ -48,6 +53,10 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>fs.s4.impl</name>
<value>com.inmobi.grid.fs.s4fs.NativeS4FileSystem</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
Expand Down Expand Up @@ -81,6 +90,7 @@
<file>${wf:conf("ivory.libpath")}/json-simple.jar</file>
<file>${wf:conf("ivory.libpath")}/oozie-client.jar</file>
<file>${wf:conf("ivory.libpath")}/spring-jms.jar</file>
<file>${wf:conf("ivory.libpath")}/s4fs-0.1-SNAPSHOT.jar</file>
</java>
<ok to="end" />
<error to="fail" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testReplicationCoords() throws IvoryException {
Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
Assert.assertEquals("input-dataset", inputDataset.getName());
Assert.assertEquals(
ClusterHelper.getHdfsUrl(srcCluster)
ClusterHelper.getStoageUrl(srcCluster)
+ "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
inputDataset.getUriTemplate());

Expand Down
5 changes: 3 additions & 2 deletions oozie/src/main/java/org/apache/ivory/logging/LogMover.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ public int run(String[] arguments) throws Exception {

if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())) {
// if replication wf
copyTTlogs(args, fs, path, jobInfo.getActions().get(2));
copyOozieLog(client, fs, path, jobInfo.getId());
copyTTlogs(args, fs, path, jobInfo.getActions().get(2));
} else {
// if process wf
String subflowId = jobInfo.getExternalId();
copyOozieLog(client, fs, path, subflowId);
WorkflowJob subflowInfo = client.getJobInfo(subflowId);
List<WorkflowAction> actions = subflowInfo.getActions();
for (WorkflowAction action : actions) {
Expand All @@ -101,7 +102,7 @@ public int run(String[] arguments) throws Exception {
+ action.getName());
}
}
copyOozieLog(client, fs, path, subflowId);

}

} catch (Exception e) {
Expand Down
13 changes: 7 additions & 6 deletions oozie/src/main/java/org/apache/ivory/logging/LogProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Instance populateLogUrls(Entity entity, Instance instance,
String resolvedRunId = "-";
try {
FileSystem fs = FileSystem.get(
new Path(ClusterHelper.getHdfsUrl(clusterObj)).toUri(),
new Path(ClusterHelper.getStoageUrl(clusterObj)).toUri(),
new Configuration());
resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance,
runId);
Expand All @@ -74,7 +74,7 @@ public String getResolvedRunId(FileSystem fs, Cluster cluster,
Entity entity, Instance instance, String runId)
throws IvoryException, IOException {
if (StringUtils.isEmpty(runId)) {
Path jobPath = new Path(ClusterHelper.getHdfsUrl(cluster),
Path jobPath = new Path(ClusterHelper.getStoageUrl(cluster),
EntityUtil.getLogPath(cluster, entity) + "/job-"
+ EntityUtil.UTCtoURIDate(instance.instance) + "/*");

Expand All @@ -88,7 +88,7 @@ public String getResolvedRunId(FileSystem fs, Cluster cluster,
return "-";
}
} else {
Path jobPath = new Path(ClusterHelper.getHdfsUrl(cluster),
Path jobPath = new Path(ClusterHelper.getStoageUrl(cluster),
EntityUtil.getLogPath(cluster, entity) + "/job-"
+ EntityUtil.UTCtoURIDate(instance.instance) + "/"
+ getFormatedRunId(runId));
Expand All @@ -106,7 +106,7 @@ private Instance populateActionLogUrls(FileSystem fs, Cluster cluster,
Entity entity, Instance instance, String formatedRunId)
throws IvoryException, OozieClientException, IOException {

Path actionPaths = new Path(ClusterHelper.getHdfsUrl(cluster),
Path actionPaths = new Path(ClusterHelper.getStoageUrl(cluster),
EntityUtil.getLogPath(cluster, entity) + "/job-"
+ EntityUtil.UTCtoURIDate(instance.instance) + "/"
+ formatedRunId + "/*");
Expand All @@ -117,7 +117,7 @@ private Instance populateActionLogUrls(FileSystem fs, Cluster cluster,
for (FileStatus file : actions) {
Path filePath = file.getPath();
String dfsBrowserUrl = getDFSbrowserUrl(
ClusterHelper.getHdfsUrl(cluster),
ClusterHelper.getStoageUrl(cluster),
EntityUtil.getLogPath(cluster, entity) + "/job-"
+ EntityUtil.UTCtoURIDate(instance.instance) + "/"
+ formatedRunId, file.getPath().getName());
Expand Down Expand Up @@ -147,7 +147,8 @@ private String getActionStatus(String fileName) {

private String getDFSbrowserUrl(String hdfsPath, String logPath,
String fileName) throws IvoryException {
String httpUrl = hdfsPath.replaceAll("hdfs://", "http://").replaceAll(
String scheme = new Path(hdfsPath).toUri().getScheme();
String httpUrl = hdfsPath.replaceAll(scheme+"://", "http://").replaceAll(
":[0-9]+", ":50070");
return new Path(httpUrl, "/data/"+logPath + "/" + fileName).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static void pushLibsToHDFS(String path, Cluster cluster, IvoryPathFilter
fs = FileSystem.get(conf);
} catch (Exception e) {
throw new IvoryException("Unable to connect to HDFS: "
+ ClusterHelper.getHdfsUrl(cluster));
+ ClusterHelper.getStoageUrl(cluster));
}
Path clusterPath = new Path(path);
if(!fs.exists(clusterPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected Properties createAppProperties(String clusterName, Path bundlePath) th
.getProperties());
}
properties.setProperty(OozieWorkflowEngine.NAME_NODE,
ClusterHelper.getHdfsUrl(cluster));
ClusterHelper.getStoageUrl(cluster));
properties.setProperty(OozieWorkflowEngine.JOB_TRACKER,
ClusterHelper.getMREndPoint(cluster));
properties.setProperty(OozieClient.BUNDLE_APP_PATH,
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@
<artifactId>slf4j-simple</artifactId>
<version>1.2</version>
</dependency>

<dependency>
<groupId>com.inmobi.grid</groupId>
<artifactId>s4fs</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ private SYNCDATASET createDataSet(String feedName, Cluster cluster, String datas

SYNCDATASET syncdataset = new SYNCDATASET();
syncdataset.setName(datasetName);
syncdataset.setUriTemplate("${nameNode}"
+ FeedHelper.getLocation(feed, locationType,
cluster.getName()).getPath());
String locPath = FeedHelper.getLocation(feed, locationType,
cluster.getName()).getPath();
syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme()!=null?locPath:"${nameNode}"
+ locPath);
syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");

org.apache.ivory.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ private void addOptionalInputProperties(Properties properties, Input in, String
properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
properties.put(inName + ".done-flag", "notused");
properties.put(inName + ".uri-template", "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA).getPath().replace('$', '%'));
String locPath = FeedHelper.getLocation(feed, LocationType.DATA).getPath().replace('$', '%');
properties.put(inName + ".uri-template", new Path(locPath).toUri().getScheme()!=null?locPath:"${nameNode}"+locPath);
properties.put(inName + ".start-instance", in.getStart());
properties.put(inName + ".end-instance", in.getEnd());
}
Expand Down
Loading

0 comments on commit 8d0b860

Please sign in to comment.