Skip to content

Commit

Permalink
better scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Zukowski committed Jul 20, 2015
1 parent cc5580a commit f58ecf1
Showing 1 changed file with 10 additions and 13 deletions.
23 changes: 10 additions & 13 deletions src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.quantifind.kafka.offsetapp

import java.lang.reflect.Constructor
import java.util.concurrent.{TimeUnit, Executors, ScheduledExecutorService}
import java.util.{Timer, TimerTask}

import com.quantifind.kafka.offsetapp.sqlite.SQLiteOffsetInfoReporter
Expand Down Expand Up @@ -48,9 +49,12 @@ class OWArgs extends OffsetGetterArgs with UnfilteredWebApp.Arguments {
*/
object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging {

implicit def funToRunnable(fun: () => Unit) = new Runnable() { def run() = fun() }

def htmlRoot: String = "/offsetapp"

val timer = new Timer()
val scheduler : ScheduledExecutorService = Executors.newScheduledThreadPool(2)

var zkClient: ZkClient = null
var reporters: mutable.Set[OffsetInfoReporter] = null

Expand All @@ -77,17 +81,9 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging {

def schedule(args: OWArgs) {

timer.scheduleAtFixedRate(new TimerTask() {
override def run() {
reportOffsets(args)
}
}, 0, args.refresh.toMillis)
scheduler.scheduleAtFixedRate( () => { reportOffsets(args) }, 0, args.refresh.toMillis, TimeUnit.MILLISECONDS )
scheduler.scheduleAtFixedRate( () => { reporters.foreach(reporter => retryTask({reporter.cleanupOldData()})) }, args.retain.toMillis, args.retain.toMillis, TimeUnit.MILLISECONDS )

timer.scheduleAtFixedRate(new TimerTask() {
override def run() {
reporters.foreach(reporter => retryTask({reporter.cleanupOldData()}))
}
}, args.retain.toMillis, args.retain.toMillis)
}

def withOG[T](args: OWArgs)(f: OffsetGetter => T): T = {
Expand Down Expand Up @@ -128,8 +124,9 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging {
}

override def afterStop() {
timer.cancel()
timer.purge()

scheduler.shutdown()

if (zkClient != null)
zkClient.close()
}
Expand Down

0 comments on commit f58ecf1

Please sign in to comment.