Skip to content

Commit

Permalink
Revert "Merge pull request linkedin#402 from pralabhkumar/ipso_changes"
Browse files Browse the repository at this point in the history
This reverts commit 2a81e40, reversing
changes made to dd31ad5.
  • Loading branch information
varunsaxena committed Oct 16, 2018
1 parent c19425b commit 89919f7
Show file tree
Hide file tree
Showing 22 changed files with 61 additions and 1,433 deletions.
6 changes: 0 additions & 6 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,6 @@
</params>
</heuristic>

<heuristic>
<applicationtype>mapreduce</applicationtype>
<heuristicname>MapReduceConfiguration</heuristicname>
<classname>com.linkedin.drelephant.mapreduce.heuristics.ConfigurationHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpeed</viewname>
</heuristic>

<!-- SPARK HEURISTICS -->

Expand Down
Original file line number Diff line number Diff line change
@@ -1,74 +1,7 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.mapreduce.heuristics;

public class CommonConstantsHeuristic {

public static final String MAPPER_SPEED = "Mapper Speed";
public static final String TOTAL_INPUT_SIZE_IN_MB = "Total input size in MB";

public enum UtilizedParameterKeys {
AVG_PHYSICAL_MEMORY("Avg Physical Memory (MB)"),
MAX_PHYSICAL_MEMORY("Max Physical Memory (MB)"),
MIN_PHYSICAL_MEMORY("Min Physical Memory (MB)"),
AVG_VIRTUAL_MEMORY("Avg Virtual Memory (MB)"),
MAX_VIRTUAL_MEMORY("Max Virtual Memory (MB)"),
MIN_VIRTUAL_MEMORY("Min Virtual Memory (MB)"),
AVG_TOTAL_COMMITTED_HEAP_USAGE_MEMORY("Avg Total Committed Heap Usage Memory (MB)"),
MAX_TOTAL_COMMITTED_HEAP_USAGE_MEMORY("Max Total Committed Heap Usage Memory (MB)"),
MIN_TOTAL_COMMITTED_HEAP_USAGE_MEMORY("Min Total Committed Heap Usage Memory (MB)");
private String value;

UtilizedParameterKeys(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}

public enum ParameterKeys {
MAPPER_MEMORY_HADOOP_CONF("mapreduce.map.memory.mb"),
MAPPER_HEAP_HADOOP_CONF("mapreduce.map.java.opts"),
SORT_BUFFER_HADOOP_CONF("mapreduce.task.io.sort.mb"),
SORT_FACTOR_HADOOP_CONF("mapreduce.task.io.sort.factor"),
SORT_SPILL_HADOOP_CONF("mapreduce.map.sort.spill.percent"),
REDUCER_MEMORY_HADOOP_CONF("mapreduce.reduce.memory.mb"),
REDUCER_HEAP_HADOOP_CONF("mapreduce.reduce.java.opts"),
SPLIT_SIZE_HADOOP_CONF("mapreduce.input.fileinputformat.split.maxsize"),
CHILD_HEAP_SIZE_HADOOP_CONF("mapred.child.java.opts"),
PIG_SPLIT_SIZE_HADOOP_CONF("pig.maxCombinedSplitSize"),
MAPPER_MEMORY_HEURISTICS_CONF("Mapper Memory"),
MAPPER_HEAP_HEURISTICS_CONF("Mapper Heap"),
REDUCER_MEMORY_HEURISTICS_CONF("Reducer Memory"),
REDUCER_HEAP_HEURISTICS_CONF("Reducer heap"),
SORT_BUFFER_HEURISTICS_CONF("Sort Buffer"),
SORT_FACTOR_HEURISTICS_CONF("Sort Factor"),
SORT_SPILL_HEURISTICS_CONF("Sort Spill"),
SPLIT_SIZE_HEURISTICS_CONF("Split Size"),
PIG_MAX_SPLIT_SIZE_HEURISTICS_CONF("Pig Max Split Size");
private String value;
ParameterKeys(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
public static final String MAPPER_SPEED="Mapper Speed";
public static final String TOTAL_INPUT_SIZE_IN_MB="Total input size in MB";
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

import static com.linkedin.drelephant.mapreduce.heuristics.CommonConstantsHeuristic.UtilizedParameterKeys.*;


/**
* This heuristic deals with the efficiency of container size
Expand All @@ -61,7 +59,8 @@ private long getContainerMemDefaultMBytes() {
String strValue = paramMap.get(CONTAINER_MEM_DEFAULT_MB);
try {
return Long.valueOf(strValue);
} catch (NumberFormatException e) {
}
catch (NumberFormatException e) {
logger.warn(CONTAINER_MEM_DEFAULT_MB + ": expected number [" + strValue + "]");
}
}
Expand All @@ -81,7 +80,7 @@ private void loadParameters() {

long containerMemDefaultBytes = getContainerMemDefaultMBytes() * FileUtils.ONE_MB;
logger.info(heuristicName + " will use " + CONTAINER_MEM_DEFAULT_MB + " with the following threshold setting: "
+ containerMemDefaultBytes);
+ containerMemDefaultBytes);

double[] confMemoryLimits = Utils.getParam(paramMap.get(CONTAINER_MEM_SEVERITY), memoryLimits.length);
if (confMemoryLimits != null) {
Expand Down Expand Up @@ -111,7 +110,7 @@ public HeuristicConfigurationData getHeuristicConfData() {
@Override
public HeuristicResult apply(MapReduceApplicationData data) {

if (!data.getSucceeded()) {
if(!data.getSucceeded()) {
return null;
}

Expand All @@ -123,13 +122,14 @@ public HeuristicResult apply(MapReduceApplicationData data) {
containerMem = Long.parseLong(containerSizeStr);
} catch (NumberFormatException e0) {
// Some job has a string var like "${VAR}" for this config.
if (containerSizeStr.startsWith("$")) {
String realContainerConf =
containerSizeStr.substring(containerSizeStr.indexOf("{") + 1, containerSizeStr.indexOf("}"));
if(containerSizeStr.startsWith("$")) {
String realContainerConf = containerSizeStr.substring(containerSizeStr.indexOf("{")+1,
containerSizeStr.indexOf("}"));
String realContainerSizeStr = data.getConf().getProperty(realContainerConf);
try {
containerMem = Long.parseLong(realContainerSizeStr);
} catch (NumberFormatException e1) {
}
catch (NumberFormatException e1) {
logger.warn(realContainerConf + ": expected number [" + realContainerSizeStr + "]");
}
} else {
Expand All @@ -146,40 +146,26 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> taskPMems = new ArrayList<Long>();
List<Long> taskVMems = new ArrayList<Long>();
List<Long> runtimesMs = new ArrayList<Long>();
List<Long> taskHeapUsages = new ArrayList<Long>();
long taskPMin = Long.MAX_VALUE, taskVMin = Long.MAX_VALUE, taskHUMin = Long.MAX_VALUE;
long taskPMax = 0, taskVMax = 0, taskHUMax = 0;
long taskPMin = Long.MAX_VALUE;
long taskPMax = 0;
for (MapReduceTaskData task : tasks) {
if (task.isTimeAndCounterDataPresent()) {
runtimesMs.add(task.getTotalRunTimeMs());
long taskPMem = task.getCounters().get(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES);
long taskVMem = task.getCounters().get(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES);
long taskHeapUsage = task.getCounters().get(MapReduceCounterData.CounterName.COMMITTED_HEAP_BYTES);
taskPMems.add(taskPMem);
taskPMin = Math.min(taskPMin, taskPMem);
taskPMax = Math.max(taskPMax, taskPMem);
taskVMin = Math.min(taskVMin, taskVMem);
taskVMax = Math.max(taskVMax, taskVMem);
taskHUMin = Math.min(taskHUMin, taskHeapUsage);
taskHUMax = Math.max(taskHUMax, taskHeapUsage);
taskVMems.add(taskVMem);
taskPMems.add(taskPMem);
taskHeapUsages.add(taskHeapUsage);
}
}

if (taskPMin == Long.MAX_VALUE) {
if(taskPMin == Long.MAX_VALUE) {
taskPMin = 0;
}
if (taskVMin == Long.MAX_VALUE) {
taskVMin = 0;
}
if (taskHUMin == Long.MAX_VALUE) {
taskHUMin = 0;
}

long taskPMemAvg = Statistics.average(taskPMems);
long taskVMemAvg = Statistics.average(taskVMems);
long taskHeapUsageAvg = Statistics.average(taskHeapUsages);
long averageTimeMs = Statistics.average(runtimesMs);

Severity severity;
Expand All @@ -189,51 +175,37 @@ public HeuristicResult apply(MapReduceApplicationData data) {
severity = getTaskMemoryUtilSeverity(taskPMemAvg, containerMem);
}

HeuristicResult result =
new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), severity,
Utils.getHeuristicScore(severity, tasks.length));
HeuristicResult result = new HeuristicResult(_heuristicConfData.getClassName(),
_heuristicConfData.getHeuristicName(), severity, Utils.getHeuristicScore(severity, tasks.length));

result.addResultDetail("Number of tasks", Integer.toString(tasks.length));
result.addResultDetail("Avg task runtime", Statistics.readableTimespan(averageTimeMs));
result.addResultDetail(AVG_PHYSICAL_MEMORY.getValue(),
Long.toString(taskPMemAvg / FileUtils.ONE_MB));
result.addResultDetail(MAX_PHYSICAL_MEMORY.getValue(),
Long.toString(taskPMax / FileUtils.ONE_MB));
result.addResultDetail(MIN_PHYSICAL_MEMORY.getValue(),
Long.toString(taskPMin / FileUtils.ONE_MB));
result.addResultDetail(AVG_VIRTUAL_MEMORY.getValue(),
Long.toString(taskVMemAvg / FileUtils.ONE_MB));
result.addResultDetail(MAX_VIRTUAL_MEMORY.getValue(),
Long.toString(taskVMax / FileUtils.ONE_MB));
result.addResultDetail(MIN_VIRTUAL_MEMORY.getValue(),
Long.toString(taskVMin / FileUtils.ONE_MB));
result.addResultDetail(AVG_TOTAL_COMMITTED_HEAP_USAGE_MEMORY.getValue(),
Long.toString(taskHeapUsageAvg / FileUtils.ONE_MB));
result.addResultDetail(MAX_TOTAL_COMMITTED_HEAP_USAGE_MEMORY.getValue(),
Long.toString(taskHUMax / FileUtils.ONE_MB));
result.addResultDetail(MIN_TOTAL_COMMITTED_HEAP_USAGE_MEMORY.getValue(),
Long.toString(taskHUMin / FileUtils.ONE_MB));
result.addResultDetail("Avg Physical Memory (MB)", Long.toString(taskPMemAvg / FileUtils.ONE_MB));
result.addResultDetail("Max Physical Memory (MB)", Long.toString(taskPMax / FileUtils.ONE_MB));
result.addResultDetail("Min Physical Memory (MB)", Long.toString(taskPMin / FileUtils.ONE_MB));
result.addResultDetail("Avg Virtual Memory (MB)", Long.toString(taskVMemAvg / FileUtils.ONE_MB));
result.addResultDetail("Requested Container Memory", FileUtils.byteCountToDisplaySize(containerMem));

return result;
}

private Severity getTaskMemoryUtilSeverity(long taskMemAvg, long taskMemMax) {
double ratio = ((double) taskMemAvg) / taskMemMax;
double ratio = ((double)taskMemAvg) / taskMemMax;
Severity sevRatio = getMemoryRatioSeverity(ratio);
// Severity is reduced if the requested container memory is close to default
Severity sevMax = getContainerMemorySeverity(taskMemMax);

return Severity.min(sevRatio, sevMax);
}


private Severity getContainerMemorySeverity(long taskMemMax) {
return Severity.getSeverityAscending(taskMemMax, memoryLimits[0], memoryLimits[1], memoryLimits[2],
memoryLimits[3]);
return Severity.getSeverityAscending(
taskMemMax, memoryLimits[0], memoryLimits[1], memoryLimits[2], memoryLimits[3]);
}

private Severity getMemoryRatioSeverity(double ratio) {
return Severity.getSeverityDescending(ratio, memRatioLimits[0], memRatioLimits[1], memRatioLimits[2],
memRatioLimits[3]);
return Severity.getSeverityDescending(
ratio, memRatioLimits[0], memRatioLimits[1], memRatioLimits[2], memRatioLimits[3]);
}
}
12 changes: 0 additions & 12 deletions app/com/linkedin/drelephant/tuning/AutoTuningAPIHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ private void setMaxAllowedMetricIncreasePercentage(TuningInput tuningInput) {
*/
private void setTuningAlgorithm(TuningInput tuningInput) throws IllegalArgumentException {
//Todo: Handle algorithm version later
logger.info(" Optimization Algorithm " + tuningInput.getOptimizationAlgo());
TuningAlgorithm tuningAlgorithm = TuningAlgorithm.find.select("*")
.where()
.eq(TuningAlgorithm.TABLE.jobType, tuningInput.getJobType())
.eq(TuningAlgorithm.TABLE.optimizationMetric, tuningInput.getOptimizationMetric())
.eq(TuningAlgorithm.TABLE.optimizationAlgo, tuningInput.getOptimizationAlgo())
.findUnique();
if (tuningAlgorithm == null) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -439,7 +437,6 @@ private void insertParamSet(JobDefinition job, TuningAlgorithm tuningAlgorithm,
jobSuggestedParamSet.isParamSetBest = false;
jobSuggestedParamSet.save();
insertParameterValues(jobSuggestedParamSet, paramValueMap);
intializeOptimizationAlgoPrerequisite(tuningAlgorithm, jobSuggestedParamSet);
logger.debug("Default parameter set inserted for job: " + job.jobName);
}

Expand All @@ -460,15 +457,6 @@ private void insertParameterValues(JobSuggestedParamSet jobSuggestedParamSet, Ma
}
}

private void intializeOptimizationAlgoPrerequisite(TuningAlgorithm tuningAlgorithm,
JobSuggestedParamSet jobSuggestedParamSet) {
logger.info("Inserting parameter constraint " + tuningAlgorithm.optimizationAlgo.name());
AutoTuningOptimizeManager manager = OptimizationAlgoFactory.getOptimizationAlogrithm(tuningAlgorithm);
if (manager != null) {
manager.intializePrerequisite(tuningAlgorithm, jobSuggestedParamSet);
}
}

/**
* Inserts parameter value in database
* @param jobSuggestedParamSet Parameter set to which the parameter belongs
Expand Down
Loading

0 comments on commit 89919f7

Please sign in to comment.