Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQ-6339
Browse files Browse the repository at this point in the history
Add support for AMQP client to connect using WebSockets.
  • Loading branch information
tabish121 committed Jun 30, 2016
1 parent 83827f2 commit 31c55f7
Show file tree
Hide file tree
Showing 53 changed files with 2,228 additions and 562 deletions.
45 changes: 45 additions & 0 deletions activemq-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@
<artifactId>activemq-spring</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-http</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand Down Expand Up @@ -123,6 +138,36 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@

/**
* Interface that defines the API for any AMQP protocol converter ised to
* map AMQP mechanincs to ActiveMQ and back.
* map AMQP mechanics to ActiveMQ and back.
*/
public interface AmqpProtocolConverter {

/**
* A new incoming data packet from the remote peer is handed off to the
* protocol converter for porcessing. The type can vary and be either an
* protocol converter for processing. The type can vary and be either an
* AmqpHeader at the handshake phase or a byte buffer containing the next
* incoming frame data from the remote.
*
Expand Down Expand Up @@ -70,9 +70,9 @@ public interface AmqpProtocolConverter {
* empty frames or closing connections due to remote end being inactive
* for to long.
*
* @returns the amount of milliseconds to wait before performaing another check.
* @returns the amount of milliseconds to wait before performing another check.
*
* @throws IOException if an error occurs on writing heatbeats to the wire.
* @throws IOException if an error occurs on writing heart-beats to the wire.
*/
long keepAlive() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;

import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.amqp.AmqpFrameParser.AMQPFrameSink;
import org.apache.activemq.transport.ws.WSTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;

/**
* An AMQP based WebSocket transport implementation.
*/
public class AmqpWSTransport extends TransportSupport implements WSTransport, AMQPFrameSink {

private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
private final URI remoteLocation;

private WSTransportSink outputSink;
private int receiveCounter;
private X509Certificate[] certificates;

/**
* Create a new Transport instance.
*
* @param location
* the remote location where the client connection is from.
* @param wireFormat
* the WireFormat instance that configures this Transport.
*/
public AmqpWSTransport(URI location, WireFormat wireFormat) {
super();

remoteLocation = location;
frameReader.setWireFormat((AmqpWireFormat) wireFormat);
}

@Override
public void setTransportSink(WSTransportSink outputSink) {
this.outputSink = outputSink;
}

@Override
public void oneway(Object command) throws IOException {
if (command instanceof ByteBuffer) {
outputSink.onSocketOutboundBinary((ByteBuffer) command);
} else {
throw new IOException("Unexpected output command.");
}
}

@Override
public String getRemoteAddress() {
return remoteLocation.toASCIIString();
}

@Override
public int getReceiveCounter() {
return receiveCounter;
}

@Override
public X509Certificate[] getPeerCertificates() {
return certificates;
}

@Override
public void setPeerCertificates(X509Certificate[] certificates) {
this.certificates = certificates;
}

@Override
public String getSubProtocol() {
return "amqp";
}

@Override
public WireFormat getWireFormat() {
return frameReader.getWireFormat();
}

@Override
protected void doStop(ServiceStopper stopper) throws Exception {
// Currently nothing needed here since we have no async workers.
}

@Override
protected void doStart() throws Exception {
if (outputSink == null) {
throw new IllegalStateException("Transport started before output sink assigned.");
}

// Currently nothing needed here since we have no async workers.
}

//----- WebSocket event hooks --------------------------------------------//

@Override
public void onWebSocketText(String data) throws IOException {
onException(new IOException("Illegal text content receive on AMQP WebSocket channel."));
}

@Override
public void onWebSocketBinary(ByteBuffer data) throws IOException {
try {
frameReader.parse(data);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}

@Override
public void onWebSocketClosed() throws IOException {
onException(new IOException("Unexpected close of AMQP WebSocket channel."));
}

//----- AMQP Frame Data event hook ---------------------------------------//

@Override
public void onFrame(Object frame) {
doConsume(frame);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;

/**
* Factory for creating WebSocket aware AMQP Transports.
*/
public class AmqpWSTransportFactory extends TransportFactory implements BrokerServiceAware {

private BrokerService brokerService = null;

@Override
protected String getDefaultWireFormatType() {
return "amqp";
}

@Override
public TransportServer doBind(URI location) throws IOException {
throw new IOException("doBind() method not implemented! No Server over WS implemented.");
}

@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);

Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");

IntrospectionSupport.setProperties(amqpTransport, options);
IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);

// Now wrap the filter with the monitor
transport = createInactivityMonitor(amqpTransport, format);
IntrospectionSupport.setProperties(transport, options);

return super.compositeConfigure(transport, format, options);
}

/**
* Factory method to create a new transport
*
* @throws IOException
* @throws UnknownHostException
*/
@Override
protected Transport createTransport(URI location, WireFormat wireFormat) throws MalformedURLException, UnknownHostException, IOException {
return new AmqpWSTransport(location, wireFormat);
}

@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}

protected Transport createInactivityMonitor(AmqpTransportFilter transport, WireFormat format) {
AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
transport.setInactivityMonitor(monitor);
return monitor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void pumpProtonToSocket() {
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
LOG.trace("Sending {} bytes out", toWrite.limit());
LOG.trace("Server: Sending {} bytes out", toWrite.limit());
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
Expand Down Expand Up @@ -356,6 +356,8 @@ public void onAMQPData(Object command) throws Exception {
return;
}

LOG.trace("Server: Received from client: {} bytes", frame.getLength());

while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
Expand Down Expand Up @@ -386,7 +388,7 @@ private void processProtonEvents() throws Exception {
Event event = null;
while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
LOG.trace("Processing event: {}", event.getType());
LOG.trace("Server: Processing event: {}", event.getType());
}
switch (event.getType()) {
case CONNECTION_REMOTE_OPEN:
Expand Down Expand Up @@ -484,7 +486,6 @@ public void onResponse(AmqpProtocolConverter converter, Response response) throw

protonConnection.close();
} else {

if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ public AmqpAuthenticator(AmqpTransport transport, Sasl sasl, BrokerService broke
}

/**
* @return true if the SASL exchange has conpleted, regardless of success.
* @return true if the SASL exchange has completed, regardless of success.
*/
public boolean isDone() {
return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE;
}

/**
* @return the list of all SASL mechanisms that are supported curretnly.
* @return the list of all SASL mechanisms that are supported currently.
*/
public String[] getSupportedMechanisms() {
return mechanisms;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory
Loading

0 comments on commit 31c55f7

Please sign in to comment.