Skip to content

Commit

Permalink
[SPARK-16129][CORE][SQL] Eliminate direct use of commons-lang classes…
Browse files Browse the repository at this point in the history
… in favor of commons-lang3

## What changes were proposed in this pull request?

Replace use of `commons-lang` in favor of `commons-lang3` and forbid the former via scalastyle; remove `NotImplementedException` from `comons-lang` in favor of JDK `UnsupportedOperationException`

## How was this patch tested?

Jenkins tests

Author: Sean Owen <[email protected]>

Closes apache#13843 from srowen/SPARK-16129.
  • Loading branch information
srowen committed Jun 24, 2016
1 parent f4fd743 commit 158af16
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 48 deletions.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable
Expand All @@ -34,7 +33,7 @@ import scala.reflect.{classTag, ClassTag}
import scala.util.control.NonFatal

import com.google.common.collect.MapMaker
import org.apache.commons.lang.SerializationUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
Expand Down Expand Up @@ -334,7 +333,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
override protected def childValue(parent: Properties): Properties = {
// Note: make a clone such that changes in the parent properties aren't reflected in
// the those of the children threads, which has confusing semantics (SPARK-10563).
SerializationUtils.clone(parent).asInstanceOf[Properties]
SerializationUtils.clone(parent)
}
override protected def initialValue(): Properties = new Properties()
}
Expand Down
6 changes: 6 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ This file is divided into 3 sections:
scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
</check>

<check customId="commonslang2" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters><parameter name="regex">org\.apache\.commons\.lang\.</parameter></parameters>
<customMessage>Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead
of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
</check>

<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
<parameters>
<parameter name="groups">java,scala,3rdParty,spark</parameter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.Map
import scala.collection.mutable.Stack
import scala.reflect.ClassTag

import org.apache.commons.lang.ClassUtils
import org.apache.commons.lang3.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;

import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -228,7 +227,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;

Expand All @@ -239,7 +238,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;

Expand All @@ -262,7 +261,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
break;
case BINARY:
Expand Down Expand Up @@ -293,12 +292,12 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
column.putByteArray(i, v.getBytes());
}
} else {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
break;

default:
throw new NotImplementedException("Unsupported type: " + descriptor.getType());
throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
}
}

Expand Down Expand Up @@ -327,7 +326,7 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce
defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}

Expand Down Expand Up @@ -360,7 +359,7 @@ private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOE
defColumn.readDoubles(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}

Expand All @@ -381,7 +380,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE
}
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}

Expand Down Expand Up @@ -417,7 +416,7 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
}
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
}

Expand Down Expand Up @@ -459,13 +458,13 @@ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) thr
@SuppressWarnings("deprecation")
Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedRleValuesReader();
this.useDictionary = true;
} else {
if (dataEncoding != Encoding.PLAIN) {
throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedPlainValuesReader();
this.useDictionary = false;
Expand All @@ -485,7 +484,7 @@ private void readPageV1(DataPageV1 page) throws IOException {

// Initialize the decoders.
if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
}
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
this.defColumn = new VectorizedRleValuesReader(bitWidth);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.math.BigInteger;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;

Expand Down Expand Up @@ -100,7 +99,7 @@ protected Array(ColumnVector data) {

@Override
public ArrayData copy() {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

// TODO: this is extremely expensive.
Expand Down Expand Up @@ -171,7 +170,7 @@ public Object[] array() {
}
}
} else {
throw new NotImplementedException("Type " + dt);
throw new UnsupportedOperationException("Type " + dt);
}
return list;
}
Expand All @@ -181,15 +180,15 @@ public Object[] array() {

@Override
public boolean getBoolean(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
public byte getByte(int ordinal) { return data.getByte(offset + ordinal); }

@Override
public short getShort(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -200,7 +199,7 @@ public short getShort(int ordinal) {

@Override
public float getFloat(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
Expand Down Expand Up @@ -240,12 +239,12 @@ public ArrayData getArray(int ordinal) {

@Override
public MapData getMap(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
public Object get(int ordinal, DataType dataType) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
}

Expand Down Expand Up @@ -562,7 +561,7 @@ private Array getByteArray(int rowId) {
* Returns the value for rowId.
*/
public MapData getMap(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.NotImplementedException;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -112,7 +110,7 @@ public static Object toPrimitiveJavaArray(ColumnVector.Array array) {
}
return result;
} else {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}
}

Expand Down Expand Up @@ -161,7 +159,7 @@ private static void appendValue(ColumnVector dst, DataType t, Object o) {
} else if (t instanceof DateType) {
dst.appendInt(DateTimeUtils.fromJavaDate((Date)o));
} else {
throw new NotImplementedException("Type " + t);
throw new UnsupportedOperationException("Type " + t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.math.BigDecimal;
import java.util.*;

import org.apache.commons.lang.NotImplementedException;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
Expand Down Expand Up @@ -166,7 +164,7 @@ public InternalRow copy() {

@Override
public boolean anyNull() {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
Expand Down Expand Up @@ -227,12 +225,12 @@ public ArrayData getArray(int ordinal) {

@Override
public MapData getMap(int ordinal) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
public Object get(int ordinal, DataType dataType) {
throw new NotImplementedException();
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -258,7 +256,7 @@ public void update(int ordinal, Object value) {
setDecimal(ordinal, Decimal.apply((BigDecimal) value, t.precision(), t.scale()),
t.precision());
} else {
throw new NotImplementedException("Datatype not supported " + dt);
throw new UnsupportedOperationException("Datatype not supported " + dt);
}
}
}
Expand Down Expand Up @@ -430,7 +428,7 @@ public int numValidRows() {
*/
public void setColumn(int ordinal, ColumnVector column) {
if (column instanceof OffHeapColumnVector) {
throw new NotImplementedException("Need to ref count columns.");
throw new UnsupportedOperationException("Need to ref count columns.");
}
columns[ordinal] = column;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar

import scala.collection.JavaConverters._

import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.StringUtils

import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Set;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.commons.lang.SerializationUtils
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
Expand Down Expand Up @@ -579,8 +579,7 @@ class StreamingContext private[streaming] (
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(
sparkContext.localProperties.get()).asInstanceOf[Properties])
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.streaming.scheduler

import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.JavaConverters._
import scala.util.Failure

import org.apache.commons.lang.SerializationUtils
import org.apache.commons.lang3.SerializationUtils

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
Expand Down Expand Up @@ -219,8 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(
SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
Expand Down

0 comments on commit 158af16

Please sign in to comment.