forked from apache/mina
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e82fd79
commit df9203c
Showing
8 changed files
with
890 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
166 changes: 166 additions & 0 deletions
166
coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
* | ||
*/ | ||
package org.apache.mina.coap.retry; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.mina.api.AbstractIoFilter; | ||
import org.apache.mina.api.IoFilter; | ||
import org.apache.mina.api.IoSession; | ||
import org.apache.mina.coap.CoapMessage; | ||
import org.apache.mina.filterchain.ReadFilterChainController; | ||
import org.apache.mina.filterchain.WriteFilterChainController; | ||
import org.apache.mina.session.WriteRequest; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* An {@link IoFilter} in charge of messages retransmissions. | ||
* | ||
* <p> | ||
* In case of messages to be sent to the client, the filter retransmits the <i>Confirmable</i> message at exponentially | ||
* increasing intervals, until it receives an acknowledgment (or <i>Reset</i> message), or runs out of attempts. | ||
* </p> | ||
* | ||
* <p> | ||
* In case of received <i>Confirmable</i> messages, the filter keeps track of the acknowledged transmissions in order to | ||
* avoid multiple processing of duplicated messages. | ||
* </p> | ||
*/ | ||
public class CoapRetryFilter extends AbstractIoFilter { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(CoapRetryFilter.class); | ||
|
||
/** The executor in charge of scheduling the retransmissions */ | ||
private ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor(); | ||
|
||
/** The confirmable messages waiting to be acknowledged */ | ||
private Map<Integer, CoapTransmission> inFlight = new ConcurrentHashMap<>(); | ||
|
||
/** The list of processed messages used to handle duplicate copies of Confirmable messages */ | ||
private ExpiringMap<Integer, CoapMessage> processed = new ExpiringMap<Integer, CoapMessage>(); | ||
|
||
public CoapRetryFilter() { | ||
processed.start(); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public void messageReceived(IoSession session, Object in, ReadFilterChainController controller) { | ||
LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session); | ||
|
||
CoapMessage coapMsg = (CoapMessage) in; | ||
|
||
switch (coapMsg.getType()) { | ||
case NON_CONFIRMABLE: | ||
// non confirmable message, let's move to the next filter | ||
controller.callReadNextFilter(coapMsg); | ||
break; | ||
case CONFIRMABLE: | ||
// check if this is a duplicate of a message already processed | ||
CoapMessage ack = processed.get(coapMsg.requestId()); | ||
if (ack != null) { | ||
// stop the filter chain and send again the ack since it was probably lost | ||
LOGGER.debug("Duplicated messages detected for ID {}", coapMsg.requestId()); | ||
controller.callWriteMessageForRead(ack); | ||
} else { | ||
controller.callReadNextFilter(coapMsg); | ||
} | ||
|
||
break; | ||
case ACK: | ||
case RESET: | ||
CoapTransmission t = inFlight.get(coapMsg.requestId()); | ||
if (t != null) { | ||
// cancel the scheduled retransmission | ||
t.getRetryFuture().cancel(false); | ||
inFlight.remove(coapMsg.requestId()); | ||
} | ||
controller.callReadNextFilter(coapMsg); | ||
break; | ||
} | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public void messageWriting(final IoSession session, final WriteRequest message, | ||
WriteFilterChainController controller) { | ||
LOGGER.debug("Processing a MESSAGE_WRITING for session {}", session); | ||
|
||
final CoapMessage coapMsg = (CoapMessage) message.getMessage(); | ||
final Integer coapMsgId = (Integer) coapMsg.requestId(); | ||
|
||
switch (coapMsg.getType()) { | ||
|
||
case NON_CONFIRMABLE: | ||
controller.callWriteNextFilter(message); | ||
break; | ||
case RESET: | ||
case ACK: | ||
// let's keep track of the message to avoid processing it again in case of duplicate copy. | ||
processed.put(coapMsgId, coapMsg); | ||
|
||
controller.callWriteNextFilter(message); | ||
break; | ||
|
||
case CONFIRMABLE: | ||
// initialize a transmission if this is not a retry | ||
CoapTransmission t = inFlight.get(coapMsgId); | ||
if (t == null) { | ||
t = new CoapTransmission(coapMsg); | ||
inFlight.put(coapMsgId, t); | ||
} | ||
|
||
// schedule a retry | ||
ScheduledFuture<?> future = retryExecutor.schedule(new Runnable() { | ||
|
||
@Override | ||
public void run() { | ||
CoapTransmission t = inFlight.get(coapMsgId); | ||
|
||
// send again the message if the maximum number of attempts is not reached | ||
if (t != null && t.timeout()) { | ||
LOGGER.debug("Retry for message with ID {}", coapMsgId); | ||
session.write(coapMsg); | ||
} else { | ||
// abort transmission | ||
LOGGER.debug("No more retry for message with ID {}", coapMsgId); | ||
} | ||
} | ||
}, t.getNextTimeout(), TimeUnit.MILLISECONDS); | ||
|
||
t.setRetryFuture(future); | ||
|
||
// move to the next filter | ||
controller.callWriteNextFilter(message); | ||
break; | ||
} | ||
|
||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
* | ||
*/ | ||
package org.apache.mina.coap.retry; | ||
|
||
import java.util.Random; | ||
import java.util.concurrent.ScheduledFuture; | ||
|
||
import org.apache.mina.coap.CoapMessage; | ||
|
||
/** | ||
* A transmission is a wrapper of a <i>Confirmable</i> {@link CoapMessage} carrying additional data used to ensure a | ||
* reliable communication. | ||
* | ||
* <p> | ||
* Basically, retransmission is controlled by two things : a timeout and retransmission counter. | ||
* </p> | ||
*/ | ||
public class CoapTransmission { | ||
|
||
/** Default value of the initial timeout - in milliseconds */ | ||
private static final long ACK_TIMEOUT = 2000L; | ||
|
||
/** Default value of the random factor used to compute the initial timeout */ | ||
private static final float ACK_RANDOM_FACTOR = 1.5F; | ||
|
||
/** Default value of the maximum number of retransmissions */ | ||
private static final int MAX_RETRANSMIT = 4; | ||
|
||
/** | ||
* The CoAP message waiting to be acknowledged | ||
*/ | ||
private CoapMessage message; | ||
|
||
/** | ||
* The future in charge of the retransmission when the timeout is reached. It is needed to keep track of this future | ||
* to be able to cancel it when the expected acknowledgment is received | ||
*/ | ||
private ScheduledFuture<?> retryFuture; | ||
|
||
/** | ||
* The number of transmission retry | ||
*/ | ||
private int transmissionCount; | ||
|
||
/** | ||
* the timeout in millisecond before the next retransmission | ||
*/ | ||
private long nextTimeout; | ||
|
||
public CoapTransmission(CoapMessage message) { | ||
this.message = message; | ||
|
||
this.transmissionCount = 0; | ||
|
||
// the initial timeout is set to a random duration between ACK_TIMEOUT and (ACK_TIMEOUT * ACK_RANDOM_FACTOR) | ||
this.nextTimeout = ACK_TIMEOUT + new Random().nextInt((int) ((ACK_RANDOM_FACTOR - 1.0F) * ACK_TIMEOUT)); | ||
} | ||
|
||
/** | ||
* This method is called when a timeout is triggered for this transmission. | ||
* | ||
* @return <code>true</code> if the message must be retransmitted and <code>false</code> if the transmission attempt | ||
* must be canceled | ||
*/ | ||
public boolean timeout() { | ||
if (transmissionCount < MAX_RETRANSMIT) { | ||
this.nextTimeout = this.nextTimeout * 2; | ||
this.transmissionCount++; | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
public CoapMessage getMessage() { | ||
return message; | ||
} | ||
|
||
public ScheduledFuture<?> getRetryFuture() { | ||
return retryFuture; | ||
} | ||
|
||
public void setRetryFuture(ScheduledFuture<?> retryFuture) { | ||
this.retryFuture = retryFuture; | ||
} | ||
|
||
public long getNextTimeout() { | ||
return nextTimeout; | ||
} | ||
|
||
} |
Oops, something went wrong.