Skip to content

Commit

Permalink
fixing client connection settings, simplifying ingest handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jprante committed Dec 23, 2015
1 parent 224ddc3 commit 4c8ae3c
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 552 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ about what happened.

| Release date | JDBC Importer version | Elasticsearch version |
| -------------| ----------------------| ----------------------|
| Dec 23 2015 | 2.1.1.1 | 2.1.1 |
| Dec 22 2015 | 2.1.1.0 | 2.1.1 |
| Dec 23 2015 | 2.1.1.2 | 2.1.1 |
| Nov 29 2015 | 2.1.0.0 | 2.1.0 |
| Oct 29 2015 | 2.0.0.1 | 2.0.0 |
| Oct 28 2015 | 2.0.0.0 | 2.0.0 |
Expand All @@ -54,9 +53,9 @@ about what happened.

## Quick links

JDBC importer 2.1.1.1
JDBC importer 2.1.1.2

`http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.1.1.1/elasticsearch-jdbc-2.1.1.1-dist.zip`
`http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.1.1.2/elasticsearch-jdbc-2.1.1.2-dist.zip`

## Installation

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>org.xbib.elasticsearch.importer</groupId>
<artifactId>elasticsearch-jdbc</artifactId>
<version>2.1.1.1</version>
<version>2.1.1.2</version>

<packaging>jar</packaging>

Expand Down
20 changes: 0 additions & 20 deletions src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,6 @@ public interface Sink<C extends Context> {
*/
Sink<C> setContext(C context);

Sink setIngestFactory(IngestFactory ingestFactory);

IngestFactory getIngestFactory();

/**
* Set index settings
*
* @param indexSettings the index settings
* @return this sink
*/
Sink setIndexSettings(Settings indexSettings);

/**
* Set index type mappings
*
* @param typeMapping the index type mappings
* @return this sink
*/
Sink setTypeMapping(Map<String, String> typeMapping);

/**
* Executed before source fetch
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -170,6 +169,7 @@ public void execute() throws Exception {
}

@Override
@SuppressWarnings("unchecked")
public void beforeFetch() throws Exception {
logger.debug("before fetch");
Sink sink = createSink();
Expand All @@ -188,7 +188,7 @@ public void fetch() throws Exception {
getSource().fetch();
} catch (Throwable e) {
setThrowable(e);
logger.error(e.getMessage(), e);
logger.error("at fetch: " + e.getMessage(), e);
}
}

Expand All @@ -200,13 +200,13 @@ public void afterFetch() throws Exception {
getSource().afterFetch();
} catch (Throwable e) {
setThrowable(e);
logger.error(e.getMessage(), e);
logger.error("after fetch: " + e.getMessage(), e);
}
try {
getSink().afterFetch();
} catch (Throwable e) {
setThrowable(e);
logger.error(e.getMessage(), e);
logger.error("after fetch: " + e.getMessage(), e);
}
}

Expand All @@ -220,14 +220,14 @@ public void shutdown() {
try {
source.shutdown();
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("source shutdown: " + e.getMessage(), e);
}
}
if (sink != null) {
try {
sink.shutdown();
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("sink shutdown: " + e.getMessage(), e);
}
}
logger.info("shutdown completed");
Expand Down Expand Up @@ -293,22 +293,10 @@ protected S createSource() {
protected Sink createSink() throws IOException {
Sink sink = StrategyLoader.newSink(strategy());
logger.info("found sink class {}", sink);
String index = settings.get("index", "jdbc");
String type = settings.get("type", "jdbc");
sink.setIndex(index).setType(type);
Map<String,Object> map = settings.getAsStructuredMap();
if (map.containsKey("index_settings")) {
Settings loadedSettings = settings.getAsSettings("index_settings");
sink.setIndexSettings(loadedSettings);
}
if (map.containsKey("type_mapping")) {
XContentBuilder builder = jsonBuilder()
.map(settings.getAsSettings("type_mapping").getAsStructuredMap());
sink.setTypeMapping(Collections.singletonMap(type, builder.string()));
}
return sink;
}

@SuppressWarnings("unchecked")
protected void prepareContext(S source, Sink sink) throws IOException {
Map<String, Object> params = settings.getAsStructuredMap();
List<SQLCommand> sql = SQLCommand.parse(params);
Expand Down
Loading

0 comments on commit 4c8ae3c

Please sign in to comment.