Skip to content

Commit 14eadaf

Browse files
authored
Merge branch 'dClimate:master' into master
2 parents 0aaf1b0 + fbe2148 commit 14eadaf

File tree

6 files changed

+175
-29
lines changed

6 files changed

+175
-29
lines changed

dweather_client/aliases_and_units.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
"fraction": u.dimensionless_unscaled,
2727
"percentage": u.pct,
2828
"mbar": u.def_unit('mbar', u.bar / 1000),
29+
"m**3/s": u.m**3 / u.s,
30+
"m^3/s": u.m**3 / u.s,
2931
}
3032

3133
METRIC_TO_IMPERIAL = {
@@ -34,14 +36,17 @@
3436
u.deg_C: lambda q: q.to(imperial.deg_F, equivalencies=u.temperature()),
3537
u.K: lambda q: q.to(imperial.deg_F, equivalencies=u.temperature()),
3638
u.kg / u.m**2: lambda q: q.to(imperial.pound / imperial.ft ** 2),
37-
u.m / u.s: lambda q: q.to(imperial.mile / u.hour)
39+
u.m / u.s: lambda q: q.to(imperial.mile / u.hour),
40+
u.m**3 / u.s: lambda q: q.to(imperial.yard**3 / u.s)
41+
3842
}
3943

4044
IMPERIAL_TO_METRIC = {
4145
imperial.inch: lambda q: q.to(u.mm),
4246
imperial.deg_F: lambda q: q.to(u.deg_C, equivalencies=u.temperature()),
4347
imperial.pound / imperial.ft ** 2: lambda q: q.to(u.kg / u.m**2),
44-
imperial.mile / u.hour: lambda q: q.to(u.m / u.s)
48+
imperial.mile / u.hour: lambda q: q.to(u.m / u.s),
49+
imperial.yard**3 / u.s: lambda q: q.to(u.m**3 / u.s)
4550
}
4651

4752
STATION_ALIASES_TO_COLUMNS = {
@@ -214,6 +219,8 @@ def lookup_station_alias(alias):
214219
'imperialize': lambda m: m.to(imperial.deg_F, equivalencies=u.temperature())},
215220
'WSF5': {'vectorize': lambda q: (q/10.0) * u.m/u.s,
216221
'imperialize': lambda q: q.to(imperial.mile/u.hour)},
222+
'FLOWRATE': {'vectorize': lambda q: u.m**3/u.s,
223+
'imperialize': lambda q: q.to(imperial.yard**3/u.s)},
217224
}
218225

219226
parent_dir = os.path.dirname(os.path.abspath(__file__))

