forked from xaviergonzalez/StemGNN
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathforecast_dataloader.py
73 lines (64 loc) · 2.93 KB
/
forecast_dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import torch.utils.data as torch_data
import numpy as np
import torch
import pandas as pd
def normalized(data, normalize_method, norm_statistic=None):
if normalize_method == 'min_max':
if not norm_statistic:
norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))
scale = norm_statistic['max'] - norm_statistic['min'] + 1e-5
data = (data - norm_statistic['min']) / scale
data = np.clip(data, 0.0, 1.0)
elif normalize_method == 'z_score':
if not norm_statistic:
norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))
mean = norm_statistic['mean']
std = norm_statistic['std']
std = [1 if i == 0 else i for i in std]
data = (data - mean) / std
norm_statistic['std'] = std
return data, norm_statistic
def de_normalized(data, normalize_method, norm_statistic):
if normalize_method == 'min_max':
if not norm_statistic:
norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))
scale = norm_statistic['max'] - norm_statistic['min'] + 1e-8
data = data * scale + norm_statistic['min']
elif normalize_method == 'z_score':
if not norm_statistic:
norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))
mean = norm_statistic['mean']
std = norm_statistic['std']
std = [1 if i == 0 else i for i in std]
data = data * std + mean
return data
class ForecastDataset(torch_data.Dataset):
def __init__(self, df, window_size, horizon, normalize_method=None, norm_statistic=None, interval=1):
self.window_size = window_size
self.interval = interval
self.horizon = horizon
self.normalize_method = normalize_method
self.norm_statistic = norm_statistic
df = pd.DataFrame(df)
df = df.fillna(method='ffill', limit=len(df)).fillna(method='bfill', limit=len(df)).values
self.data = df
self.df_length = len(df)
self.x_end_idx = self.get_x_end_idx()
if normalize_method:
self.data, _ = normalized(self.data, normalize_method, norm_statistic)
def __getitem__(self, index):
hi = self.x_end_idx[index]
lo = hi - self.window_size
train_data = self.data[lo: hi]
target_data = self.data[hi:hi + self.horizon]
x = torch.from_numpy(train_data).type(torch.float)
y = torch.from_numpy(target_data).type(torch.float)
return x, y
def __len__(self):
return len(self.x_end_idx)
def get_x_end_idx(self):
# each element `hi` in `x_index_set` is an upper bound for get training data
# training data range: [lo, hi), lo = hi - window_size
x_index_set = range(self.window_size, self.df_length - self.horizon + 1)
x_end_idx = [x_index_set[j * self.interval] for j in range((len(x_index_set)) // self.interval)]
return x_end_idx