Skip to content

Commit

Permalink
Merge pull request pinterest#338 from tygrash/master
Browse files Browse the repository at this point in the history
Added timestamp in Message class and populating it while extracting o…
  • Loading branch information
HenryCaiHaiying authored May 18, 2017
2 parents 872a1ee + fbeb3af commit 9cf87fc
Show file tree
Hide file tree
Showing 22 changed files with 215 additions and 107 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ Secor is distributed under [Apache License, Version 2.0](http://www.apache.org/l
* [Rakuten](http://techblog.rakuten.co.jp/)
* [Appsflyer](https://www.appsflyer.com)
* [Wego](https://www.wego.com)
* [GO_JEK](http://gojekengineering.com/)
* [GO-JEK](http://gojekengineering.com/)

## Help

Expand Down
5 changes: 5 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ kafka.dual.commit.enabled=true
# Possible values: "zookeeper" to read offset from zookeeper or "kafka" to read offset from kafka consumer topic
kafka.offsets.storage=zookeeper

# Parameter which tells whether to extract Kafka message timestamp. This value is to be chose in case of 0.10.x kafka
# brokers. Default value is false.
kafka.useTimestamp=false

# Secor generation is a version that should be incremented during non-backwards-compatible
# Secor releases. Generation number is one of the components of generated log file names.
# Generation number makes sure that outputs of different Secor versions are isolated.
Expand Down Expand Up @@ -404,3 +408,4 @@ parquet.validation=false
secor.orc.message.schema.*=struct<a:int\,b:int\,c:struct<d:int\,e:string>\,f:array<string>\,g:int>
# Below config used for defining ORC schema provider class name. User can use the custom implementation for orc schema provider
secor.orc.schema.provider=com.pinterest.secor.util.orc.schema.DefaultORCSchemaProvider

6 changes: 5 additions & 1 deletion src/main/java/com/pinterest/secor/common/KafkaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,12 @@ private Message getMessage(TopicPartition topicPartition, long offset,
payloadBytes = new byte[payload.limit()];
payload.get(payloadBytes);
}
Long timestamp = null;
if (mConfig.useKafkaTimestamp()) {
timestamp = messageAndOffset.message().timestamp();
}
return new Message(topicPartition.getTopic(), topicPartition.getPartition(),
messageAndOffset.offset(), keyBytes, payloadBytes);
messageAndOffset.offset(), keyBytes, payloadBytes, timestamp);
}

private SimpleConsumer createConsumer(String host, int port, String clientName) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public String getOffsetsStorage() {
return getString("kafka.offsets.storage");
}

public boolean useKafkaTimestamp() {
return getBoolean("kafka.useTimestamp", false);
}

public int getGeneration() {
return getInt("secor.generation");
}
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/pinterest/secor/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ public class Message {
private long mOffset;
private byte[] mKafkaKey;
private byte[] mPayload;
private Long mTimestamp;

protected String fieldsToString() {
return "topic='" + mTopic + '\'' +
", kafkaPartition=" + mKafkaPartition +
", offset=" + mOffset +
", kafkaKey=" + new String(mKafkaKey) +
", payload=" + new String(mPayload);
", payload=" + new String(mPayload) +
", timestamp=" + mTimestamp;
}

@Override
public String toString() {
return "Message{" + fieldsToString() + '}';
}

public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload) {
public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload, Long timestamp) {
mTopic = topic;
mKafkaPartition = kafkaPartition;
mOffset = offset;
Expand All @@ -60,6 +62,7 @@ public Message(String topic, int kafkaPartition, long offset, byte[] kafkaKey, b
if (mPayload == null) {
mPayload = EMPTY_BYTES;
}
mTimestamp = timestamp;
}

public String getTopic() {
Expand All @@ -82,6 +85,10 @@ public byte[] getPayload() {
return mPayload;
}

public Long getTimestamp() {
return mTimestamp;
}

public void write(OutputStream output) throws IOException {
output.write(mPayload);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/pinterest/secor/message/ParsedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public String toString() {
}

public ParsedMessage(String topic, int kafkaPartition, long offset, byte[] kafkaKey, byte[] payload,
String[] mPartitions) {
super(topic, kafkaPartition, offset, kafkaKey, payload);
String[] mPartitions, long timestamp) {
super(topic, kafkaPartition, offset, kafkaKey, payload, timestamp);
this.mPartitions = mPartitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,4 @@ public String[] extractPartitions(Message message) {

return result;
}

}
6 changes: 3 additions & 3 deletions src/main/java/com/pinterest/secor/parser/MessageParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import net.minidev.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;

import java.util.regex.Pattern;

// TODO(pawel): should we offer a multi-message parser capable of parsing multiple types of
Expand Down Expand Up @@ -60,7 +60,7 @@ public ParsedMessage parse(Message message) throws Exception {
String[] partitions = extractPartitions(message);
return new ParsedMessage(message.getTopic(), message.getKafkaPartition(),
message.getOffset(), message.getKafkaKey(),
message.getPayload(), partitions);
message.getPayload(), partitions, message.getTimestamp());
}

public abstract String[] extractPartitions(Message payload) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package com.pinterest.secor.parser;

import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,12 +56,15 @@ public abstract class TimestampedMessageParser extends MessageParser implements
protected final boolean mUsingHourly;
protected final boolean mUsingMinutely;

protected final boolean mUseKafkaTimestamp;


public TimestampedMessageParser(SecorConfig config) {
super(config);

mUsingHourly = usingHourly(config);
mUsingMinutely = usingMinutely(config);
mUseKafkaTimestamp = useKafkaTimestamp(config);
mDtFormat = usingDateFormat(config);
mHrFormat = usingHourFormat(config);
mMinFormat = usingMinuteFormat(config);
Expand Down Expand Up @@ -91,7 +92,6 @@ public TimestampedMessageParser(SecorConfig config) {

mDtHrMinFormatter = new SimpleDateFormat(mDtFormat+ "-" + mHrFormat + "-" + mMinFormat);
mDtHrMinFormatter.setTimeZone(config.getTimeZone());

}

static boolean usingHourly(SecorConfig config) {
Expand Down Expand Up @@ -126,6 +126,10 @@ static String usingMinutePrefix(SecorConfig config) {
return config.getString("partitioner.granularity.minute.prefix", "min=");
}

static boolean useKafkaTimestamp(SecorConfig config) {
return config.useKafkaTimestamp();
}

protected static long toMillis(final long timestamp) {
final long nanosecondDivider = (long) Math.pow(10, 9 + 9);
final long microsecondDivider = (long) Math.pow(10, 9 + 6);
Expand All @@ -145,6 +149,10 @@ protected static long toMillis(final long timestamp) {

public abstract long extractTimestampMillis(final Message message) throws Exception;

public long getTimestampMillis(Message message) throws Exception {
return (mUseKafkaTimestamp) ? toMillis(message.getTimestamp()) : extractTimestampMillis(message);
}

protected String[] generatePartitions(long timestampMillis, boolean usingHourly, boolean usingMinutely)
throws Exception {
Date date = new Date(timestampMillis);
Expand Down Expand Up @@ -172,14 +180,14 @@ protected long parsePartitions(String[] partitions) throws Exception {
@Override
public String[] extractPartitions(Message message) throws Exception {
// Date constructor takes milliseconds since epoch.
long timestampMillis = extractTimestampMillis(message);
long timestampMillis = getTimestampMillis(message);
return generatePartitions(timestampMillis, mUsingHourly, mUsingMinutely);
}

private long getFinalizedTimestampMillis(Message lastMessage,
Message committedMessage) throws Exception {
long lastTimestamp = extractTimestampMillis(lastMessage);
long committedTimestamp = extractTimestampMillis(committedMessage);
long lastTimestamp = getTimestampMillis(lastMessage);
long committedTimestamp = getTimestampMillis(committedMessage);
long now = System.currentTimeMillis();
if (lastTimestamp == committedTimestamp &&
(now - lastTimestamp) > mFinalizerDelaySeconds * 1000) {
Expand Down Expand Up @@ -216,8 +224,8 @@ public String[] getFinalizedUptoPartitions(List<Message> lastMessages,
minMillis -= mFinalizerDelaySeconds * 1000L;
LOG.info("adjusted millis {}", minMillis);
return generatePartitions(minMillis, mUsingHourly, mUsingMinutely);

}

@Override
public String[] getPreviousPartitions(String[] partitions) throws Exception {
long millis = parsePartitions(partitions);
Expand Down Expand Up @@ -269,4 +277,4 @@ public String[] getPreviousPartitions(String[] partitions) throws Exception {
}
return generatePartitions(millis, usingHourly, usingMinutely);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Message read() {
MessageAndMetadata<byte[], byte[]> kafkaMessage = mIterator.next();
Message message = new Message(kafkaMessage.topic(), kafkaMessage.partition(),
kafkaMessage.offset(), kafkaMessage.key(),
kafkaMessage.message());
kafkaMessage.message(), kafkaMessage.timestamp());
TopicPartition topicPartition = new TopicPartition(message.getTopic(),
message.getKafkaPartition());
updateAccessTime(topicPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private String metricName(String key) {

private long getTimestamp(Message message) throws Exception {
if (mMessageParser instanceof TimestampedMessageParser) {
return ((TimestampedMessageParser)mMessageParser).extractTimestampMillis(message);
return ((TimestampedMessageParser)mMessageParser).getTimestampMillis(message);
} else {
return -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ public class LogFilePathTest extends TestCase {
".10_0_00000000000000000100.crc";

private LogFilePath mLogFilePath;
private long timestamp;

@Override
protected void setUp() throws Exception {
super.setUp();
mLogFilePath = new LogFilePath(PREFIX, TOPIC, PARTITIONS, GENERATION, KAFKA_PARTITION,
LAST_COMMITTED_OFFSET, "");
timestamp = System.currentTimeMillis();
}

public void testConstructFromMessage() throws Exception {
ParsedMessage message = new ParsedMessage(TOPIC, KAFKA_PARTITION, 1000, null,
"some_payload".getBytes(), PARTITIONS);
"some_payload".getBytes(), PARTITIONS, timestamp);
LogFilePath logFilePath = new LogFilePath(PREFIX, GENERATION, LAST_COMMITTED_OFFSET,
message, "");
assertEquals(PATH, logFilePath.getLogFilePath());
Expand Down
4 changes: 1 addition & 3 deletions src/test/java/com/pinterest/secor/message/MessageTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.pinterest.secor.message;

import static org.junit.Assert.*;

import org.junit.Test;

public class MessageTest {

@Test
public void testNullPayload() {
Message message = new Message("testTopic", 0, 123, null, null);
Message message = new Message("testTopic", 0, 123, null, null, null);
System.out.println(message);

// no assert necessary, just making sure it does not throw a
Expand Down
Loading

0 comments on commit 9cf87fc

Please sign in to comment.