Skip to content

Commit b871c3b

Browse files
Initial Commit
1 parent 32169e0 commit b871c3b

File tree

6 files changed

+542010
-0
lines changed

6 files changed

+542010
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@
66
*/app-logs*
77
*__pycache__*
88

9+
*.crc
10+
*.parquet
11+
*/*/_SUCCESS

14-GroupingDemo/GroupingDemo.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql import functions as f
3+
4+
from lib.logger import Log4j
5+
6+
if __name__ == "__main__":
7+
spark = SparkSession \
8+
.builder \
9+
.appName("Agg Demo") \
10+
.master("local[2]") \
11+
.getOrCreate()
12+
13+
logger = Log4j(spark)
14+
15+
invoice_df = spark.read \
16+
.format("csv") \
17+
.option("header", "true") \
18+
.option("inferSchema", "true") \
19+
.load("data/invoices.csv")
20+
21+
NumInvoices = f.countDistinct("InvoiceNo").alias("NumInvoices")
22+
TotalQuantity = f.sum("Quantity").alias("TotalQuantity")
23+
InvoiceValue = f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValue")
24+
25+
exSummary_df = invoice_df \
26+
.withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm")) \
27+
.where("year(InvoiceDate) == 2010") \
28+
.withColumn("WeekNumber", f.weekofyear(f.col("InvoiceDate"))) \
29+
.groupBy("Country", "WeekNumber") \
30+
.agg(NumInvoices, TotalQuantity, InvoiceValue)
31+
32+
exSummary_df.coalesce(1) \
33+
.write \
34+
.format("parquet") \
35+
.mode("overwrite") \
36+
.save("output")
37+
38+
exSummary_df.sort("Country", "WeekNumber").show()

0 commit comments

Comments
 (0)