dweather_client/client.py

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@
99
from dweather_client.struct_utils import tupleify, convert_nans_to_none
1010
import datetime
1111
import pytz
12-
import csv, json
12+
import csv
13+
import json
1314
import inspect
1415
import numpy as np
1516
import pandas as pd
1617
from astropy import units as u
1718
from timezonefinder import TimezoneFinder
1819
from dweather_client import gridded_datasets
1920
from dweather_client.storms_datasets import IbtracsDataset, AtcfDataset, SimulatedStormsDataset
20-
from dweather_client.ipfs_queries import AustraliaBomStations, CedaBiomass, CmeStationsDataset, DutchStationsDataset, DwdStationsDataset, DwdHourlyStationsDataset, GlobalHourlyStationsDataset, JapanStations, StationDataset,\
21+
from dweather_client.ipfs_queries import AustraliaBomStations, CedaBiomass, CmeStationsDataset, DutchStationsDataset, DwdStationsDataset, DwdHourlyStationsDataset, GlobalHourlyStationsDataset, JapanStations, StationDataset, EauFranceDataset,\
2122
YieldDatasets, FsaIrrigationDataset, AemoPowerDataset, AemoGasDataset, AesoPowerDataset, ForecastDataset, AfrDataset, DroughtMonitor, CwvStations, SpeedwellStations, TeleconnectionsDataset, CsvStationDataset, StationForecastDataset
2223
from dweather_client.slice_utils import DateRangeRetriever, has_changed
2324
from dweather_client.ipfs_errors import *
@@ -246,6 +247,7 @@ def get_tropical_storms(
246247
min_lon=None,
247248
max_lat=None,
248249
max_lon=None,
250+
as_of=None,
249251
ipfs_timeout=None):
250252
"""
251253
return:
@@ -288,11 +290,11 @@ def get_tropical_storms(
288290

289291
with cm as storm_getter:
290292
if radius:
291-
return storm_getter.get_data(basin, radius=radius, lat=lat, lon=lon)
293+
return storm_getter.get_data(basin, radius=radius, lat=lat, lon=lon, as_of=as_of)
292294
elif min_lat:
293-
return storm_getter.get_data(basin, min_lat=min_lat, min_lon=min_lon, max_lat=max_lat, max_lon=max_lon)
295+
return storm_getter.get_data(basin, min_lat=min_lat, min_lon=min_lon, max_lat=max_lat, max_lon=max_lon, as_of=as_of)
294296
else:
295-
return storm_getter.get_data(basin)
297+
return storm_getter.get_data(basin, as_of=as_of)
296298

297299

298300
def get_station_history(
@@ -485,6 +487,7 @@ def get_hourly_station_history(dataset, station_id, weather_variable, use_imperi
485487
v) for k, v in final_resp_series.to_dict().items()}
486488
return result
487489

490+
488491
def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_units=True, desired_units=None, ipfs_timeout=None):
489492
"""
490493
This is almost an exact copy of get_hourly_station_history
@@ -543,14 +546,26 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_
543546
"Invalid weather variable for this station")
544547

545548
try:
546-
if dataset == "inmet_brazil-hourly":
549+
# RawSet style where we only want the most recent file
550+
if dataset in ["inmet_brazil-hourly"]:
547551
with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj:
548-
csv_text = dataset_obj.get_data(station_id, weather_variable)
552+
csv_text_list = [dataset_obj.get_data(
553+
station_id, weather_variable)]
554+
# ClimateSet style where we need the entire linked list history
555+
elif dataset in ["ne_iso-hourly"]:
556+
with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj:
557+
csv_text_list = dataset_obj.get_data_recursive(
558+
station_id, weather_variable)
549559
else:
550560
raise DatasetError("No such dataset in dClimate")
551561
except ipfshttpclient.exceptions.ErrorResponse:
552562
raise StationNotFoundError("Invalid station ID for dataset")
553-
df = pd.read_csv(StringIO(csv_text))
563+
564+
# concat together all retrieved station csv texts
565+
dfs = []
566+
for csv_text in csv_text_list:
567+
dfs.append(pd.read_csv(StringIO(csv_text)))
568+
df = pd.concat(dfs, ignore_index=True)
554569
str_resp_series = df[column_name].astype(str)
555570
df = df.set_index("dt")
556571
if desired_units:
@@ -579,18 +594,21 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_
579594
v) for k, v in final_resp_series.to_dict().items()}
580595
return result
581596

597+
582598
def get_station_forecast_history(dataset, station_id, forecast_date, desired_units=None, ipfs_timeout=None):
583-
try:
599+
try:
584600
with StationForecastDataset(dataset, ipfs_timeout=ipfs_timeout) as dataset_obj:
585601
csv_text = dataset_obj.get_data(station_id, forecast_date)
586602
history = {}
587603
reader = csv.reader(csv_text.split('\n'))
588604
headers = next(reader)
589605
date_col = headers.index('DATE')
590606
try: # Make sure weather variable is correct.
591-
data_col = headers.index("SETT") #at the moment the only variable is "SETT"
607+
# at the moment the only variable is "SETT"
608+
data_col = headers.index("SETT")
592609
except ValueError:
593-
raise WeatherVariableNotFoundError("Invalid weather variable for this station")
610+
raise WeatherVariableNotFoundError(
611+
"Invalid weather variable for this station")
594612
for row in reader:
595613
try:
596614
if not row:
@@ -599,11 +617,12 @@ def get_station_forecast_history(dataset, station_id, forecast_date, desired_uni
599617
row[date_col], "%Y-%m-%d").date()] = float(row[data_col])
600618
except ValueError:
601619
history[datetime.datetime.strptime(
602-
row[date_col], "%Y-%m-%d").date()] = row[data_col]
620+
row[date_col], "%Y-%m-%d").date()] = row[data_col]
603621
return history
604622
except ipfshttpclient.exceptions.ErrorResponse:
605623
raise StationNotFoundError("Invalid station ID for dataset")
606624

