Skip to content

Commit

Permalink
fix: in case of error, propagate back error code of edgeHub (Azure#4929)
Browse files Browse the repository at this point in the history
instead of using 0 (which is a result of uninitialized status property)
  • Loading branch information
vipeller authored Jun 3, 2021
1 parent bc56061 commit 8250d87
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter
using System.Collections.Generic;
using System.Dynamic;
using System.IO;
using System.Net;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
Expand Down Expand Up @@ -306,13 +307,33 @@ void HandleDirectMethodCall(string id, string method, string rid, byte[] payload
{
// TODO acquire the response timeout from the message
var callingTask = this.edgeHub.InvokeMethodAsync(rid, new DirectMethodRequest(id, method, payload, TimeSpan.FromMinutes(1)));
callingTask.ContinueWith(
async response =>
_ = callingTask.ContinueWith(
async responseTask =>
{
var status = response.IsCompletedSuccessfully ? response.Result.Status : 500; // TODO check the correct status code to return
var topic = string.Format(DirectMethodResponseTemplate, id, response.Result.Status, rid);
// DirectMethodResponse has a 'Status' and 'HttpStatusCode'. If everything is fine, 'Status' contains
// the response of the device and HttpStatusCode is 200. In this case we pass back the response of the
// device. If something went wrong, then HttpStatusCode is an error (typically 404). In this case we
// pass back that value. The value 500/InternalServerError is just a fallback, edgeHub is supposed to
// handle errors, so 500 always should be overwritten.
var responseCode = Convert.ToInt32(HttpStatusCode.InternalServerError);
var responseData = this.emptyArray;
if (responseTask.IsCompletedSuccessfully)
{
var response = responseTask.Result;
responseData = response.Data ?? responseData;

if (response.HttpStatusCode == HttpStatusCode.OK)
{
responseCode = response.Status;
}
else
{
responseCode = Convert.ToInt32(response.HttpStatusCode);
}
}

await this.SendUpstreamMessageAsync(RpcCmdPub, topic, response.Result.Data ?? this.emptyArray);
var topic = string.Format(DirectMethodResponseTemplate, id, responseCode, rid);
await this.SendUpstreamMessageAsync(RpcCmdPub, topic, responseData);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.MqttBrokerAdapter.Test
{
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Moq;
using Newtonsoft.Json;
using Newtonsoft.Json.Bson;
using Newtonsoft.Json.Serialization;
using Xunit;

public class BrokeredCloudProxyDispatcherTest
{
[Fact]
public async Task SuccessfulMethodCallReturnsMethodResult()
{
var resultTopic = await this.DirectMethodCall(new DirectMethodResponse("123", new byte[] { 1, 2, 3 }, 201));
Assert.Contains("/res/201/", resultTopic);
}

[Fact]
public async Task FailedMethodCallReturnsHttpStatus()
{
var resultTopic = await this.DirectMethodCall(new DirectMethodResponse(new Exception("some error"), System.Net.HttpStatusCode.NotFound));
Assert.Contains("/res/404/", resultTopic);
}

public async Task<string> DirectMethodCall(DirectMethodResponse response)
{
var connector = new Mock<IMqttBrokerConnector>();
var edgeHub = new Mock<IEdgeHub>();

var lastPayload = default(byte[]);

var milestone = new SemaphoreSlim(0, 1);

connector.Setup(c => c.SendAsync(It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>()))
.Callback<string, byte[], bool>(
(_, p, __) =>
{
lastPayload = p;
milestone.Release();
})
.Returns(() => Task.FromResult(true));

edgeHub.Setup(e => e.InvokeMethodAsync(It.IsAny<string>(), It.IsAny<DirectMethodRequest>()))
.Returns(() => Task.FromResult(response));

var sut = new BrokeredCloudProxyDispatcher();

sut.BindEdgeHub(edgeHub.Object);
sut.SetConnector(connector.Object);

await sut.HandleAsync(new MqttPublishInfo("$downstream/dev_a/mod_1/methods/post/test/?$rid=123", Encoding.UTF8.GetBytes("{ \"test\":\"data\"}")));

await milestone.WaitAsync();

var packet = GetRpcPacket(lastPayload);

return packet.Topic;
}

RpcPacket GetRpcPacket(byte[] payload)
{
var packet = default(RpcPacket);
using (var reader = new BsonDataReader(new MemoryStream(payload)))
{
var serializer = new JsonSerializer
{
ContractResolver = new DefaultContractResolver
{
NamingStrategy = new CamelCaseNamingStrategy()
}
};

packet = serializer.Deserialize<RpcPacket>(reader);
}
return packet;
}
}
}

0 comments on commit 8250d87

Please sign in to comment.