Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add VAR-HAC long-run covariance estimation #363

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
More work and tests
  • Loading branch information
bashtage committed Apr 2, 2020
commit 6b1cc78fbc8c34f0d7555aaf3168323f631cb4b3
1 change: 1 addition & 0 deletions arch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pytest_plugins = [
"arch.tests.unitroot.cointegration_data",
"arch.tests.covariance.covariance_data",
]


Expand Down
29 changes: 29 additions & 0 deletions arch/covariance/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"Gallant",
"NeweyWest",
"normalize_kernel_name",
"ZeroLag",
]

KERNELS = [
Expand All @@ -40,6 +41,7 @@
"Andrews",
"Gallant",
"NeweyWest",
"ZeroLag",
]


Expand Down Expand Up @@ -189,6 +191,7 @@ class CovarianceEstimator(ABC):
def __init__(
self,
x: ArrayLike,
*,
bandwidth: Optional[float] = None,
df_adjust: int = 0,
center: bool = True,
Expand Down Expand Up @@ -716,3 +719,29 @@ class NeweyWest(Bartlett):
--------
Bartlett
"""


zero_lag_name = "Zero-lag (No autocorrelation)"
zero_lag_formula = """\
w= 1 & z=0\\\\ \
\\ 0 & z>0 \
\\end{cases} \
"""


@Substitution(kernel_name=zero_lag_name, formula=zero_lag_formula)
class ZeroLag(CovarianceEstimator, metaclass=AbstractDocStringInheritor):
@property
def kernel_const(self) -> float:
return 1.0

@property
def bandwidth_scale(self) -> float:
return 0.0

@property
def rate(self) -> float:
return 0.0

def _weights(self) -> NDArray:
return np.ones(1)
103 changes: 85 additions & 18 deletions arch/covariance/var.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Dict, NamedTuple, Optional, Tuple

import numpy as np
from numpy import zeros
from numpy.linalg import lstsq
import pandas as pd
from pandas.util._decorators import Appender
from statsmodels.tools import add_constant
from statsmodels.tsa.tsatools import lagmat

Expand All @@ -14,6 +14,9 @@
normalize_kernel_name,
)
from arch.typing import ArrayLike, NDArray
from arch.vendor import cached_property

__all__ = ["PreWhitenedRecolored"]


class VARModel(NamedTuple):
Expand All @@ -23,48 +26,98 @@ class VARModel(NamedTuple):
intercept: bool


class PreWhitenRecoloredCovariance(CovarianceEstimator):
class PreWhitenedRecolored(CovarianceEstimator):
"""
VAR-HAC and Pre-Whitened-Recolored Long-run covariance estimation.

Andrews & Monahan [1]_ PWRC and DenHaan-Levin's VAR-HAC [2]_ covariance
estimators.

Parameters
----------
x : array_like
The data to use in covariance estimation.
lags : int, default None
The number of lags to include in the VAR. If None, a specification
search is used to select the order.
method : {"aic", "hqc", "bic"}, default "aic"
The information criteria to use in the model specification search.
diagonal : bool, default True
Flag indicating to consider both diagonal parameter coefficient
matrices on lags. A diagonal coefficient matrix restricts all
off-diagonal coefficient to be zero.
max_lag : int, default None
The maximum lag to use in the model specification search. If None,
then nobs**(1/3) is used.
sample_autocov : bool, default False
kernel : str, default "bartlett"
Whether to the the same autocovariance or the theoretical
autocovariance implied by the estimated VAR when computing
the long-run covairance.
kernel : {str, None}, default "bartlett".
The name of the kernel to use. Can be any available kernel. Input
is normalised using lower casing and any underscores or hyphens
are removed, so that "QuadraticSpectral", "quadratic-spectral" and
"quadratic_spectral" are all the same. Use None to prevent recoloring.
bandwidth : float, default None
The kernel's bandwidth. If None, optimal bandwidth is estimated.
df_adjust : int, default 0
Degrees of freedom to remove when adjusting the covariance. Uses the
number of observations in x minus df_adjust when dividing
inner-products.
center : bool, default True
A flag indicating whether x should be demeaned before estimating the
covariance.
weights : array_like, default None
An array of weights used to combine when estimating optimal bandwidth.
If not provided, a vector of 1s is used. Must have nvar elements.
force_int : bool, default False
Force bandwidth to be an integer.

See Also
--------
arch.covariance.kernel
Kernel-based long-run covariance estimators

Notes
-----
TODO: Add detailed notes

Examples
--------

References
----------
.. [1] Andrews, D. W., & Monahan, J. C. (1992). An improved
heteroskedasticity and autocorrelation consistent covariance matrix
estimator. Econometrica: Journal of the Econometric Society, 953-966.
.. [2] Haan, W. J. D., & Levin, A. T. (2000). Robust covariance matrix
estimation with data-dependent VAR prewhitening order (No. 255).
National Bureau of Economic Research.
"""

def __init__(
self,
x: ArrayLike,
*,
lags: Optional[int] = None,
method: str = "aic",
diagonal: bool = True,
max_lag: Optional[int] = None,
sample_autocov: bool = False,
kernel: str = "bartlett",
kernel: Optional[str] = "bartlett",
bandwidth: Optional[float] = None,
df_adjust: int = 0,
center: bool = True,
weights: Optional[ArrayLike] = None,
force_int: bool = False,
) -> None:
super().__init__(
x, bandwidth=bandwidth, df_adjust=df_adjust, center=center, weights=weights
x,
bandwidth=bandwidth,
df_adjust=df_adjust,
center=center,
weights=weights,
force_int=force_int,
)
self._kernel_name = kernel
self._lags = 0
Expand All @@ -75,7 +128,13 @@ def __init__(
self._auto_lag_selection = True
self._format_lags(lags)
self._sample_autocov = sample_autocov
kernel = normalize_kernel_name(kernel)
if kernel is not None:
kernel = normalize_kernel_name(kernel)
else:
if self._bandwidth not in (0, None):
raise ValueError("bandwidth must be None when kernel is None")
self._bandwidth = None
kernel = "zerolag"
if kernel not in KERNEL_ESTIMATORS:
raise ValueError(KERNEL_ERR)

Expand Down Expand Up @@ -155,7 +214,7 @@ def _ic_from_vars(
ics: Dict[Tuple[int, int], float] = {
(full_order, full_order): self._ic(sigma, nparam, nobs)
}
if not self._diagonal:
if not self._diagonal or self._x.shape[1] == 1:
return ics

purged_indiv_lags = np.empty((nvar, nobs, max_lag - full_order))
Expand Down Expand Up @@ -206,15 +265,15 @@ def _select_lags(self) -> Tuple[int, int]:
return models[ic.argmin()]

def _estimate_var(self, full_order: int, diag_order: int) -> VARModel:
nobs, nvar = self._x.shape
nvar = self._x.shape[1]
center = int(self._center)
max_lag = max(full_order, diag_order)
lhs, rhs, extra_lags = self._setup_model_data(max_lag)
c = int(self._center)
rhs = rhs[:, : c + full_order * nvar]
extra_lags = extra_lags[:, :, full_order:diag_order]

params = zeros((nvar, nvar * max_lag + center))
params = np.zeros((nvar, nvar * max_lag + center))
resids = np.empty_like(lhs)
ncommon = rhs.shape[1]
for i in range(nvar):
Expand Down Expand Up @@ -246,8 +305,8 @@ def _estimate_sample_cov(self, nvar: int, nlag: int) -> NDArray:
if self._center:
x = x - x.mean(0)
nobs = x.shape[0]
var_cov = zeros((nvar * nlag, nvar * nlag))
gamma = zeros((nlag, nvar, nvar))
var_cov = np.zeros((nvar * nlag, nvar * nlag))
gamma = np.zeros((nlag, nvar, nvar))
for i in range(nlag):
gamma[i] = (x[i:].T @ x[: (nobs - i)]) / nobs
for r in range(nlag):
Expand All @@ -258,10 +317,11 @@ def _estimate_sample_cov(self, nvar: int, nlag: int) -> NDArray:
var_cov[r * nvar : (r + 1) * nvar, c * nvar : (c + 1) * nvar] = g
return var_cov

@staticmethod
def _estimate_model_cov(
self, nvar: int, nlag: int, coeffs: NDArray, short_run: NDArray
nvar: int, nlag: int, coeffs: NDArray, short_run: NDArray
) -> NDArray:
sigma = zeros((nvar * nlag, nvar * nlag))
sigma = np.zeros((nvar * nlag, nvar * nlag))
sigma[:nvar, :nvar] = short_run
multiplier = np.linalg.inv(np.eye(coeffs.size) - np.kron(coeffs, coeffs))
vec_sigma = sigma.ravel()[:, None]
Expand All @@ -274,7 +334,7 @@ def _companion_form(
) -> Tuple[NDArray, NDArray]:
nvar = var_model.resids.shape[1]
nlag = var_model.var_order
coeffs = zeros((nvar * nlag, nvar * nlag))
coeffs = np.zeros((nvar * nlag, nvar * nlag))
coeffs[:nvar] = var_model.params[:, var_model.intercept :]
for i in range(nlag - 1):
coeffs[(i + 1) * nvar : (i + 2) * nvar, i * nvar : (i + 1) * nvar] = np.eye(
Expand All @@ -286,15 +346,21 @@ def _companion_form(
var_cov = self._estimate_model_cov(nvar, nlag, coeffs, short_run)
return coeffs, var_cov

@property
@cached_property
@Appender(CovarianceEstimator.cov.__doc__)
def cov(self) -> CovarianceEstimate:
common, individual = self._select_lags()
self._order = (common, individual)
var_mod = self._estimate_var(common, individual)
resids = var_mod.resids
nobs, nvar = resids.shape
self._kernel_instance = self._kernel(
resids, self._bandwidth, 0, False, self._x_weights, self._force_int
resids,
bandwidth=self._bandwidth,
df_adjust=0,
center=False,
weights=self._x_weights,
force_int=self._force_int,
)
kern_cov = self._kernel_instance.cov
short_run = kern_cov.short_run
Expand All @@ -316,7 +382,7 @@ def cov(self) -> CovarianceEstimate:
have diagonal coefficient matrices. The maximum eigenvalue of the companion-form \
VAR(1) coefficient matrix is {max_eig}."""
)
coeff_sum = zeros((nvar, nvar))
coeff_sum = np.zeros((nvar, nvar))
params = var_mod.params[:, var_mod.intercept :]
for i in range(var_mod.var_order):
coeff_sum += params[:, i * nvar : (i + 1) * nvar]
Expand All @@ -343,7 +409,8 @@ def cov(self) -> CovarianceEstimate:

