|
| 1 | +from pyspark.sql import SparkSession |
| 2 | +from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType |
| 3 | + |
| 4 | +from lib.logger import Log4j |
| 5 | + |
| 6 | +if __name__ == "__main__": |
| 7 | + spark = SparkSession \ |
| 8 | + .builder \ |
| 9 | + .master("local[3]") \ |
| 10 | + .appName("SparkSchemaDemo") \ |
| 11 | + .getOrCreate() |
| 12 | + |
| 13 | + logger = Log4j(spark) |
| 14 | + |
| 15 | + flightSchemaStruct = StructType([ |
| 16 | + StructField("FL_DATE", DateType()), |
| 17 | + StructField("OP_CARRIER", StringType()), |
| 18 | + StructField("OP_CARRIER_FL_NUM", IntegerType()), |
| 19 | + StructField("ORIGIN", StringType()), |
| 20 | + StructField("ORIGIN_CITY_NAME", StringType()), |
| 21 | + StructField("DEST", StringType()), |
| 22 | + StructField("DEST_CITY_NAME", StringType()), |
| 23 | + StructField("CRS_DEP_TIME", IntegerType()), |
| 24 | + StructField("DEP_TIME", IntegerType()), |
| 25 | + StructField("WHEELS_ON", IntegerType()), |
| 26 | + StructField("TAXI_IN", IntegerType()), |
| 27 | + StructField("CRS_ARR_TIME", IntegerType()), |
| 28 | + StructField("ARR_TIME", IntegerType()), |
| 29 | + StructField("CANCELLED", IntegerType()), |
| 30 | + StructField("DISTANCE", IntegerType()) |
| 31 | + ]) |
| 32 | + |
| 33 | + flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING, |
| 34 | + ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT, |
| 35 | + WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT""" |
| 36 | + |
| 37 | + flightTimeCsvDF = spark.read \ |
| 38 | + .format("csv") \ |
| 39 | + .option("header", "true") \ |
| 40 | + .schema(flightSchemaStruct) \ |
| 41 | + .option("mode", "FAILFAST") \ |
| 42 | + .option("dateFormat", "M/d/y") \ |
| 43 | + .load("data/flight*.csv") |
| 44 | + |
| 45 | + flightTimeCsvDF.show(5) |
| 46 | + logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString()) |
| 47 | + |
| 48 | + flightTimeJsonDF = spark.read \ |
| 49 | + .format("json") \ |
| 50 | + .schema(flightSchemaDDL) \ |
| 51 | + .option("dateFormat", "M/d/y") \ |
| 52 | + .load("data/flight*.json") |
| 53 | + |
| 54 | + flightTimeJsonDF.show(5) |
| 55 | + logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString()) |
| 56 | + |
| 57 | + flightTimeParquetDF = spark.read \ |
| 58 | + .format("parquet") \ |
| 59 | + .load("data/flight*.parquet") |
| 60 | + |
| 61 | + flightTimeParquetDF.show(5) |
| 62 | + logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString()) |
0 commit comments