Skip to content

Commit efdcc5b

Browse files
Initial Commit
1 parent 9a660cf commit efdcc5b

10 files changed

+941070
-0
lines changed

20-BucketJoinDemo/BucketJoinDemo.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from pyspark.sql import SparkSession
2+
3+
from lib.logger import Log4j
4+
5+
if __name__ == "__main__":
6+
spark = SparkSession \
7+
.builder \
8+
.appName("Bucket Join Demo") \
9+
.master("local[3]") \
10+
.enableHiveSupport() \
11+
.getOrCreate()
12+
13+
logger = Log4j(spark)
14+
df1 = spark.read.json("data/d1/")
15+
df2 = spark.read.json("data/d2/")
16+
# df1.show()
17+
# df2.show()
18+
'''
19+
spark.sql("CREATE DATABASE IF NOT EXISTS MY_DB")
20+
spark.sql("USE MY_DB")
21+
22+
df1.coalesce(1).write \
23+
.bucketBy(3, "id") \
24+
.mode("overwrite") \
25+
.saveAsTable("MY_DB.flight_data1")
26+
27+
df2.coalesce(1).write \
28+
.bucketBy(3, "id") \
29+
.mode("overwrite") \
30+
.saveAsTable("MY_DB.flight_data2")
31+
'''
32+
33+
df3 = spark.read.table("MY_DB.flight_data1")
34+
df4 = spark.read.table("MY_DB.flight_data2")
35+
36+
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
37+
join_expr = df3.id == df4.id
38+
join_df = df3.join(df4, join_expr, "inner")
39+
40+
join_df.collect()
41+
input("press a key to stop...")
42+
43+
44+
45+
46+
47+
48+
49+
50+
51+
52+
53+
54+
55+
56+
57+

20-BucketJoinDemo/data/d1/part-00000-00af64b6-7ef5-4909-8f82-b8897114efaf-c000.json

Lines changed: 172660 additions & 0 deletions
Large diffs are not rendered by default.

20-BucketJoinDemo/data/d1/part-00001-00af64b6-7ef5-4909-8f82-b8897114efaf-c000.json

Lines changed: 171572 additions & 0 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)