Skip to content

Commit

Permalink
OAK-8985 | oak-search-elastic: use bulk API for indexing (Committing …
Browse files Browse the repository at this point in the history
…patch by Fabrizio Fortino)

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1876170 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Nitin Gupta committed Apr 6, 2020
1 parent 653a833 commit 8cc86c4
Show file tree
Hide file tree
Showing 26 changed files with 464 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider;
import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
Expand Down Expand Up @@ -61,7 +61,7 @@ public Jcr customize(Oak oak) {
.with(new PropertyIndexEditorProvider())
.with(new NodeTypeIndexProvider())
.with(new PropertyFullTextTest.FullTextPropertyInitialiser("elasticText", of("text"),
ElasticsearchIndexConstants.TYPE_ELASTICSEARCH).nodeScope().analyzed());
ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH).nodeScope().analyzed());
return new Jcr(oak);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jcr.Repository;
import java.io.File;
Expand Down Expand Up @@ -82,7 +80,7 @@ public Jcr customize(Oak oak) {
.with((new ElasticGlobalInitializer(ELASTIC_GLOBAL_INDEX, storageEnabled)).async("fulltext-async"))
// the WikipediaImporter set a property `title`
.with(new FullTextPropertyInitialiser("elasticTitle", of("title"),
ElasticsearchIndexConstants.TYPE_ELASTICSEARCH).async())
ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH).async())
.withAsyncIndexing("async", 5)
.withAsyncIndexing("fulltext-async", 5);
return new Jcr(oak);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
Expand Down Expand Up @@ -106,7 +106,7 @@ public Jcr customize(Oak oak) {
.with((new ElasticGlobalInitializer(ELASTIC_GLOBAL_INDEX, storageEnabled)).async())
// the WikipediaImporter set a property `title`
.with(new FullTextPropertyInitialiser("elasticTitle", of("title"),
ElasticsearchIndexConstants.TYPE_ELASTICSEARCH).async())
ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH).async())
.withAsyncIndexing("async", 5);
return new Jcr(oak);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider;
import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
Expand Down Expand Up @@ -77,7 +77,7 @@ public Jcr customize(Oak oak) {
.with(new PropertyIndexEditorProvider())
.with(new NodeTypeIndexProvider())
.with(new PropertyFullTextTest.FullTextPropertyInitialiser("elasticText", of("text"),
ElasticsearchIndexConstants.TYPE_ELASTICSEARCH));
ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH));
return new Jcr(oak);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.util.IndexHelper;
import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
Expand Down Expand Up @@ -105,7 +105,7 @@ public void initialize(@NotNull NodeBuilder builder) {
// do nothing
} else {
IndexHelper.newFTIndexDefinition(IndexUtils.getOrCreateOakIndex(builder),
this.name, ElasticsearchIndexConstants.TYPE_ELASTICSEARCH,
this.name, ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH,
propertyTypes, excludes, async, storageEnabled);
}
}
Expand Down
10 changes: 10 additions & 0 deletions oak-search-elastic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,21 @@
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-core</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,126 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elasticsearch;

import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.state.NodeState;

import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.jackrabbit.oak.plugins.index.search.util.ConfigUtil.getOptionalValue;
import static org.elasticsearch.common.Strings.INVALID_FILENAME_CHARS;

