Skip to content

Commit

Permalink
Merge pull request gorillalabs#49 from Dwarfartisan/develop
Browse files Browse the repository at this point in the history
add spark sql support
  • Loading branch information
chrisbetz authored Jan 6, 2017
2 parents 1d06a15 + 563a8a4 commit b959fed
Show file tree
Hide file tree
Showing 6 changed files with 847 additions and 3 deletions.
606 changes: 606 additions & 0 deletions data/euclid/elements.txt

Large diffs are not rendered by default.

149 changes: 149 additions & 0 deletions src/clojure/sparkling/sql.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
(ns sparkling.sql
"spark sql api for clojure. As sparkling.core, pass the sql context as last parameter.
Read json or write json like sparkling.core/text-file or save-as-text-file."
(:require [sparkling.function :as func])
(:import [org.apache.spark.sql SQLContext]
[org.apache.spark.sql functions]
[com.google.common.collect ImmutableMap]))

(defn sql-context
"create new SQLContext by spark context"
[sc]
(SQLContext. sc))

(defn sql
"give sql script then return data frame"
[code context]
(.sql context code))

(defn register-temp-table
"regist data frame as a tmp table named table-name and return data frame"
[table-name data-frame]
(.registerTempTable data-frame table-name)
data-frame)

(defn select
"DataFrame select, accept vector [column...]"
[cols data-frame]
(.select data-frame
(into-array cols)))

