Skip to content

Commit

Permalink
OAK-6087: Avoid reads from MongoDB primary
Browse files Browse the repository at this point in the history
Automatically use client sessions when running on MongoDB 3.6, otherwise fall back to the previous behaviour.
The feature can be disabled with a system property: -Doak.mongo.clientSession=false
Travis runs an additional build for oak-store-document on a MongoDB replica-set and a readPreference of secondaryPreferred

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1832110 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
mreutegg committed May 23, 2018
1 parent 49c02a9 commit 8acac8b
Show file tree
Hide file tree
Showing 31 changed files with 1,018 additions and 193 deletions.
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ env:
- MODULE=oak-jcr PROFILE="-PintegrationTesting" UT="-Dsurefire.skip.ut=true" MONGODB_MODE="--single"
- MODULE=oak-jcr PROFILE="" UT="" MONGODB_MODE="--single"
- MODULE=oak-store-document PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-store-document PROFILE="-PintegrationTesting,replicaset" UT="" MONGODB_MODE="--replicaset"
- MODULE=oak-lucene PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-run PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-upgrade PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
- MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single"
install:
- wget -N http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-${MONGODB}.tgz -P $HOME/.mongodb
- tar --skip-old-files -C $HOME/.mongodb -xf $HOME/.mongodb/mongodb-linux-x86_64-${MONGODB}.tgz
Expand Down
9 changes: 9 additions & 0 deletions oak-store-document/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@
</plugins>
</build>

<profiles>
<profile>
<id>replicaset</id>
<properties>
<mongo.url>mongodb://localhost:27017,localhost:27018,localhost:27019/MongoMKDB?readPreference=secondaryPreferred</mongo.url>
</properties>
</profile>
</profiles>

<dependencies>
<!-- Optional OSGi dependencies, used only when running within OSGi -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.mongo;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import com.mongodb.event.ServerHeartbeatFailedEvent;
import com.mongodb.event.ServerHeartbeatStartedEvent;
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerMonitorListener;

