Skip to content

Commit

Permalink
Bug fixes and minor updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomasz Bak committed Jun 3, 2015
1 parent 4e10b9a commit 7ac504a
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.client.apache4.ApacheHttpClient4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Tomasz Bak
*/
public abstract class JerseyEurekaHttpClient implements EurekaHttpClient {

private static final Logger logger = LoggerFactory.getLogger(JerseyEurekaHttpClient.class);

protected final String serviceUrl;

protected JerseyEurekaHttpClient(String serviceUrl) {
Expand All @@ -27,15 +32,19 @@ public HttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
WebResource webResource = getJerseyApacheClient().resource(serviceUrl).path(urlPath);
addExtraHeaders(webResource);
response = webResource
Builder resourceBuilder = getJerseyApacheClient().resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return HttpResponse.responseWith(response.getStatus());
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[register] Jersey HTTP POST {} with instance {}; statusCode={}", urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
Expand All @@ -44,14 +53,17 @@ public HttpResponse<Void> register(InstanceInfo info) {

@Override
public HttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + "/" + id;
ClientResponse response = null;
try {
String urlPath = "apps/" + appName + "/" + id;
WebResource webResource = getJerseyApacheClient().resource(serviceUrl).path(urlPath);
addExtraHeaders(webResource);
response = webResource.delete(ClientResponse.class);
Builder resourceBuilder = getJerseyApacheClient().resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return HttpResponse.responseWith(response.getStatus());
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[cancel] Jersey HTTP DELETE {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
Expand All @@ -60,25 +72,23 @@ public HttpResponse<Void> cancel(String appName, String id) {

@Override
public HttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
String urlPath = "apps/" + appName + "/" + id;
WebResource webResource = getJerseyApacheClient().resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
addExtraHeaders(webResource);
response = webResource.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
InstanceInfo infoFromPeer = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
infoFromPeer = response.getEntity(InstanceInfo.class);

}
return HttpResponse.responseWith(response.getStatus(), infoFromPeer);
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
return HttpResponse.responseWith(response.getStatus());
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
Expand All @@ -87,16 +97,21 @@ public HttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, Insta

@Override
public HttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info) {
String urlPath = "apps/" + appName + "/" + id + "/status";
ClientResponse response = null;
try {
String urlPath = "apps/" + appName + "/" + id + "/status";
WebResource webResource = getJerseyApacheClient().resource(serviceUrl)
Builder requestBuilder = getJerseyApacheClient().resource(serviceUrl)
.path(urlPath)
.queryParam("value", newStatus.name())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
response = webResource.put(ClientResponse.class);
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.put(ClientResponse.class);
return HttpResponse.responseWith(response.getStatus());
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[statusUpdate] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
Expand All @@ -105,16 +120,20 @@ public HttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus

@Override
public HttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info) {
String urlPath = "apps/" + appName + '/' + id + "/status";
ClientResponse response = null;
try {
String urlPath = "apps/" + appName + '/' + id + "/status";
WebResource webResource = getJerseyApacheClient().resource(serviceUrl)
Builder requestBuilder = getJerseyApacheClient().resource(serviceUrl)
.path(urlPath)
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
addExtraHeaders(webResource);
response = webResource.delete(ClientResponse.class);
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.delete(ClientResponse.class);
return HttpResponse.responseWith(response.getStatus());
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[statusOverrideDelete] Jersey HTTP DELETE {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
Expand All @@ -123,19 +142,22 @@ public HttpResponse<Void> deleteStatusOverride(String appName, String id, Instan

@Override
public HttpResponse<InstanceInfo> getInstance(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
String urlPath = "apps/" + appName + '/' + id;
WebResource webResource = getJerseyApacheClient().resource(serviceUrl).path(urlPath);
addExtraHeaders(webResource);
response = webResource.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Builder requestBuilder = getJerseyApacheClient().resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

InstanceInfo infoFromPeer = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
infoFromPeer = response.getEntity(InstanceInfo.class);
}
return HttpResponse.responseWith(response.getStatus(), infoFromPeer);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[getInstance] Jersey HTTP GET {}; statusCode=", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
Expand All @@ -147,7 +169,7 @@ public void shutdown() {
getJerseyApacheClient().destroy();
}

protected void addExtraHeaders(WebResource webResource) {
protected void addExtraHeaders(Builder webResource) {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ public double getRenewalPercentThreshold() {
namespace + "renewalPercentThreshold", 0.85).get();
}

@Override
public boolean shouldEnableReplicatedRequestCompression() {
return configInstance.getBooleanProperty(
namespace + "enableReplicatedRequestCompression", false).get();
}

@Override
public int getNumberOfReplicationRetries() {
return configInstance.getIntProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public interface EurekaServerConfig {
*/
int getPeerEurekaNodesUpdateIntervalMs();

/**
* If set to true, the replicated data send in the request will be always compressed.
* This does not define response path, which is driven by "Accept-Encoding" header.
*/
boolean shouldEnableReplicatedRequestCompression();

/**
* Get the number of times the replication events should be retried with
* peers.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.netflix.eureka.cluster;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.EurekaIdentityHeaderFilter;
import com.netflix.discovery.shared.EurekaJerseyClient;
import com.netflix.discovery.shared.EurekaJerseyClient.JerseyClient;
Expand All @@ -17,6 +20,7 @@
import com.netflix.eureka.resources.ASGResource.ASGStatus;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.client.apache4.ApacheHttpClient4;
import org.slf4j.Logger;
Expand Down Expand Up @@ -61,7 +65,7 @@ public JerseyReplicationClient(EurekaServerConfig config, String serviceUrl) {
config.getPeerNodeConnectionIdleTimeoutSeconds());
}
jerseyApacheClient = jerseyClient.getClient();
jerseyApacheClient.addFilter(new GZIPContentEncodingFilter(true));
jerseyApacheClient.addFilter(new GZIPContentEncodingFilter(config.shouldEnableReplicatedRequestCompression()));
} catch (Throwable e) {
throw new RuntimeException("Cannot Create new Replica Node :" + name, e);
}
Expand All @@ -82,10 +86,44 @@ protected ApacheHttpClient4 getJerseyApacheClient() {
}

@Override
protected void addExtraHeaders(WebResource webResource) {
protected void addExtraHeaders(Builder webResource) {
webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}

/**
* Compared to regular heartbeat, in the replication channel the server may return a more up to date
* instance copy.
*/
@Override
public HttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
WebResource webResource = getJerseyApacheClient().resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
InstanceInfo infoFromPeer = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
infoFromPeer = response.getEntity(InstanceInfo.class);
}
return HttpResponse.responseWith(response.getStatus(), infoFromPeer);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}

@Override
public HttpResponse<Void> statusUpdate(String asgName, ASGStatus newStatus) {
ClientResponse response = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import org.junit.Test;
import org.mockserver.client.server.MockServerClient;
import org.mockserver.junit.MockServerRule;
import org.mockserver.model.Header;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockserver.model.Header.header;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;

Expand Down Expand Up @@ -56,6 +56,7 @@ public void testRegistrationReplication() throws Exception {
serverMockClient.when(
request()
.withMethod("POST")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/apps/" + instanceInfo.getAppName())
).respond(
response().withStatusCode(200)
Expand All @@ -70,6 +71,7 @@ public void testCancelReplication() throws Exception {
serverMockClient.when(
request()
.withMethod("DELETE")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/apps/" + instanceInfo.getAppName() + '/' + instanceInfo.getId())
).respond(
response().withStatusCode(204)
Expand All @@ -84,6 +86,7 @@ public void testHeartbeatReplicationWithNoResponseBody() throws Exception {
serverMockClient.when(
request()
.withMethod("PUT")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/apps/" + instanceInfo.getAppName() + '/' + instanceInfo.getId())
).respond(
response().withStatusCode(200)
Expand All @@ -103,12 +106,13 @@ public void testHeartbeatReplicationWithResponseBody() throws Exception {
serverMockClient.when(
request()
.withMethod("PUT")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/apps/" + this.instanceInfo.getAppName() + '/' + this.instanceInfo.getId())
).respond(
response()
.withStatusCode(200)
.withHeader(Header.header("Content-Type", MediaType.APPLICATION_JSON))
.withHeader(Header.header("Content-Encoding", "gzip"))
.withHeader(header("Content-Type", MediaType.APPLICATION_JSON))
.withHeader(header("Content-Encoding", "gzip"))
.withBody(responseBody)
);

Expand All @@ -122,6 +126,7 @@ public void testAsgStatusUpdateReplication() throws Exception {
serverMockClient.when(
request()
.withMethod("PUT")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/asg/" + instanceInfo.getASGName() + "/status")
).respond(
response().withStatusCode(200)
Expand All @@ -136,6 +141,7 @@ public void testStatusUpdateReplication() throws Exception {
serverMockClient.when(
request()
.withMethod("PUT")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/apps/" + instanceInfo.getAppName() + '/' + instanceInfo.getId() + "/status")
).respond(
response().withStatusCode(200)
Expand All @@ -150,6 +156,7 @@ public void testDeleteStatusOverrideReplication() throws Exception {
serverMockClient.when(
request()
.withMethod("DELETE")
.withHeader(header(PeerEurekaNode.HEADER_REPLICATION, "true"))
.withPath("/eureka/v2/apps/" + instanceInfo.getAppName() + '/' + instanceInfo.getId() + "/status")
).respond(
response().withStatusCode(204)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testHeartbeat() throws Exception {
jerseyEurekaClient.register(instanceInfo);

// Now send heartbeat
HttpResponse<InstanceInfo> heartBeatResponse = jerseyEurekaClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
HttpResponse<InstanceInfo> heartBeatResponse = jerseyReplicationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

assertThat(heartBeatResponse.getStatusCode(), is(equalTo(200)));
assertThat(heartBeatResponse.getEntity(), is(nullValue()));
Expand All @@ -135,7 +135,7 @@ public void testMissedHeartbeat() throws Exception {
InstanceInfo instanceInfo = instanceInfoIt.next();

// Now send heartbeat
HttpResponse<InstanceInfo> heartBeatResponse = jerseyEurekaClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
HttpResponse<InstanceInfo> heartBeatResponse = jerseyReplicationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

assertThat(heartBeatResponse.getStatusCode(), is(equalTo(404)));
}
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Thu Jan 08 14:57:15 PST 2015
#Mon Jun 01 21:52:12 PDT 2015
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-all.zip

0 comments on commit 7ac504a

Please sign in to comment.