Skip to content

Commit

Permalink
Merge branch 'titan09' of https://github.com/thinkaurelius/titan into…
Browse files Browse the repository at this point in the history
… titan09
  • Loading branch information
mbroecheler committed Sep 19, 2015
2 parents ee354a0 + 17a518a commit a68d4e6
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 68 deletions.
116 changes: 63 additions & 53 deletions docs/hadoop.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,79 +33,89 @@ plugin activated: tinkerpop.hadoop
plugin activated: aurelius.titan
gremlin> :load data/grateful-dead-titan-schema.groovy
==>true
==>standardtitangraph[cassandrathrift:[127.0.0.1]]
==>com.thinkaurelius.titan.graphdb.database.management.ManagementSystem@6088451e
==>true
==>song
==>artist
==>true
==>songType
==>performances
==>name
==>weight
==>true
==>sungBy
==>writtenBy
==>followedBy
==>true
==>verticesByName
==>followsByWeight
==>true
gremlin> graph = TitanFactory.open('conf/titan-cassandra.properties')
==>standardtitangraph[cassandrathrift:[127.0.0.1]]
gremlin> defineGratefulDeadSchema(graph)
==>null
gremlin> graph.close()
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-load.properties')
==>hadoopgraph[kryoinputformat->kryooutputformat]
gremlin> r = graph.compute().program(BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)).submit().get()
gremlin> hdfs.copyFromLocal('data/grateful-dead.kryo','data/grateful-dead.kryo')
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
==>hadoopgraph[gryoinputformat->nulloutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader,vertexIdProperty=bulkLoader.vertex.id,userSuppliedIds=false,keepOriginalIds=true,batchSize=0]
gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()
...
==>result[hadoopgraph[kryoinputformat->kryooutputformat], memory[size:0]]
==>result[hadoopgraph[gryoinputformat->nulloutputformat],memory[size:0]]
gremlin>
----

[source, properties]
----
# hadoop-load.properties

# Hadoop-Gremlin settings
gremlin.graph=com.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
gremlin.hadoop.graphOutputFormat=com.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=data/grateful-dead-vertices.gio
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

# Giraph settings
giraph.SplitMasterWorker=false
giraph.minWorkers=1
giraph.maxWorkers=1
#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000

#
# SparkGraphComputer Configuration
#
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
----

[source, gremlin]
----
// grateful-dead-titan-schema.groovy

// Open Titan and its ManagementSystem
titanGraph = TitanFactory.open('conf/titan-cassandra.properties')
schema = titanGraph.openManagement()
// Vertex Labels
schema.makeVertexLabel("song").make()
schema.makeVertexLabel("artist").make()
// Property Keys
schema.makePropertyKey("songType").dataType(String.class).make()
schema.makePropertyKey("performances").dataType(Integer.class).make()
nameKey = schema.makePropertyKey("name").dataType(String.class).make()
weightKey = schema.makePropertyKey("weight").dataType(Integer.class).make()
// Edge Labels
schema.makeEdgeLabel("sungBy").make()
schema.makeEdgeLabel("writtenBy").make()
followedLabel = schema.makeEdgeLabel("followedBy").make()
// Indices
schema.buildIndex("verticesByName", Vertex.class).addKey(nameKey).unique().buildCompositeIndex()
schema.buildEdgeIndex(followedLabel, "followsByWeight", Direction.BOTH, Order.decr, weightKey)
// Commit schemata and release resources
schema.commit()
titanGraph.close()
// titan-schema-grateful-dead.groovy

def defineGratefulDeadSchema(titanGraph) {
m = titanGraph.openManagement()
// vertex labels
artist = m.makeVertexLabel("artist").make()
song = m.makeVertexLabel("song").make()
// edge labels
sungBy = m.makeEdgeLabel("sungBy").make()
writtenBy = m.makeEdgeLabel("writtenBy").make()
followedBy = m.makeEdgeLabel("followedBy").make()
// vertex and edge properties
blid = m.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
name = m.makePropertyKey("name").dataType(String.class).make()
songType = m.makePropertyKey("songType").dataType(String.class).make()
performances = m.makePropertyKey("performances").dataType(Integer.class).make()
weight = m.makePropertyKey("weight").dataType(Integer.class).make()
// global indices
m.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
m.buildIndex("artistsByName", Vertex.class).addKey(name).indexOnly(artist).buildCompositeIndex()
m.buildIndex("songsByName", Vertex.class).addKey(name).indexOnly(song).buildCompositeIndex()
// vertex centric indices
m.buildEdgeIndex(followedBy, "followedByWeight", Direction.BOTH, Order.decr, weight)
m.commit()
}
----

