Skip to content

Commit

Permalink
Import title counts
Browse files Browse the repository at this point in the history
  • Loading branch information
thcrock committed Feb 3, 2017
1 parent 58773bb commit da60684
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 2 deletions.
1 change: 1 addition & 0 deletions api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@
api.add_resource(AssociatedJobsForSkillEndpoint, '/skills/<string:id>/related_jobs')
api.add_resource(AssociatedJobsForJobEndpoint, '/jobs/<string:id>/related_jobs')
api.add_resource(AssociatedSkillForSkillEndpoint, '/skills/<string:id>/related_skills')
api.add_resource(TitleCountsEndpoint, '/title_counts')
131 changes: 131 additions & 0 deletions api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from . models.skills_importance import SkillImportance
from . models.geographies import Geography
from . models.jobs_importance import JobImportance
from . models.geo_title_count import GeoTitleCount
from . models.title_count import TitleCount
from collections import OrderedDict

# Pagination Control Parameters
Expand Down Expand Up @@ -826,3 +828,132 @@ def get(self):
return create_response(all_jobs, 200)
else:
return create_error('No jobs were found', 404)


class TitleCountsEndpoint(Resource):
"""All Jobs Endpoint Class"""

def get(self):
"""GET operation for the endpoint class.
Returns:
A collection of jobs.
Notes:
The endpoint supports pagination.
"""

args = request.args
limit, offset = get_limit_and_offset(args)

all_jobs = []
links = OrderedDict()
links['links'] = []


if args is not None:
geography = None
if 'fips' in args.keys():
fips = args['fips']
geography = Geography.query.filter_by(
geography_type = 'CBSA',
geography_name = fips
).first()
if geography is None:
return create_error('Core-Based Statistical Area FIPS code not found', 404)
base_stmt = '''select
tc.job_title,
tc.job_uuid,
round(avg(gtc.count), 2)
from
title_counts tc
join geo_title_counts gtc using (job_uuid, quarter_id)
where geography_id = %(geography_id)s
group by 1, 2 order by 3 desc
'''
job_results = db.engine.execute(
'''{}
limit %(limit)s
offset %(offset)s
'''.format(base_stmt),
geography_id=geography.geography_id,
limit=limit,
offset=offset
)
rows = [row[0] for row in db.engine.execute(
'select count(*) from ({}) q'.format(base_stmt),
geography_id=geography.geography_id,
)][0]
else:
base_stmt = '''select
tc.job_title,
tc.job_uuid,
round(avg(tc.count), 2)
from title_counts tc
group by 1, 2 order by 3 desc
'''
job_results = db.engine.execute(
'''{}
limit %(limit)s
offset %(offset)s
'''.format(base_stmt),
limit=limit,
offset=offset
)
rows = [row[0] for row in db.engine.execute(
'select count(*) from ({}) q'.format(base_stmt)
)][0]


# compute pages
url_link = '/title_counts?offset={}&limit={}'
custom_headers = []
custom_headers.append('X-Total-Count = ' + str(rows))

total_pages = int(math.ceil(rows / limit))
current_page = compute_page(offset, limit)
first = OrderedDict()
prev = OrderedDict()
next = OrderedDict()
last = OrderedDict()
current = OrderedDict()

current['rel'] = 'self'
current['href'] = url_link.format(str(offset), str(limit))
links['links'].append(current)

first['rel'] = 'first'
first['href'] = url_link.format(str(compute_offset(1, limit)), str(limit))
links['links'].append(first)

if current_page > 1:
prev['rel'] = 'prev'
prev['href'] = url_link.format(str(compute_offset(current_page - 1, limit)), str(limit))
links['links'].append(prev)

if current_page < total_pages:
next['rel'] = 'next'
next['href'] = url_link.format(str(compute_offset(current_page + 1, limit)), str(limit))
links['links'].append(next)

last['rel'] = 'last'
last['href'] = url_link.format(str(compute_offset(total_pages, limit)), str(limit))
links['links'].append(last)

