Skip to content

Commit

Permalink
[hotfix][network] Remove unused EventSerializer#isEvent method
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Oct 29, 2020
1 parent e2dd0b6 commit efe588e
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,48 +110,6 @@ else if (eventClass == CancelCheckpointMarker.class) {
}
}

/**
* Identifies whether the given buffer encodes the given event. Custom events are not supported.
*
* <p><strong>Pre-condition</strong>: This buffer must encode some event!</p>
*
* @param buffer the buffer to peak into
* @param eventClass the expected class of the event type
* @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
*/
private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass) throws IOException {
if (buffer.remaining() < 4) {
throw new IOException("Incomplete event");
}

final int bufferPos = buffer.position();
final ByteOrder bufferOrder = buffer.order();
buffer.order(ByteOrder.BIG_ENDIAN);

try {
int type = buffer.getInt();

if (eventClass.equals(EndOfPartitionEvent.class)) {
return type == END_OF_PARTITION_EVENT;
} else if (eventClass.equals(CheckpointBarrier.class)) {
return type == CHECKPOINT_BARRIER_EVENT;
} else if (eventClass.equals(EndOfSuperstepEvent.class)) {
return type == END_OF_SUPERSTEP_EVENT;
} else if (eventClass.equals(EndOfChannelStateEvent.class)) {
return type == END_OF_CHANNEL_STATE_EVENT;
} else if (eventClass.equals(CancelCheckpointMarker.class)) {
return type == CANCEL_CHECKPOINT_MARKER_EVENT;
} else {
throw new UnsupportedOperationException("Unsupported eventClass = " + eventClass);
}
}
finally {
buffer.order(bufferOrder);
// restore the original position in the buffer (recall: we only peak into it!)
buffer.position(bufferPos);
}
}

public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
if (buffer.remaining() < 4) {
throw new IOException("Incomplete event");
Expand Down Expand Up @@ -315,15 +273,4 @@ public static BufferConsumer toBufferConsumer(AbstractEvent event, boolean hasPr
public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
return fromSerializedEvent(buffer.getNioBufferReadable(), classLoader);
}

/**
* Identifies whether the given buffer encodes the given event. Custom events are not supported.
*
* @param buffer the buffer to peak into
* @param eventClass the expected class of the event type
* @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
*/
public static boolean isEvent(Buffer buffer, Class<?> eventClass) throws IOException {
return !buffer.isBuffer() && isEvent(buffer.getNioBufferReadable(), eventClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests for the {@link EventSerializer}.
Expand Down Expand Up @@ -100,84 +98,4 @@ public void testToBuffer() throws IOException {
}
}
}

/**
* Tests {@link EventSerializer#isEvent(Buffer, Class)}
* whether it peaks into the buffer only, i.e. after the call, the buffer
* is still de-serializable.
*/
@Test
public void testIsEventPeakOnly() throws Exception {
final Buffer serializedEvent = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE, false);
try {
final ClassLoader cl = getClass().getClassLoader();
assertTrue(
EventSerializer.isEvent(serializedEvent, EndOfPartitionEvent.class));
EndOfPartitionEvent event = (EndOfPartitionEvent) EventSerializer
.fromBuffer(serializedEvent, cl);
assertEquals(EndOfPartitionEvent.INSTANCE, event);
} finally {
serializedEvent.recycleBuffer();
}
}

/**
* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
* the correct answer for various encoded event buffers.
*/
@Test
public void testIsEvent() throws Exception {
Class[] expectedClasses = Arrays.stream(events)
.map(AbstractEvent::getClass)
.toArray(Class[]::new);

for (AbstractEvent evt : events) {
for (Class<?> expectedClass: expectedClasses) {
if (expectedClass.equals(TestTaskEvent.class)) {
try {
checkIsEvent(evt, expectedClass);
fail("This should fail");
}
catch (UnsupportedOperationException ex) {
// expected
}
}
else if (evt.getClass().equals(expectedClass)) {
assertTrue(checkIsEvent(evt, expectedClass));
} else {
assertFalse(checkIsEvent(evt, expectedClass));
}
}
}
}

/**
* Returns the result of
* {@link EventSerializer#isEvent(Buffer, Class)} on a buffer
* that encodes the given <tt>event</tt>.
*
* @param event the event to encode
* @param eventClass the event class to check against
*
* @return whether {@link EventSerializer#isEvent(Buffer, Class)}
* thinks the encoded buffer matches the class
*/
private boolean checkIsEvent(
AbstractEvent event,
Class<?> eventClass) throws IOException {

final boolean unprioritizedIsEvent = isEvent(event, eventClass, false);
final boolean prioritizedIsEvent = isEvent(event, eventClass, true);
assertEquals(unprioritizedIsEvent, prioritizedIsEvent);
return unprioritizedIsEvent;
}

private boolean isEvent(AbstractEvent event, Class<?> eventClass, boolean hasPriority) throws IOException {
final Buffer serializedEvent = EventSerializer.toBuffer(event, hasPriority);
try {
return EventSerializer.isEvent(serializedEvent, eventClass);
} finally {
serializedEvent.recycleBuffer();
}
}
}

0 comments on commit efe588e

Please sign in to comment.