Skip to content

Commit

Permalink
Misc improvements to dynamic reqiest endpoints used by ibeans
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.codehaus.org/mule/branches/mule-3.x@19333 bf997673-6b11-0410-b953-e057580c5b09
  • Loading branch information
rossmason committed Sep 3, 2010
1 parent 59df214 commit 5d4f4cd
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.endpoint.EndpointBuilder;
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.endpoint.MalformedEndpointException;
import org.mule.api.transformer.Transformer;
import org.mule.api.transport.PropertyScope;
import org.mule.config.endpoint.AnnotatedEndpointData;
import org.mule.config.i18n.CoreMessages;
import org.mule.transport.AbstractConnector;
import org.mule.util.UriParamFilter;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -48,13 +50,24 @@ public class CallRequestEndpoint extends DynamicRequestEndpoint
//determined
private static List<Transformer> transformers = new ArrayList<Transformer>();
private static List<Transformer> responseTransformers = new ArrayList<Transformer>();

private UriParamFilter filter = new UriParamFilter();

public CallRequestEndpoint(MuleContext context, AnnotatedEndpointData epData)
public CallRequestEndpoint(MuleContext context, AnnotatedEndpointData epData) throws MalformedEndpointException
{
super(createInboundEndpoint(context, epData), epData.getAddress());
super( context, createInboundBuilder(context, epData), epData.getAddress());
}

private static InboundEndpoint createInboundEndpoint(MuleContext context, AnnotatedEndpointData epData)
@Override
protected void validateUriTemplate(String uri) throws MalformedEndpointException
{
if (uri.indexOf(":") > uri.indexOf(parser.getStyle().getPrefix()))
{
throw new MalformedEndpointException(CoreMessages.dynamicEndpointsMustSpecifyAScheme(), uri);
}
}