Running PageRank
Expand Down
2 changes: 1 addition & 1 deletion titan-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<url>http://thinkaurelius.github.com/titan/</url>
<properties>
<astyanax.version>3.8.0</astyanax.version>
<jamm.group>com.github.stephenc</jamm.group>
<jamm.group>com.github.jbellis</jamm.group>
<dependency.plugin.version>2.8</dependency.plugin.version>
<test.extra.jvm.opts>-javaagent:${basedir}/target/jamm-${jamm.version}.jar</test.extra.jvm.opts>
<default.test.jvm.opts>-Xms256m -Xmx1280m -ea -XX:+HeapDumpOnOutOfMemoryError ${test.extra.jvm.opts}</default.test.jvm.opts>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# Thrift. Cassandra must already be started before starting Titan
# with this file.

gremlin.graph=com.thinkaurelius.titan.core.TitanFactory

#TITANCFG{storage.backend=cassandrathrift}

#TITANCFG{storage.hostname=127.0.0.1}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# hadoop-load.properties

# Hadoop-Gremlin settings
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=./data/grateful-dead.gio
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

#####################################
# GiraphGraphComputer Configuration #
#####################################
#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
Expand All @@ -22,9 +22,10 @@ mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]

#
# SparkGraphComputer Configuration
#
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ titanmr.ioformat.conf.storage.port=9160
titanmr.ioformat.conf.storage.keyspace=titan

#
# Apache Cassandra InptuFormat configuration
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ def defineGratefulDeadSchema(titanGraph) {
writtenBy = m.makeEdgeLabel("writtenBy").make()
followedBy = m.makeEdgeLabel("followedBy").make()
// vertex and edge properties
blid = m.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
name = m.makePropertyKey("name").dataType(String.class).make()
songType = m.makePropertyKey("songType").dataType(String.class).make()
performances = m.makePropertyKey("performances").dataType(Integer.class).make()
weight = m.makePropertyKey("weight").dataType(Integer.class).make()
// global indices
m.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
m.buildIndex("artistsByName", Vertex.class).addKey(name).indexOnly(artist).buildCompositeIndex()
m.buildIndex("songsByName", Vertex.class).addKey(name).indexOnly(song).buildCompositeIndex()
// vertex centric indices
m.buildEdgeIndex(followedBy, "followedByTime", Direction.BOTH, Order.decr, weight)
m.buildEdgeIndex(followedBy, "followedByWeight", Direction.BOTH, Order.decr, weight)
m.commit()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* legacy-graphson-script-input.groovy
*
* Can be used as a script file for ScriptInputFormat to read/load
* GraphSON exports from older Titan versions (<= 0.5.x).
*/
def parse(line, factory) {
def slurper = new JsonSlurper()
def properties = slurper.parseText(line)
def outE = properties.remove("_outE")
def inE = properties.remove("_inE")
def vid = properties.remove("_id")
def vlabel = properties.remove("_label") ?: Vertex.DEFAULT_LABEL
def vertex = factory.vertex(vid, vlabel)
properties.each { def key, def value ->
vertex.property(key, value)
}
if (outE != null) {
outE.each { def e ->
def eid = e.remove("_id")
def elabel = e.remove("_label") ?: Edge.DEFAULT_LABEL
def inV = factory.vertex(e.remove("_inV"))
edge = factory.edge(vertex, inV, elabel)
e.each { def key, def value ->
edge.property(key, value)
}
}
}
if (inE != null) {
inE.each { def e ->
def eid = e.remove("_id")
def elabel = e.remove("_label")
def outV = factory.vertex(e.remove("_outV"))
edge = factory.edge(outV, vertex, elabel)
e.each { def key, def value ->
edge.property(key, value)
}
}
}
return vertex
}

0 comments on commit a68d4e6

Please sign in to comment.