Skip to content

Commit

Permalink
Merge pull request #46 from bendominguez0111/thread-requests
Browse files Browse the repository at this point in the history
Added thread_requests parameter to import_pbp_data and import_weekly …
  • Loading branch information
cooperdff authored Sep 2, 2023
2 parents 1524703 + e73fa45 commit 9fc431a
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 55 deletions.
157 changes: 102 additions & 55 deletions nfl_data_py/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
name = 'nfl_data_py'

import pandas
import numpy
import datetime
import appdirs
import os

from concurrent.futures import ThreadPoolExecutor, as_completed

import appdirs
import numpy
import pandas
from typing import Iterable

# module level doc string
Expand Down Expand Up @@ -47,8 +50,15 @@
clean_nfl_data() - clean df by aligning common name diffs
"""


def import_pbp_data(years, columns=None, include_participation=True, downcast=True, cache=False, alt_path=None):
def import_pbp_data(
years,
columns=None,
include_participation=True,
downcast=True,
cache=False,
alt_path=None,
thread_requests=False
):
"""Imports play-by-play data
Args:
Expand All @@ -69,8 +79,9 @@ def import_pbp_data(years, columns=None, include_participation=True, downcast=Tr
if min(years) < 1999:
raise ValueError('Data not available before 1999.')

if columns is None:
if not columns:
columns = []

columns = [x for x in columns if x not in ['season']]

if all([include_participation, len(columns) != 0]):
Expand All @@ -81,56 +92,68 @@ def import_pbp_data(years, columns=None, include_participation=True, downcast=Tr
url2 = r'.parquet'
appname = 'nfl_data_py'
appauthor = 'cooper_dff'

plays = pandas.DataFrame()
pbp_data = []

if cache is True:

if alt_path is None:
if cache:
if not alt_path:
dpath = os.path.join(appdirs.user_cache_dir(appname, appauthor), 'pbp')
else:
dpath = alt_path

# read in pbp data
for year in years:
if cache is True:
seasonStr = f'season={year}'
if not os.path.isdir(os.path.join(dpath, seasonStr)):
raise ValueError(f'{year} cache file does not exist.')
for fname in filter(lambda x: seasonStr in x, os.listdir(dpath)):
folder = os.path.join(dpath, fname)
for file in os.listdir(folder):
if file.endswith(".parquet"):
fpath = os.path.join(folder, file)

# define path based on cache and alt_path variables
path = fpath
else:
path = url1 + str(year) + url2

# load data
try:
if len(columns) != 0:
data = pandas.read_parquet(path, columns=columns, engine='auto')
if thread_requests and not cache:
with ThreadPoolExecutor() as executor:
# Create a list of the same size as years, initialized with None
pbp_data = [None]*len(years)
# Create a mapping of futures to their corresponding index in the pbp_data
futures_map = {
executor.submit(
pandas.read_parquet,
path=url1 + str(year) + url2,
columns=columns if columns else None,
engine='auto'
): idx
for idx, year in enumerate(years)
}
for future in as_completed(futures_map):
pbp_data[futures_map[future]] = future.result()
else:
# read in pbp data
for year in years:
if cache:
seasonStr = f'season={year}'
if not os.path.isdir(os.path.join(dpath, seasonStr)):
raise ValueError(f'{year} cache file does not exist.')
for fname in filter(lambda x: seasonStr in x, os.listdir(dpath)):
folder = os.path.join(dpath, fname)
for file in os.listdir(folder):
if file.endswith(".parquet"):
fpath = os.path.join(folder, file)

# define path based on cache and alt_path variables
path = fpath
else:
data = pandas.read_parquet(path, engine='auto')

raw = pandas.DataFrame(data)
raw['season'] = year

if all([include_participation, year >= 2016, not cache]):
path = r'https://github.com/nflverse/nflverse-data/releases/download/pbp_participation/pbp_participation_{}.parquet'.format(year)
partic = pandas.read_parquet(path)
raw = raw.merge(partic, how='left', on=['play_id','old_game_id'])

pbp_data.append(raw)
print(str(year) + ' done.')

except:
print('Data not available for ' + str(year))

if len(pbp_data) > 0:
path = url1 + str(year) + url2

# load data
try:
data = pandas.read_parquet(path, columns=columns if columns else None, engine='auto')

raw = pandas.DataFrame(data)
raw['season'] = year

if all([include_participation, year >= 2016, not cache]):
path = r'https://github.com/nflverse/nflverse-data/releases/download/pbp_participation/pbp_participation_{}.parquet'.format(year)
partic = pandas.read_parquet(path)
raw = raw.merge(partic, how='left', on=['play_id','old_game_id'])

pbp_data.append(raw)
print(str(year) + ' done.')

except Error as e:
print(e)
print('Data not available for ' + str(year))

if pbp_data:
plays = pandas.concat(pbp_data).reset_index(drop=True)

# converts float64 to float32, saves ~30% memory
Expand Down Expand Up @@ -210,7 +233,12 @@ def cache_pbp(years, downcast=True, alt_path=None):
next


def import_weekly_data(years, columns=None, downcast=True):
def import_weekly_data(
years,
columns=None,
downcast=True,
thread_requests=False
):
"""Imports weekly player data
Args:
Expand All @@ -228,14 +256,33 @@ def import_weekly_data(years, columns=None, downcast=True):
if min(years) < 1999:
raise ValueError('Data not available before 1999.')

if columns is None:
if not columns:
columns = []

# read weekly data

url = r'https://github.com/nflverse/nflverse-data/releases/download/player_stats/player_stats_{0}.parquet'
data = pandas.concat([pandas.read_parquet(url.format(x), engine='auto') for x in years])

if len(columns) > 0:
if thread_requests:
with ThreadPoolExecutor() as executor:
# Create a list of the same size as years, initialized with None
data = [None]*len(years)
# Create a mapping of futures to their corresponding index in the data
futures_map = {
executor.submit(
pandas.read_parquet,
path=url.format(year),
columns=columns if columns else None,
engine='auto'
): idx
for idx, year in enumerate(years)
}
for future in as_completed(futures_map):
data[futures_map[future]] = future.result()
data = pandas.concat(data)
else:
# read weekly data
data = pandas.concat([pandas.read_parquet(url.format(x), engine='auto') for x in years])

if columns:
data = data[columns]

# converts float64 to float32, saves ~30% memory
Expand Down
11 changes: 11 additions & 0 deletions nfl_data_py/tests/nfl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ def test_is_df_with_data(self):
s = nfl.import_pbp_data([2020])
self.assertEqual(True, isinstance(s, pd.DataFrame))
self.assertTrue(len(s) > 0)

def test_is_df_with_data_thread_requests(self):
s = nfl.import_pbp_data([2020, 2021], thread_requests=True)
self.assertEqual(True, isinstance(s, pd.DataFrame))
self.assertTrue(len(s) > 0)


def test_uses_cache_when_cache_is_true(self):
cache = Path(__file__).parent/"tmpcache"
Expand All @@ -32,6 +38,11 @@ def test_is_df_with_data(self):
s = nfl.import_weekly_data([2020])
self.assertEqual(True, isinstance(s, pd.DataFrame))
self.assertTrue(len(s) > 0)

def test_is_df_with_data_thread_requests(self):
s = nfl.import_weekly_data([2020, 2021], thread_requests=True)
self.assertEqual(True, isinstance(s, pd.DataFrame))
self.assertTrue(len(s) > 0)

class test_seasonal(TestCase):
def test_is_df_with_data(self):
Expand Down

0 comments on commit 9fc431a

Please sign in to comment.