Skip to content

Commit

Permalink
[FLINK-5397] [runtime] Do not replace ObjectStreamClass on deserializ…
Browse files Browse the repository at this point in the history
…ation of migration package classes, override resolveClass(...) instead

This closes apache#3050
  • Loading branch information
StefanRRichter authored and StephanEwen committed Jan 5, 2017
1 parent fb48c3b commit 3554c96
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,40 @@ public final class MigrationInstantiationUtil {

public static class ClassLoaderObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {

private static final String ARRAY_PREFIX = "[L";
private static final String FLINK_BASE_PACKAGE = "org.apache.flink.";
private static final String FLINK_MIGRATION_PACKAGE = "org.apache.flink.migration.";

public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
super(in, classLoader);
}

@Override
protected ObjectStreamClass readClassDescriptor()
throws IOException, ClassNotFoundException {
ObjectStreamClass objectStreamClass = super.readClassDescriptor();
String className = objectStreamClass.getName();
if (className.contains("apache.flink.")) {
className = className.replace("apache.flink.", "apache.flink.migration.");
try {
Class<?> clazz = Class.forName(className, false, classLoader);
objectStreamClass = ObjectStreamClass.lookup(clazz);
} catch (Exception ignored) {
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
final String className = desc.getName();

// the flink package may be at position 0 (regular class) or position 2 (array)
final int flinkPackagePos;
if ((flinkPackagePos = className.indexOf(FLINK_BASE_PACKAGE)) == 0 ||
(flinkPackagePos == 2 && className.startsWith(ARRAY_PREFIX)))
{
final String modClassName = flinkPackagePos == 0 ?
FLINK_MIGRATION_PACKAGE + className.substring(FLINK_BASE_PACKAGE.length()) :
ARRAY_PREFIX + FLINK_MIGRATION_PACKAGE + className.substring(2 + FLINK_BASE_PACKAGE.length());

try {
return classLoader != null ?
Class.forName(modClassName, false, classLoader) :
Class.forName(modClassName);
}
catch (ClassNotFoundException ignored) {}
}
return objectStreamClass;

// either a non-Flink class, or not located in the migration package
return super.resolveClass(desc);
}
}

public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
return deserializeObject(new ByteArrayInputStream(bytes), cl);
}
Expand All @@ -78,5 +90,4 @@ public static <T> T deserializeObject(InputStream in, ClassLoader cl) throws IOE
private MigrationInstantiationUtil() {
throw new IllegalAccessError();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
}

@Override
public Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
if (classLoader != null) {
String name = desc.getName();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ protected void executeAndSavepoint(
// FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath));
}

protected void restoreAndExecute(
@SafeVarargs
protected final void restoreAndExecute(
StreamExecutionEnvironment env,
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception {
Expand Down Expand Up @@ -217,17 +218,14 @@ protected void restoreAndExecute(
for (Tuple2<String, Integer> acc : expectedAccumulators) {
Integer numFinished = (Integer) accumulators.get(acc.f0);
if (numFinished == null) {
System.out.println("NO ACC FOR " + acc);
allDone = false;
break;
}
if (!numFinished.equals(acc.f1)) {
System.out.println("TO LOW FOR ACC" + acc);
allDone = false;
break;
}
}
System.out.println("ACC: " + accumulators);
if (allDone) {
done = true;
break;
Expand Down

0 comments on commit 3554c96

Please sign in to comment.