Skip to content

Commit

Permalink
Merge pull request Netflix#241 from Netflix/ISSUE-240
Browse files Browse the repository at this point in the history
IndexInfoBuilder interface extended
  • Loading branch information
metacret committed Apr 2, 2015
2 parents 7c9e0b1 + d2b7d26 commit 5b95e58
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.netflix.suro.message.Message;
import com.netflix.suro.sink.DataConverter;
Expand Down Expand Up @@ -128,4 +130,45 @@ public long getTimestamp() {
return null;
}
}

@Override
public String getActionMetadata(IndexInfo info) {
if (!Strings.isNullOrEmpty(info.getId())) {
return String.format(
"{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }",
info.getIndex(), info.getType(), info.getId());
} else {
return String.format(
"{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\"} }",
info.getIndex(), info.getType());
}
}

@Override
public String getSource(IndexInfo info) throws JsonProcessingException {
if (info.getSource() instanceof Map) {
return jsonMapper.writeValueAsString(info.getSource());
} else {
return info.getSource().toString();
}
}

@Override
public String getIndexUri(IndexInfo info) {
return info.getId() != null ?
String.format(
"/%s/%s/%s",
info.getIndex(),
info.getType(),
info.getId()) :
String.format(
"/%s/%s/",
info.getIndex(),
info.getType());
}

@Override
public String getCommand() {
return "create";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -206,18 +205,10 @@ private String createIndexRequest(Message m) {
try {

StringBuilder sb = new StringBuilder();
if (info.getId() != null) {
sb.append(String.format(
"{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }",
info.getIndex(), info.getType(), info.getId()));
} else {
sb.append(String.format(
"{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\"} }",
info.getIndex(), info.getType()));
}
sb.append(indexInfo.getActionMetadata(info));
sb.append('\n');

sb.append(getSource(info));
sb.append(indexInfo.getSource(info));
sb.append('\n');

return sb.toString();
Expand All @@ -232,13 +223,7 @@ private String createIndexRequest(Message m) {
}
}

private String getSource(IndexInfo info) throws JsonProcessingException {
if (info.getSource() instanceof Map) {
return jsonMapper.writeValueAsString(info.getSource());
} else {
return info.getSource().toString();
}
}


@Override
protected void write(List<Message> msgList) throws IOException {
Expand Down Expand Up @@ -282,7 +267,7 @@ public void run() {
List items = (List) result.get("items");
for (int i = 0; i < items.size(); ++i) {
String routingKey = request.second().get(i).getRoutingKey();
Map<String, Object> resPerMessage = (Map) ((Map) (items.get(i))).get("create");
Map<String, Object> resPerMessage = (Map) ((Map) (items.get(i))).get(indexInfo.getCommand());
if (isFailed(resPerMessage) && !getErrorMessage(resPerMessage).contains("DocumentAlreadyExistsException")) {
log.error("Failed with: " + resPerMessage.get("error"));
Servo.getCounter(
Expand Down Expand Up @@ -335,33 +320,17 @@ private boolean isFailed(Map<String, Object> resPerMessage) {
return (int)resPerMessage.get("status") / 100 != 2;
}

@VisibleForTesting
protected void recover(Message message) throws Exception {
public void recover(Message message) throws Exception {
IndexInfo info = indexInfo.create(message);

String uri = info.getId() != null ?
String.format(
"/%s/%s/%s",
info.getIndex(),
info.getType(),
info.getId()) :
String.format(
"/%s/%s/",
info.getIndex(),
info.getType());

String entity = info.getSource() instanceof Map ?
jsonMapper.writeValueAsString(info.getSource()) :
info.getSource().toString();

HttpResponse response = null;
try {
response = client.executeWithLoadBalancer(
HttpRequest.newBuilder()
.verb(HttpRequest.Verb.POST)
.setRetriable(true)
.uri(uri)
.entity(entity)
.uri(indexInfo.getIndexUri(info))
.entity(indexInfo.getSource(info))
.build());
if (response.getStatus() / 100 != 2) {
Servo.getCounter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@
})
public interface IndexInfoBuilder {
IndexInfo create(Message msg);
String getActionMetadata(IndexInfo info);
String getSource(IndexInfo info) throws Exception;
String getIndexUri(IndexInfo info);
String getCommand();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

public enum TimestampSlice {
ts_millisecond {
String get(long ts) {
public String get(long ts) {
return Long.toString(ts);
}
},
ts_second {
String get(long ts) {
public String get(long ts) {
return Long.toString(ts / 1000);
}
},
ts_minute {
String get(long ts) {
public String get(long ts) {
return Long.toString(ts / 60000);
}
};

abstract String get(long ts);
public abstract String get(long ts);
}

0 comments on commit 5b95e58

Please sign in to comment.