Skip to content

Commit

Permalink
KUDU-2387: retry exportAuthenticationCredentials and getHiveMetastore…
Browse files Browse the repository at this point in the history
…Config

These client calls were not retried when a leader master could not be found.

The fix cribs from RetryRpcErrback and delayedSendRpcToTablet. It's not
possible to reuse those methods directly, as the RPCs involved in these
calls are squirreled away in ConnectToCluster.

Change-Id: Ia534efe776c4b10f45c961a3b279e729dc015ea3
Reviewed-on: http://gerrit.cloudera.org:8080/12099
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <[email protected]>
  • Loading branch information
adembo committed Dec 20, 2018
1 parent b3b008b commit ae98cca
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -809,26 +809,51 @@ public Deferred<KuduTable> openTable(String name) {
*/
@InterfaceStability.Unstable
public Deferred<byte[]> exportAuthenticationCredentials() {
// This is basically just a hacky way to encapsulate the necessary bits to
// properly do exponential backoff on retry; there's no actual "RPC" to send.
KuduRpc<byte[]> fakeRpc = buildFakeRpc("exportAuthenticationCredentials", null);

// Store the Deferred locally; callback() or errback() on the RPC will
// reset it and we'd return a different, non-triggered Deferred.
Deferred<byte[]> fakeRpcD = fakeRpc.getDeferred();
doExportAuthenticationCredentials(fakeRpc);
return fakeRpcD;
}

private void doExportAuthenticationCredentials(
final KuduRpc<byte[]> fakeRpc) {
// If we've already connected to the master, use the authentication
// credentials that we received when we connected.
if (hasConnectedToMaster) {
return Deferred.fromResult(
fakeRpc.callback(
securityContext.exportAuthenticationCredentials());
return;
}

// We have no authn data -- connect to the master, which will fetch
// new info.
return getMasterTableLocationsPB(null)
fakeRpc.attempt++;
getMasterTableLocationsPB(null)
.addCallback(new MasterLookupCB(masterTable,
/* partitionKey */ null,
/* requestedBatchSize */ 1))
.addCallback(new Callback<byte[], Object>() {
.addCallback(new Callback<Void, Object>() {
@Override
public byte[] call(Object ignored) {
// Connecting to the cluster should have also fetched the
// authn data.
return securityContext.exportAuthenticationCredentials();
public Void call(Object ignored) {
// Just call ourselves again; we're guaranteed to have the
// authentication credentials.
assert hasConnectedToMaster;
doExportAuthenticationCredentials(fakeRpc);
return null;
}
});
})
.addErrback(new RetryTaskErrback<byte[]>(
fakeRpc, new TimerTask() {
@Override
public void run(final Timeout ignored) {
doExportAuthenticationCredentials(fakeRpc);
}
}));
}

/**
Expand All @@ -838,26 +863,100 @@ public byte[] call(Object ignored) {
@InterfaceAudience.LimitedPrivate("Impala")
@InterfaceStability.Unstable
public Deferred<HiveMetastoreConfig> getHiveMetastoreConfig() {
// This is basically just a hacky way to encapsulate the necessary bits to
// properly do exponential backoff on retry; there's no actual "RPC" to send.
KuduRpc<HiveMetastoreConfig> fakeRpc = buildFakeRpc("getHiveMetastoreConfig", null);

// Store the Deferred locally; callback() or errback() on the RPC will
// reset it and we'd return a different, non-triggered Deferred.
Deferred<HiveMetastoreConfig> fakeRpcD = fakeRpc.getDeferred();
doGetHiveMetastoreConfig(fakeRpc);
return fakeRpcD;
}

private void doGetHiveMetastoreConfig(final KuduRpc<HiveMetastoreConfig> fakeRpc) {
// If we've already connected to the master, use the config we received when we connected.
if (hasConnectedToMaster) {
// Take a ref to the HMS config under the lock, but invoke the callback
// chain with the lock released.
HiveMetastoreConfig c;
synchronized (this) {
return Deferred.fromResult(hiveMetastoreConfig);
c = hiveMetastoreConfig;
}
fakeRpc.callback(c);
return;
}

// We have no Metastore config -- connect to the master, which will fetch new info.
return getMasterTableLocationsPB(null)
fakeRpc.attempt++;
getMasterTableLocationsPB(null)
.addCallback(new MasterLookupCB(masterTable,
/* partitionKey */ null,
/* requestedBatchSize */ 1))
.addCallback(new Callback<HiveMetastoreConfig, Object>() {
.addCallback(new Callback<Void, Object>() {
@Override
public HiveMetastoreConfig call(Object ignored) {
// Connecting to the cluster should have also fetched the metastore config.
synchronized (AsyncKuduClient.this) {
return hiveMetastoreConfig;
}
public Void call(Object ignored) {
// Just call ourselves again; we're guaranteed to have the HMS config.
assert hasConnectedToMaster;
doGetHiveMetastoreConfig(fakeRpc);
return null;
}
});
})
.addErrback(new RetryTaskErrback<HiveMetastoreConfig>(
fakeRpc, new TimerTask() {
@Override
public void run(final Timeout ignored) {
doGetHiveMetastoreConfig(fakeRpc);
}
}));
}

/**
* Errback for retrying a generic TimerTask. Retries RecoverableExceptions;
* signals fakeRpc's Deferred on a fatal error.
*/
class RetryTaskErrback<R> implements Callback<Void, Exception> {
private final KuduRpc<R> fakeRpc;
private final TimerTask retryTask;

public RetryTaskErrback(KuduRpc<R> fakeRpc,
TimerTask retryTask) {
this.fakeRpc = fakeRpc;
this.retryTask = retryTask;
}

@Override
public Void call(Exception arg) {
if (!(arg instanceof RecoverableException)) {
fakeRpc.errback(arg);
return null;
}

// Sleep and retry the entire operation.
RecoverableException ex = (RecoverableException)arg;
long sleepTime = getSleepTimeForRpcMillis(fakeRpc);
if (cannotRetryRequest(fakeRpc) ||
fakeRpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
tooManyAttemptsOrTimeout(fakeRpc, ex); // invokes fakeRpc.Deferred
return null;
}
fakeRpc.addTrace(
new RpcTraceFrame.RpcTraceFrameBuilder(
fakeRpc.method(),
RpcTraceFrame.Action.SLEEP_THEN_RETRY)
.callStatus(ex.getStatus())
.build());
newTimeout(retryTask, sleepTime);
return null;

// fakeRpc.Deferred was not invoked; the user continues to wait until
// retryTask succeeds or fails with a fatal error.
}

@Override
public String toString() {
return "retry task after error";
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static org.junit.Assert.fail;

import java.io.Closeable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -853,7 +855,7 @@ public void testNoLogSpewOnConnectionRefused() throws Exception {
// Force the client to connect to the masters.
localClient.exportAuthenticationCredentials();
fail("Should have failed to connect.");
} catch (NoLeaderFoundException e) {
} catch (NonRecoverableException e) {
assertTrue("Bad exception string: " + e.getMessage(),
e.getMessage().matches(".*Master config .+ has no leader. " +
"Exceptions received:.*Connection refused.*Connection refused" +
Expand Down Expand Up @@ -1111,4 +1113,44 @@ public Void call() throws Exception {
future.get();
}
}

private void runTestCallDuringLeaderElection(String clientMethodName) throws Exception {
// This bit of reflection helps us avoid duplicating test code.
Method methodToInvoke = KuduClient.class.getMethod(clientMethodName);

for (int i = 0; i < 5; i++) {
KuduClient cl = new KuduClient.KuduClientBuilder(
harness.getMasterAddressesAsString()).build();
harness.restartLeaderMaster();

// There's a good chance that this executes while there's no leader
// master. It should retry until the leader election completes and a new
// leader master is elected.
methodToInvoke.invoke(cl);
}

// With all masters down, exportAuthenticationCredentials() should time out.
harness.killAllMasterServers();
KuduClient cl = new KuduClient.KuduClientBuilder(
harness.getMasterAddressesAsString())
.defaultAdminOperationTimeoutMs(5000) // speed up the test
.build();
try {
methodToInvoke.invoke(cl);
} catch (InvocationTargetException ex) {
assertTrue(ex.getTargetException() instanceof KuduException);
KuduException realEx = (KuduException)ex.getTargetException();
assertTrue(realEx.getStatus().isTimedOut());
}
}

@Test(timeout = 100000)
public void testExportAuthenticationCredentialsDuringLeaderElection() throws Exception {
runTestCallDuringLeaderElection("exportAuthenticationCredentials");
}

@Test(timeout = 100000)
public void testGetHiveMetastoreConfigDuringLeaderElection() throws Exception {
runTestCallDuringLeaderElection("getHiveMetastoreConfig");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ private void startCluster(Set<Option> opts) throws IOException {
.build();
miniCluster.kinit("test-admin");
client = new KuduClient.KuduClientBuilder(miniCluster.getMasterAddressesAsString()).build();

// TODO(todd): it seems that exportAuthenticationCredentials() doesn't properly retry
// in the case that there is no leader, even though NoLeaderFoundException is a RecoverableException.
// So, we have to use a hack of calling listTabletServers, which _does_ properly retry,
// in order to wait for the masters to elect a leader.
client.listTabletServers();
}

// Add a rule to rerun tests. We use this with Gradle because it doesn't support
Expand Down

0 comments on commit ae98cca

Please sign in to comment.