(defn cols
"create a vector for columns by given names"
[names data-frame]
(vec (map #(.col data-frame %) names)))

(defn selects
"call select by given names"
[columns data-frame]
(.select data-frame
(into-array (cols columns data-frame))))

(defn select-expr
"call select expr by given column expressions"
[expr data-frame]
(.selectExpr data-frame
(into-array expr)))

(defn where
"call where by "
[expression data-frame]
(.where data-frame expression))

(defn join
"DataFrame join use expression text"
[expression other data-frame]
(.join data-frame expression))

(defn join-on
"data frame inner join use one column name"
[column-name other data-frame]
(.join data-frame other
(.equalTo (.col data-frame column-name) (.col other column-name))))

(defn group-by
"DataFrame group by"
[cols data-frame]
(.groupBy data-frame (into-array cols)))

(defn group-by-cols
"DataFrame group by column names"
[columns data-frame]
(.groupBy data-frame (into-array (cols columns data-frame))))

(defn count
"grouped data count"
[cols grouped-data]
(.max grouped-data (into-array cols)))

(defn max
"grouped data max"
[cols grouped-data]
(.max grouped-data (into-array cols)))

(defn min
"grouped min"
[cols grouped-data]
(.min grouped-data (into-array cols)))

(defn sum
"grouped sum"
[cols grouped-data]
(.min grouped-data (into-array cols)))

(defn agg
"dataset agg"
[cols dataset]
(.agg dataset (ImmutableMap/copyOf (apply hash-map cols))))

(defn with-column-renamed
"DataFrame with column renamed."
[exist-name new-name data-frame]
(.withColumnRenamed data-frame exist-name new-name))

(defn order-by
"封装 DataFrame 的 orderBy 操作"
[cols data-frame]
(.orderBy data-frame (into-array cols)))

(defn order-by-cols
"order by columns named by the columns parameter"
[columns data-frame]
(.orderBy data-frame (into-array (cols columns data-frame))))

(defn register-udf1
"register a user defined function match java api udf1 return SQLContext."
[name func data-type sqlc]
(-> sqlc
.udf
(.register name (func/function func) data-type))
sqlc)

(defn register-udf2
"register a user defined function match java api udf2 return SQLContext."
[name func data-type sqlc]
(-> sqlc
.udf
(.register name (func/function2 func) data-type))
sqlc)

(defn json-rdd
"convert data frame as json java rdd"
[data-frame]
(-> data-frame
.toJSON
.toJavaRDD))

(defn read-json
"read json (file in path or a java rdd) as data-frame"
[sqlc data-source]
(-> sqlc
.read
(.json data-source)))

(defn write-json
"save data-frame as json file to path"
[data-source data-frame]
(-> data-frame
.write
(.json data-source)))
2 changes: 1 addition & 1 deletion src/java/sparkling/function/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import clojure.lang.IFn;

public class Function extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function {
public class Function extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function, org.apache.spark.sql.api.java.UDF1 {
public Function(IFn func) {
super(func);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/sparkling/function/Function2.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import clojure.lang.IFn;

public class Function2 extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function2 {
public class Function2 extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function2, org.apache.spark.sql.api.java.UDF2 {
public Function2(IFn func) {
super(func);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/sparkling/function/Function3.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import clojure.lang.IFn;

public class Function3 extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function3 {
public class Function3 extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function3, org.apache.spark.sql.api.java.UDF3 {
public Function3(IFn func) {
super(func);
}
Expand Down
89 changes: 89 additions & 0 deletions test/sparkling/sql/basic_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
(ns sparkling.sql.basic-test
(:require [clojure.java.io :as io]
[sparkling.conf :as conf]
[sparkling.core :as spark]
[sparkling.sql :as sql]
[clojure.test :refer :all])
(:import [org.apache.spark.sql functions]))

(deftest selects-0
(let [conf (-> (conf/spark-conf)
(conf/set "spark.kryo.registrator"
"sparkling.testutils.records.registrator.Registrator")
(conf/master "local[*]")
(conf/app-name "spark sql select columns by name test"))]
(spark/with-context sc conf
(testing
(is (= "{\"name\":\"C.N.1\",\"description\":\"Things which are equal to the same thing are also equal to one another.\"}"
(->> sc
sql/sql-context
(#(sql/read-json % (.getPath (io/resource "euclid/elements.txt"))))
(sql/selects ["name" "description"])
(sql/where "name='C.N.1'")
sql/json-rdd
(spark/take 1)
first)))))))

(deftest select-expr-test
(let [conf (-> (conf/spark-conf)
(conf/set "spark.kryo.registrator"
"sparkling.testutils.records.registrator.Registrator")
(conf/master "local[*]")
(conf/app-name "spark sql select expr test"))]
(spark/with-context sc conf
(testing
(is (= 5
(->> sc
sql/sql-context
(#(sql/read-json % (.getPath (io/resource "euclid/elements.txt"))))
(sql/select-expr ["substr(name, 0, 4) as category"])
(sql/where "category='C.N.'")
.count)))))))

(deftest select-group-agg
"test with select-expr, group by, agg, with column renamed, order by and json rdd"
(let [conf (-> (conf/spark-conf)
(conf/set "spark.kryo.registrator"
"sparkling.testutils.records.registrator.Registrator")
(conf/master "local[*]")
(conf/app-name "spark sql select expr test"))]
(spark/with-context sc conf
(testing
(is (= "{\"reference\":\"Prop.6.1\",\"referenced\":87}"
(->> sc
sql/sql-context
(#(sql/read-json % (.getPath (io/resource "euclid/elements.txt"))))
(sql/select-expr ["explode(references) as reference"])
(sql/group-by-cols ["reference"])
(sql/agg ["reference" "count"])
(sql/with-column-renamed "count(reference)" "referenced")
(sql/order-by [(functions/desc "referenced")])
sql/json-rdd spark/first)))))))

(deftest select-sql
"test with sql include select, group by, count, join, and register tmp table"
(let [conf (-> (conf/spark-conf)
(conf/set "spark.kryo.registrator"
"sparkling.testutils.records.registrator.Registrator")
(conf/master "local[*]")
(conf/app-name "spark sql select expr test"))]
(spark/with-context sc conf
(testing
(is (= "{\"name\":\"Prop.10.93\",\"references\":22}"
(let [sqlc (sql/sql-context sc)]
(->> sqlc
(#(sql/read-json % (.getPath (io/resource "euclid/elements.txt"))))
(sql/register-temp-table "elements"))
(->> sqlc
(sql/sql "select name, explode(references) as reference from elements")
(sql/group-by-cols ["name"])
(sql/agg ["reference" "count"])
(sql/with-column-renamed "count(reference)" "references")
(sql/register-temp-table "references"))
(->> sqlc
(sql/sql "select max(references) as count from references")
(sql/register-temp-table "element"))
(->> sqlc
(sql/sql "select references.name, element.count as references
from references join element on references.references = element.count")
sql/json-rdd spark/first))))))))

0 comments on commit b959fed

Please sign in to comment.