Skip to content

Commit

Permalink
SequenceSpec updated.
Browse files Browse the repository at this point in the history
  • Loading branch information
skrusche63 committed Feb 9, 2015
1 parent dd28788 commit ccf4124
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/main/resources/fieldspec.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<fieldspec>
<field name="timestamp" type="Long">timestamp</field>
<field name="user" type="String">user</field>
<field name="group" type="String">order</field>
<field name="item" type="Integer">item</field>
<field name="timestamp" type="long">timestamp</field>
<field name="user" type="string">user</field>
<field name="group" type="string">order</field>
<field name="item" type="integer">item</field>
</fieldspec>
2 changes: 1 addition & 1 deletion src/main/scala/de/kp/spark/fsm/actor/SPADEActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SPADEActor(@transient val sc:SparkContext) extends BaseActor {

try {

val source = new SequenceSource(sc,config,SequenceSpec)
val source = new SequenceSource(sc,config,new SequenceSpec(req))
val dataset = SPMFHandler.sequence2SPMF(source.connect(req))

val support = params
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/de/kp/spark/fsm/actor/TSRActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TSRActor(@transient val sc:SparkContext) extends BaseActor {

try {

val source = new SequenceSource(sc,config,SequenceSpec)
val source = new SequenceSource(sc,config,new SequenceSpec(req))
val dataset = SPMFHandler.sequence2SPMF(source.connect(req))

val (k,minconf) = params
Expand Down
36 changes: 21 additions & 15 deletions src/main/scala/de/kp/spark/fsm/spec/SequenceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,34 @@ import de.kp.spark.core.spec.Fields
import de.kp.spark.fsm.Configuration

import scala.xml._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Buffer

object SequenceSpec extends Fields {
class SequenceSpec(req:ServiceRequest) extends Fields {

val path = "fieldspec.xml"

val (host,port) = Configuration.redis
val cache = new RedisCache(host,port.toInt)

def get(req:ServiceRequest):Map[String,String] = {
val fields = HashMap.empty[String,String]
private val fields = load

def mapping:Map[String,String] = fields.map(x => (x.name,x.value)).toMap

def names:List[String] = fields.map(_.name)

def types:List[String] = fields.map(_.datatype)

private val load:List[Field] = {

val data = Buffer.empty[Field]

try {

if (cache.fieldsExist(req)) {

val fieldspec = cache.fields(req)
for (field <- fieldspec) {

val _name = field.name
val _mapping = field.value

fields += _name -> _mapping

data += Field(field.name,field.datatype,field.value)
}

} else {
Expand All @@ -59,9 +62,11 @@ object SequenceSpec extends Fields {
for (field <- root \ "field") {

val _name = (field \ "@name").toString
val _type = (field \ "@type").toString

val _mapping = field.text

fields += _name -> _mapping
data += Field(_name,_type,_mapping)

}

Expand All @@ -71,7 +76,8 @@ object SequenceSpec extends Fields {
case e:Exception => {}
}

fields.toMap
}
data.toList

}

}

0 comments on commit ccf4124

Please sign in to comment.