Skip to content

Commit

Permalink
Update spark-script.py
Browse files Browse the repository at this point in the history
Added some more examples of using Spark and MongoDB - SparkSQL and using the agg framework
  • Loading branch information
RWaltersMA authored Dec 21, 2020
1 parent 6e98b60 commit ba847fc
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion spark-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,32 @@
config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
getOrCreate()

#reading dataframes from MongoDB
df = spark.read.format("mongo").load()

df.printSchema()

#let's change the data type to a timestamp
df = df.withColumn("tx_time", df.tx_time.cast("timestamp"))

#Here we are calculating a moving average
from pyspark.sql.window import Window
from pyspark.sql import functions as F

movAvg = df.withColumn("movingAverage", F.avg("price").over( Window.partitionBy("company_symbol").rowsBetween(-1,1)) )

movAvg.show()

#Saving Dataframes to MongoDB
movAvg.write.format("mongo").option("replaceDocument", "true").mode("append").save()

#Reading Dataframes from the Aggregation Pipeline in MongoDB
pipeline = "[{'$group': {_id:'$company_name', 'maxprice': {$max:'$price'}}},{$sort:{'maxprice':-1}}]"
aggPipelineDF = spark.read.format("mongo").option("pipeline", pipeline).load()
aggPipelineDF.show()

#using SparkSQL with MongoDB
movAvg.createOrReplaceTempView("avgs")

sqlDF=spark.sql("SELECT * FROM avgs WHERE movingAverage > 43.0")

sqlDF.show()

0 comments on commit ba847fc

Please sign in to comment.