The repo is to supplement the youtube video on PySpark for Glue. It includes a cloudfomation template which creates the s3 bucket, glue tables, IAM roles, and csv data files.
- Spin up resources using cloudformation template
- Add csv files to S3 bucket in relative folder location
- Create Glue notebook
- Execute PySpark code
Below are the schemas for the tables created in the Glue Data Catalog by the cloudformation template. They also include a small sampke of data to aid the explaination of the coding syntax.
Customerid | Firstname | Lastname | Fullname |
293 | Catherine | Abel | Catherine Abel |
295 | Kim | Abercrombie | Kim Abercrombie |
297 | Humberto | Acevedo | Humberto Acevedo |
SalesOrderID | SalesOrderDetailID | OrderDate | DueDate | ShipDate | EmployeeID | CustomerID | SubTotal | TaxAmt | Freight | TotalDue | ProductID | OrderQty | UnitPrice | UnitPriceDiscount | LineTotal |
71782 | 110667 | 5/1/2014 | 5/13/2014 | 5/8/2014 | 276 | 293 | 33319.986 | 3182.8264 | 994.6333 | 37497.4457 | 714 | 3 | 29.994 | 0 | 89.982 |
44110 | 1732 | 8/1/2011 | 8/13/2011 | 8/8/2011 | 277 | 295 | 16667.3077 | 1600.6864 | 500.2145 | 18768.2086 | 765 | 2 | 419.4589 | 0 | 838.9178 |
44131 | 2005 | 8/1/2011 | 8/13/2011 | 8/8/2011 | 275 | 297 | 20514.2859 | 1966.5222 | 614.5382 | 23095.3463 | 709 | 6 | 5.7 | 0 | 34.2 |
EmployeeID | ManagerID | FirstName | LastName | FullName | JobTitle | OrganizationLevel | MaritalStatus | Gender | Territory | Country | Group |
276 | 274 | Linda | Mitchell | Linda Mitchell | Sales Representative | 3 | M | F | Southwest | US | North America |
277 | 274 | Jillian | Carson | Jillian Carson | Sales Representative | 3 | S | F | Central | US | North America |
275 | 274 | Michael | Blythe | Michael Blythe | Sales Representative | 3 | S | M | Northeast | US | North America |
Create Resources using supplied cloud formation template see video
Upload csv folder/files to S3 bucket see video
Create Glue Notebook see videox
Read data from Customers Table using Notebook Using Dyanmic Frame
# Read from the customers table in the glue data catalog using a dynamic frame dynamicFrameCustomers = glueContext.create_dynamic_frame.from_catalog( database = "pyspark_tutorial_db", table_name = "customers" ) # Show the top 10 rows from the dyanmic dataframe
Check data types in Dynamic Frame
# Check types in dynamic frame dynamicFrameCustomers.printSchema()
Count The Number of Rows in a Dyanmic DataFrame
# Count The Number of Rows in a Dyanmic Dataframe dynamicFrameCustomers.count()
Select Fields From A Dynamic frame
# Selecting certain fields from a Dyanmic DataFrame dyfCustomerSelectFields = dynamicFrameCustomers.select_fields(["customerid", "fullname"]) # Show top 10
Drop Columns in a Dynamic Frame
#Drop Fields of Dynamic Frame dyfCustomerDropFields = dynamicFrameCustomers.drop_fields(["firstname","lastname"]) # Show Top 10 rows of dyfCustomerDropFields Dynamic Frame
Rename Columns in a Dynamic Frame
# Mapping array for column rename fullname -> name mapping=[("customerid", "long", "customerid","long"),("fullname", "string", "name", "string")] # Apply the mapping to rename fullname -> name dfyMapping = ApplyMapping.apply( frame = dyfCustomerDropFields, mappings = mapping, transformation_ctx = "applymapping1" ) # show the new dynamic frame with name column
Filter data in a Dynamic Frame
# Filter dynamicFrameCustomers for customers who have the last name Adams dyfFilter= Filter.apply(frame = dynamicFrameCustomers, f = lambda x: x["lastname"] in "Adams" ) # Show the top 10 customers from the filtered Dynamic frame
Join Two Dynamic frames on a equality join
- read up orders dynamic frame
# Read from the ccustomers table in the glue data catalog using a dynamic frame dynamicFrameOrders = glueContext.create_dynamic_frame.from_catalog( database = "pyspark_tutorial_db", table_name = "orders" ) # show top 10 rows of orders table
- join customers and orders dynamic frame
# Join wwo dynamic frames on an equality join dfyjoin = dynamicFrameCustomers.join(["customerid"],["customerid"],dynamicFrameOrders) # show top 10 rows for the joined dynamic
Write Down Data from a Dynamic Frame To S3
- Create a folder in the S3 bucket created by Cloudformation to use as a location to write the data down to
- Write down data to S3 using the dyanmic DataFrame writer class for an S3 path.
# write down the data in a Dynamic Frame to S3 location. glueContext.write_dynamic_frame.from_options( frame = dynamicFrameCustomers, connection_type="s3", connection_options = {"path": "s3://<YOUR_S£_BUCKET_NAME>/write_down_dyf_to_s3"}, format = "csv", format_options={ "separator": "," }, transformation_ctx = "datasink2")
Write Down Data from a Dynamic Frame using Glue Data Catalog
# write data from the dynamicFrameCustomers to customers_write_dyf table using the meta data stored in the glue data catalog glueContext.write_dynamic_frame.from_catalog( frame=dynamicFrameCustomers, database = "pyspark_tutorial_db", table_name = "customers_write_dyf" )
Convert from Dynamic Frame To Spark DataFrame
# Dynamic Frame to Spark DataFrame sparkDf = dynamicFrameCustomers.toDF() #show spark DF
Selecting Colunmns In a Spark DataFrame
# Select columns from spark dataframe dfSelect ="customerid","fullname") # show selected
Add Columns In A Spark Dataframe
- creating a new column with a literal string
#import lit from sql functions from pyspark.sql.functions import lit # Add new column to spark dataframe dfNewColumn = sparkDf.withColumn("date", lit("2022-07-24")) # show df with new column
- Using concat to concatonate two columns together
#import concat from functions from pyspark.sql.functions import concat # create another full name column dfNewFullName = sparkDf.withColumn("new_full_name",concat("firstname",concat(lit(' '),"lastname"))) #show full name column
- Dropping Columns
# Drop column from spark dataframe dfDropCol = sparkDf.drop("firstname","lastname") #show dropped column df
- Renaming columns
# Rename column in Spark dataframe dfRenameCol = sparkDf.withColumnRenamed("fullname","full_name") #show renamed column dataframe
- GroupBy and Aggregate Operations
# Group by lastname then print counts of lastaname and show sparkDf.groupBy("lastname").count().show()
- Filtering Columns and Where clauses
- Filter the spark dataframe
# Filter spark DataFrame for customers who have the last name Adams sparkDf.filter(sparkDf["lastname"] == "Adams").show()
- Where clause
# Where clause spark DataFrame for customers who have the last name Adams sparkDf.where("lastname =='Adams'").show()
- Joins
- read up orders dataset and convert to spark dataframe
# Read from the ccustomers table in the glue data catalog using a dynamic frame and convert to spark dataframe dfOrders = glueContext.create_dynamic_frame.from_catalog( database = "pyspark_tutorial_db", table_name = "orders" ).toDF()
- Inner join for Spark Dataframe All Data
# Inner Join Customers Spark DF to Orders Spark DF sparkDf.join(dfOrders,sparkDf.customerid == dfOrders.customerid,"inner").show(truncate=False)
- Inner Join Adams only
#Get customers that only have surname Adams dfAdams = sparkDf.where("lastname =='Adams'") # inner join on Adams DF and orders dfAdams.join(dfOrders,dfAdams.customerid == dfOrders.customerid,"inner").show()
- Left Join
#left join on orders and adams df dfOrders.join(dfAdams,dfAdams.customerid == dfOrders.customerid,"left").show(100)
- Writing data down using the Glue Data Catalog
- delete data from S3 in
- convert from spark Dataframe to Glue Dynamic DataFrame
# Import Dyanmic DataFrame class from awsglue.dynamicframe import DynamicFrame #Convert from Spark Data Frame to Glue Dynamic Frame dyfCustomersConvert = DynamicFrame.fromDF(sparkDf, glueContext, "convert") #Show converted Glue Dynamic Frame
- Write Dyanmic DataFrame down to S3 location
# write down the data in converted Dynamic Frame to S3 location. glueContext.write_dynamic_frame.from_options( frame = dyfCustomersConvert, connection_type="s3", connection_options = {"path": "s3://<YOUR_S3_BUCKET_NAME>/write_down_dyf_to_s3"}, format = "csv", format_options={ "separator": "," }, transformation_ctx = "datasink2")
- Write Dyanmic DataFrame using Glue Data Catalog
# write data from the converted to customers_write_dyf table using the meta data stored in the glue data catalog glueContext.write_dynamic_frame.from_catalog( frame = dyfCustomersConvert, database = "pyspark_tutorial_db", table_name = "customers_write_dyf")
