Skip to content

Commit dd05d0e

Browse files
(Feature) Java rate limiting (metlo-labs#72)
1 parent 03e5542 commit dd05d0e

File tree

4 files changed

+196
-63
lines changed

4 files changed

+196
-63
lines changed

ingestors/java/spring/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104

105105
<groupId>com.metlo</groupId>
106106
<artifactId>spring</artifactId>
107-
<version>0.2.2</version>
107+
<version>0.3</version>
108108

109109
<dependencies>
110110
<dependency>

ingestors/java/spring/src/main/java/com/metlo/spring/Metlo.java

Lines changed: 49 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,75 @@
11
package com.metlo.spring;
22

3-
import com.google.gson.Gson;
3+
import com.metlo.spring.utils.ContentCachingResponseWrapperWithHeaderNames;
4+
import com.metlo.spring.utils.RateLimitingRequests;
5+
46
import org.springframework.web.filter.OncePerRequestFilter;
57
import org.springframework.web.util.ContentCachingRequestWrapper;
6-
import org.springframework.web.util.ContentCachingResponseWrapper;
78

89
import javax.servlet.FilterChain;
910
import javax.servlet.ServletException;
1011
import javax.servlet.http.HttpServletRequest;
1112
import javax.servlet.http.HttpServletResponse;
1213
import java.io.IOException;
13-
import java.io.OutputStream;
1414
import java.io.UnsupportedEncodingException;
15-
import java.net.HttpURLConnection;
16-
import java.net.URL;
17-
import java.net.URLConnection;
18-
import java.nio.charset.StandardCharsets;
1915
import java.util.*;
20-
import java.util.concurrent.SynchronousQueue;
21-
import java.util.concurrent.ThreadPoolExecutor;
22-
import java.util.concurrent.TimeUnit;
2316
import java.util.function.Function;
2417

18+
2519
public class Metlo extends OncePerRequestFilter {
26-
private static final int DEFAULT_THREAD_POOL_SIZE = 8;
27-
private final static String endpoint = "/api/v1/log-request/single";
28-
private final ThreadPoolExecutor pool;
29-
private final String METLO_KEY;
30-
private final String METLO_ADDR;
20+
private static final int DEFAULT_THREAD_POOL_SIZE = 2;
21+
22+
private static final int DEFAULT_RPS = 10;
23+
private final static String endpoint = "api/v1/log-request/single";
24+
25+
private final Boolean enabled;
26+
27+
private final RateLimitingRequests req;
3128

3229
public Metlo(String host, String api_key) {
33-
this(DEFAULT_THREAD_POOL_SIZE, host, api_key);
30+
this(DEFAULT_THREAD_POOL_SIZE, host, api_key, DEFAULT_RPS);
3431
}
3532

36-
public Metlo(int pool_size, String host, String api_key) {
37-
this.METLO_KEY = api_key;
38-
this.METLO_ADDR = host + Metlo.endpoint;
39-
this.pool = new ThreadPoolExecutor(0, pool_size,
40-
60L, TimeUnit.SECONDS,
41-
new SynchronousQueue<Runnable>());
33+
public Metlo(String host, String api_key, Integer rps) {
34+
this(DEFAULT_THREAD_POOL_SIZE, host, api_key, rps);
4235
}
4336

44-
private void pushRequest(Map<String, Object> data) throws IOException {
45-
URL url = new URL(this.METLO_ADDR);
46-
URLConnection con = url.openConnection();
47-
HttpURLConnection http = (HttpURLConnection) con;
48-
http.setRequestMethod("POST"); // PUT is another valid option
49-
http.setRequestProperty("Authorization", this.METLO_KEY);
50-
http.setDoOutput(true);
51-
Gson gson = new Gson();
52-
String json = gson.toJson(data);
53-
54-
byte[] out = json.getBytes(StandardCharsets.UTF_8);
55-
int length = out.length;
56-
57-
http.setFixedLengthStreamingMode(length);
58-
http.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
59-
http.connect();
60-
try (OutputStream os = http.getOutputStream()) {
61-
os.write(out);
37+
public Metlo(int pool_size, String host, String api_key, Integer rps) {
38+
39+
String METLO_ADDR = host;
40+
if (host.charAt(host.length() - 1) == '/') {
41+
METLO_ADDR += endpoint;
42+
} else {
43+
METLO_ADDR += "/" + endpoint;
44+
}
45+
this.req = new RateLimitingRequests(rps, pool_size, METLO_ADDR, api_key);
46+
47+
String enabled = System.getenv("METLO_ENABLED");
48+
if (enabled != null) {
49+
this.enabled = Boolean.parseBoolean(enabled);
50+
} else {
51+
this.enabled = true;
6252
}
6353
}
6454

55+
6556
@Override
6657
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
6758
throws ServletException, IOException {
59+
6860
ContentCachingRequestWrapper requestWrapper = new ContentCachingRequestWrapper(request);
69-
ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(response);
61+
ContentCachingResponseWrapperWithHeaderNames responseWrapper = new ContentCachingResponseWrapperWithHeaderNames(response);
7062

7163
filterChain.doFilter(requestWrapper, responseWrapper);
72-
73-
String requestBody = getStringValue(requestWrapper.getContentAsByteArray(),
74-
request.getCharacterEncoding());
75-
String responseBody = getStringValue(responseWrapper.getContentAsByteArray(),
76-
response.getCharacterEncoding());
77-
78-
this.pool.submit(() -> {
79-
try {
80-
this.pushRequest(createDataBinding(requestWrapper, responseWrapper, requestBody, responseBody));
81-
} catch (Exception e) {
82-
e.printStackTrace();
83-
}
84-
});
85-
responseWrapper.copyBodyToResponse();
64+
if (this.enabled) {
65+
String requestBody = getStringValue(requestWrapper.getContentAsByteArray(),
66+
request.getCharacterEncoding());
67+
String responseBody = getStringValue(responseWrapper.getContentAsByteArray(),
68+
response.getCharacterEncoding());
69+
70+
this.req.send(createDataBinding(requestWrapper, responseWrapper, requestBody, responseBody));
71+
responseWrapper.copyBodyToResponse();
72+
}
8673
}
8774

8875
private String getStringValue(byte[] contentAsByteArray, String characterEncoding) {
@@ -94,7 +81,7 @@ private String getStringValue(byte[] contentAsByteArray, String characterEncodin
9481
return "";
9582
}
9683

97-
private Map<String, Object> createDataBinding(HttpServletRequest request, HttpServletResponse response, String request_body, String response_body) throws Exception {
84+
private Map<String, Object> createDataBinding(HttpServletRequest request, ContentCachingResponseWrapperWithHeaderNames response, String request_body, String response_body) {
9885
Map<String, Object> DATA = new HashMap<String, Object>();
9986
Map<String, Object> REQUEST = new HashMap<>();
10087
Map<String, Object> REQUEST_URL = new HashMap<>();
@@ -146,8 +133,8 @@ private List<Map<String, String>> listParamFormat(Map<String, String[]> map) {
146133
List<Map<String, String>> _formatted_params_ = new ArrayList<>();
147134
for (Map.Entry<String, String[]> entry_raw : map.entrySet()) {
148135
HashMap<String, String> entry = new HashMap<String, String>();
149-
entry.put("Name", entry_raw.getKey());
150-
entry.put("Value", "[" + String.join(",", entry_raw.getValue()) + "]");
136+
entry.put("name", entry_raw.getKey());
137+
entry.put("value", "[" + String.join(",", entry_raw.getValue()) + "]");
151138
_formatted_params_.add(entry);
152139
}
153140
return _formatted_params_;
@@ -159,8 +146,8 @@ private List<Map<String, String>> listRequestHeaderFormat(Enumeration<String> he
159146
String headerName = headerNameIter.next();
160147
String headerValue = headerValueForName.apply(headerName);
161148
Map<String, String> individualHeader = new HashMap<String, String>();
162-
individualHeader.put("Name", headerName);
163-
individualHeader.put("Value", headerValue);
149+
individualHeader.put("name", headerName);
150+
individualHeader.put("value", headerValue);
164151
headers.add(individualHeader);
165152
}
166153
return headers;
@@ -171,8 +158,8 @@ private List<Map<String, String>> listResponseHeaderFormat(Collection<String> he
171158
for (String headerName : headerNames) {
172159
String headerValue = headerValueForName.apply(headerName);
173160
Map<String, String> individualHeader = new HashMap<String, String>();
174-
individualHeader.put("Name", headerName);
175-
individualHeader.put("Value", headerValue);
161+
individualHeader.put("name", headerName);
162+
individualHeader.put("value", headerValue);
176163
headers.add(individualHeader);
177164
}
178165
return headers;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.metlo.spring.utils;
2+
3+
import org.springframework.web.util.ContentCachingResponseWrapper;
4+
5+
import javax.servlet.http.HttpServletResponse;
6+
import java.util.Collections;
7+
import java.util.HashSet;
8+
import java.util.Set;
9+
10+
public class ContentCachingResponseWrapperWithHeaderNames extends ContentCachingResponseWrapper {
11+
/**
12+
* Here to provide a wrapper to HTTPServletResponse so that we can capture more of the headers.
13+
*/
14+
private final Set<String> headerNames = new HashSet<String>();
15+
16+
public ContentCachingResponseWrapperWithHeaderNames(HttpServletResponse delegate) {
17+
super(delegate);
18+
}
19+
20+
public void addHeader(String name, String value) {
21+
super.addHeader(name, value);
22+
headerNames.add(name);
23+
}
24+
25+
public void addDateHeader(String name, long date) {
26+
super.addDateHeader(name, date);
27+
headerNames.add(name);
28+
}
29+
30+
public void addIntHeader(String name, int value) {
31+
super.addIntHeader(name, value);
32+
headerNames.add(name);
33+
}
34+
35+
public void setHeader(String name, String value) {
36+
super.setHeader(name, value);
37+
headerNames.add(name);
38+
}
39+
40+
public void setDateHeader(String name, long date) {
41+
super.setDateHeader(name, date);
42+
headerNames.add(name);
43+
}
44+
45+
public void setIntHeader(String name, int value) {
46+
super.setIntHeader(name, value);
47+
headerNames.add(name);
48+
}
49+
50+
51+
public Set<String> getHeaderNames() {
52+
return Collections.unmodifiableSet(headerNames);
53+
}
54+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.metlo.spring.utils;
2+
3+
import com.google.gson.Gson;
4+
5+
import java.io.IOException;
6+
import java.io.OutputStream;
7+
import java.net.HttpURLConnection;
8+
import java.net.URL;
9+
import java.net.URLConnection;
10+
import java.nio.charset.StandardCharsets;
11+
import java.time.Instant;
12+
import java.util.*;
13+
import java.util.concurrent.LinkedBlockingQueue;
14+
import java.util.concurrent.ThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
16+
17+
public class RateLimitingRequests {
18+
private final Integer rps;
19+
private final String host;
20+
private final String key;
21+
private final ThreadPoolExecutor pool;
22+
private List<Long> ts;
23+
24+
public RateLimitingRequests(Integer rps, Integer pool_size, String host, String api_key) {
25+
this.rps = rps;
26+
this.host = host;
27+
this.key = api_key;
28+
this.ts = Collections.synchronizedList(new ArrayList<Long>());
29+
this.pool = new ThreadPoolExecutor(0, pool_size,
30+
60L, TimeUnit.SECONDS,
31+
new LinkedBlockingQueue<Runnable>());
32+
}
33+
34+
private void pushRequest(Map<String, Object> data) throws IOException {
35+
URL url = new URL(this.host);
36+
URLConnection con = url.openConnection();
37+
HttpURLConnection http = (HttpURLConnection) con;
38+
http.setRequestMethod("POST"); // PUT is another valid option
39+
http.setRequestProperty("Authorization", this.key);
40+
http.setDoOutput(true);
41+
Gson gson = new Gson();
42+
String json = gson.toJson(data);
43+
44+
byte[] out = json.getBytes(StandardCharsets.UTF_8);
45+
int length = out.length;
46+
47+
48+
http.setFixedLengthStreamingMode(length);
49+
http.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
50+
http.connect();
51+
try (OutputStream os = http.getOutputStream()) {
52+
os.write(out);
53+
}
54+
int code = http.getResponseCode();
55+
}
56+
57+
58+
private synchronized Boolean allow() {
59+
60+
List<Long> tmp_ts = Collections.synchronizedList(new ArrayList<Long>());
61+
Long curr = Instant.now().toEpochMilli();
62+
this.ts.forEach((Long x) -> {
63+
// We care about requests in the last second only.
64+
if ((curr - x) <= 1000) {
65+
tmp_ts.add(x);
66+
}
67+
});
68+
69+
this.ts = tmp_ts;
70+
71+
if (this.ts.size() < this.rps) {
72+
this.ts.add(curr);
73+
return true;
74+
}
75+
return false;
76+
77+
}
78+
79+
public void send(Map<String, Object> data) {
80+
if (this.allow()) {
81+
this.pool.submit(() -> {
82+
try {
83+
pushRequest(data);
84+
} catch (Exception e) {
85+
e.printStackTrace();
86+
}
87+
});
88+
}
89+
}
90+
91+
92+
}

0 commit comments

Comments
 (0)