Skip to content

Commit

Permalink
FLUME-632: Support arguments in output format configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Hsieh committed Jun 2, 2011
1 parent fb2e8e4 commit 4b35ba6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
35 changes: 35 additions & 0 deletions flume-core/src/main/java/com/cloudera/flume/conf/FlumeBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class FlumeBuilder {

enum ASTNODE {
DEC, HEX, OCT, STRING, BOOL, FLOAT, // literals
FUNC, // function node
SINK, SOURCE, // sink or source
KWARG, // kwarg support
MULTI, DECO, BACKUP, ROLL, GEN, FAILCHAIN, // compound sinks
Expand Down Expand Up @@ -127,6 +128,10 @@ static CommonTree parseLiteral(String s) throws RecognitionException {
return (CommonTree) getDeployParser(s).literal().getTree();
}

static CommonTree parseArg(String s) throws RecognitionException {
return (CommonTree) getDeployParser(s).arg().getTree();
}

public static CommonTree parseSink(String s) throws RecognitionException {
FlumeDeployParser parser = getDeployParser(s);
CommonTree ast = (CommonTree) parser.sink().getTree();
Expand Down Expand Up @@ -358,6 +363,15 @@ public static Object buildSimpleArg(CommonTree t) throws FlumeSpecException {
Preconditions.checkArgument(str.startsWith("\"") && str.endsWith("\""));
str = str.substring(1, str.length() - 1);
return StringEscapeUtils.unescapeJava(str);
case FUNC:
String name = t.getChild(0).getText();
List<Object> args = new ArrayList<Object>();
int children = t.getChildCount();
for (int j = 1; j < children; j++) {
args.add(buildSimpleArg((CommonTree) t.getChild(j)));
}
FunctionSpec funSpec = new FunctionSpec(name, args.toArray());
return funSpec;
case KWARG:
return null;
default:
Expand All @@ -366,6 +380,27 @@ public static Object buildSimpleArg(CommonTree t) throws FlumeSpecException {
}
}

/**
* Container class for holding function arguments
*/
public static class FunctionSpec {
String name;
Object[] args;

FunctionSpec(String name, Object... args) {
this.name = name;
this.args = args;
}

public String getName() {
return name;
}

public Object[] getArgs() {
return args;
}
}

public static Pair<String, CommonTree> buildKWArg(CommonTree t) {
ASTNODE type = ASTNODE.valueOf(t.getText()); // convert to enum
Preconditions.checkArgument("KWARG".equals(type.toString()));
Expand Down
18 changes: 18 additions & 0 deletions flume-core/src/main/java/com/cloudera/flume/conf/FlumeSpecGen.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,30 @@ static String genArg(CommonTree t) throws FlumeSpecException {
return t.getChild(0).getText();
case KWARG:
return t.getChild(0).getText() + "=" + genArg((CommonTree) t.getChild(1));
case FUNC:
return genFunc(t);
default:
throw new FlumeSpecException("Not a node of literal type: "
+ t.toStringTree());
}
}

static String genFunc(CommonTree t) throws FlumeSpecException {
StringBuilder sb = new StringBuilder();
sb.append(t.getChild(0).getText());
sb.append("(");

for (int i=1; i<t.getChildCount(); i++) {
if (i > 1) {
sb.append(", ");
}
sb.append(genArg((CommonTree)t.getChild(i)));
}
sb.append(")");

return sb.toString();
}

static String genArgs(List<String> args, String pre, String delim, String post) {
StringBuilder b = new StringBuilder();
if (args.size() == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public void testString() throws RecognitionException, FlumeSpecException {
assertEquals(orig, out);
}

@Test
public void testFunc() throws RecognitionException, FlumeSpecException {
String orig = "foo(\"bar\", 42)";
CommonTree tree = FlumeBuilder.parseArg(orig);
String out = FlumeSpecGen.genArg(tree);
assertEquals(orig, out);
}

/**
* Spaces are important for the next test cases.
*/
Expand Down Expand Up @@ -181,7 +189,27 @@ public void testKeywordArgGen() throws RecognitionException,
o2 = FlumeBuilder.parseSink(s2);
out = FlumeSpecGen.genEventSink(o2);
assertEquals(s2, out);
}

@Test
public void testFuncs() throws RecognitionException, FlumeSpecException {
// func arg
String s = "text( baz(42), foo=\"bar\" )";
CommonTree o = FlumeBuilder.parseSink(s);
String out = FlumeSpecGen.genEventSink(o);
assertEquals(s, out);

// func with func arg
String s2 = "text( baz(bog(\"boo\")), foo=\"bar\" )";
CommonTree o2 = FlumeBuilder.parseSink(s2);
out = FlumeSpecGen.genEventSink(o2);
assertEquals(s2, out);

// func as kwarg
String s3 = "text( bogus=baz(42), foo=\"bar\" )";
CommonTree o3 = FlumeBuilder.parseSink(s3);
out = FlumeSpecGen.genEventSink(o3);
assertEquals(s3, out);
}

}

0 comments on commit 4b35ba6

Please sign in to comment.