625+
607626
def get_station_forecast_stations(dataset, forecast_date, desired_units=None, ipfs_timeout=None):
608627
with StationForecastDataset(dataset, ipfs_timeout=ipfs_timeout) as dataset_obj:
609628
csv_text = dataset_obj.get_stations(forecast_date)
@@ -850,13 +869,13 @@ def has_dataset_updated(dataset, slices, as_of, ipfs_timeout=None):
850869

851870
def get_teleconnections_history(weather_variable, ipfs_timeout=None):
852871
with TeleconnectionsDataset(ipfs_timeout=ipfs_timeout) as dataset_obj:
853-
csv_text = dataset_obj.get_data()
872+
csv_text = dataset_obj.get_data(weather_variable)
854873
history = {}
855874
reader = csv.reader(csv_text.split('\n'))
856875
headers = next(reader)
857876
date_col = headers.index('DATE')
858-
try: # Make sure weather variable is correct.
859-
data_col = headers.index(weather_variable)
877+
try:
878+
data_col=headers.index("value")
860879
except ValueError:
861880
raise WeatherVariableNotFoundError(
862881
"Invalid weather variable for this station")
@@ -874,3 +893,40 @@ def get_teleconnections_history(weather_variable, ipfs_timeout=None):
874893
history[datetime.datetime.strptime(
875894
row[date_col], "%Y-%m-%d").date()] = row[data_col]
876895
return history
896+
897+
898+
def get_eaufrance_history(station, weather_variable, use_imperial_units=False, desired_units=None, ipfs_timeout=None):
899+
try:
900+
with EauFranceDataset(ipfs_timeout=ipfs_timeout) as dataset_obj:
901+
csv_text = dataset_obj.get_data(station)
902+
df = pd.read_csv(StringIO(csv_text))
903+
df = df.set_index("DATE")
904+
original_units = "m^3/s"
905+
if desired_units:
906+
converter, dweather_unit = get_unit_converter_no_aliases(
907+
original_units, desired_units)
908+
else:
909+
converter, dweather_unit = get_unit_converter(
910+
original_units, use_imperial_units)
911+
if converter:
912+
try:
913+
converted_resp_series = pd.Series(
914+
converter(df[weather_variable].values*dweather_unit), index=df.index)
915+
except ValueError:
916+
raise UnitError(
917+
f"Specified unit is incompatible with original, original units are {original_units} and requested units are {desired_units}")
918+
if desired_units is not None:
919+
rounded_resp_array = np.vectorize(rounding_formula_temperature)(
920+
str_resp_series, converted_resp_series)
921+
final_resp_series = pd.Series(
922+
rounded_resp_array * converted_resp_series.values.unit, index=df.index)
923+
else:
924+
final_resp_series = converted_resp_series
925+
else:
926+
final_resp_series = pd.Series(
927+
df[weather_variable].values*dweather_unit, index=df.index)
928+
result = {datetime.date.fromisoformat(k): convert_nans_to_none(
929+
v) for k, v in final_resp_series.to_dict().items()}
930+
return result
931+
except ipfshttpclient.exceptions.ErrorResponse:
932+
raise StationNotFoundError("Invalid station ID for dataset")

dweather_client/ipfs_queries.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -660,13 +660,34 @@ def __init__(self, dataset, ipfs_timeout=None):
660660
super().__init__(ipfs_timeout=ipfs_timeout)
661661
self._dataset = dataset
662662

663+
def get_hashes(self):
664+
"""
665+
return: list of all hashes in dataset
666+
"""
667+
hashes = self.traverse_ll(self.head, self.as_of)
668+
return list(hashes)
669+
663670
def get_data(self, station, weather_variable=None):
664671
# only some stations need weather variable
665672
# so this is an optional arg
666673
super().get_data()
667674
file_name = f"{self.head}/{station}.csv"
668675
return self.get_file_object(file_name).read().decode("utf-8")
669676

