Skip to content

Commit

Permalink
code changes according reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
Lovenish Goyal committed Mar 19, 2016
1 parent 50938e8 commit 83199b3
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -31,6 +31,7 @@ public abstract class TimestampedMessageParser extends MessageParser implements

private static final long HOUR_IN_MILLIS = 3600L * 1000L;
private static final long DAY_IN_MILLIS = 3600L * 24 * 1000L;
private static final long MINUTE_IN_MILLIS = 60L * 1000L;

/*
* IMPORTANT
Expand All @@ -41,13 +42,19 @@ public abstract class TimestampedMessageParser extends MessageParser implements
private final SimpleDateFormat mHrFormatter;
private final SimpleDateFormat mDtHrFormatter;
private final int mFinalizerDelaySeconds;
private final SimpleDateFormat mDtHrMinFormatter;
private final SimpleDateFormat mMinFormatter;

private final boolean mUsingHourly;
private final boolean mUsingMinutely;


public TimestampedMessageParser(SecorConfig config) {
super(config);
mUsingHourly = usingHourly(config);
mUsingMinutely = usingMinutely(config);
LOG.info("UsingHourly: {}", mUsingHourly);
LOG.info("UsingMin: {}", mUsingMinutely);
mFinalizerDelaySeconds = config.getFinalizerDelaySeconds();
LOG.info("FinalizerDelaySeconds: {}", mFinalizerDelaySeconds);

Expand All @@ -57,14 +64,20 @@ public TimestampedMessageParser(SecorConfig config) {
mHrFormatter.setTimeZone(config.getTimeZone());
mDtHrFormatter = new SimpleDateFormat("yyyy-MM-dd-HH");
mDtHrFormatter.setTimeZone(config.getTimeZone());
mDtHrMinFormatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
mDtHrMinFormatter.setTimeZone(config.getTimeZone());
mMinFormatter = new SimpleDateFormat("mm");
mMinFormatter.setTimeZone(config.getTimeZone());
}

public abstract long extractTimestampMillis(final Message message) throws Exception;

static boolean usingHourly(SecorConfig config) {
return config.getBoolean("partitioner.granularity.hour", false);
}

static boolean usingMinutely(SecorConfig config) {
return config.getBoolean("partitioner.granularity.minute", false);
}

protected static long toMillis(final long timestamp) {
final long nanosecondDivider = (long) Math.pow(10, 9 + 9);
final long millisecondDivider = (long) Math.pow(10, 9 + 3);
Expand All @@ -79,12 +92,17 @@ protected static long toMillis(final long timestamp) {
return timestampMillis;
}

protected String[] generatePartitions(long timestampMillis, boolean usingHourly)
throws Exception {
public abstract long extractTimestampMillis(final Message message) throws Exception;

protected String[] generatePartitions(long timestampMillis, boolean usingHourly, boolean usingMinutely)
throws Exception {
Date date = new Date(timestampMillis);
String dt = "dt=" + mDtFormatter.format(date);
String hr = "hr=" + mHrFormatter.format(date);
if (usingHourly) {
String min = "min=" + mMinFormatter.format(date);
if (usingMinutely) {
return new String[]{dt, hr, min};
} else if (usingHourly) {
return new String[]{dt, hr};
} else {
return new String[]{dt};
Expand All @@ -94,16 +112,17 @@ protected String[] generatePartitions(long timestampMillis, boolean usingHourly)
protected long parsePartitions(String[] partitions) throws Exception {
String dtValue = partitions[0].split("=")[1];
String hrValue = partitions.length > 1 ? partitions[1].split("=")[1] : "00";
String value = dtValue + "-" + hrValue;
Date date = mDtHrFormatter.parse(value);
String minValue = partitions.length > 2 ? partitions[2].split("=")[1] : "00";
String value = dtValue + "-" + hrValue + "-" + minValue;
Date date = mDtHrMinFormatter.parse(value);
return date.getTime();
}

@Override
public String[] extractPartitions(Message message) throws Exception {
// Date constructor takes milliseconds since epoch.
long timestampMillis = extractTimestampMillis(message);
return generatePartitions(timestampMillis, mUsingHourly);
return generatePartitions(timestampMillis, mUsingHourly, mUsingMinutely);
}

private long getFinalizedTimestampMillis(Message lastMessage,
Expand All @@ -112,7 +131,7 @@ private long getFinalizedTimestampMillis(Message lastMessage,
long committedTimestamp = extractTimestampMillis(committedMessage);
long now = System.currentTimeMillis();
if (lastTimestamp == committedTimestamp &&
(now - lastTimestamp) > mFinalizerDelaySeconds * 1000) {
(now - lastTimestamp) > mFinalizerDelaySeconds * 1000) {
LOG.info("No new message coming, use the current time: " + now);
return now;
}
Expand All @@ -122,39 +141,58 @@ private long getFinalizedTimestampMillis(Message lastMessage,
@Override
public String[] getFinalizedUptoPartitions(List<Message> lastMessages,
List<Message> committedMessages) throws Exception {

if (lastMessages == null || committedMessages == null) {
LOG.error("Either: {} and {} is null", lastMessages,
committedMessages);
LOG.error("Either: {} and {} is null", lastMessages, committedMessages);
return null;
}
assert lastMessages.size() == committedMessages.size();

long minMillis = Long.MAX_VALUE;
for (int i = 0; i < lastMessages.size(); i++) {
long millis = getFinalizedTimestampMillis(lastMessages.get(i),
committedMessages.get(i));
long millis = getFinalizedTimestampMillis(lastMessages.get(i), committedMessages.get(i));
if (millis < minMillis) {
LOG.info("partition {}, time {}", i, millis);
minMillis = millis;
}
}
if (minMillis == Long.MAX_VALUE) {
LOG.error("No valid timestamps among messages: {} and {}", lastMessages,
committedMessages);
LOG.error("No valid timestamps among messages: {} and {}", lastMessages, committedMessages);
return null;
}

// add the safety lag for late-arrival messages
minMillis -= mFinalizerDelaySeconds * 1000L;
LOG.info("adjusted millis {}", minMillis);
return generatePartitions(minMillis, mUsingHourly);
}
return generatePartitions(minMillis, mUsingHourly, mUsingMinutely);

}
@Override
public String[] getPreviousPartitions(String[] partitions) throws Exception {
long millis = parsePartitions(partitions);
boolean usingHourly = mUsingHourly;
if (mUsingHourly && millis % DAY_IN_MILLIS == 0) {
boolean usingMinutely = mUsingMinutely;

if (mUsingMinutely && millis % HOUR_IN_MILLIS == 0) {
if (partitions.length == 3) {
usingMinutely = false;
if (millis % DAY_IN_MILLIS == 0) {
millis -= DAY_IN_MILLIS;
} else {
millis -= HOUR_IN_MILLIS;
usingHourly = true;
}
} else if (partitions.length == 2) {
millis += HOUR_IN_MILLIS;
millis -= MINUTE_IN_MILLIS;
usingMinutely = true;
} else {
millis += DAY_IN_MILLIS;
millis -= HOUR_IN_MILLIS;
usingMinutely = false;
usingHourly = true;
}
} else if (mUsingHourly && millis % DAY_IN_MILLIS == 0) {
// On the day boundary, if the currrent partition is [dt=07-07, hr=00], the previous
// one is dt=07-06; If the current one is [dt=07-06], the previous one is
// [dt=07-06, hr-23]
Expand All @@ -163,7 +201,7 @@ public String[] getPreviousPartitions(String[] partitions) throws Exception {
// dt=07-07, hr=00
// dt=07-06
// dt=07-06, hr=23
if (partitions.length == 2 ) {
if (partitions.length == 2) {
usingHourly = false;
millis -= DAY_IN_MILLIS;
} else {
Expand All @@ -173,9 +211,11 @@ public String[] getPreviousPartitions(String[] partitions) throws Exception {
}
} else {
long delta = mUsingHourly ? HOUR_IN_MILLIS : DAY_IN_MILLIS;
if (mUsingMinutely) {
delta = MINUTE_IN_MILLIS;
}
millis -= delta;
}
return generatePartitions(millis, usingHourly);
return generatePartitions(millis, usingHourly, usingMinutely);
}
}

}
162 changes: 161 additions & 1 deletion src/test/java/com/pinterest/secor/parser/JsonMessageParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,5 +255,165 @@ public void testHourlyGetFinalizedUptoPartitions() throws Exception {
assertEquals(expectedPartition, retrievedPartition);
}
}

@Test
public void testMinutelyGetFinalizedUptoPartitions() throws Exception {
Mockito.when(TimestampedMessageParser.usingMinutely(mConfig)).thenReturn(true);
JsonMessageParser jsonMessageParser = new JsonMessageParser(mConfig);

List<Message> lastMessages = new ArrayList<Message>();
lastMessages.add(mMessageWithSecondsTimestamp);
List<Message> committedMessages = new ArrayList<Message>();
committedMessages.add(mMessageWithMillisTimestamp);
String uptoPartitions[] = jsonMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages);
assertEquals(3, uptoPartitions.length);
assertEquals("dt=2014-07-21", uptoPartitions[0]);
assertEquals("hr=01", uptoPartitions[1]);
assertEquals("min=51", uptoPartitions[2]);

uptoPartitions[1] = "hr=01";
uptoPartitions[2] = "min=00";
uptoPartitions[0] = "dt=2014-07-20";

String[][] expectedPartitions = new String[][] {
new String[] { "dt=2014-07-20", "hr=00"},
new String[] { "dt=2014-07-20", "hr=00", "min=59" },
new String[] { "dt=2014-07-20", "hr=00", "min=58" },
new String[] { "dt=2014-07-20", "hr=00", "min=57" },
new String[] { "dt=2014-07-20", "hr=00", "min=56" },
new String[] { "dt=2014-07-20", "hr=00", "min=55" },
new String[] { "dt=2014-07-20", "hr=00", "min=54" },
new String[] { "dt=2014-07-20", "hr=00", "min=53" },
new String[] { "dt=2014-07-20", "hr=00", "min=52" },
new String[] { "dt=2014-07-20", "hr=00", "min=51" },
new String[] { "dt=2014-07-20", "hr=00", "min=50" },
new String[] { "dt=2014-07-20", "hr=00", "min=49" },
new String[] { "dt=2014-07-20", "hr=00", "min=48" },
new String[] { "dt=2014-07-20", "hr=00", "min=47" },
new String[] { "dt=2014-07-20", "hr=00", "min=46" },
new String[] { "dt=2014-07-20", "hr=00", "min=45" },
new String[] { "dt=2014-07-20", "hr=00", "min=44" },
new String[] { "dt=2014-07-20", "hr=00", "min=43" },
new String[] { "dt=2014-07-20", "hr=00", "min=42" },
new String[] { "dt=2014-07-20", "hr=00", "min=41" },
new String[] { "dt=2014-07-20", "hr=00", "min=40" },
new String[] { "dt=2014-07-20", "hr=00", "min=39" },
new String[] { "dt=2014-07-20", "hr=00", "min=38" },
new String[] { "dt=2014-07-20", "hr=00", "min=37" },
new String[] { "dt=2014-07-20", "hr=00", "min=36" },
new String[] { "dt=2014-07-20", "hr=00", "min=35" },
new String[] { "dt=2014-07-20", "hr=00", "min=34" },
new String[] { "dt=2014-07-20", "hr=00", "min=33" },
new String[] { "dt=2014-07-20", "hr=00", "min=32" },
new String[] { "dt=2014-07-20", "hr=00", "min=31" },
new String[] { "dt=2014-07-20", "hr=00", "min=30" },
new String[] { "dt=2014-07-20", "hr=00", "min=29" },
new String[] { "dt=2014-07-20", "hr=00", "min=28" },
new String[] { "dt=2014-07-20", "hr=00", "min=27" },
new String[] { "dt=2014-07-20", "hr=00", "min=26" },
new String[] { "dt=2014-07-20", "hr=00", "min=25" },
new String[] { "dt=2014-07-20", "hr=00", "min=24" },
new String[] { "dt=2014-07-20", "hr=00", "min=23" },
new String[] { "dt=2014-07-20", "hr=00", "min=22" },
new String[] { "dt=2014-07-20", "hr=00", "min=21" },
new String[] { "dt=2014-07-20", "hr=00", "min=20" },
new String[] { "dt=2014-07-20", "hr=00", "min=19" },
new String[] { "dt=2014-07-20", "hr=00", "min=18" },
new String[] { "dt=2014-07-20", "hr=00", "min=17" },
new String[] { "dt=2014-07-20", "hr=00", "min=16" },
new String[] { "dt=2014-07-20", "hr=00", "min=15" },
new String[] { "dt=2014-07-20", "hr=00", "min=14" },
new String[] { "dt=2014-07-20", "hr=00", "min=13" },
new String[] { "dt=2014-07-20", "hr=00", "min=12" },
new String[] { "dt=2014-07-20", "hr=00", "min=11" },
new String[] { "dt=2014-07-20", "hr=00", "min=10" },
new String[] { "dt=2014-07-20", "hr=00", "min=09" },
new String[] { "dt=2014-07-20", "hr=00", "min=08" },
new String[] { "dt=2014-07-20", "hr=00", "min=07" },
new String[] { "dt=2014-07-20", "hr=00", "min=06" },
new String[] { "dt=2014-07-20", "hr=00", "min=05" },
new String[] { "dt=2014-07-20", "hr=00", "min=04" },
new String[] { "dt=2014-07-20", "hr=00", "min=03" },
new String[] { "dt=2014-07-20", "hr=00", "min=02" },
new String[] { "dt=2014-07-20", "hr=00", "min=01" },
new String[] { "dt=2014-07-20", "hr=00", "min=00" },
new String[] { "dt=2014-07-19" },
new String[] { "dt=2014-07-19", "hr=23"},
new String[] { "dt=2014-07-19", "hr=23", "min=59" },
new String[] { "dt=2014-07-19", "hr=23", "min=58" },
new String[] { "dt=2014-07-19", "hr=23", "min=57" },
new String[] { "dt=2014-07-19", "hr=23", "min=56" },
new String[] { "dt=2014-07-19", "hr=23", "min=55" },
new String[] { "dt=2014-07-19", "hr=23", "min=54" },
new String[] { "dt=2014-07-19", "hr=23", "min=53" },
new String[] { "dt=2014-07-19", "hr=23", "min=52" },
new String[] { "dt=2014-07-19", "hr=23", "min=51" },
new String[] { "dt=2014-07-19", "hr=23", "min=50" },
new String[] { "dt=2014-07-19", "hr=23", "min=49" },
new String[] { "dt=2014-07-19", "hr=23", "min=48" },
new String[] { "dt=2014-07-19", "hr=23", "min=47" },
new String[] { "dt=2014-07-19", "hr=23", "min=46" },
new String[] { "dt=2014-07-19", "hr=23", "min=45" },
new String[] { "dt=2014-07-19", "hr=23", "min=44" },
new String[] { "dt=2014-07-19", "hr=23", "min=43" },
new String[] { "dt=2014-07-19", "hr=23", "min=42" },
new String[] { "dt=2014-07-19", "hr=23", "min=41" },
new String[] { "dt=2014-07-19", "hr=23", "min=40" },
new String[] { "dt=2014-07-19", "hr=23", "min=39" },
new String[] { "dt=2014-07-19", "hr=23", "min=38" },
new String[] { "dt=2014-07-19", "hr=23", "min=37" },
new String[] { "dt=2014-07-19", "hr=23", "min=36" },
new String[] { "dt=2014-07-19", "hr=23", "min=35" },
new String[] { "dt=2014-07-19", "hr=23", "min=34" },
new String[] { "dt=2014-07-19", "hr=23", "min=33" },
new String[] { "dt=2014-07-19", "hr=23", "min=32" },
new String[] { "dt=2014-07-19", "hr=23", "min=31" },
new String[] { "dt=2014-07-19", "hr=23", "min=30" },
new String[] { "dt=2014-07-19", "hr=23", "min=29" },
new String[] { "dt=2014-07-19", "hr=23", "min=28" },
new String[] { "dt=2014-07-19", "hr=23", "min=27" },
new String[] { "dt=2014-07-19", "hr=23", "min=26" },
new String[] { "dt=2014-07-19", "hr=23", "min=25" },
new String[] { "dt=2014-07-19", "hr=23", "min=24" },
new String[] { "dt=2014-07-19", "hr=23", "min=23" },
new String[] { "dt=2014-07-19", "hr=23", "min=22" },
new String[] { "dt=2014-07-19", "hr=23", "min=21" },
new String[] { "dt=2014-07-19", "hr=23", "min=20" },
new String[] { "dt=2014-07-19", "hr=23", "min=19" },
new String[] { "dt=2014-07-19", "hr=23", "min=18" },
new String[] { "dt=2014-07-19", "hr=23", "min=17" },
new String[] { "dt=2014-07-19", "hr=23", "min=16" },
new String[] { "dt=2014-07-19", "hr=23", "min=15" },
new String[] { "dt=2014-07-19", "hr=23", "min=14" },
new String[] { "dt=2014-07-19", "hr=23", "min=13" },
new String[] { "dt=2014-07-19", "hr=23", "min=12" },
new String[] { "dt=2014-07-19", "hr=23", "min=11" },
new String[] { "dt=2014-07-19", "hr=23", "min=10" },
new String[] { "dt=2014-07-19", "hr=23", "min=09" },
new String[] { "dt=2014-07-19", "hr=23", "min=08" },
new String[] { "dt=2014-07-19", "hr=23", "min=07" },
new String[] { "dt=2014-07-19", "hr=23", "min=06" },
new String[] { "dt=2014-07-19", "hr=23", "min=05" },
new String[] { "dt=2014-07-19", "hr=23", "min=04" },
new String[] { "dt=2014-07-19", "hr=23", "min=03" },
new String[] { "dt=2014-07-19", "hr=23", "min=02" },
new String[] { "dt=2014-07-19", "hr=23", "min=01" },
new String[] { "dt=2014-07-19", "hr=23", "min=00" },
new String[] { "dt=2014-07-19", "hr=22" },
new String[] { "dt=2014-07-19", "hr=22", "min=59" }, };

String[] partitions = uptoPartitions;
List<String[]> partitionsList = new ArrayList<String[]>();
for (int i = 0; i < 125; i++) {
String[] previous = jsonMessageParser.getPreviousPartitions(partitions);
partitionsList.add(previous);
partitions = previous;
}

assertEquals(partitionsList.size(), expectedPartitions.length);
for (int i = 0; i < partitionsList.size(); i++) {
List<String> expectedPartition = Arrays.asList(expectedPartitions[i]);
List<String> retrievedPartition = Arrays.asList(partitionsList.get(i));
assertEquals(expectedPartition, retrievedPartition);
}
}
}

0 comments on commit 83199b3

Please sign in to comment.