private static EndpointBuilder createInboundBuilder(MuleContext context, AnnotatedEndpointData epData)
{
try
{
Expand All @@ -64,18 +77,31 @@ private static InboundEndpoint createInboundEndpoint(MuleContext context, Annota
builder.setName(epData.getName());
builder.setProperties(epData.getProperties() == null ? new HashMap() : epData.getProperties());

return builder.buildInboundEndpoint();
return builder;
}
catch (MuleException e)
{
throw new RuntimeException(e);
}
}

@Override
protected String parseURIString(String uri, MuleMessage message)
{
//We do additional processing here to parse the URI template
Map<String, Object> props = getPropertiesForTemplate(message);

String newUriString = parser.parse(props, uri);
//Remove optional params completely if null
newUriString = filter.filterParamsByValue(newUriString, CallOutboundEndpoint.NULL_PARAM);

return super.parseURIString(newUriString, message);
}

@Override
protected Map<String, Object> getPropertiesForTemplate(MuleMessage message)
{
Map<String, Object> props = (Map) message.removeProperty(CHANNEL.URI_PARAM_PROPERTIES, PropertyScope.INVOCATION);
Map<String, Object> props = (Map) message.findPropertyInAnyScope(CHANNEL.URI_PARAM_PROPERTIES, null);
if (props == null)
{
throw new IllegalStateException(CHANNEL.URI_PARAM_PROPERTIES + " not set on message");
Expand All @@ -90,7 +116,7 @@ public List getTransformers()
{
try
{
transformers.addAll(getLocalConnector().getDefaultInboundTransformers(this));
transformers.addAll(((AbstractConnector)getConnector()).getDefaultInboundTransformers(this));
for (Transformer tran : transformers)
{
tran.setEndpoint(this);
Expand All @@ -113,7 +139,7 @@ public List getResponseTransformers()
{
try
{
responseTransformers.addAll(getLocalConnector().getDefaultResponseTransformers(this));
responseTransformers.addAll(((AbstractConnector)getConnector()).getDefaultResponseTransformers(this));
for (Transformer tran : responseTransformers)
{
tran.setEndpoint(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,43 @@
package org.mule.module.ibeans.spi.support;

import org.mule.DefaultMuleMessage;
import org.mule.MessageExchangePattern;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.MuleRuntimeException;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.endpoint.EndpointBuilder;
import org.mule.api.endpoint.EndpointURI;
import org.mule.api.endpoint.InboundEndpoint;
import org.mule.api.endpoint.MalformedEndpointException;
import org.mule.api.expression.ExpressionManager;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.transport.Connector;
import org.mule.config.i18n.CoreMessages;
import org.mule.endpoint.DefaultInboundEndpoint;
import org.mule.endpoint.DynamicOutboundEndpoint;
import org.mule.endpoint.DynamicURIInboundEndpoint;
import org.mule.endpoint.MuleEndpointURI;
import org.mule.transport.AbstractConnector;
import org.mule.endpoint.URIBuilder;
import org.mule.transport.service.TransportFactory;
import org.mule.util.BeanUtils;
import org.mule.transport.service.TransportFactoryException;
import org.mule.util.TemplateParser;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* A dynamic request endpoint is used in conjunction with the {@link org.ibeans.annotation.Call} annotation when there are no {@link org.ibeans.annotation.param.Payload},
* {@link org.ibeans.annotation.param.PayloadParam} or {@link org.ibeans.annotation.param.HeaderParam} annotations
* A dynamic request endpoint is used in conjunction with the {@link org.ibeans.annotation.Call} annotation when there are no {@link org.ibeans.annotation.param.Body},
* {@link org.ibeans.annotation.param.BodyParam} or {@link org.ibeans.annotation.param.HeaderParam} annotations
* on a method and allows a dynamic {@link org.mule.api.endpoint.InboundEndpoint} to be created. This endpoint is then used via the Mule {@link org.mule.api.transport.MessageRequester}
* interface to make a specific request to a transport for a message.
*/
Expand All @@ -47,18 +59,29 @@ public class DynamicRequestEndpoint extends DynamicURIInboundEndpoint
protected transient final Log logger = LogFactory.getLog(DynamicRequestEndpoint.class);
private static final long serialVersionUID = 8861985949279708638L;

protected String uri;
protected TemplateParser parser = TemplateParser.createCurlyBracesStyleParser();

//Need a local read-write intance
protected AbstractConnector localConnector;
/**
* The URI template used to construct the actual URI to send the message to.
*/
protected String uriTemplate;

protected TemplateParser parser = TemplateParser.createCurlyBracesStyleParser();
private EndpointBuilder builder;

public DynamicRequestEndpoint(InboundEndpoint endpoint, String uri)
public DynamicRequestEndpoint(MuleContext muleContext, EndpointBuilder builder, String uriTemplate) throws MalformedEndpointException
{
super(endpoint);
this.uri = uri;
this.localConnector = (AbstractConnector) endpoint.getConnector();
super(new NullInboundEndpoint(muleContext));
this.builder = builder;
this.uriTemplate = uriTemplate;
validateUriTemplate(uriTemplate);
}

protected void validateUriTemplate(String uri) throws MalformedEndpointException
{
if (uri.indexOf(":") > uri.indexOf(ExpressionManager.DEFAULT_EXPRESSION_PREFIX))
{
throw new MalformedEndpointException(CoreMessages.dynamicEndpointsMustSpecifyAScheme(), uri);
}
}

protected Map<String, Object> getPropertiesForTemplate(MuleMessage message)
Expand All @@ -74,24 +97,24 @@ protected Map<String, Object> getPropertiesForTemplate(MuleMessage message)
return props;
}

protected EndpointURI getEndpointURIForMessage(MuleMessage message) throws MessagingException
protected EndpointURI getEndpointURIForMessage(MuleEvent event) throws MessagingException
{
if (logger.isDebugEnabled())
{
logger.debug("Uri before parsing is: " + uri);
logger.debug("Uri before parsing is: " + uriTemplate);
}

Map<String, Object> props = getPropertiesForTemplate(message);
Map<String, Object> props = getPropertiesForTemplate(event.getMessage());

String newUriString = parser.parse(props, uri);
String newUriString = parser.parse(props, uriTemplate);
Object evalParam = props.get(EVAL_PARAM_PROPERTY);
if (evalParam != null)
{
newUriString = this.getMuleContext().getExpressionManager().parse(newUriString, new DefaultMuleMessage(evalParam, getMuleContext()), true);
newUriString = parseURIString(newUriString, new DefaultMuleMessage(evalParam, getMuleContext()));
}
else
{
newUriString = this.getMuleContext().getExpressionManager().parse(newUriString, message, true);
newUriString = parseURIString(newUriString, event.getMessage());
}
if (logger.isDebugEnabled())
{
Expand All @@ -102,58 +125,44 @@ protected EndpointURI getEndpointURIForMessage(MuleMessage message) throws Messa
{
setEndpointURI(new MuleEndpointURI(newUriString, getMuleContext()));

if (!getLocalConnector().supportsProtocol(getEndpointURI().getScheme()))
if (!newUriString.startsWith(getEndpointURI().getScheme()))
{
throw new MessagingException(CoreMessages.schemeCannotChangeForRouter(
this.getEndpointURI().getScheme(), getEndpointURI().getScheme()), message);
this.getEndpointURI().getScheme(), getEndpointURI().getScheme()), event);
}
getEndpointURI().initialise();
return getEndpointURI();
}
catch (Exception e)
{
throw new MessagingException(
CoreMessages.templateCausedMalformedEndpoint(uri, newUriString),
message, e);
CoreMessages.templateCausedMalformedEndpoint(uriTemplate, newUriString),
event, e);
}

}

@Override
public Connector getConnector()
protected String parseURIString(String uri, MuleMessage message)
{
try
{
return getLocalConnector();
}
catch (MuleException e)
{
throw new MuleRuntimeException(e.getI18nMessage(), e);
}
return this.getMuleContext().getExpressionManager().parse(uri, message, true);
}


protected AbstractConnector getLocalConnector() throws MuleException
public MuleMessage request(long timeout, MuleEvent event) throws Exception
{
if (localConnector == null)
EndpointURI uri = getEndpointURIForMessage(event);

if (endpoint instanceof NullInboundEndpoint)
{
localConnector = (AbstractConnector) new TransportFactory(getMuleContext()).createConnector(getEndpointURI());
getMuleContext().getRegistry().registerConnector(localConnector);
//This allows connector properties to be set as properties on the endpoint
BeanUtils.populateWithoutFail(localConnector, this.getProperties(), false);
builder.setURIBuilder(new URIBuilder(uri));
endpoint = builder.buildInboundEndpoint();
}
return localConnector;
}
InboundEndpoint inboundEndpoint = new DynamicURIInboundEndpoint(endpoint, uri);

public MuleMessage request(long timeout, MuleMessage message) throws Exception
{
EndpointURI uri = getEndpointURIForMessage(message);
DynamicURIInboundEndpoint inboundEndpoint = new DynamicURIInboundEndpoint(this, uri);
if (message.getInvocationProperty(MuleProperties.MULE_CREDENTIALS_PROPERTY) != null)
if (event.getMessage().getInvocationProperty(MuleProperties.MULE_CREDENTIALS_PROPERTY) != null)
{
inboundEndpoint.getProperties().put(MuleProperties.MULE_CREDENTIALS_PROPERTY, message.getInvocationProperty(MuleProperties.MULE_CREDENTIALS_PROPERTY));
inboundEndpoint.getProperties().put(MuleProperties.MULE_CREDENTIALS_PROPERTY, event.getMessage().getInvocationProperty(MuleProperties.MULE_CREDENTIALS_PROPERTY));
}
return getLocalConnector().request(inboundEndpoint, timeout);
return super.request(timeout);
}

@Override
Expand All @@ -174,15 +183,55 @@ public boolean equals(Object o)

DynamicRequestEndpoint that = (DynamicRequestEndpoint) o;

return !(uri != null ? !uri.equals(that.uri) : that.uri != null);
return !(uriTemplate != null ? !uriTemplate.equals(that.uriTemplate) : that.uriTemplate != null);

}

@Override
public int hashCode()
{
int result = 0;
result = 31 * result + (uri != null ? uri.hashCode() : 0);
result = 31 * result + (uriTemplate != null ? uriTemplate.hashCode() : 0);
return result;
}
}

protected static class NullInboundEndpoint extends DefaultInboundEndpoint implements InboundEndpoint
{
NullInboundEndpoint(MuleContext muleContext)
{
super(createDynamicConnector(muleContext), null, null, new HashMap(), null, true, MessageExchangePattern.ONE_WAY, 0, "started", null, null, muleContext, null, null, null, null, true, null);
}

@Override
public MessageProcessor createMessageProcessorChain(FlowConstruct flowContruct) throws MuleException
{
throw new UnsupportedOperationException("createMessageProcessorChain");
}

public List<String> getResponseProperties()
{
return Collections.emptyList();
}

@SuppressWarnings("unused")
public MuleEvent process(MuleEvent event) throws MuleException
{
throw new UnsupportedOperationException("process");
}

static Connector createDynamicConnector(MuleContext muleContext)
{
try
{
return new TransportFactory(muleContext).createConnector(DynamicOutboundEndpoint.DYNAMIC_URI_PLACEHOLDER);
}
catch (TransportFactoryException e)
{
//This should never happen
throw new MuleRuntimeException(e);
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
package org.mule.module.ibeans.spi.support;

import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MessagingException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleMessage;
import org.mule.config.i18n.CoreMessages;
import org.mule.transport.NullPayload;

import org.ibeans.api.channel.CHANNEL;

Expand All @@ -31,8 +34,12 @@ public MuleEvent process(MuleEvent event) throws MessagingException
int timeout = event.getMessage().getInboundProperty(CHANNEL.TIMEOUT, event.getMuleContext().getConfiguration().getDefaultResponseTimeout());
if (inboundEndpoint instanceof DynamicRequestEndpoint)
{
return new DefaultMuleEvent(((DynamicRequestEndpoint) inboundEndpoint).request(timeout,
event.getMessage()), event);
MuleMessage message =((DynamicRequestEndpoint) inboundEndpoint).request(timeout, event);
if(message == null)
{
message = new DefaultMuleMessage(NullPayload.getInstance(), event.getMuleContext());
}
return new DefaultMuleEvent(message, event);
}
else
{
Expand All @@ -43,7 +50,7 @@ public MuleEvent process(MuleEvent event) throws MessagingException
}
catch (Exception e)
{
throw new MessagingException(CoreMessages.failedToInvoke("inboundEndpoint.request()"), event.getMessage(), e);
throw new MessagingException(CoreMessages.failedToInvoke("inboundEndpoint.request()"), event, e);
}
}
}

0 comments on commit 5d4f4cd

Please sign in to comment.