677+
def get_data_recursive(self, station, weather_variable=None):
678+
# only some stations need weather variable
679+
# so this is an optional arg
680+
super().get_data()
681+
# get all hashes and then effectively just use get_data
682+
# recursively to get a full list of csvs
683+
hashes = self.get_hashes()
684+
csv_text_list = []
685+
for hash_ in hashes:
686+
file_name = f"{hash_}/{station}.csv"
687+
csv_text_list.append(self.get_file_object(
688+
file_name).read().decode("utf-8"))
689+
return csv_text_list
690+
670691

671692
class YieldDatasets(IpfsDataset):
672693
"""
@@ -1114,7 +1135,8 @@ def get_full_date_range_from_metadata(self, h):
11141135
return: list of [start_time, end_time]
11151136
"""
11161137
metadata = self.get_metadata(h)
1117-
str_dates = (metadata["api documentation"]["full date range"][0], metadata["api documentation"]["full date range"][1])
1138+
str_dates = (metadata["api documentation"]["full date range"]
1139+
[0], metadata["api documentation"]["full date range"][1])
11181140
return [datetime.datetime.fromisoformat(dt).date() for dt in str_dates]
11191141

11201142
def get_relevant_hash(self, forecast_date):
@@ -1144,12 +1166,14 @@ def get_relevant_hash(self, forecast_date):
11441166
prev_date_range = [datetime.date.fromisoformat(
11451167
d) for d in prev_metadata["date range"]]
11461168
if prev_date_range[0] <= forecast_date <= prev_date_range[1]:
1147-
print(f"User requested {forecast_date}, returning data for date range {prev_date_range} from hash {prev_hash}") # NOTE for testing, TODO remove afterwards
11481169
return prev_hash
1149-
prev_hash = prev_metadata['previous hash'] # iterate backwards in the link list one step
1170+
# iterate backwards in the link list one step
1171+
prev_hash = prev_metadata['previous hash']
11501172

11511173
# If this script runs to the end without returning anything or an error, the forecast date must fall in a hole in the data
1152-
raise DateOutOfRangeError("forecast date unavailable due to holes in data") # NOTE only returns if there are holes in the data
1174+
# NOTE only returns if there are holes in the data
1175+
raise DateOutOfRangeError(
1176+
"forecast date unavailable due to holes in data")
11531177

11541178
def get_weather_dict(self, forecast_date, ipfs_hash, lat, lon):
11551179
"""
@@ -1194,6 +1218,7 @@ def get_data(self, lat, lon, forecast_date):
11941218

11951219
return (float(ret_lat), float(ret_lon)), pd.Series(weather_dict)
11961220

1221+
11971222
class StationForecastDataset(ForecastDataset):
11981223
"""
11991224
Instantiable class for pulling in station data that is also forecast data.
@@ -1202,11 +1227,11 @@ class StationForecastDataset(ForecastDataset):
12021227
@property
12031228
def dataset(self):
12041229
return self._dataset
1205-
1230+
12061231
def __init__(self, dataset, **kwargs):
12071232
super().__init__(dataset, 1)
12081233
self.head = get_heads()[self.dataset]
1209-
1234+
12101235
def get_data(self, station, forecast_date):
12111236
relevant_hash = self.get_relevant_hash(forecast_date)
12121237
return self.get_file_object(f"{relevant_hash}/{station}.csv").read().decode("utf-8")
@@ -1215,6 +1240,7 @@ def get_stations(self, forecast_date):
12151240
relevant_hash = self.get_relevant_hash(forecast_date)
12161241
return self.get_file_object(f"{relevant_hash}/stations.json").read().decode("utf-8")
12171242