/**
* A composite {@link ServerMonitorListener}.
*/
class CompositeServerMonitorListener implements ServerMonitorListener {

private final List<ServerMonitorListener> listeners = new CopyOnWriteArrayList<>();

void addListener(ServerMonitorListener listener) {
listeners.add(listener);
}

void removeListener(ServerMonitorListener listener) {
listeners.remove(listener);
}

@Override
public void serverHearbeatStarted(ServerHeartbeatStartedEvent event) {
listeners.forEach(l -> l.serverHearbeatStarted(event));
}

@Override
public void serverHeartbeatSucceeded(ServerHeartbeatSucceededEvent event) {
listeners.forEach(l -> l.serverHeartbeatSucceeded(event));
}

@Override
public void serverHeartbeatFailed(ServerHeartbeatFailedEvent event) {
listeners.forEach(l -> l.serverHeartbeatFailed(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,25 @@
import org.bson.conversions.Bson;

import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;

public class MongoBlobReferenceIterator extends BlobReferenceIterator {

private final MongoDocumentStore documentStore;

public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, MongoDocumentStore documentStore) {
public MongoBlobReferenceIterator(DocumentNodeStore nodeStore,
MongoDocumentStore documentStore) {
super(nodeStore);
this.documentStore = documentStore;
}

@Override
public Iterator<NodeDocument> getIteratorOverDocsWithBinaries() {
Bson query = Filters.eq(NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL);
// TODO It currently prefers secondary. Would that be Ok?
// TODO It currently uses the configured read preference. Would that be Ok?
MongoCursor<BasicDBObject> cursor = documentStore.getDBCollection(NODES)
.withReadPreference(documentStore.getConfiguredReadPreference(NODES))
.find(query).iterator();

return CloseableIterator.wrap(transform(cursor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.mongodb.ReadPreference;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import org.apache.jackrabbit.oak.commons.StringUtils;
Expand All @@ -42,7 +43,6 @@
import com.mongodb.client.model.Filters;

import static com.mongodb.ReadPreference.primary;
import static com.mongodb.ReadPreference.secondaryPreferred;
import static java.util.stream.StreamSupport.stream;
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
Expand All @@ -68,6 +68,7 @@ public class MongoBlobStore extends CachingBlobStore {
fromCodecs(new MongoBlobCodec())
);

private final ReadPreference defaultReadPreference;
private final MongoCollection<MongoBlob> blobCollection;
private long minLastModified;

Expand All @@ -86,6 +87,7 @@ public MongoBlobStore(MongoDatabase db, long cacheSize) {
// space allocated for a record to the next power of two
// (there is an overhead per record, let's assume it is 1 KB at most)
setBlockSize(2 * 1024 * 1024 - 1024);
defaultReadPreference = db.getReadPreference();
blobCollection = initBlobCollection(db);
}

Expand Down Expand Up @@ -184,8 +186,12 @@ private MongoCollection<MongoBlob> initBlobCollection(MongoDatabase db) {
.noneMatch(COLLECTION_BLOBS::equals)) {
db.createCollection(COLLECTION_BLOBS);
}
// override the read preference configured with the MongoDB URI
// and use the primary as default. Reading a blob will still
// try a secondary first and then fallback to the primary.
return db.getCollection(COLLECTION_BLOBS, MongoBlob.class)
.withCodecRegistry(CODEC_REGISTRY);
.withCodecRegistry(CODEC_REGISTRY)
.withReadPreference(primary());
}

private MongoCollection<MongoBlob> getBlobCollection() {
Expand All @@ -196,10 +202,9 @@ private MongoBlob getBlob(String id, long lastMod) {
Bson query = getBlobQuery(id, lastMod);
Bson fields = new BasicDBObject(MongoBlob.KEY_DATA, 1);

// try the secondary first
// TODO add a configuration option for whether to try reading from secondary
// try with default read preference first, may be from secondary
List<MongoBlob> result = new ArrayList<>(1);
getBlobCollection().withReadPreference(secondaryPreferred()).find(query)
getBlobCollection().withReadPreference(defaultReadPreference).find(query)
.projection(fields).into(result);
if (result.isEmpty()) {
// not found in the secondary: try the primary
Expand Down Expand Up @@ -244,15 +249,14 @@ public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) t
@Override
public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
Bson fields = new BasicDBObject(MongoBlob.KEY_ID, 1);
Bson hint = new BasicDBObject("$hint", fields);

Bson query = new Document();
if (maxLastModifiedTime != 0 && maxLastModifiedTime != -1) {
query = Filters.lte(MongoBlob.KEY_LAST_MOD, maxLastModifiedTime);
}

final MongoCursor<MongoBlob> cur = getBlobCollection().find(query)
.projection(fields).modifiers(hint).iterator();
.projection(fields).hint(fields).iterator();

//TODO The cursor needs to be closed
return new AbstractIterator<String>() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.document.mongo;

import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -56,6 +55,7 @@ public abstract class MongoDocumentNodeStoreBuilderBase<T extends MongoDocumentN
private boolean socketKeepAlive = true;
private MongoStatus mongoStatus;
private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6);
private boolean clientSessionDisabled = false;

/**
* Uses the given information to connect to to MongoDB as backend
Expand All @@ -70,21 +70,19 @@ public abstract class MongoDocumentNodeStoreBuilderBase<T extends MongoDocumentN
* any database name given in the {@code uri}.
* @param blobCacheSizeMB the blob cache size in MB.
* @return this
* @throws UnknownHostException if one of the hosts given in the URI
* is unknown.
*/
public T setMongoDB(@Nonnull String uri,
@Nonnull String name,
int blobCacheSizeMB)
throws UnknownHostException {
int blobCacheSizeMB) {
this.mongoUri = uri;

MongoClusterListener listener = new MongoClusterListener();
CompositeServerMonitorListener serverMonitorListener = new CompositeServerMonitorListener();
MongoClientOptions.Builder options = MongoConnection.getDefaultBuilder();
options.addClusterListener(listener);
options.addServerMonitorListener(serverMonitorListener);
options.socketKeepAlive(socketKeepAlive);
MongoClient client = new MongoClient(new MongoClientURI(uri, options));
MongoStatus status = new MongoStatus(client, name, listener);
MongoStatus status = new MongoStatus(client, name);
serverMonitorListener.addListener(status);
MongoDatabase db = client.getDatabase(name);
if (!MongoConnection.hasWriteConcern(uri)) {
db = db.withWriteConcern(MongoConnection.getDefaultWriteConcern(client));
Expand Down Expand Up @@ -167,6 +165,26 @@ public boolean isSocketKeepAlive() {
return socketKeepAlive;
}

/**
* Disables the use of a client session available with MongoDB 3.6 and
* newer. By default the MongoDocumentStore will use a client session if
* available. That is, when connected to MongoDB 3.6 and newer.
*
* @param b whether to disable the use of a client session.
* @return this
*/
public T setClientSessionDisabled(boolean b) {
this.clientSessionDisabled = b;
return thisBuilder();
}

/**
* @return whether the use of a client session is disabled.
*/
boolean isClientSessionDisabled() {
return clientSessionDisabled;
}

public T setMaxReplicationLag(long duration, TimeUnit unit){
maxReplicationLagMillis = unit.toMillis(duration);
return thisBuilder();
Expand Down
Loading

0 comments on commit 8acac8b

Please sign in to comment.