Skip to content

Commit

Permalink
Backed out changeset 96dab44be3ba (bug 1351673) for Android build bus…
Browse files Browse the repository at this point in the history
…tage

CLOSED TREE

MozReview-Commit-ID: 4XV7Be9AXmB
  • Loading branch information
philor committed Sep 21, 2017
1 parent 4e22312 commit 2bb722e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package org.mozilla.gecko.sync;

import org.mozilla.gecko.background.common.log.Logger;

/**
* A little class to allow us to maintain a count of extant
* things (in our case, callbacks that need to fire), and
* some work that we want done when that count hits 0.
*
* @author rnewman
*
*/
public class DelayedWorkTracker {
private static final String LOG_TAG = "DelayedWorkTracker";
protected Runnable workItem = null;
protected int outstandingCount = 0;

public int incrementOutstanding() {
Logger.trace(LOG_TAG, "Incrementing outstanding.");
synchronized(this) {
return ++outstandingCount;
}
}
public int decrementOutstanding() {
Logger.trace(LOG_TAG, "Decrementing outstanding.");
Runnable job = null;
int count;
synchronized(this) {
if ((count = --outstandingCount) == 0 &&
workItem != null) {
job = workItem;
workItem = null;
} else {
return count;
}
}
job.run();
// In case it's changed.
return getOutstandingOperations();
}
public int getOutstandingOperations() {
synchronized(this) {
return outstandingCount;
}
}
public void delayWorkItem(Runnable item) {
Logger.trace(LOG_TAG, "delayWorkItem.");
boolean runnableNow = false;
synchronized(this) {
Logger.trace(LOG_TAG, "outstandingCount: " + outstandingCount);
if (outstandingCount == 0) {
runnableNow = true;
} else {
if (workItem != null) {
throw new IllegalStateException("Work item already set!");
}
workItem = item;
}
}
if (runnableNow) {
Logger.trace(LOG_TAG, "Running item now.");
item.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.DelayedWorkTracker;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
Expand All @@ -28,8 +29,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -59,6 +58,7 @@ public class BatchingDownloader {
private static final String DEFAULT_SORT_ORDER = "index";

private final RepositorySession repositorySession;
private final DelayedWorkTracker workTracker = new DelayedWorkTracker();
private final Uri baseCollectionUri;
private final long fetchDeadline;
private final boolean allowMultipleBatches;
Expand All @@ -73,8 +73,6 @@ public class BatchingDownloader {
protected final Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
/* @GuardedBy("this") */ private String lastModified;

private final ExecutorService taskQueue = Executors.newSingleThreadExecutor();

public BatchingDownloader(
AuthHeaderProvider authHeaderProvider,
Uri baseCollectionUri,
Expand All @@ -93,7 +91,7 @@ public BatchingDownloader(
}

@VisibleForTesting
/* package-private */ static String flattenIDs(String[] guids) {
protected static String flattenIDs(String[] guids) {
// Consider using Utils.toDelimitedString if and when the signature changes
// to Collection<String> guids.
if (guids.length == 0) {
Expand All @@ -112,23 +110,18 @@ public BatchingDownloader(
}

@VisibleForTesting
protected void fetchWithParameters(final long newer,
final long batchLimit,
final boolean full,
final String sort,
final String ids,
final SyncStorageCollectionRequest request,
final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
protected void fetchWithParameters(long newer,
long batchLimit,
boolean full,
String sort,
String ids,
SyncStorageCollectionRequest request,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate)
throws URISyntaxException, UnsupportedEncodingException {
taskQueue.execute(new Runnable() {
@Override
public void run() {
request.delegate = new BatchingDownloaderDelegate(BatchingDownloader.this, fetchRecordsDelegate, request,
newer, batchLimit, full, sort, ids);
pending.add(request);
request.get();
}
});
request.delegate = new BatchingDownloaderDelegate(this, fetchRecordsDelegate, request,
newer, batchLimit, full, sort, ids);
this.pending.add(request);
request.get();
}

@VisibleForTesting
Expand Down Expand Up @@ -222,10 +215,10 @@ public void onFetchCompleted(SyncStorageResponse response,
Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
}

taskQueue.execute(new Runnable() {
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "onFetchCompleted running.");
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
fetchRecordsDelegate.onFetchCompleted();
}
});
Expand All @@ -247,10 +240,9 @@ public void run() {

// We need to make another batching request!
// Let the delegate know that a batch fetch just completed before we proceed.
// Beware that while this operation will run after every call to onFetchedRecord returned,
// it's not guaranteed that the 'sink' session actually processed all of the fetched records.
// See Bug https://bugzilla.mozilla.org/show_bug.cgi?id=1351673#c28 for details.
taskQueue.execute(new Runnable() {
// This operation needs to run after every call to onFetchedRecord for this batch has been
// processed, hence the delayWorkItem call.
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onBatchCompleted.");
Expand All @@ -273,10 +265,10 @@ public void run() {
if (!this.stateProvider.commit()) {
Logger.warn(LOG_TAG, "Failed to commit repository state while handling request creation error");
}
taskQueue.execute(new Runnable() {
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "onFetchCompleted running.");
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
fetchRecordsDelegate.onFetchFailed(e);
}
});
Expand Down Expand Up @@ -312,7 +304,7 @@ private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetch
}
}

taskQueue.execute(new Runnable() {
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Running onFetchFailed.");
Expand All @@ -323,6 +315,8 @@ public void run() {

public void onFetchedRecord(CryptoRecord record,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
this.workTracker.incrementOutstanding();

try {
fetchRecordsDelegate.onFetchedRecord(record);
// NB: changes to stateProvider are committed in either onFetchCompleted or handleFetchFailed.
Expand All @@ -332,6 +326,8 @@ public void onFetchedRecord(CryptoRecord record,
} catch (Exception ex) {
Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
throw new RuntimeException(ex);
} finally {
this.workTracker.decrementOutstanding();
}
}

Expand Down Expand Up @@ -366,7 +362,7 @@ private static boolean mayProceedWithBatching(long deadline) {
}

@VisibleForTesting
/* package-private */ static URI buildCollectionURI(Uri baseCollectionUri, boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
public static URI buildCollectionURI(Uri baseCollectionUri, boolean full, long newer, long limit, String sort, String ids, String offset) throws URISyntaxException {
Uri.Builder uriBuilder = baseCollectionUri.buildUpon();

if (full) {
Expand Down

0 comments on commit 2bb722e

Please sign in to comment.