1243+
12181244
class TeleconnectionsDataset(IpfsDataset):
12191245
"""
12201246
Instantiable class used for pulling in el nino teleconnections data
@@ -1224,9 +1250,25 @@ class TeleconnectionsDataset(IpfsDataset):
12241250
def __init__(self, ipfs_timeout=None):
12251251
super().__init__(ipfs_timeout=ipfs_timeout)
12261252

1227-
def get_data(self):
1253+
def get_data(self, station):
1254+
super().get_data()
1255+
metadata = self.get_metadata(self.head)
1256+
1257+
file_name = f"{self.head}/{station}.csv"
1258+
return self.get_file_object(file_name).read().decode("utf-8")
1259+
1260+
1261+
class EauFranceDataset(IpfsDataset):
1262+
"""
1263+
Instantiable class used for pulling in el nino teleconnections data
1264+
"""
1265+
dataset = "EauFrance-daily"
1266+
1267+
def __init__(self, ipfs_timeout=None):
1268+
super().__init__(ipfs_timeout=ipfs_timeout)
1269+
1270+
def get_data(self, station):
12281271
super().get_data()
12291272
metadata = self.get_metadata(self.head)
1230-
year_month = metadata["time generated"][:7]
1231-
file_name = f"{self.head}/teleconnections_{year_month}.csv"
1273+
file_name = f"{self.head}/{station}.csv"
12321274
return self.get_file_object(file_name).read().decode("utf-8")

dweather_client/storms_datasets.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import gzip
22
import json
3+
import datetime
34
from abc import abstractmethod
45

56
import pandas as pd
67

78
from dweather_client.df_utils import boxed_storms, nearby_storms
89
from dweather_client.ipfs_queries import IpfsDataset
10+
from dweather_client.ipfs_errors import *
11+
912

1013
def process_df(input_df, **kwargs):
1114
if {'radius', 'lat', 'lon'}.issubset(kwargs.keys()):
@@ -23,7 +26,8 @@ def get_data(self, basin, **kwargs):
2326
if basin not in {'NI', 'SI', 'NA', 'EP', 'WP', 'SP', 'SA'}:
2427
raise ValueError("Invalid basin ID")
2528
super().get_data()
26-
file_obj = self.get_file_object(f"{self.head}/ibtracs-{basin}.csv.gz")
29+
ipfs_hash = self.get_relevant_hash(kwargs["as_of"])
30+
file_obj = self.get_file_object(f"{ipfs_hash}/ibtracs-{basin}.csv.gz")
2731
df = pd.read_csv(
2832
file_obj, na_values=["", " "], keep_default_na=False, low_memory=False, compression="gzip"
2933
)
@@ -40,6 +44,34 @@ def get_data(self, basin, **kwargs):
4044

4145
return processed_df
4246

47+
def get_relevant_hash(self, as_of_date):
48+
"""
49+
return the ipfs hash required to pull in data for a forecast date
50+
"""
51+
cur_hash = self.head
52+
if as_of_date == None:
53+
return cur_hash
54+
cur_metadata = self.get_metadata(cur_hash)
55+
# This routine is agnostic to the order of data contained in the hashes (at a cost of inefficiency) -- if the data contains the forecast date, it WILL be found, eventually
56+
most_recent_date = datetime.datetime.fromisoformat(cur_metadata["time generated"]).date()
57+
if as_of_date >= most_recent_date:
58+
return cur_hash
59+
prev_hash = cur_metadata['previous hash']
60+
while prev_hash is not None:
61+
prev_metadata = self.get_metadata(prev_hash)
62+
prev_date = datetime.datetime.fromisoformat(prev_metadata["time generated"]).date()
63+
if prev_date <= as_of_date <= most_recent_date:
64+
return prev_hash
65+
# iterate backwards in the link list one step
66+
try:
67+
prev_hash = prev_metadata['previous hash']
68+
most_recent_date = prev_date
69+
except KeyError:
70+
# Because we added the as_of after a while to this ETL prev_hash won't be 'None' we'll just run into an exception
71+
raise DateOutOfRangeError(
72+
"as_of data is earlier than earliest available hash")
73+
74+
4375
class AtcfDataset(IpfsDataset):
4476
dataset = "atcf_btk-seasonal"
4577

0 commit comments

Comments
 (0)