Skip to content

Commit

Permalink
[FLINK-18236] fix es connector test ElasticsearchSinkTestBase.runElas…
Browse files Browse the repository at this point in the history
…ticsearchSink* verify not right.
  • Loading branch information
liuyongvs authored and rmetzger committed Jun 17, 2020
1 parent c025407 commit 6623ef1
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,31 @@ public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> exte
* Tests that the Elasticsearch sink works properly with json.
*/
public void runElasticsearchSinkTest() throws Exception {
runElasticSearchSinkTest(SourceSinkDataTestKit::getJsonSinkFunction);
runElasticSearchSinkTest("elasticsearch-sink-test-json-index", SourceSinkDataTestKit::getJsonSinkFunction);
}

/**
* Tests that the Elasticsearch sink works properly with cbor.
*/
public void runElasticsearchSinkCborTest() throws Exception {
runElasticSearchSinkTest(SourceSinkDataTestKit::getCborSinkFunction);
runElasticSearchSinkTest("elasticsearch-sink-test-cbor-index", SourceSinkDataTestKit::getCborSinkFunction);
}

/**
* Tests that the Elasticsearch sink works properly with smile.
*/
public void runElasticsearchSinkSmileTest() throws Exception {
runElasticSearchSinkTest(SourceSinkDataTestKit::getSmileSinkFunction);
runElasticSearchSinkTest("elasticsearch-sink-test-smile-index", SourceSinkDataTestKit::getSmileSinkFunction);
}

/**
* Tests that the Elasticsearch sink works properly with yaml.
*/
public void runElasticsearchSinkYamlTest() throws Exception {
runElasticSearchSinkTest(SourceSinkDataTestKit::getYamlSinkFunction);
runElasticSearchSinkTest("elasticsearch-sink-test-yaml-index", SourceSinkDataTestKit::getYamlSinkFunction);
}

private void runElasticSearchSinkTest(Function<String, ElasticsearchSinkFunction<Tuple2<Integer, String>>> functionFactory) throws Exception {
final String index = "elasticsearch-sink-test-index";

private void runElasticSearchSinkTest(String index, Function<String, ElasticsearchSinkFunction<Tuple2<Integer, String>>> functionFactory) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
Expand Down

0 comments on commit 6623ef1

Please sign in to comment.