def _ensure_kernel_instantized(self) -> None:
if self._kernel_instance is None:
self.cov
# Workaround to avoid linting noise
getattr(self, "cov")

@property
def bandwidth_scale(self) -> float:
Expand Down
45 changes: 45 additions & 0 deletions arch/tests/covariance/covariance_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from itertools import product

import numpy as np
import pandas as pd
import pytest

DATA_PARAMS = list(product([1, 3], [True, False], [0, 1, 3]))
DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS]


@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS)
def covariance_data(request):
dim, pandas, order = request.param
rs = np.random.RandomState([839084, 3823810, 982103, 829108])
burn = 100
shape = (burn + 500,)
if dim > 1:
shape += (3,)
rvs = rs.standard_normal(shape)
phi = np.zeros((order, dim, dim))
if order > 0:
phi[0] = np.eye(dim) * 0.4 + 0.1
for i in range(1, order):
phi[i] = 0.3 / (i + 1) * np.eye(dim)
for i in range(order, burn + 500):
for j in range(order):
if dim == 1:
rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1])
else:
rvs[i] += phi[j] @ rvs[i - j - 1]
if order > 1:
p = np.eye(dim * order, dim * order, -dim)
for j in range(order):
p[:dim, j * dim : (j + 1) * dim] = phi[j]
v, _ = np.linalg.eig(p)
assert np.max(np.abs(v)) < 1
rvs = rvs[burn:]
if pandas and dim == 1:
return pd.Series(rvs, name="x")
elif pandas:
df = pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)])
df.to_csv(f"cov-data-order-{order}.csv")
return df

return rvs
Loading