Skip to content

Commit

Permalink
Optimize exception handling mechanism to simplify exception stack
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Jan 24, 2019
1 parent c8dc2d1 commit a583f2e
Show file tree
Hide file tree
Showing 25 changed files with 95 additions and 99 deletions.
1 change: 1 addition & 0 deletions sylph-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ subprojects {
}

dependencies {
compileOnly group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry
compileOnly project(":sylph-etl-api")
}

Expand Down
1 change: 0 additions & 1 deletion sylph-connectors/sylph-elasticsearch5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ plugins {
}

dependencies {
shadow group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0'
compile 'org.elasticsearch.client:transport:5.6.0'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
import static com.github.harbby.gadtry.base.Checks.checkState;

@Name("elasticsearch5")
@Description("this is elasticsearch5 sink plugin")
Expand Down
1 change: 0 additions & 1 deletion sylph-connectors/sylph-elasticsearch6/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ plugins {
}

dependencies {
shadow group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0'
compile 'org.elasticsearch.client:transport:6.4.0'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
import static com.github.harbby.gadtry.base.Checks.checkState;

@Name("elasticsearch6")
@Description("this is elasticsearch6 sink plugin")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static ideal.sylph.plugins.hdfs.factory.HDFSFactorys.getRowKey;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -269,7 +270,7 @@ protected boolean removeEldestEntry(Map.Entry<String, FileChannel> eldest)
return true;
}
catch (IOException e) {
throw new RuntimeException(e);
throw throwsException(e);
}
}
else {
Expand Down
1 change: 0 additions & 1 deletion sylph-connectors/sylph-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apply plugin: 'scala'

dependencies {
compile group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0'
compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) {
exclude(module: 'flink-shaded-hadoop2')
}
Expand Down
2 changes: 1 addition & 1 deletion sylph-connectors/sylph-mysql/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies {

compile group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0'
compile group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-5.0'

compileOnly group: 'org.slf4j', name: 'slf4j-api', version: deps.log4j12

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.github.harbby.gadtry.base.Throwables.noCatch;
import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static ideal.sylph.etl.join.JoinContext.JoinType.LEFT;
import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -128,51 +129,48 @@ public void check()
@Override
public void process(Row input, Collector<Row> collector)
{
try {
checkState(connection != null, " connection is null");
StringBuilder builder = new StringBuilder();
for (int index : joinOnMapping.keySet()) {
builder.append(input.<Object>getField(index)).append("\u0001");
}
List<Map<String, Object>> cacheData = cache.get(builder.toString(), () -> {
//-- 这里进行真正的数据库查询
List<Integer> indexs = ImmutableList.copyOf(joinOnMapping.keySet());
try (PreparedStatement statement = connection.prepareStatement(sql)) {
for (int i = 0; i < indexs.size(); i++) {
statement.setObject(i + 1, input.getField(indexs.get(i)));
}
if (logger.isDebugEnabled()) {
logger.debug("Thread is {}, this {}", Thread.currentThread().getId(), this);
}
try (ResultSet rs = statement.executeQuery()) {
List<Map<String, Object>> result = JdbcUtils.resultToList(rs);
if (result.isEmpty() && joinType == LEFT) { // left join and inter join
return ImmutableList.of(ImmutableMap.of());
}
return result;
}
checkState(connection != null, " connection is null");

StringBuilder builder = new StringBuilder();
for (int index : joinOnMapping.keySet()) {
builder.append(input.<Object>getField(index)).append("\u0001");
}

List<Map<String, Object>> cacheData = noCatch(() -> cache.get(builder.toString(), () -> {
//-- 这里进行真正的数据库查询
List<Integer> indexs = ImmutableList.copyOf(joinOnMapping.keySet());
try (PreparedStatement statement = connection.prepareStatement(sql)) {
for (int i = 0; i < indexs.size(); i++) {
statement.setObject(i + 1, input.getField(indexs.get(i)));
}
catch (SQLException e) {
throw new RuntimeException(e);
if (logger.isDebugEnabled()) {
logger.debug("Thread is {}, this {}", Thread.currentThread().getId(), this);
}
});

for (Map<String, Object> map : cacheData) {
Object[] row = new Object[selectFieldCnt];
for (int i = 0; i < selectFieldCnt; i++) {
SelectField field = selectFields.get(i);
if (field.isBatchTableField()) {
row[i] = map.get(field.getFieldName());
}
else {
row[i] = input.getField(field.getFieldIndex());
try (ResultSet rs = statement.executeQuery()) {
List<Map<String, Object>> result = JdbcUtils.resultToList(rs);
if (result.isEmpty() && joinType == LEFT) { // left join and inter join
return ImmutableList.of(ImmutableMap.of());
}
return result;
}
collector.collect(Row.of(row));
}
}
catch (ExecutionException e) {
throw new RuntimeException(e);
catch (SQLException e) {
throw throwsException(e);
}
}));

for (Map<String, Object> map : cacheData) {
Object[] row = new Object[selectFieldCnt];
for (int i = 0; i < selectFieldCnt; i++) {
SelectField field = selectFields.get(i);
if (field.isBatchTableField()) {
row[i] = map.get(field.getFieldName());
}
else {
row[i] = input.getField(field.getFieldIndex());
}
}
collector.collect(Row.of(row));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;

@Name("mysql")
Expand Down Expand Up @@ -105,7 +106,7 @@ public void process(Row row)
}
}
catch (SQLException e) {
throw new RuntimeException(e);
throwsException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Optional;

import static com.github.harbby.gadtry.base.Throwables.throwsThrowable;
import static ideal.sylph.spi.job.Job.Status.STOP;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -112,7 +113,7 @@ private Map listJobs()
}
catch (Exception e) {
logger.error("", Throwables.getRootCause(e));
throw new RuntimeException(Throwables.getRootCause(e));
throw throwsThrowable(Throwables.getRootCause(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ideal.sylph.controller.action;

import com.github.harbby.gadtry.base.Throwables;
import com.github.harbby.gadtry.jvm.JVMException;
import com.google.common.collect.ImmutableMap;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.exception.SylphException;
Expand Down Expand Up @@ -94,9 +95,16 @@ public Map saveJob(@Context HttpServletRequest request)
logger.info("save job {}", jobId);
return out;
}
catch (JVMException e) {
logger.warn("save job {} failed: {}", jobId, e.getMessage());
String message = e.getMessage();
return ImmutableMap.of("type", "save",
"status", "error",
"msg", message);
}
catch (Exception e) {
logger.warn("save job {} failed: {}", jobId, e);
String message = Throwables.getStackTraceAsString(Throwables.getRootCause(e));
String message = Throwables.getStackTraceAsString(e);
return ImmutableMap.of("type", "save",
"status", "error",
"msg", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.List;
import java.util.Set;

import static com.github.harbby.gadtry.base.Checks.checkState;
import static com.google.common.base.Preconditions.checkArgument;
import static ideal.sylph.spi.exception.StandardErrorCode.JOB_CONFIG_ERROR;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -200,11 +201,8 @@ public String getJobUrl(String id)
new SylphException(JOB_CONFIG_ERROR, "job " + id + " not Online"))
);
Job.Status status = container.getStatus();
if (status == Job.Status.RUNNING) {
return container.getJobUrl();
}
else {
throw new RuntimeException("job " + id + " Status " + status + ",is not RUNNING");
}
checkState(status == Job.Status.RUNNING, "job " + id + " Status " + status + ",but not RUNNING");

return container.getJobUrl();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.github.harbby.gadtry.base.Checks.checkState;
import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static ideal.sylph.spi.exception.StandardErrorCode.LOAD_MODULE_ERROR;
import static java.util.Objects.requireNonNull;

Expand All @@ -67,9 +69,8 @@ public void loadPlugins()
throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException
{
File pluginsDir = new File("etl-plugins");
if (!pluginsDir.exists() || !pluginsDir.isDirectory()) {
throw new RuntimeException(pluginsDir + " not exists or isDirectory");
}
checkState(pluginsDir.exists() && pluginsDir.isDirectory(), pluginsDir + " not exists or isDirectory");

File[] pluginFiles = requireNonNull(pluginsDir.listFiles(), pluginsDir + " not exists or isDirectory");

ImmutableSet.Builder<PipelinePluginInfo> builder = ImmutableSet.builder();
Expand All @@ -90,7 +91,7 @@ public void loadPlugins()
return getPluginInfo(it, javaClass, false, typeArguments);
}
catch (IncompleteAnnotationException e) {
throw new RuntimeException(it + " Annotation value not set, Please check scala code", e);
throw new IllegalStateException(it + " Annotation value not set, Please check scala code", e);
}
}).collect(Collectors.toSet());
builder.addAll(tmp);
Expand Down Expand Up @@ -161,7 +162,7 @@ private static TypeArgument[] parserDriver(Class<? extends PipelinePlugin> javaC
//Type[] javaTypes = classRepository.getSuperInterfaces();
}
catch (Exception e) {
throw new RuntimeException(e);
throw throwsException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -125,7 +126,7 @@ private void createRunner(final Runner runner)
factory = runner.getContainerFactory().newInstance();
}
catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
throw throwsException(e);
}

runner.create(runnerContext).forEach(jobActuatorHandle -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.github.harbby.gadtry.base.Throwables.throwsException;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static ideal.sylph.spi.model.PipelinePluginManager.filterRunnerPlugins;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -73,8 +73,7 @@ public Set<JobActuatorHandle> create(RunnerContext context)
.map(injector::getInstance).collect(Collectors.toSet());
}
catch (Exception e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
throw throwsException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.google.common.collect.ImmutableMap;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.SourceContext;
import ideal.sylph.parser.SqlParserException;
import ideal.sylph.parser.antlr.AntlrSqlParser;
import ideal.sylph.parser.antlr.ParsingException;
import ideal.sylph.parser.antlr.tree.CreateFunction;
import ideal.sylph.parser.antlr.tree.CreateStreamAsSelect;
import ideal.sylph.parser.antlr.tree.CreateTable;
Expand Down Expand Up @@ -91,14 +89,7 @@ public void buildStreamBySql(String sql)
.setTableEnv(tableEnv)
.setBatchPluginManager(pluginManager)
.build();

Statement statement;
try {
statement = sqlParser.createStatement(sql);
}
catch (ParsingException e) {
throw new SqlParserException("Sylph sql parser error", e);
}
Statement statement = sqlParser.createStatement(sql);

if (statement instanceof CreateStreamAsSelect) {
CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ private StreamSqlUtil() {}

static DataStream<Row> checkStream(DataStream<Row> inputStream, RowTypeInfo tableTypeInfo)
{
if (!(inputStream.getType() instanceof RowTypeInfo)) {
throw new RuntimeException("sourceType not is RowTypeInfo");
}
checkState(inputStream.getType() instanceof RowTypeInfo, "DataStream type not is RowTypeInfo");
RowTypeInfo sourceType = (RowTypeInfo) inputStream.getType();

List<Integer> indexs = Arrays.stream(tableTypeInfo.getFieldNames())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ else if (row instanceof DefaultRow) {
return org.apache.flink.types.Row.of(((DefaultRow) row).getValues());
}
else {
throw new RuntimeException(" not souch row type: " + row.getClass());
throw new UnsupportedOperationException("Not Unsupported row type: " + row.getClass());
}
}

Expand Down
Loading

0 comments on commit a583f2e

Please sign in to comment.