Skip to content

Commit

Permalink
[FLINK-21702][sql-client] Support option sql-client.verbose to prin…
Browse files Browse the repository at this point in the history
…t the exception stack

This closes apache#15383
  • Loading branch information
zhuxiaoshang authored and wuchong committed Mar 28, 2021
1 parent 23afdbc commit e19d383
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,11 @@
<td><p>Enum</p>Possible values: [TABLE, CHANGELOG, TABLEAU]</td>
<td>Determine the mode when display the query result. The available values are ['table', 'tableau', 'changelog']. The 'table' mode materializes results in memory and visualizes them in a regular, paginated table representation. The 'changelog' mode does not materialize results and visualizes the result stream that is produced by a continuous query. The 'tableau' mode is more like a traditional way which will display the results in the screen directly with a tableau format. </td>
</tr>
<tr>
<td><h5>sql-client.verbose</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Determine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.client.SqlClient;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.SqlClientOptions;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
Expand Down Expand Up @@ -354,7 +355,7 @@ private void callSet(SetOperation setOperation) {
}
// show all properties
else {
final Map<String, String> properties = executor.getSessionProperties(sessionId);
final Map<String, String> properties = executor.getSessionConfigMap(sessionId);
if (properties.isEmpty()) {
terminal.writer()
.println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
Expand Down Expand Up @@ -462,7 +463,8 @@ private void executeOperation(Operation operation) {
private void printExecutionException(Throwable t) {
final String errorMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
LOG.warn(errorMessage, t);
terminal.writer().println(CliStrings.messageError(errorMessage, t).toAnsi());
boolean isVerbose = executor.getSessionConfig(sessionId).get(SqlClientOptions.VERBOSE);
terminal.writer().println(CliStrings.messageError(errorMessage, t, isVerbose).toAnsi());
terminal.flush();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.client.cli;

import org.apache.flink.util.ExceptionUtils;

import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.AttributedStyle;
Expand Down Expand Up @@ -297,14 +299,17 @@ public static AttributedString messageWarning(String message) {
.toAttributedString();
}

public static AttributedString messageError(String message, Throwable t) {
public static AttributedString messageError(String message, Throwable t, boolean isVerbose) {
while (t.getCause() != null
&& t.getCause().getMessage() != null
&& !t.getCause().getMessage().isEmpty()) {
t = t.getCause();
}
return messageError(message, t.getClass().getName() + ": " + t.getMessage());
// return messageError(message, ExceptionUtils.stringifyException(t));
if (isVerbose) {
return messageError(message, ExceptionUtils.stringifyException(t));
} else {
return messageError(message, t.getClass().getName() + ": " + t.getMessage());
}
}

public static AttributedString messageError(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ private SqlClientOptions() {}
+ "The 'table' mode materializes results in memory and visualizes them in a regular, paginated table representation. "
+ "The 'changelog' mode does not materialize results and visualizes the result stream that is produced by a continuous query. "
+ "The 'tableau' mode is more like a traditional way which will display the results in the screen directly with a tableau format. ");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> VERBOSE =
ConfigOptions.key("sql-client.verbose")
.booleanType()
.defaultValue(false)
.withDescription(
"Determine whether to output the verbose output to the console. If set the option true, it will print the exception stack. Otherwise, it only output the cause.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.client.gateway;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
Expand Down Expand Up @@ -51,8 +52,23 @@ public interface Executor {
*/
void closeSession(String sessionId) throws SqlExecutionException;

/** Lists all session properties that are defined by the executor and the session. */
Map<String, String> getSessionProperties(String sessionId) throws SqlExecutionException;
/**
* Returns a copy of {@link Map} of all session configurations that are defined by the executor
* and the session.
*
* <p>Both this method and {@link #getSessionConfig(String)} return the same configuration set,
* but different return type.
*/
Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException;

/**
* Returns a {@link ReadableConfig} of all session configurations that are defined by the
* executor and the session.
*
* <p>Both this method and {@link #getSessionConfigMap(String)} return the same configuration
* set, but different return type.
*/
ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException;

/**
* Reset all the properties for the given session identifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,15 @@ protected ExecutionContext getExecutionContext(String sessionId) throws SqlExecu
}

@Override
public Map<String, String> getSessionProperties(String sessionId) throws SqlExecutionException {
public Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException {
return getSessionContext(sessionId).getConfigMap();
}

@Override
public ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException {
return getSessionContext(sessionId).getReadableConfig();
}

@Override
public void resetSessionProperties(String sessionId) throws SqlExecutionException {
SessionContext context = getSessionContext(sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -91,7 +92,7 @@ public void before() throws IOException {
}

@After
public void after() throws InterruptedException {
public void after() {
System.setOut(originalPrintStream);
System.setIn(originalInputStream);
CommonTestUtils.setEnv(originalEnv);
Expand All @@ -101,43 +102,43 @@ private String getStdoutString() {
return testOutputStream.toString();
}

@Test(timeout = 20000)
public void testEmbeddedWithOptions() throws InterruptedException {
@Test
public void testEmbeddedWithOptions() {
String[] args = new String[] {"embedded", "-hist", historyPath};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path: " + historyPath));
}

@Test(timeout = 20000)
public void testEmbeddedWithLongOptions() throws InterruptedException {
@Test
public void testEmbeddedWithLongOptions() {
String[] args = new String[] {"embedded", "--history", historyPath};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path: " + historyPath));
}

@Test(timeout = 20000)
public void testEmbeddedWithoutOptions() throws InterruptedException {
@Test
public void testEmbeddedWithoutOptions() {
String[] args = new String[] {"embedded"};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path"));
}

@Test(timeout = 20000)
public void testEmptyOptions() throws InterruptedException {
@Test
public void testEmptyOptions() {
String[] args = new String[] {};
SqlClient.main(args);
assertThat(getStdoutString(), containsString("Command history file path"));
}

@Test(timeout = 20000)
public void testUnsupportedGatewayMode() throws Exception {
@Test
public void testUnsupportedGatewayMode() {
String[] args = new String[] {"gateway"};
thrown.expect(SqlClientException.class);
thrown.expectMessage("Gateway mode is not supported yet.");
SqlClient.main(args);
}

@Test(timeout = 20000)
@Test
public void testPrintHelpForUnknownMode() throws IOException {
String[] args = new String[] {"unknown"};
SqlClient.main(args);
Expand All @@ -146,4 +147,54 @@ public void testPrintHelpForUnknownMode() throws IOException {
final String help = FileUtils.readFileUtf8(new File(url.getFile()));
assertEquals(help, getStdoutString());
}

@Test
public void testErrorMessage() {
// prepare statements which will throw exception
String stmts =
"CREATE TABLE T (a int) WITH ('connector' = 'invalid');\n"
+ "SELECT * FROM T;\n"
+ "QUIT;\n";
System.setIn(new ByteArrayInputStream(stmts.getBytes(StandardCharsets.UTF_8)));
String[] args = new String[] {};
SqlClient.main(args);
String output = getStdoutString();
assertThat(
output,
containsString(
"org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'invalid'"));

// shouldn't contain error stack
String[] errorStack =
new String[] {
"at org.apache.flink.table.factories.FactoryUtil.discoverFactory",
"at org.apache.flink.table.factories.FactoryUtil.createTableSource"
};
for (String stack : errorStack) {
assertThat(output, not(containsString(stack)));
}
}

@Test
public void testVerboseErrorMessage() {
// prepare statements which will throw exception
String stmts =
"CREATE TABLE T (a int) WITH ('connector' = 'invalid');\n"
+ "SET sql-client.verbose=true;\n"
+ "SELECT * FROM T;\n"
+ "QUIT;\n";
System.setIn(new ByteArrayInputStream(stmts.getBytes(StandardCharsets.UTF_8)));
String[] args = new String[] {};
SqlClient.main(args);
String output = getStdoutString();
String[] errors =
new String[] {
"org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'invalid'",
"at org.apache.flink.table.factories.FactoryUtil.discoverFactory",
"at org.apache.flink.table.factories.FactoryUtil.createTableSource"
};
for (String error : errors) {
assertThat(output, containsString(error));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.environment.TestingJobClient;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
Expand Down Expand Up @@ -214,11 +215,17 @@ public String openSession(@Nullable String sessionId) throws SqlExecutionExcepti
public void closeSession(String sessionId) throws SqlExecutionException {}

@Override
public Map<String, String> getSessionProperties(String sessionId)
public Map<String, String> getSessionConfigMap(String sessionId)
throws SqlExecutionException {
return null;
}

@Override
public ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException {
SessionContext context = this.sessionMap.get(sessionId);
return context.getReadableConfig();
}

@Override
public void resetSessionProperties(String sessionId) throws SqlExecutionException {}

Expand All @@ -228,7 +235,10 @@ public void resetSessionProperty(String sessionId, String key)

@Override
public void setSessionProperty(String sessionId, String key, String value)
throws SqlExecutionException {}
throws SqlExecutionException {
SessionContext context = this.sessionMap.get(sessionId);
context.set(key, value);
}

@Override
public TableResult executeOperation(String sessionId, Operation operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.table.client.cli;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Column;
Expand Down Expand Up @@ -140,11 +141,16 @@ public void closeSession(String sessionId) throws SqlExecutionException {
}

@Override
public Map<String, String> getSessionProperties(String sessionId)
public Map<String, String> getSessionConfigMap(String sessionId)
throws SqlExecutionException {
return null;
}

@Override
public ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException {
return null;
}

@Override
public void resetSessionProperties(String sessionId) throws SqlExecutionException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.table.client.cli;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.client.cli.utils.SqlParserHelper;
import org.apache.flink.table.client.gateway.Executor;
Expand Down Expand Up @@ -95,10 +96,15 @@ public String openSession(@Nullable String sessionId) throws SqlExecutionExcepti
public void closeSession(String sessionId) throws SqlExecutionException {}

@Override
public Map<String, String> getSessionProperties(String sessionId) throws SqlExecutionException {
public Map<String, String> getSessionConfigMap(String sessionId) throws SqlExecutionException {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public ReadableConfig getSessionConfig(String sessionId) throws SqlExecutionException {
return null;
}

@Override
public void resetSessionProperties(String sessionId) throws SqlExecutionException {
throw new UnsupportedOperationException("Not implemented.");
Expand Down

0 comments on commit e19d383

Please sign in to comment.