Skip to content

Commit

Permalink
NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environ…
Browse files Browse the repository at this point in the history
…ment (apache#4341)

* NIFI-7540: Fix TestListenSMTP and TestListFile on macOS build environment

This also fixes NIFI-4760.

* NIFI-7540: Remove duplicate mail.smtp.starttls.enable from TestListenSMTP

Signed-off-by: Andy LoPresto <[email protected]>
  • Loading branch information
jfrazee authored Jun 17, 2020
1 parent 8b1a23a commit c18b27a
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package org.apache.nifi.remote.io.socket;

import java.io.IOException;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
*
*/
public class NetworkUtils {

/**
Expand All @@ -43,4 +44,34 @@ public final static int availablePort() {
}
}
}

public final static boolean isListening(final String hostname, final int port) {
try (final Socket s = new Socket(hostname, port)) {
return s.isConnected();
} catch (final Exception ignore) {}
return false;
}

public final static boolean isListening(final String hostname, final int port, final int timeoutMillis) {
Boolean result = false;

final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
result = executor.submit(() -> {
while(!isListening(hostname, port)) {
try {
Thread.sleep(100);
} catch (final Exception ignore) {}
}
return true;
}).get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (final Exception ignore) {} finally {
try {
executor.shutdown();
} catch (final Exception ignore) {}
}

return (result != null && result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.5.6</version>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
Expand Down Expand Up @@ -125,4 +125,3 @@
</dependency>
</dependencies>
</project>

Original file line number Diff line number Diff line change
Expand Up @@ -18,90 +18,75 @@

import static org.junit.Assert.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.SimpleEmail;
import java.util.Properties;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;

import org.junit.Test;

public class TestListenSMTP {

private ScheduledExecutorService executor;

@Before
public void before() {
this.executor = Executors.newScheduledThreadPool(2);
}

@After
public void after() {
this.executor.shutdown();
}

@Test
public void validateSuccessfulInteraction() throws Exception {
int port = NetworkUtils.availablePort();
public void testListenSMTP() throws Exception {
final ListenSMTP processor = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(processor);

TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
final int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");

runner.assertValid();
runner.run(5, false);
runner.run(1, false);

assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));

final Properties config = new Properties();
config.put("mail.smtp.host", "localhost");
config.put("mail.smtp.port", String.valueOf(port));
config.put("mail.smtp.connectiontimeout", "5000");
config.put("mail.smtp.timeout", "5000");
config.put("mail.smtp.writetimeout", "5000");

final Session session = Session.getInstance(config);
session.setDebug(true);

final int numMessages = 5;
CountDownLatch latch = new CountDownLatch(numMessages);

this.executor.schedule(() -> {
for (int i = 0; i < numMessages; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("[email protected]");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("[email protected]");
email.send();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, 1500, TimeUnit.MILLISECONDS);

boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
for (int i = 0; i < numMessages; i++) {
final Message email = new MimeMessage(session);
email.setFrom(new InternetAddress("[email protected]"));
email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("[email protected]"));
email.setSubject("This is a test");
email.setText("MSG-" + i);
Transport.send(email);
}

runner.shutdown();
assertTrue(complete);
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
}

@Test
public void validateSuccessfulInteractionWithTls() throws Exception {
// TODO: Setting system properties without cleaning/restoring at the end of a test is an anti-pattern and can have side effects
System.setProperty("mail.smtp.ssl.trust", "*");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystore.jks");
System.setProperty("javax.net.ssl.keyStorePassword", "passwordpassword");
int port = NetworkUtils.availablePort();

TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
public void testListenSMTPwithTLS() throws Exception {
final ListenSMTP processor = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(processor);

final int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");

// Setup the SSL Context
SSLContextService sslContextService = new StandardRestrictedSSLContextService();
final SSLContextService sslContextService = new StandardRestrictedSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/truststore.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "passwordpassword");
Expand All @@ -116,80 +101,78 @@ public void validateSuccessfulInteractionWithTls() throws Exception {
runner.setProperty(ListenSMTP.CLIENT_AUTH, SslContextFactory.ClientAuth.NONE.name());
runner.assertValid();

int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
runner.run(messageCount, false);

this.executor.schedule(() -> {
for (int i = 0; i < messageCount; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("[email protected]");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("[email protected]");

// Enable STARTTLS but ignore the cert
email.setStartTLSEnabled(true);
email.setStartTLSRequired(true);
email.setSSLCheckServerIdentity(false);
email.send();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, 1500, TimeUnit.MILLISECONDS);

boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.run(1, false);

assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));

final Properties config = new Properties();
config.put("mail.smtp.host", "localhost");
config.put("mail.smtp.port", String.valueOf(port));
config.put("mail.smtp.auth", "false");
config.put("mail.smtp.starttls.enable", "true");
config.put("mail.smtp.starttls.required", "true");
config.put("mail.smtp.ssl.trust", "*");
config.put("mail.smtp.connectiontimeout", "5000");
config.put("mail.smtp.timeout", "5000");
config.put("mail.smtp.writetimeout", "5000");

final Session session = Session.getInstance(config);
session.setDebug(true);

final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
final Message email = new MimeMessage(session);
email.setFrom(new InternetAddress("[email protected]"));
email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("[email protected]"));
email.setSubject("This is a test");
email.setText("MSG-" + i);
Transport.send(email);
}

runner.shutdown();
assertTrue(complete);
runner.assertAllFlowFilesTransferred("success", messageCount);
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
}

@Test
public void validateTooLargeMessage() throws Exception {
int port = NetworkUtils.availablePort();
@Test(expected = MessagingException.class)
public void testListenSMTPwithTooLargeMessage() throws Exception {
final ListenSMTP processor = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(processor);

TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
final int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B");

runner.assertValid();
runner.run(1, false);

assertTrue(String.format("expected server listening on %s:%d", "localhost", port), NetworkUtils.isListening("localhost", port, 5000));

final Properties config = new Properties();
config.put("mail.smtp.host", "localhost");
config.put("mail.smtp.port", String.valueOf(port));
config.put("mail.smtp.connectiontimeout", "5000");
config.put("mail.smtp.timeout", "5000");
config.put("mail.smtp.writetimeout", "5000");

final Session session = Session.getInstance(config);
session.setDebug(true);

MessagingException messagingException = null;
try {
final Message email = new MimeMessage(session);
email.setFrom(new InternetAddress("[email protected]"));
email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("[email protected]"));
email.setSubject("This is a test");
email.setText("MSG-0");
Transport.send(email);
} catch (final MessagingException e) {
messagingException = e;
}

int messageCount = 1;
CountDownLatch latch = new CountDownLatch(messageCount);

runner.run(messageCount, false);

this.executor.schedule(() -> {
for (int i = 0; i < messageCount; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("[email protected]");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("[email protected]");
email.send();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, 1000, TimeUnit.MILLISECONDS);

boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.shutdown();
assertTrue(complete);
runner.assertAllFlowFilesTransferred("success", 0);
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, 0);

if (messagingException != null) throw messagingException;
}

}
Loading

0 comments on commit c18b27a

Please sign in to comment.