Skip to content

Commit

Permalink
breach flink_1.9
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Dec 18, 2019
1 parent 475de5b commit 8bcabb7
Show file tree
Hide file tree
Showing 161 changed files with 1,015 additions and 458 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ allprojects {
joda_time : '2.9.3',
slf4j : '1.7.25',
guice : '4.2.1',
gadtry : '1.6.2',
gadtry : '1.6.3',
guava : '25.1-jre',
jackson : '2.9.5',
jersey : '2.28',
Expand Down
12 changes: 6 additions & 6 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rootProject.name = 'sylph'

include 'sylph-spi'
include 'sylph-main'
include 'sylph-controller'
include 'sylph-web'

include 'sylph-runners'
include 'sylph-runners:flink'
Expand All @@ -20,14 +20,14 @@ include ':sylph-runners:spark'
project(':sylph-runners:spark').name = 'sylph-runner-spark'

//----
include 'sylph-etl-api'
include 'sylph-api'
include 'sylph-connectors'
include 'sylph-connectors:sylph-kafka'
include 'sylph-connectors:flink-kafka'
include 'sylph-connectors:sylph-mysql'
include 'sylph-connectors:sylph-hdfs'
include 'sylph-connectors:sylph-hdfs2'
include 'sylph-connectors:sylph-kafka08'
include 'sylph-connectors:sylph-kafka09'
include 'sylph-connectors:flink-hdfs2'
include 'sylph-connectors:flink-kafka08'
include 'sylph-connectors:flink-kafka09'
//include 'sylph-connectors:sylph-hbase'
include 'sylph-connectors:sylph-elasticsearch6'
include 'sylph-connectors:sylph-elasticsearch5'
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -26,11 +27,13 @@ public final class Field
{
private final String name;
private final Type javaType;
private final String extend;

public Field(String name, Type javaType)
public Field(String name, Type javaType, String extend)
{
this.name = requireNonNull(name, "Field name must not null");
this.javaType = requireNonNull(javaType, "Field type must not null");
this.extend = extend;
}

public String getName()
Expand All @@ -43,6 +46,11 @@ public Type getJavaType()
return javaType;
}

public Optional<String> getExtend()
{
return Optional.of(extend);
}

public Class<?> getJavaTypeClass()
{
return typeToClass(javaType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.io.Serializable;

public interface PipelinePlugin
public interface Operator
extends Serializable
{
public static enum PipelineType
Expand All @@ -32,14 +32,14 @@ public static enum PipelineType
@Deprecated
batch_join(TransForm.class);

private final Class<? extends PipelinePlugin> value;
private final Class<? extends Operator> value;

PipelineType(Class<? extends PipelinePlugin> value)
PipelineType(Class<? extends Operator> value)
{
this.value = value;
}

public Class<? extends PipelinePlugin> getValue()
public Class<? extends Operator> getValue()
{
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

public interface Plugin
{
public Set<Class<? extends PipelinePlugin>> getConnectors();
public Set<Class<? extends Operator>> getConnectors();
}
94 changes: 94 additions & 0 deletions sylph-api/src/main/java/ideal/sylph/etl/Record.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.etl;

import java.util.Arrays;

public interface Record
{
String mkString(String seq);

default String mkString()
{
return this.mkString(",");
}

<T> T getAs(String key);

<T> T getAs(int i);

default <T> T getField(int i)
{
return getAs(i);
}

int size();

public static Record of(Object[] values)
{
return new DefaultRecord(values);
}

static class DefaultRecord
implements Record
{
Object[] values;

private DefaultRecord(Object[] values)
{
this.values = values;
}

public Object[] getValues()
{
return Arrays.copyOf(values, values.length);
}

@Override
public String mkString(String seq)
{
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < values.length; i++) {
stringBuilder.append(seq).append(values[i]);
}
return stringBuilder.substring(1);
}

@Override
public String mkString()
{
return this.mkString(",");
}

@Override
public <T> T getAs(String key)
{
throw new UnsupportedOperationException("this " + this.getClass().getName() + " method have't T getAs(String)!");
}

@Override
public <T> T getAs(int key)
{
throw new UnsupportedOperationException("this " + this.getClass().getName() + " method have't T getAs(int)!");
}

@Override
public int size()
{
return values.length;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package ideal.sylph.etl;

import com.github.harbby.gadtry.base.JavaTypes;

import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -62,6 +65,16 @@ public List<Field> getFields()
return fields;
}

public int size()
{
return fields.size();
}

public Field getField(int i)
{
return fields.get(i);
}

public static SchemaBuilder newBuilder()
{
return new SchemaBuilder();
Expand All @@ -71,12 +84,21 @@ public static class SchemaBuilder
{
private final List<Field> fields = new ArrayList<>();

public SchemaBuilder add(String name, Type javaType)
public SchemaBuilder add(String name, Type javaType, Optional<String> extend)
{
fields.add(new Field(name, javaType));
Type fieldType = javaType;
if (javaType instanceof Class<?> && ((Class<?>) javaType).isPrimitive()) {
fieldType = JavaTypes.getWrapperClass((Class<?>) javaType);
}
fields.add(new Field(name, fieldType, extend.orElse(null)));
return this;
}

public SchemaBuilder add(String name, Type javaType)
{
return add(name, javaType, Optional.empty());
}

public Schema build()
{
return new Schema(new ArrayList<>(fields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package ideal.sylph.etl.api;

import ideal.sylph.etl.PipelinePlugin;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Operator;
import ideal.sylph.etl.Record;

public interface RealTimeSink
extends PipelinePlugin, RealTimePipeline
extends Operator, RealTimePipeline
{
/**
* ByCommitState
Expand All @@ -31,5 +31,5 @@ default void flush()
/**
* line 级别的 需要注意线程安全问题
**/
void process(Row value);
void process(Record value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
package ideal.sylph.etl.api;

import ideal.sylph.etl.Collector;
import ideal.sylph.etl.PipelinePlugin;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Operator;
import ideal.sylph.etl.Record;
import ideal.sylph.etl.Schema;

/**
* JoinOperator
* */
public interface RealTimeTransForm
extends PipelinePlugin, RealTimePipeline
extends Operator, RealTimePipeline
{
/**
* line 级别的 需要注意线程安全问题
**/
void process(Row input, Collector<Row> collector);
void process(Record input, Collector<Record> collector);

/**
* driver 上运行
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package ideal.sylph.etl.api;

import ideal.sylph.etl.PipelinePlugin;
import ideal.sylph.etl.Operator;

public interface Sink<T>
extends PipelinePlugin
extends Operator
{
void run(final T stream);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package ideal.sylph.etl.api;

import ideal.sylph.etl.PipelinePlugin;
import ideal.sylph.etl.Operator;

public interface Source<T>
extends PipelinePlugin
extends Operator
{
T getSource();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package ideal.sylph.etl.api;

import ideal.sylph.etl.PipelinePlugin;
import ideal.sylph.etl.Operator;

/**
* Created by ideal on 17-5-8. 转换
*/
public interface TransForm<T>
extends PipelinePlugin
extends Operator
{
T transform(final T stream);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
package ideal.sylph.etl.impl;

import ideal.sylph.etl.Collector;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Record;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class ListCollector
implements Collector<Row>
implements Collector<Record>
{
private final List<Row> list;
private final List<Record> list;

public ListCollector(List<Row> list)
public ListCollector(List<Record> list)
{
this.list = requireNonNull(list, "list is null");
}

@Override
public void collect(Row record)
public void collect(Record record)
{
list.add(record);
}
Expand Down
2 changes: 1 addition & 1 deletion sylph-base-jdbc/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

dependencies {
compile group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-5.0'
compileOnly project(":sylph-etl-api")
compileOnly project(":sylph-api")
compileOnly group: 'org.slf4j', name: 'slf4j-api', version: deps.slf4j
}
Loading

0 comments on commit 8bcabb7

Please sign in to comment.