Skip to content

Commit

Permalink
NIFI-3541: Add local network interface capability to site-to-site cli…
Browse files Browse the repository at this point in the history
…ent and remote group and ports
  • Loading branch information
markap14 authored and apiri committed Mar 6, 2017
1 parent 000414e commit 9e68f02
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -168,6 +169,7 @@ public static class Builder implements Serializable {
private int batchCount;
private long batchSize;
private long batchNanos;
private InetAddress localAddress;
private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW;
private HttpProxy httpProxy;

Expand Down Expand Up @@ -198,6 +200,7 @@ public Builder fromConfig(final SiteToSiteClientConfig config) {
this.batchCount = config.getPreferredBatchCount();
this.batchSize = config.getPreferredBatchSize();
this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
this.localAddress = config.getLocalAddress();
this.httpProxy = config.getHttpProxy();

return this;
Expand All @@ -223,12 +226,31 @@ public Builder url(final String url) {
}

/**
* <p>Specifies the URLs of the remote NiFi instance.</p>
* <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.</p>
* <p>
* Specifies the local address to use when communicating with the remote NiFi instance.
* </p>
*
* @param localAddress the local address to use, or <code>null</code> to use <code>anyLocal</code> address.
* @return the builder
*/
public Builder localAddress(final InetAddress localAddress) {
this.localAddress = localAddress;
return this;
}

/**
* <p>
* Specifies the URLs of the remote NiFi instance.
* </p>
* <p>
* If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.
* </p>
*
* <p>Multiple urls provide better connectivity with a NiFi cluster, able to connect
* to the target cluster at long as one of the specified urls is accessible.</p>
* <p>
* Multiple urls provide better connectivity with a NiFi cluster, able to connect
* to the target cluster at long as one of the specified urls is accessible.
* </p>
*
* @param urls urls of remote instance
* @return the builder
Expand Down Expand Up @@ -717,6 +739,7 @@ class StandardSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializ
private final long batchSize;
private final long batchNanos;
private final HttpProxy httpProxy;
private final InetAddress localAddress;

// some serialization frameworks require a default constructor
private StandardSiteToSiteClientConfig() {
Expand All @@ -740,6 +763,7 @@ private StandardSiteToSiteClientConfig() {
this.batchNanos = 0;
this.transportProtocol = null;
this.httpProxy = null;
this.localAddress = null;
}

private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
Expand All @@ -766,6 +790,7 @@ private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
this.batchNanos = builder.batchNanos;
this.transportProtocol = builder.getTransportProtocol();
this.httpProxy = builder.getHttpProxy();
this.localAddress = builder.localAddress;
}

@Override
Expand Down Expand Up @@ -931,5 +956,10 @@ public SiteToSiteTransportProtocol getTransportProtocol() {
public HttpProxy getHttpProxy() {
return httpProxy;
}

@Override
public InetAddress getLocalAddress() {
return localAddress;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.File;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -36,6 +37,7 @@ public interface SiteToSiteClientConfig extends Serializable {
* for backward compatibility for implementations that does not expect multiple URLs.
* {@link #getUrls()} should be used instead then should support multiple URLs when making requests.
*/
@Deprecated
String getUrl();

/**
Expand Down Expand Up @@ -171,4 +173,9 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
HttpProxy getHttpProxy();

/**
* @return the InetAddress to bind to for the local address when creating a socket, or
* {@code null} to bind to the {@code anyLocal} address.
*/
InetAddress getLocalAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,55 @@
*/
package org.apache.nifi.remote.util;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -87,53 +136,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;

public class SiteToSiteRestApiClient implements Closeable {

private static final String EVENT_CATEGORY = "Site-to-Site";
Expand All @@ -160,6 +162,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private CloseableHttpAsyncClient httpAsyncClient;

private boolean compress = false;
private InetAddress localAddress = null;
private long requestExpirationMillis = 0;
private int serverTransactionTtl = 0;
private int batchCount = 0;
Expand Down Expand Up @@ -239,6 +242,10 @@ private void setupRequestConfig() {
.setConnectTimeout(connectTimeoutMillis)
.setSocketTimeout(readTimeoutMillis);

if (localAddress != null) {
requestConfigBuilder.setLocalAddress(localAddress);
}

if (proxy != null) {
requestConfigBuilder.setProxy(proxy.getHttpHost());
}
Expand Down Expand Up @@ -916,6 +923,8 @@ private void startExtendingTtl(final String transactionUrl, final Closeable stre
extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
extendingApiClient.localAddress = this.localAddress;

final int extendFrequency = serverTransactionTtl / 2;

ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
Expand Down Expand Up @@ -1197,10 +1206,12 @@ public void setBaseUrl(final String baseUrl) {

public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
setupRequestConfig();
}

public void setReadTimeoutMillis(final int readTimeoutMillis) {
this.readTimeoutMillis = readTimeoutMillis;
setupRequestConfig();
}

public static String getFirstUrl(final String clusterUrlStr) {
Expand Down Expand Up @@ -1336,6 +1347,10 @@ private void setBaseUrl(final String scheme, final String host, final int port,
public void setCompress(final boolean compress) {
this.compress = compress;
}

public void setLocalAddress(final InetAddress localAddress) {
this.localAddress = localAddress;
}

public void setRequestExpirationMillis(final long requestExpirationMillis) {
if (requestExpirationMillis < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
*/
package org.apache.nifi.controller;

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.Resource;
Expand All @@ -36,22 +52,6 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static java.util.Objects.requireNonNull;

public abstract class AbstractPort implements Port {

public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package org.apache.nifi.groups;

import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;

import java.net.InetAddress;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable {

@Override
String getIdentifier();

String getTargetUri();
Expand Down Expand Up @@ -154,6 +158,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
*/
String getAuthorizationIssue();

/**
* Validates the current configuration, returning ValidationResults for any
* invalid configuration parameter.
*
* @return Collection of validation result objects for any invalid findings
* only. If the collection is empty then the component is valid. Guaranteed
* non-null
*/
Collection<ValidationResult> validate();

/**
* @return the {@link EventReporter} that can be used to report any notable
* events
Expand All @@ -180,6 +194,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable

void setProxyPassword(String proxyPassword);

void setNetworkInterface(String interfaceName);

String getNetworkInterface();

/**
* Returns the InetAddress that the will this instance will bind to when communicating with a
* remote NiFi instance, or <code>null</code> if no specific address has been specified
*/
InetAddress getLocalAddress();

/**
* Initiates a task in the remote process group to re-initialize, as a
* result of clustering changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public RemoteGroupPort(String id, String name, ProcessGroup processGroup, Connec

public abstract TransferDirection getTransferDirection();

@Override
public abstract boolean isUseCompression();

public abstract void setUseCompression(boolean useCompression);
Expand Down
Loading

0 comments on commit 9e68f02

Please sign in to comment.