if job_results is not None:
for job in job_results:
title, job_uuid, count = job
job_response = OrderedDict()
job_response['uuid'] = job_uuid
job_response['title'] = title
job_response['count'] = float(count)
job_response['normalized_job_title'] = None
job_response['parent_uuid'] = None
all_jobs.append(job_response)

all_jobs.append(links)

return create_response(all_jobs, 200, custom_headers)
else:
return create_error('No jobs were found', 404)
18 changes: 18 additions & 0 deletions api/v1/models/geo_title_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-

from app.app import db


class GeoTitleCount(db.Model):
__tablename__ = 'geo_title_counts'

quarter_id = db.Column(db.SmallInteger, db.ForeignKey('quarters.quarter_id'), primary_key=True)
geography_id = db.Column(db.SmallInteger, db.ForeignKey('geographies.geography_id'), primary_key=True)
job_uuid = db.Column(db.String, primary_key=True)
job_title = db.Column(db.String)
count = db.Column(db.Integer)

def __repr__(self):
return '<GeoTitleCount {}/{}/{}: {}>'.format(
self.geography_id, self.quarter_id, self.job_uuid, self.count
)
17 changes: 17 additions & 0 deletions api/v1/models/title_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-

from app.app import db


class TitleCount(db.Model):
__tablename__ = 'title_counts'

job_uuid = db.Column(db.String, primary_key=True)
quarter_id = db.Column(db.SmallInteger, db.ForeignKey('quarters.quarter_id'), primary_key=True)
job_title = db.Column(db.String)
count = db.Column(db.Integer)

