Skip to content

Latest commit

 

History

History
114 lines (74 loc) · 8.16 KB

README.md

File metadata and controls

114 lines (74 loc) · 8.16 KB

CO2 emissions Data Pipeline

This project builds a modern three-layer lakehouse and provides a apache superset deployment to visualize its data. Through the execution of each notebook you will analyze external data provided by the World Bank and monitor the change in CO2 emissions from cars sold within the EU. This project utilizes two separate datasets listed below.

The lakehouse will consist of 3 different layers-- raw, curated, and serve. Details of each layer and their tables and description are listed below.

The tools utilized are:

  • Apache Spark: To read the input data in CSV and JSON formats, and various most used functions to explore the data.
  • Apache Iceberg: File format to support the lakehouse architecture
  • StarRocks: As our sql engine to query the tables for visualization
  • Apache Superset: To visualize the data

Datasets

Data Layers

Raw: csv and json files loaded as iceberg format. One minor transformation on the csv's where we remove a trailing comma from the end of each line which was causing a new column to be generated by spark in the dataframe)

  • Tables:

    • raw.world_development_indicators.WDIData
    • raw.world_development_indicators.WDICountry-Series
    • raw.world_development_indicators.WDICountry
    • raw.world_development_indicators.WDIFootNote
    • raw.world_development_indicators.WDISeries
    • raw.co2_passenger_cars_emissions.co2_emissions_passenger_cars_2017
    • raw.co2_passenger_cars_emissions.co2_emissions_passenger_cars_2018
    • raw.co2_passenger_cars_emissions.co2_emissions_passenger_cars_2019

Curated: Apply different filters and data quality checks on the data

  • Quality Checks

    • Replace spaces in column names with underscores (“_”) for all DataFrames
    • Drop records that only consist of null values (records with null values on all columns)
    • Drop duplicate records
    • For the WDICountry.csv and WDIData.csv files, drop all records that have a country code (column: Country_Code) with a size other than three
    • For WDISeries.csv, drop all records that contain a space character (" ") in the Series_Code column
    • remove parentheses from column names
    • Drop all records that have a member state code size other than two (column: MS) and that contain any character other than uppercase letters in this column
  • Tables:

    • curated.world_development_indicators.country
    • curated.world_development_indicators.data
    • curated.world_development_indicators.series
    • curated.co2_passenger_cars_emissions (partitioned by year: year=2017, year=2018, year=2019)

Serve

  • wdi_serving.wdi_data_unpivoted derived from curated.world_development_indicators.data

    • Table that changed how the indicators’ values are represented by unpivoting the data. Prior to this tranformation, each year’s data is present in its own column (for example, the column named 1960 contains the data for that year). This is useful for some use cases, but for this first request, users want to have a couple of columns named "indicator_value" (with each row containing the value of the indicator for a specific year) and "year" (containing the year, which is currently stored in column names). Users can then leverage these two columns to build charts tracking specific indicators.
  • wdi_serving.partitioned_average_indicators derived from wdi_serving.wdi_data_unpivoted created above

    • Table that generates the average value for every indicator per country. This means that instead of having a separate value for each year, we just want the average value. This is done by aggregating the unpivoted DataFrame previously created according to this need and add a new column named “Indicator_Average_Value” that contains the average value for each indicator and country combination.
  • eea_serving.highest_emissions derived from curated.co2_passenger_cars_emissions

    • For the co2 emissions dataset, we take the curated co2 emissions data and retrieve the 100 vehicles with the highest CO2 emissions per member state per year. For the emissions measurement, we’ll rely on "New European Driving Cycle" (NEDC) data, so the column that interests us is Enedc_g/km
  • wdi_serving.countries_data drived from curated.world_development_indicators.data && curated.world_development_indicators.data && curated.world_development_indicators.data

    • The different datasets include some indicators that reference country groups or regions instead of countries. We will take those 3 datasets and generate a dataset containing the indicators that reference countries (and not country groups) while also adding some of the columns present in the countries dataset
  • eea_serving.emissions_diff_yoy derived from curated.co2_passenger_cars_emissions.year=2017&year=2018&year=2019

    • Table that contains the difference in emissions for a given year when compared to the previous year (based on the emissions value stored in Enedc_g/km) for every member state

wdi_serving.decade_level_datas derived from wdi_serving.wdi_data_unpivoted

  • Table with an added decade column to the unpivoted dataset, allowing end users to easily generate decade-level insights from the data. The values of the decade column would be structured as follows: the first year of the decade + the letter “s”. So, for the data corresponding to the years 1960 to 1969, the value of the decade column would be "1960s"

Setting this up locally

This project utilizes docker compose. Follow this link to install docker. Clone down the repository and run the following commands. You will first have to unzip the emissions_data.zip && world_bank_data.zip files within the /data directory.

To set up your pipeline environment. From the root of the directory run

docker compose up
  • This will set up the iceberg-spark project which includes a jupyter notebook running in localhost:8888. To view your spark UI navigate to localhost:8080 for the main executor dashboard. Or when you have a spark sessions started navigate to localhost:4040|4041|4042|4043 to view the specific session UI. The output of the port will be in the first cell of each notebook.
  • To view your minio dashboard navigate to localhost:9091 the default username and password is username: admin || password: password

The pipelines are separated by each layer generation: notebooks/raw, notebooks/curated, notebooks/serve run the notebooks in the jupyter localhost dashboard or through vscode with the kernel pointing to the running server at localhost:8888. At each file run run all.

  1. notebooks/raw/data-ingestion.ipynb
  2. notebooks/curated/data-curation.ipynb
  3. notebooks/curated/eea_curation.ipynb
  4. notebooks/serve/wdi-data-serve.ipynb
  5. notebooks/serve/wdi-denormalized.ipynb
  6. notebooks/serve/wdi-data-level-serve.ipynb

Once these are ran you should be able to view the iceberg files and their metadata within the warehouse and their cooresponding layers. Please note the catalog these are created in is iceberg.

To set up the Apache superset dashboard run the following.

From the /superset directory run:
docker compose -f docker-compose-non-dev.yml up

This will spin up the dashboard in localhost:8088. To log in the default credentials are username: admin | password: admin

You will need to connect the starrocks db to superset. In order to do this navigate to settings -> database connections -> + Database -> select starrocsk under "supported databases" -> enter this URI: starrocks://root:@host.docker.internal:9030/iceberg.wdi_serving

To add other databases follow the {catalog}.{database} naming convention after the trailing /. For certain dbs you will have to enter the full db path like iceberg.curated.world_development_indicators to gain access to the WDI curated layer.