Skip to content

Commit

Permalink
Merge pull request vitessio#6088 from HubSpot/upstream-unsync-conn
Browse files Browse the repository at this point in the history
Remove synchronizing VTGateConnection on itself
  • Loading branch information
harshit-gangal authored Apr 23, 2020
2 parents 61d4cf7 + 5f65ee6 commit 228e6fe
Showing 1 changed file with 56 additions and 60 deletions.
116 changes: 56 additions & 60 deletions java/client/src/main/java/io/vitess/client/VTGateConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,27 @@ public VTGateConnection(RpcClient client) {
*/
public SQLFuture<Cursor> execute(Context ctx, String query, @Nullable Map<String, ?> bindVars,
final VTSession vtSession) throws SQLException {
synchronized (this) {
vtSession.checkCallIsAllowed("execute");
ExecuteRequest.Builder requestBuilder = ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setSession(vtSession.getSession());

if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}

SQLFuture<Cursor> call = new SQLFuture<>(
transformAsync(client.execute(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteResponse response) throws Exception {
vtSession.setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
}, directExecutor()));
vtSession.setLastCall(call);
return call;
vtSession.checkCallIsAllowed("execute");
ExecuteRequest.Builder requestBuilder = ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(checkNotNull(query), bindVars))
.setSession(vtSession.getSession());

if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}

SQLFuture<Cursor> call = new SQLFuture<>(
transformAsync(client.execute(ctx, requestBuilder.build()),
new AsyncFunction<ExecuteResponse, Cursor>() {
@Override
public ListenableFuture<Cursor> apply(ExecuteResponse response) throws Exception {
vtSession.setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.<Cursor>immediateFuture(new SimpleCursor(response.getResult()));
}
}, directExecutor()));
vtSession.setLastCall(call);
return call;
}

/**
Expand Down Expand Up @@ -148,45 +146,43 @@ public SQLFuture<List<CursorWithError>> executeBatch(Context ctx, List<String> q
public SQLFuture<List<CursorWithError>> executeBatch(Context ctx, List<String> queryList,
@Nullable List<Map<String, ?>> bindVarsList, boolean asTransaction, final VTSession vtSession)
throws SQLException {
synchronized (this) {
vtSession.checkCallIsAllowed("executeBatch");
List<Query.BoundQuery> queries = new ArrayList<>();

if (null != bindVarsList && bindVarsList.size() != queryList.size()) {
throw new SQLDataException(
"Size of SQL Query list does not match the bind variables list");
}

for (int i = 0; i < queryList.size(); ++i) {
queries.add(i, Proto.bindQuery(checkNotNull(queryList.get(i)),
bindVarsList == null ? null : bindVarsList.get(i)));
}

Vtgate.ExecuteBatchRequest.Builder requestBuilder =
Vtgate.ExecuteBatchRequest.newBuilder()
.addAllQueries(checkNotNull(queries))
.setSession(vtSession.getSession())
.setAsTransaction(asTransaction);

if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}

SQLFuture<List<CursorWithError>> call = new SQLFuture<>(
transformAsync(client.executeBatch(ctx, requestBuilder.build()),
new AsyncFunction<Vtgate.ExecuteBatchResponse, List<CursorWithError>>() {
@Override
public ListenableFuture<List<CursorWithError>> apply(
Vtgate.ExecuteBatchResponse response) throws Exception {
vtSession.setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.immediateFuture(
Proto.fromQueryResponsesToCursorList(response.getResultsList()));
}
}, directExecutor()));
vtSession.setLastCall(call);
return call;
vtSession.checkCallIsAllowed("executeBatch");
List<Query.BoundQuery> queries = new ArrayList<>();

if (null != bindVarsList && bindVarsList.size() != queryList.size()) {
throw new SQLDataException(
"Size of SQL Query list does not match the bind variables list");
}

for (int i = 0; i < queryList.size(); ++i) {
queries.add(i, Proto.bindQuery(checkNotNull(queryList.get(i)),
bindVarsList == null ? null : bindVarsList.get(i)));
}

Vtgate.ExecuteBatchRequest.Builder requestBuilder =
Vtgate.ExecuteBatchRequest.newBuilder()
.addAllQueries(checkNotNull(queries))
.setSession(vtSession.getSession())
.setAsTransaction(asTransaction);

if (ctx.getCallerId() != null) {
requestBuilder.setCallerId(ctx.getCallerId());
}

SQLFuture<List<CursorWithError>> call = new SQLFuture<>(
transformAsync(client.executeBatch(ctx, requestBuilder.build()),
new AsyncFunction<Vtgate.ExecuteBatchResponse, List<CursorWithError>>() {
@Override
public ListenableFuture<List<CursorWithError>> apply(
Vtgate.ExecuteBatchResponse response) throws Exception {
vtSession.setSession(response.getSession());
Proto.checkError(response.getError());
return Futures.immediateFuture(
Proto.fromQueryResponsesToCursorList(response.getResultsList()));
}
}, directExecutor()));
vtSession.setLastCall(call);
return call;
}

/**
Expand Down

0 comments on commit 228e6fe

Please sign in to comment.