From ae98ccacc24da9a9b023c902d05f2cbd2543a98b Mon Sep 17 00:00:00 2001 From: Adar Dembo Date: Sat, 15 Dec 2018 17:29:48 -0800 Subject: [PATCH] KUDU-2387: retry exportAuthenticationCredentials and getHiveMetastoreConfig These client calls were not retried when a leader master could not be found. The fix cribs from RetryRpcErrback and delayedSendRpcToTablet. It's not possible to reuse those methods directly, as the RPCs involved in these calls are squirreled away in ConnectToCluster. Change-Id: Ia534efe776c4b10f45c961a3b279e729dc015ea3 Reviewed-on: http://gerrit.cloudera.org:8080/12099 Reviewed-by: Alexey Serbin Tested-by: Kudu Jenkins Reviewed-by: Grant Henke --- .../apache/kudu/client/AsyncKuduClient.java | 133 +++++++++++++++--- .../apache/kudu/client/TestKuduClient.java | 44 +++++- .../org/apache/kudu/client/TestSecurity.java | 6 - 3 files changed, 159 insertions(+), 24 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index b0ed00046b..2807ef5ac2 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -809,26 +809,51 @@ public Deferred openTable(String name) { */ @InterfaceStability.Unstable public Deferred exportAuthenticationCredentials() { + // This is basically just a hacky way to encapsulate the necessary bits to + // properly do exponential backoff on retry; there's no actual "RPC" to send. + KuduRpc fakeRpc = buildFakeRpc("exportAuthenticationCredentials", null); + + // Store the Deferred locally; callback() or errback() on the RPC will + // reset it and we'd return a different, non-triggered Deferred. + Deferred fakeRpcD = fakeRpc.getDeferred(); + doExportAuthenticationCredentials(fakeRpc); + return fakeRpcD; + } + + private void doExportAuthenticationCredentials( + final KuduRpc fakeRpc) { // If we've already connected to the master, use the authentication // credentials that we received when we connected. if (hasConnectedToMaster) { - return Deferred.fromResult( + fakeRpc.callback( securityContext.exportAuthenticationCredentials()); + return; } + // We have no authn data -- connect to the master, which will fetch // new info. - return getMasterTableLocationsPB(null) + fakeRpc.attempt++; + getMasterTableLocationsPB(null) .addCallback(new MasterLookupCB(masterTable, /* partitionKey */ null, /* requestedBatchSize */ 1)) - .addCallback(new Callback() { + .addCallback(new Callback() { @Override - public byte[] call(Object ignored) { - // Connecting to the cluster should have also fetched the - // authn data. - return securityContext.exportAuthenticationCredentials(); + public Void call(Object ignored) { + // Just call ourselves again; we're guaranteed to have the + // authentication credentials. + assert hasConnectedToMaster; + doExportAuthenticationCredentials(fakeRpc); + return null; } - }); + }) + .addErrback(new RetryTaskErrback( + fakeRpc, new TimerTask() { + @Override + public void run(final Timeout ignored) { + doExportAuthenticationCredentials(fakeRpc); + } + })); } /** @@ -838,26 +863,100 @@ public byte[] call(Object ignored) { @InterfaceAudience.LimitedPrivate("Impala") @InterfaceStability.Unstable public Deferred getHiveMetastoreConfig() { + // This is basically just a hacky way to encapsulate the necessary bits to + // properly do exponential backoff on retry; there's no actual "RPC" to send. + KuduRpc fakeRpc = buildFakeRpc("getHiveMetastoreConfig", null); + + // Store the Deferred locally; callback() or errback() on the RPC will + // reset it and we'd return a different, non-triggered Deferred. + Deferred fakeRpcD = fakeRpc.getDeferred(); + doGetHiveMetastoreConfig(fakeRpc); + return fakeRpcD; + } + + private void doGetHiveMetastoreConfig(final KuduRpc fakeRpc) { // If we've already connected to the master, use the config we received when we connected. if (hasConnectedToMaster) { + // Take a ref to the HMS config under the lock, but invoke the callback + // chain with the lock released. + HiveMetastoreConfig c; synchronized (this) { - return Deferred.fromResult(hiveMetastoreConfig); + c = hiveMetastoreConfig; } + fakeRpc.callback(c); + return; } + // We have no Metastore config -- connect to the master, which will fetch new info. - return getMasterTableLocationsPB(null) + fakeRpc.attempt++; + getMasterTableLocationsPB(null) .addCallback(new MasterLookupCB(masterTable, /* partitionKey */ null, /* requestedBatchSize */ 1)) - .addCallback(new Callback() { + .addCallback(new Callback() { @Override - public HiveMetastoreConfig call(Object ignored) { - // Connecting to the cluster should have also fetched the metastore config. - synchronized (AsyncKuduClient.this) { - return hiveMetastoreConfig; - } + public Void call(Object ignored) { + // Just call ourselves again; we're guaranteed to have the HMS config. + assert hasConnectedToMaster; + doGetHiveMetastoreConfig(fakeRpc); + return null; } - }); + }) + .addErrback(new RetryTaskErrback( + fakeRpc, new TimerTask() { + @Override + public void run(final Timeout ignored) { + doGetHiveMetastoreConfig(fakeRpc); + } + })); + } + + /** + * Errback for retrying a generic TimerTask. Retries RecoverableExceptions; + * signals fakeRpc's Deferred on a fatal error. + */ + class RetryTaskErrback implements Callback { + private final KuduRpc fakeRpc; + private final TimerTask retryTask; + + public RetryTaskErrback(KuduRpc fakeRpc, + TimerTask retryTask) { + this.fakeRpc = fakeRpc; + this.retryTask = retryTask; + } + + @Override + public Void call(Exception arg) { + if (!(arg instanceof RecoverableException)) { + fakeRpc.errback(arg); + return null; + } + + // Sleep and retry the entire operation. + RecoverableException ex = (RecoverableException)arg; + long sleepTime = getSleepTimeForRpcMillis(fakeRpc); + if (cannotRetryRequest(fakeRpc) || + fakeRpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) { + tooManyAttemptsOrTimeout(fakeRpc, ex); // invokes fakeRpc.Deferred + return null; + } + fakeRpc.addTrace( + new RpcTraceFrame.RpcTraceFrameBuilder( + fakeRpc.method(), + RpcTraceFrame.Action.SLEEP_THEN_RETRY) + .callStatus(ex.getStatus()) + .build()); + newTimeout(retryTask, sleepTime); + return null; + + // fakeRpc.Deferred was not invoked; the user continues to wait until + // retryTask succeeds or fails with a fatal error. + } + + @Override + public String toString() { + return "retry task after error"; + } } /** diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 018d1e6900..4901738156 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -41,6 +41,8 @@ import static org.junit.Assert.fail; import java.io.Closeable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; @@ -853,7 +855,7 @@ public void testNoLogSpewOnConnectionRefused() throws Exception { // Force the client to connect to the masters. localClient.exportAuthenticationCredentials(); fail("Should have failed to connect."); - } catch (NoLeaderFoundException e) { + } catch (NonRecoverableException e) { assertTrue("Bad exception string: " + e.getMessage(), e.getMessage().matches(".*Master config .+ has no leader. " + "Exceptions received:.*Connection refused.*Connection refused" + @@ -1111,4 +1113,44 @@ public Void call() throws Exception { future.get(); } } + + private void runTestCallDuringLeaderElection(String clientMethodName) throws Exception { + // This bit of reflection helps us avoid duplicating test code. + Method methodToInvoke = KuduClient.class.getMethod(clientMethodName); + + for (int i = 0; i < 5; i++) { + KuduClient cl = new KuduClient.KuduClientBuilder( + harness.getMasterAddressesAsString()).build(); + harness.restartLeaderMaster(); + + // There's a good chance that this executes while there's no leader + // master. It should retry until the leader election completes and a new + // leader master is elected. + methodToInvoke.invoke(cl); + } + + // With all masters down, exportAuthenticationCredentials() should time out. + harness.killAllMasterServers(); + KuduClient cl = new KuduClient.KuduClientBuilder( + harness.getMasterAddressesAsString()) + .defaultAdminOperationTimeoutMs(5000) // speed up the test + .build(); + try { + methodToInvoke.invoke(cl); + } catch (InvocationTargetException ex) { + assertTrue(ex.getTargetException() instanceof KuduException); + KuduException realEx = (KuduException)ex.getTargetException(); + assertTrue(realEx.getStatus().isTimedOut()); + } + } + + @Test(timeout = 100000) + public void testExportAuthenticationCredentialsDuringLeaderElection() throws Exception { + runTestCallDuringLeaderElection("exportAuthenticationCredentials"); + } + + @Test(timeout = 100000) + public void testGetHiveMetastoreConfigDuringLeaderElection() throws Exception { + runTestCallDuringLeaderElection("getHiveMetastoreConfig"); + } } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java index 66f9e71533..197e57c95e 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java @@ -81,12 +81,6 @@ private void startCluster(Set