Skip to content

Commit

Permalink
NIFI-3540: QueryDatabaseTable Failing to Track MS SQL Max Values
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Burgess <[email protected]>

This closes apache#1547
  • Loading branch information
patricker authored and mattyb149 committed Mar 2, 2017
1 parent 0d66b6d commit bf2f04f
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
try {
fileToProcess = session.write(fileToProcess, out -> {
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro));
} catch (SQLException | RuntimeException e) {
Expand Down Expand Up @@ -419,10 +419,12 @@ protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDesc
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
Map<String, String> newColMap;
String tableName;

public MaxValueResultSetRowCollector(Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
this.dbAdapter = dbAdapter;
newColMap = stateMap;
this.tableName = tableName;
}

@Override
Expand All @@ -437,7 +439,7 @@ public void processRow(ResultSet resultSet) throws IOException {
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
String fullyQualifiedMaxValueKey = getStateKey(meta.getTableName(i), colName);
String fullyQualifiedMaxValueKey = getStateKey(tableName, colName);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {
Expand Down

0 comments on commit bf2f04f

Please sign in to comment.