def __repr__(self):
return '<TitleCount {}/{}>'.format(
self.quarter_id, self.job_uuid, self.count
)
124 changes: 122 additions & 2 deletions etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
"""

import csv
import re
import os
import sys
import hashlib
import uuid
import time

from api.v1.models.skills_master import SkillMaster
from api.v1.models.jobs_master import JobMaster
Expand All @@ -22,6 +22,8 @@
from api.v1.models.quarters import Quarter
from api.v1.models.geographies import Geography
from api.v1.models.jobs_importance import JobImportance
from api.v1.models.geo_title_count import GeoTitleCount
from api.v1.models.title_count import TitleCount


from app.app import app, db
Expand All @@ -33,6 +35,7 @@
# Add the SQLAlchemy migration utility
manager.add_command('db', MigrateCommand)


@manager.command
def load_skills_master():
""" Loads the skills_master table """
Expand Down Expand Up @@ -332,6 +335,121 @@ def load_jobs_importances():
print 'Load complete'


def load_geo_quarter_title_counts(filename, year, quarter):
""" Loads the geo_title_counts table """

before_time = time.time()
# Hardcoding quarter for now
quarter_record = Quarter.query.filter_by(year=year, quarter=quarter).first()
if not quarter_record:
quarter_record = Quarter(year=year, quarter=quarter)
db.session.add(quarter_record)
db.session.commit()

quarter_id = quarter_record.quarter_id

with open(os.path.join('etl/stage_1', filename), 'r') as f:
reader = csv.reader(f)
records = []
for row in reader:
if len(row) < 3:
print 'Skipping', row, 'with not enough data'
continue
else:
cbsa, title, count = row

if cbsa == '' or title == '':
print 'Skipping', row, 'with invalid data'
continue

kwargs = {
'geography_name': cbsa,
'geography_type': 'CBSA'
}
geography = Geography.query.filter_by(**kwargs).first()
if not geography:
geography = Geography(**kwargs)
db.session.add(geography)
db.session.commit()

job_title_uuid = str(hashlib.md5(title).hexdigest())
records.append(GeoTitleCount(
job_uuid=job_title_uuid,
job_title=title,
quarter_id=quarter_id,
geography_id=geography.geography_id,
count=count
))
print 'Saving objects'
db.session.bulk_save_objects(records)
db.session.commit()
after_time = time.time()
print 'Load complete', str(after_time - before_time), 'seconds'


def load_quarter_title_counts(filename, year, quarter):
""" Loads the title_counts table """

# Hardcoding quarter for now
quarter_record = Quarter.query.filter_by(year=year, quarter=quarter).first()
if not quarter_record:
quarter_record = Quarter(year=year, quarter=quarter)
db.session.add(quarter_record)
db.session.commit()

quarter_id = quarter_record.quarter_id

with open(os.path.join('etl/stage_1', filename), 'r') as f:
reader = csv.reader(f)
records = []
for row in reader:
if len(row) < 2:
print 'Skipping', row, 'with not enough data'
continue
else:
title, count = row

if title == '':
print 'Skipping', row, 'with invalid data'
continue

job_title_uuid = str(hashlib.md5(title).hexdigest())
records.append(TitleCount(
job_uuid=job_title_uuid,
job_title=title,
quarter_id=quarter_id,
count=count
))
print 'Saving objects'
db.session.bulk_save_objects(records)
db.session.commit()
print 'Load complete'


@manager.command
def load_all_geo_title_counts():
for filename in os.listdir('etl/stage_1/'):
print 'checking', filename
match = re.match(r"output_geo_title_count_(?P<year>\d{4})Q(?P<quarter>\d).csv", filename)
if match:
print 'loading', filename
year = match.group('year')
quarter = match.group('quarter')
load_geo_quarter_title_counts(filename, year, quarter)


@manager.command
def load_all_title_counts():
for filename in os.listdir('etl/stage_1/'):
print 'checking', filename
match = re.match(r"output_title_count_(?P<year>\d{4})Q(?P<quarter>\d).csv", filename)
if match:
print 'loading', filename
year = match.group('year')
quarter = match.group('quarter')
load_quarter_title_counts(filename, year, quarter)


@manager.command
def load_all_tables():
""" Load all tables in sequence """
Expand All @@ -342,6 +460,8 @@ def load_all_tables():
#load_skills_related()
load_jobs_skills()
load_jobs_importances()
load_all_geo_title_counts()
load_all_title_counts()

if __name__ == '__main__':
manager.run()
3 changes: 3 additions & 0 deletions etl/etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ for path in ${S3_PATHS[@]}; do
echo ""
aws s3 cp s3://$path $STAGE_1/.
done
aws s3 cp s3://skills-public/pipeline/tables/ $STAGE_1/ --recursive --exclude "*" --include "output_geo_title_count_*"
aws s3 cp s3://skills-public/pipeline/tables/ $STAGE_1/ --recursive --exclude "*" --include "output_title_count_*"

echo "Done"


Expand Down
42 changes: 42 additions & 0 deletions migrations/versions/89028ebc40d1_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""empty message
Revision ID: 89028ebc40d1
Revises: 465785295fcb
Create Date: 2017-01-31 16:01:04.425086
"""

# revision identifiers, used by Alembic.
revision = '89028ebc40d1'
down_revision = '465785295fcb'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_table(
'title_counts',
sa.Column('quarter_id', sa.SmallInteger(), nullable=False),
sa.Column('job_uuid', sa.String(), nullable=False),
sa.Column('job_title', sa.String(), nullable=True),
sa.Column('count', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['quarter_id'], ['quarters.quarter_id'], ),
sa.PrimaryKeyConstraint('quarter_id', 'job_uuid')
)
op.create_table(
'geo_title_counts',
sa.Column('quarter_id', sa.SmallInteger(), nullable=False),
sa.Column('geography_id', sa.SmallInteger(), nullable=False),
sa.Column('job_uuid', sa.String(), nullable=False),
sa.Column('job_title', sa.String(), nullable=True),
sa.Column('count', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['geography_id'], ['geographies.geography_id'], ),
sa.ForeignKeyConstraint(['quarter_id'], ['quarters.quarter_id'], ),
sa.PrimaryKeyConstraint('quarter_id', 'geography_id', 'job_uuid')
)


def downgrade():
op.drop_table('geo_title_counts')
op.drop_table('title_counts')

0 comments on commit da60684

Please sign in to comment.