public class ElasticsearchIndexDefinition extends IndexDefinition {

public static final String TYPE_ELASTICSEARCH = "elasticsearch";

public static final String BULK_ACTIONS = "bulkActions";
public static final int BULK_ACTIONS_DEFAULT = 250;

public static final String BULK_SIZE_BYTES = "bulkSizeBytes";
public static final long BULK_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024; // 2MB

public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs";
public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;

public static final String BULK_RETRIES = "bulkRetries";
public static final int BULK_RETRIES_DEFAULT = 3;

public static final String BULK_RETRIES_BACKOFF = "bulkRetriesBackoff";
public static final long BULK_RETRIES_BACKOFF_DEFAULT = 200;

private static final int MAX_NAME_LENGTH = 255;

private static final String INVALID_CHARS_REGEX = Pattern.quote(INVALID_FILENAME_CHARS
.stream()
.map(Object::toString)
.collect(Collectors.joining("")));

private final String remoteIndexName;

public final int bulkActions;
public final long bulkSizeBytes;
public final long bulkFlushIntervalMs;
public final int bulkRetries;
public final long bulkRetriesBackoff;

public ElasticsearchIndexDefinition(NodeState root, NodeState defn, String indexPath) {
super(root, getIndexDefinitionState(defn), determineIndexFormatVersion(defn), determineUniqueId(defn), indexPath);
this.remoteIndexName = setupIndexName();

this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT);
this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT);
this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
this.bulkRetries = getOptionalValue(defn, BULK_RETRIES, BULK_RETRIES_DEFAULT);
this.bulkRetriesBackoff = getOptionalValue(defn, BULK_RETRIES_BACKOFF, BULK_RETRIES_BACKOFF_DEFAULT);
}

/**
* Returns the index identifier on the Elasticsearch cluster. Notice this can be different from the value returned
* from {@code getIndexName}.
* @return the Elasticsearch index identifier
*/
public String getRemoteIndexName() {
return remoteIndexName;
}

private String setupIndexName() {
// TODO: implement advanced remote index name strategy that takes into account multiple tenants and re-index process
return getESSafeIndexName(getIndexPath() + "-" + getReindexCount());
}

/**
* <ul>
* <li>abc -> abc</li>
* <li>xy:abc -> xyabc</li>
* <li>/oak:index/abc -> abc</li>
* </ul>
* <p>
* The resulting file name would be truncated to MAX_NAME_LENGTH
*/
private static String getESSafeIndexName(String indexPath) {
String name = StreamSupport
.stream(PathUtils.elements(indexPath).spliterator(), false)
.limit(3) //Max 3 nodeNames including oak:index which is the immediate parent for any indexPath
.filter(p -> !"oak:index".equals(p))
.map(ElasticsearchIndexDefinition::getESSafeName)
.collect(Collectors.joining("_"));

if (name.length() > MAX_NAME_LENGTH) {
name = name.substring(0, MAX_NAME_LENGTH);
}
return name;
}

/**
* Convert {@code e} to Elasticsearch safe index name.
* Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
*/
private static String getESSafeName(String suggestedIndexName) {
return suggestedIndexName.replaceAll(INVALID_CHARS_REGEX, "").toLowerCase();
}

/**
* Class to help with {@link ElasticsearchIndexDefinition} creation.
* The built object represents the index definition only without the node structure.
*/
public static class Builder extends IndexDefinition.Builder {
@Override
public ElasticsearchIndexDefinition build() {
return (ElasticsearchIndexDefinition) super.build();
}

@Override
public ElasticsearchIndexDefinition.Builder reindex() {
super.reindex();
return this;
}

@Override
protected IndexDefinition createInstance(NodeState indexDefnStateToUse) {
return new ElasticsearchIndexDefinition(root, indexDefnStateToUse, indexPath);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ private void registerIndexProvider(BundleContext bundleContext) {
ElasticsearchIndexProvider indexProvider = new ElasticsearchIndexProvider(elasticsearchConnection);

Dictionary<String, Object> props = new Hashtable<>();
props.put("type", ElasticsearchIndexConstants.TYPE_ELASTICSEARCH);
props.put("type", ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH);
regs.add(bundleContext.registerService(QueryIndexProvider.class.getName(), indexProvider, props));
}

private void registerIndexEditor(BundleContext bundleContext) {
ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(elasticsearchConnection, extractedTextCache);

Dictionary<String, Object> props = new Hashtable<>();
props.put("type", ElasticsearchIndexConstants.TYPE_ELASTICSEARCH);
props.put("type", ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH);
regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, props));
// oakRegs.add(registerMBean(whiteboard,
// TextExtractionStatsMBean.class,
Expand Down
Loading

0 comments on commit 8cc86c4

Please sign in to comment.