Skip to content

ksree/world-air-quality

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

84 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

License Java CI with Maven

world-air-quality

A project to read and aggregate world air quality data Build and package code

cd $HOME/world-air-aq/
mvn package -DskipTests

Create BigQuery Dataset and temporary GCS for intermidate storage

export PROJECT_ID="kapilsreed12-1dataflow"
export GCS_TEMPORARY_BUCKET="${PROJECT_ID}-openairaq-temp-bucket"
export STORAGE_CLASS=standard
export GCP_REGION=us-east1

gsutil mb -c $STORAGE_CLASS  gs://$GCS_TEMPORARY_BUCKET

bq --location=$GCP_REGION mk \
--dataset \
--description 'Open Air Quality' \
 "${PROJECT_ID}:OpenAirAQ"

application.conf

WS_ACCESS_KEY="Add Your Key here"
AWS_SECRET_KEY="Your Secret "
AWS_BUCKET_NAME="openaq-fetches"
AWS_BUCKET_PREFIX="realtime-gzipped"
GCS_TEMPORARY_BUCKET="your-gcs-temp-bucket"
BIGQUERY_TABLE_NAME="yourprojectname:OpenAirAQ.pm25_global"
startDate="2019-01-01"   -- Start Date OpenAQ dataset
endDate="2019-12-31"      -- End Date OpenAQ dataset
applyAggregations="true"  -- true= applies pm2.5 aggreagation, false= loads openaq data as is into BigQuery. 

Execute Dataproc job

export GCP_REGION=us-east1

gcloud dataproc jobs submit spark \
--cluster=cluster-3e8d  \
--region=$GCP_REGION \
--class=com.ksr.air.Run \
--files=/home/kapilsreed12/application.conf \
--jars=/home/kapilsreed12/world-air-quality/target/world-air-quality-1.0-SNAPSHOT.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
-- application.conf

To perform aggregations in BigQuery(rather than Spark), set applyAggregations= false in application config.

SQL to perform monthly and yearly aggreagation on BigQuery:


 -- Calculate Monthly Average, with pivoting
 WITH
  MonthlyAVG AS(
SELECT  city, 
  year,
  MAX(IF(month = 1, monthly_avg, NULL)) AS `JAN`,
  MAX(IF(month = 2, monthly_avg, NULL)) AS `FEB`,
  MAX(IF(month = 3, monthly_avg, NULL)) AS `MARCH`,
  MAX(IF(month = 4, monthly_avg, NULL)) AS `APRIL`,
  MAX(IF(month = 5, monthly_avg, NULL)) AS `MAY`, 
  MAX(IF(month = 6, monthly_avg, NULL)) AS `JUNE`,
  MAX(IF(month = 7, monthly_avg, NULL)) AS `JULY`,
  MAX(IF(month = 8, monthly_avg, NULL)) AS `AUGUST`,
  MAX(IF(month = 9, monthly_avg, NULL)) AS `SEPT`,
  MAX(IF(month = 10, monthly_avg, NULL)) AS `OCT`,
  MAX(IF(month = 11, monthly_avg, NULL)) AS `NOV`,
  MAX(IF(month = 12, monthly_avg, NULL)) AS `DEC`
  FROM (
    SELECT  city, month, year,
    COUNT(*) OVER(PARTITION BY city, year) as no_readings_per_yr,
    TRUNC(AVG(value),2)  as monthly_avg  FROM `kapilsreed12-1dataflow.OpenAirAQ.pm25_global_og`
    GROUP BY city, month, year
    ORDER BY city, month, year
  ) WHERE no_readings_per_yr > 4
  GROUP BY city, year
  )

  --Calculate yearly avg
  SELECT
    YearlyAVG.city,
    YearlyAVG.parameter,	
    YearlyAVG.latitude,	
    YearlyAVG.longitude,	
    YearlyAVG.country,	
    YearlyAVG.unit,	
    YearlyAVG.year,	
    YearlyAVG.yearly_avg,
    MonthlyAVG.JAN,
    MonthlyAVG.FEB,
    MonthlyAVG.MARCH,
    MonthlyAVG.APRIL,
    MonthlyAVG.MAY,
    MonthlyAVG.JUNE,
    MonthlyAVG.JULY,
    MonthlyAVG.AUGUST,
    MonthlyAVG.SEPT,
    MonthlyAVG.OCT,
    MonthlyAVG.NOV,
    MonthlyAVG.DEC
  FROM ( SELECT * FROM (
    SELECT
      city,
      parameter,
      coordinates.latitude AS latitude,
      coordinates.longitude AS longitude,
      country,
      sourceName,
      sourceType,
      unit,
      month,
      year,
      TRUNC(AVG(value) OVER(PARTITION BY year, city) , 2) AS yearly_avg,
      ROW_NUMBER() OVER(PARTITION BY year, city) AS row_no
    FROM
      `kapilsreed12-1dataflow.OpenAirAQ.pm25_global_og` ) WHERE row_no = 1) AS YearlyAVG
  RIGHT JOIN MonthlyAVG 
  ON YearlyAVG.city = MonthlyAVG.city 
  AND YearlyAVG.year = MonthlyAVG.year
  ORDER BY year, YearlyAVG.yearly_avg DESC;

About

A project to read and aggregate world air quality data

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published