Skip to content

Commit

Permalink
[FLINK-19656][metrics] Enforce non-null character filter
Browse files Browse the repository at this point in the history
  • Loading branch information
echauchot authored and zentol committed Jan 19, 2021
1 parent 674edcf commit 559ed01
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* can transform. The returned string is the transformation result.
*/
public interface CharacterFilter {
CharacterFilter NO_OP_FILTER = input -> input;

/**
* Filter the given string and generate a resulting string from it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -268,7 +269,7 @@ public QueryScopeInfo getQueryServiceMetricInfo(CharacterFilter filter) {
*/
@Override
public String getMetricIdentifier(String metricName) {
return getMetricIdentifier(metricName, null);
return getMetricIdentifier(metricName, CharacterFilter.NO_OP_FILTER);
}

/**
Expand Down Expand Up @@ -297,27 +298,16 @@ public String getMetricIdentifier(String metricName, CharacterFilter filter) {
*/
public String getMetricIdentifier(
String metricName, CharacterFilter filter, int reporterIndex, char delimiter) {
Preconditions.checkNotNull(filter);

metricName = filter.filterCharacters(metricName);
if (scopeStrings.length == 0
|| (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) {
String newScopeString;
if (filter != null) {
newScopeString = ScopeFormat.concat(filter, delimiter, scopeComponents);
metricName = filter.filterCharacters(metricName);
} else {
newScopeString = ScopeFormat.concat(delimiter, scopeComponents);
}
return newScopeString + delimiter + metricName;
return ScopeFormat.concat(filter, delimiter, scopeComponents) + delimiter + metricName;
} else {
if (scopeStrings[reporterIndex] == null) {
if (filter != null) {
scopeStrings[reporterIndex] =
ScopeFormat.concat(filter, delimiter, scopeComponents);
} else {
scopeStrings[reporterIndex] = ScopeFormat.concat(delimiter, scopeComponents);
}
}
if (filter != null) {
metricName = filter.filterCharacters(metricName);
scopeStrings[reporterIndex] =
ScopeFormat.concat(filter, delimiter, scopeComponents);
}
return scopeStrings[reporterIndex] + delimiter + metricName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public FrontMetricGroup(ReporterScopedSettings settings, P reference) {
@Override
public String getMetricIdentifier(String metricName) {
return parentMetricGroup.getMetricIdentifier(
metricName, null, this.settings.getReporterIndex(), this.settings.getDelimiter());
metricName,
CharacterFilter.NO_OP_FILTER,
this.settings.getReporterIndex(),
this.settings.getDelimiter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@
*/
public abstract class ScopeFormat {

private static CharacterFilter defaultFilter =
new CharacterFilter() {
@Override
public String filterCharacters(String input) {
return input;
}
};

// ------------------------------------------------------------------------
// Scope Format Special Characters
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -208,18 +200,6 @@ public static String asVariable(String scope) {
return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX;
}

public static String concat(String... components) {
return concat(defaultFilter, '.', components);
}

public static String concat(CharacterFilter filter, String... components) {
return concat(filter, '.', components);
}

public static String concat(Character delimiter, String... components) {
return concat(defaultFilter, delimiter, components);
}

/**
* Concatenates the given component names separated by the delimiter character. Additionally the
* character filter is applied to all component names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -127,7 +128,8 @@ public void close() {

@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String metricIdentifier = group.getMetricIdentifier(metricName, name -> name);
final String metricIdentifier =
group.getMetricIdentifier(metricName, CharacterFilter.NO_OP_FILTER);
for (final String expectedPattern : patternFutures.keySet()) {
if (metricIdentifier.matches(expectedPattern)) {
patternFutures.get(expectedPattern).complete(null);
Expand Down

0 comments on commit 559ed01

Please sign in to comment.