Skip to content

Commit

Permalink
SAMZA-1076; getKafkaChangelogEnabledStores() should use StorageConfig…
Browse files Browse the repository at this point in the history
….getChangelo…

getKafkaChangelogEnabledStores() should use StorageConfig.getChangelogStream to get changelog system.stream

Author: Boris Shkolnik <[email protected]>

Reviewers: xiliu <[email protected]>

Closes apache#39 from sborya/KafkConfigForChangelogStream
  • Loading branch information
Boris Shkolnik authored and jagadish-northguard committed Jan 6, 2017
1 parent a47e881 commit ec1934c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,19 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getKafkaChangelogEnabledStores() = {
val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
var storeToChangelog = Map[String, String]()
for((changelogConfig, changelogName) <- changelogConfigs){
val storageConfig = new StorageConfig(config)
val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)

for((changelogConfig, cn) <- changelogConfigs){
// Lookup the factory for this particular stream and verify if it's a kafka system

val matcher = pattern.matcher(changelogConfig)
val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)

val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig));
val systemStream = Util.getSystemStreamFromNames(changelogName)
val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
val matcher = pattern.matcher(changelogConfig)
val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + systemStream)
storeToChangelog += storeName -> systemStream.getStream
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,19 @@ class TestKafkaConfig {
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
props.setProperty("job.changelog.system", "kafka")
props.setProperty("stores.test3.changelog", "otherstream")
props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")

val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
assertEquals(kafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores()
assertEquals(storeToChangelog.get("test1").getOrElse(""), "mychangelog1")
assertEquals(storeToChangelog.get("test2").getOrElse(""), "mychangelog2")
assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse(""))
assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse(""))
assertEquals("otherstream", storeToChangelog.get("test3").getOrElse(""))
}

@Test
Expand Down

0 comments on commit ec1934c

Please sign in to comment.