Skip to content

Commit

Permalink
Fix ignored MQTT integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Posi-Paka authored and ashvayka committed Jan 6, 2018
1 parent e49c713 commit ca7ffd9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public final class PluginProcessingContext implements PluginContext {
private static final Executor executor = Executors.newSingleThreadExecutor();
public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";

private final SharedPluginProcessingContext pluginCtx;
private final Optional<PluginApiCallSecurityContext> securityCtx;
Expand Down Expand Up @@ -309,7 +310,7 @@ private void validateDevice(final PluginApiCallSecurityContext ctx, EntityId ent
ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
Futures.addCallback(deviceFuture, getCallback(callback, device -> {
if (device == null) {
return ValidationResult.entityNotFound("Device with requested id wasn't found!");
return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
} else {
if (!device.getTenantId().equals(ctx.getTenantId())) {
return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public abstract class AbstractControllerTest {
protected static final String CUSTOMER_USER_EMAIL = "[email protected]";
private static final String CUSTOMER_USER_PASSWORD = "customer";

/** See {@link org.springframework.test.web.servlet.DefaultMvcResult#getAsyncResult(long)}
* and {@link org.springframework.mock.web.MockAsyncContext#getTimeout()}
*/
private static final long DEFAULT_TIMEOUT = -1L;

protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(),
MediaType.APPLICATION_JSON.getSubtype(),
Charset.forName("utf8"));
Expand Down Expand Up @@ -336,15 +341,19 @@ protected <T> T doPost(String urlTemplate, Class<T> responseClass, String... par
}

protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
return readResponse(doPost(urlTemplate, params).andExpect(resultMatcher), responseClass);
return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseClass);
}

protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, String... params) throws Exception {
return readResponse(doPost(urlTemplate, content, params).andExpect(status().isOk()), responseClass);
}

protected <T> T doPostAsync(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
return readResponse(doPostAsync(urlTemplate, content, params).andExpect(resultMatcher), responseClass);
return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass);
}

protected <T> T doPostAsync(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, Long timeout, String... params) throws Exception {
return readResponse(doPostAsync(urlTemplate, content, timeout, params).andExpect(resultMatcher), responseClass);
}

protected <T> T doDelete(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
Expand All @@ -366,12 +375,13 @@ protected <T> ResultActions doPost(String urlTemplate, T content, String... para
return mockMvc.perform(postRequest);
}

protected <T> ResultActions doPostAsync(String urlTemplate, T content, String... params) throws Exception {
protected <T> ResultActions doPostAsync(String urlTemplate, T content, Long timeout, String... params) throws Exception {
MockHttpServletRequestBuilder postRequest = post(urlTemplate);
setJwtToken(postRequest);
String json = json(content);
postRequest.contentType(contentType).content(json);
MvcResult result = mockMvc.perform(postRequest).andReturn();
result.getAsyncResult(timeout);
return mockMvc.perform(asyncDispatch(result));
}

Expand All @@ -384,8 +394,8 @@ protected ResultActions doDelete(String urlTemplate, String... params) throws Ex

protected void populateParams(MockHttpServletRequestBuilder request, String... params) {
if (params != null && params.length > 0) {
Assert.assertEquals(params.length % 2, 0);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<String, String>();
Assert.assertEquals(0, params.length % 2);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
for (int i = 0; i < params.length; i += 2) {
paramsMap.add(params[i], params[i + 1]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/
package org.thingsboard.server.mqtt.rpc;

import java.util.Arrays;

import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.junit.*;
import org.springframework.http.HttpStatus;
import org.springframework.web.client.HttpClientErrorException;
import org.thingsboard.server.actors.plugin.PluginProcessingContext;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;

import java.util.UUID;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -42,15 +44,19 @@
public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractControllerTest {

private static final String MQTT_URL = "tcp://localhost:1883";
private static final String FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED = "HttpClientErrorException expected, but not encountered";
private static final Long TIME_TO_HANDLE_REQUEST = 500L;

private Tenant savedTenant;
private User tenantAdmin;
private Long asyncContextTimeoutToUseRpcPlugin;


@Before
public void beforeTest() throws Exception {
loginSysAdmin();

asyncContextTimeoutToUseRpcPlugin = getAsyncContextTimeoutToUseRpcPlugin();

Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
Expand All @@ -70,8 +76,7 @@ public void beforeTest() throws Exception {
public void afterTest() throws Exception {
loginSysAdmin();
if (savedTenant != null) {
doDelete("/api/tenant/" + savedTenant.getId().getId().toString())
.andExpect(status().isOk());
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
}
}

Expand Down Expand Up @@ -102,7 +107,6 @@ public void testServerMqttOneWayRpc() throws Exception {
}

@Test
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200
public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
Device device = new Device();
device.setName("Test One-Way Server-Side RPC Device Offline");
Expand All @@ -115,29 +119,19 @@ public void testServerMqttOneWayRpcDeviceOffline() throws Exception {

String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String deviceId = savedDevice.getId().getId().toString();
try {
doPost("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(408));
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
} catch (HttpClientErrorException e) {
log.error(e.getMessage(), e);
Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode());
Assert.assertEquals("408 null", e.getMessage());
}

doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
asyncContextTimeoutToUseRpcPlugin);
}

@Test
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401
public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception {
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String nonExistentDeviceId = UUID.randomUUID().toString();
try {
doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400));
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
} catch (HttpClientErrorException e) {
log.error(e.getMessage(), e);
Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode());
Assert.assertEquals("400 null", e.getMessage());
}
String nonExistentDeviceId = UUIDs.timeBased().toString();

String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class,
status().isNotFound());
Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result);
}

@Test
Expand Down Expand Up @@ -168,7 +162,6 @@ public void testServerMqttTwoWayRpc() throws Exception {
}

@Test
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200
public void testServerMqttTwoWayRpcDeviceOffline() throws Exception {
Device device = new Device();
device.setName("Test Two-Way Server-Side RPC Device Offline");
Expand All @@ -181,29 +174,19 @@ public void testServerMqttTwoWayRpcDeviceOffline() throws Exception {

String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String deviceId = savedDevice.getId().getId().toString();
try {
doPost("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(408));
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
} catch (HttpClientErrorException e) {
log.error(e.getMessage(), e);
Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode());
Assert.assertEquals("408 null", e.getMessage());
}

doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
asyncContextTimeoutToUseRpcPlugin);
}

@Test
@Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401
public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception {
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String nonExistentDeviceId = UUID.randomUUID().toString();
try {
doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400));
Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
} catch (HttpClientErrorException e) {
log.error(e.getMessage(), e);
Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode());
Assert.assertEquals("400 null", e.getMessage());
}
String nonExistentDeviceId = UUIDs.timeBased().toString();

String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class,
status().isNotFound());
Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result);
}

private Device getSavedDevice(Device device) throws Exception {
Expand All @@ -214,6 +197,13 @@ private DeviceCredentials getDeviceCredentials(Device savedDevice) throws Except
return doGet("/api/device/" + savedDevice.getId().getId().toString() + "/credentials", DeviceCredentials.class);
}

private Long getAsyncContextTimeoutToUseRpcPlugin() throws Exception {
TextPageData<PluginMetaData> plugins = doGetTyped("/api/plugin/system?limit=1&textSearch=system rpc plugin",
new TypeReference<TextPageData<PluginMetaData>>(){});
Long systemRpcPluginTimeout = plugins.getData().iterator().next().getConfiguration().get("defaultTimeout").asLong();
return systemRpcPluginTimeout + TIME_TO_HANDLE_REQUEST;
}

private static class TestMqttCallback implements MqttCallback {

private final MqttAsyncClient client;
Expand All @@ -228,10 +218,10 @@ public void connectionLost(Throwable throwable) {

@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
log.info("Message Arrived: " + mqttMessage.getPayload().toString());
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
MqttMessage message = new MqttMessage();
String responseTopic = requestTopic.replace("request", "response");
message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes());
message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
client.publish(responseTopic, message);
}

Expand Down

0 comments on commit ca7ffd9

Please sign in to comment.