# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.
"""
Orthogonal Machine Learning is a general approach to estimating causal models
by formulating them as minimizers of some loss function that depends on
auxiliary regression models that also need to be estimated from data. The
class in this module implements the general logic in a very versatile way
so that various child classes can simply instantiate the appropriate models
and save a lot of code repetition.
References
----------
Dylan Foster, Vasilis Syrgkanis (2019). Orthogonal Statistical Learning.
ACM Conference on Learning Theory. https://arxiv.org/abs/1901.09036
Xinkun Nie, Stefan Wager (2017). Quasi-Oracle Estimation of Heterogeneous Treatment Effects.
https://arxiv.org/abs/1712.04912
Chernozhukov et al. (2017). Double/debiased machine learning for treatment and structural parameters.
The Econometrics Journal. https://arxiv.org/abs/1608.00060
"""
import copy
from collections import namedtuple
from warnings import warn
from abc import abstractmethod
from typing import List, Union
import inspect
from collections import defaultdict
import re
import numpy as np
from sklearn.base import clone
from sklearn.model_selection import KFold, StratifiedKFold, GroupKFold, StratifiedGroupKFold, check_cv
from sklearn.preprocessing import (FunctionTransformer, LabelEncoder,
OneHotEncoder)
from sklearn.utils import check_random_state
from ._cate_estimator import (BaseCateEstimator, LinearCateEstimator,
TreatmentExpansionMixin)
from .inference import BootstrapInference
from .utilities import (_deprecate_positional, check_input_arrays,
cross_product, filter_none_kwargs, one_hot_encoder, strata_from_discrete_arrays,
inverse_onehot, jacify_featurizer, ndim, reshape, shape, transpose)
from .sklearn_extensions.model_selection import ModelSelector
try:
import ray
except ImportError as exn:
from .utilities import MissingModule
ray = MissingModule(
"Ray is not a dependency of the base econml package; install econml[ray] or econml[all] to require it, "
"or install ray separately, to use functionality that depends on ray", exn)
def _fit_fold(model, train_idxs, test_idxs, calculate_scores, args, kwargs):
"""
Fits a single model on the training data and calculates the nuisance value on the test data.
model: object
An object that supports fit and predict. Fit must accept all the args
and the keyword arguments kwargs. Similarly predict must all accept
all the args as arguments and kwards as keyword arguments. The fit
function estimates a model of the nuisance function, based on the input
data to fit. Predict evaluates the fitted nuisance function on the input
data to predict.
train_idxs (array-like): Indices for the training data.
test_idxs (array-like): Indices for the test data.
calculate_scores (bool): Whether to calculate scores after fitting.
args : a sequence of (numpy matrices or None)
Each matrix is a data variable whose first index corresponds to a sample
kwargs : a sequence of key-value args, with values being (numpy matrices or None)
Each keyword argument is of the form Var=x, with x a numpy array. Each
of these arrays are data variables. The model fit and predict will be
called with signature: `model.fit(*args, **kwargs)` and
`model.predict(*args, **kwargs)`. Key-value arguments that have value
None, are ommitted from the two calls. So all the args and the non None
kwargs variables must be part of the models signature.
Returns:
--------
-Tuple containing:
nuisance_temp (tuple): Predictions or values of interest from the model.
fitted_model: The fitted model after training.
score_temp (tuple or None): Scores calculated after fitting if `calculate_scores` is True, otherwise None.
"""
model = clone(model, safe=False)
args_train = tuple(var[train_idxs] if var is not None else None for var in args)
args_test = tuple(var[test_idxs] if var is not None else None for var in args)
kwargs_train = {key: var[train_idxs] for key, var in kwargs.items()}
kwargs_test = {key: var[test_idxs] for key, var in kwargs.items()}
model.train(False, None, *args_train, **kwargs_train)
nuisance_temp = model.predict(*args_test, **kwargs_test)
if not isinstance(nuisance_temp, tuple):
nuisance_temp = (nuisance_temp,)
if calculate_scores:
score_temp = model.score(*args_test, **kwargs_test)
if not isinstance(score_temp, tuple):
score_temp = (score_temp,)
return nuisance_temp, model, (score_temp if calculate_scores else None)
[docs]def _crossfit(models: Union[ModelSelector, List[ModelSelector]], folds, use_ray, ray_remote_fun_option,
*args, **kwargs):
"""
General crossfit based calculation of nuisance parameters.
Parameters
----------
models : ModelSelector or List[ModelSelector]
One or more objects that have train and predict methods.
The train method must take an 'is_selecting' argument first, a set of folds second
(which will be None when not selecting) and then accept positional arguments `args`
and keyword arguments `kwargs`; the predict method just takes those `args` and `kwargs`.
The train method selects or estimates a model of the nuisance function, based on the input
data to fit. Predict evaluates the fitted nuisance function on the input
data to predict.
folds : list of tuple or None
The crossfitting fold structure. Every entry in the list is a tuple whose
first element are the training indices of the args and kwargs data and
the second entry are the test indices. If the union of the test indices
is not the full set of all indices, then the remaining nuisance parameters
for the missing indices have value NaN. If folds is None, then cross fitting
is not performed; all indices are used for both model fitting and prediction
use_ray: bool, default False
Flag to indicate whether to use ray to parallelize the cross-fitting step.
ray_remote_fun_option: dict, default None
Options to pass to the ray.remote decorator.
args : a sequence of (numpy matrices or None)
Each matrix is a data variable whose first index corresponds to a sample
kwargs : a sequence of key-value args, with values being (numpy matrices or None)
Each keyword argument is of the form Var=x, with x a numpy array. Each
of these arrays are data variables. The model fit and predict will be
called with signature: `model.fit(*args, **kwargs)` and
`model.predict(*args, **kwargs)`. Key-value arguments that have value
None, are ommitted from the two calls. So all the args and the non None
kwargs variables must be part of the models signature.
Returns
-------
nuisances : tuple of array_like
Each entry in the tuple is a nuisance parameter matrix. Each row i-th in the
matrix corresponds to the value of the nuisance parameter for the i-th input
sample.
model_list : list of object of same type as input model
The cloned and fitted models for each fold. Can be used for inspection of the
variability of the fitted models across folds.
fitted_inds : np array1d
The indices of the arrays for which the nuisance value was calculated. This
corresponds to the union of the indices of the test part of each fold in
the input fold list.
scores : tuple of list of float or None
The out-of-sample model scores for each nuisance model
Examples
--------
.. testcode::
import numpy as np
from sklearn.model_selection import KFold
from sklearn.linear_model import Lasso
from econml._ortho_learner import _crossfit
class Wrapper:
def __init__(self, model):
self._model = model
def train(self, is_selecting, folds, X, y, W=None):
self._model.fit(X, y)
return self
def predict(self, X, y, W=None):
return self._model.predict(X)
np.random.seed(123)
X = np.random.normal(size=(5000, 3))
y = X[:, 0] + np.random.normal(size=(5000,))
folds = list(KFold(2).split(X, y))
model = Lasso(alpha=0.01)
use_ray = False
ray_remote_fun_option = {}
nuisance, model_list, fitted_inds, scores = _crossfit(Wrapper(model),folds, use_ray, ray_remote_fun_option,
X, y,W=y, Z=None)
>>> nuisance
(array([-1.105728... , -1.537566..., -2.451827... , ..., 1.106287...,
-1.829662..., -1.782273...]),)
>>> model_list
[<Wrapper object at 0x...>, <Wrapper object at 0x...>]
>>> fitted_inds
array([ 0, 1, 2, ..., 4997, 4998, 4999])
"""
model_list = []
kwargs = filter_none_kwargs(**kwargs)
n = (args[0] if args else kwargs.items()[0][1]).shape[0]
# fully materialize folds so that they can be reused across models
# and precompute fitted indices so that we fail fast if there's an issue with them
if folds is not None:
folds = list(folds)
fitted_inds = []
for idx, (train_idxs, test_idxs) in enumerate(folds):
if len(np.intersect1d(train_idxs, test_idxs)) > 0:
raise AttributeError(f"Invalid crossfitting fold structure. Train and test indices of fold {idx+1} "
f"are not disjoint: {train_idxs}, {test_idxs}")
common_idxs = np.intersect1d(fitted_inds, test_idxs)
if len(common_idxs) > 0:
raise AttributeError(f"Invalid crossfitting fold structure. The indexes {common_idxs} in fold {idx+1} "
f"have appeared in previous folds")
fitted_inds = np.concatenate((fitted_inds, test_idxs))
fitted_inds = np.sort(fitted_inds.astype(int))
else:
fold_vals = [(np.arange(n), np.arange(n))]
fitted_inds = np.arange(n)
accumulated_nuisances = ()
# NOTE: if any model is missing scores we will just return None even if another model
# has scores. this is because we don't know how many scores are missing
# for the models that are missing them, so we don't know how to pad the array
calculate_scores = True
accumulated_scores = ()
# for convenience we allos a single model to be passed in lieu of a singleton list
# in that case, we will also unwrap the model output
unwrap_model_output = False
if not isinstance(models, list):
unwrap_model_output = True
models = [models]
for model in models:
# when there is more than one model, nuisances from previous models
# come first as positional arguments
accumulated_args = accumulated_nuisances + args
model.train(True, fold_vals if folds is None else folds, *accumulated_args, **kwargs)
calculate_scores &= hasattr(model, 'score')
model_list.append([]) # add a new empty list of clones for this model
if folds is None: # skip crossfitting
model_list[-1].append(clone(model, safe=False))
model_list[-1][0].train(False, None, *accumulated_args, **kwargs) # fit the selected model
nuisances = model_list[-1][0].predict(*accumulated_args, **kwargs)
if not isinstance(nuisances, tuple):
nuisances = (nuisances,)
if calculate_scores:
scores = model_list[-1][0].score(*accumulated_args, **kwargs)
if not isinstance(scores, tuple):
scores = (scores,)
# scores entries should be lists of scores, so make each entry a singleton list
scores = tuple([s] for s in scores)
else:
fold_refs = []
if use_ray:
# Adding the kwargs to ray object store to be used by remote functions
# for each fold to avoid IO overhead
ray_args = ray.put(kwargs)
for idx, (train_idxs, test_idxs) in enumerate(folds):
fold_refs.append(
ray.remote(_fit_fold).options(**ray_remote_fun_option).remote(model, train_idxs, test_idxs,
calculate_scores,
accumulated_args, ray_args))
for idx, (train_idxs, test_idxs) in enumerate(folds):
if use_ray:
nuisance_temp, model_out, score_temp = ray.get(fold_refs[idx])
else:
nuisance_temp, model_out, score_temp = _fit_fold(model, train_idxs, test_idxs,
calculate_scores, accumulated_args, kwargs)
if idx == 0:
nuisances = tuple([np.full((n,) + nuis.shape[1:], np.nan)
for nuis in nuisance_temp])
for it, nuis in enumerate(nuisance_temp):
nuisances[it][test_idxs] = nuis
if calculate_scores:
if idx == 0:
scores = tuple([] for _ in score_temp)
for it, score in enumerate(score_temp):
scores[it].append(score)
model_list[-1].append(model_out)
accumulated_nuisances += nuisances
if calculate_scores:
accumulated_scores += scores
else:
accumulated_scores = None
if unwrap_model_output:
model_list = model_list[0]
return accumulated_nuisances, model_list, fitted_inds, accumulated_scores
CachedValues = namedtuple('CachedValues', ['nuisances',
'Y', 'T', 'X', 'W', 'Z', 'sample_weight', 'freq_weight',
'sample_var', 'groups'])
[docs]class _OrthoLearner(TreatmentExpansionMixin, LinearCateEstimator):
"""
Base class for all orthogonal learners. This class is a parent class to any method that has
the following architecture:
1. The CATE :math:`\\theta(X)` is the minimizer of some expected loss function
.. math ::
\\mathbb{E}[\\ell(V; \\theta(X), h(V))]
where :math:`V` are all the random variables and h is a vector of nuisance functions. Alternatively,
the class would also work if :math:`\\theta(X)` is the solution to a set of moment equations that
also depend on nuisance functions :math:`h`.
2. To estimate :math:`\\theta(X)` we first fit the h functions and calculate :math:`h(V_i)` for each sample
:math:`i` in a crossfit manner:
- Let (F1_train, F1_test), ..., (Fk_train, Fk_test) be any KFold partition
of the data, where Ft_train, Ft_test are subsets of indices of the input samples and such that
F1_train is disjoint from F1_test. The sets F1_test, ..., Fk_test form an incomplete partition
of all the input indices, i.e. they are be disjoint and their union could potentially be a subset of
all input indices. For instance, in a time series split F0_train could be a prefix of the data and
F0_test the suffix. Typically, these folds will be created
by a KFold split, i.e. if S1, ..., Sk is any partition of the data, then Ft_train is the set of
all indices except St and Ft_test = St. If the union of the Ft_test is not all the data, then only the
subset of the data in the union of the Ft_test sets will be used in the final stage.
- Then for each t in [1, ..., k]
- Estimate a model :math:`\\hat{h}_t` for :math:`h` using Ft_train
- Evaluate the learned :math:`\\hat{h}_t` model on the data in Ft_test and use that value
as the nuisance value/vector :math:`\\hat{U}_i=\\hat{h}(V_i)` for the indices i in Ft_test
3. Estimate the model for :math:`\\theta(X)` by minimizing the empirical (regularized) plugin loss on
the subset of indices for which we have a nuisance value, i.e. the union of {F1_test, ..., Fk_test}:
.. math ::
\\mathbb{E}_n[\\ell(V; \\theta(X), \\hat{h}(V))]\
= \\frac{1}{n} \\sum_{i=1}^n \\ell(V_i; \\theta(X_i), \\hat{U}_i)
The method is a bit more general in that the final step does not need to be a loss minimization step.
The class takes as input a model for fitting an estimate of the nuisance h given a set of samples
and predicting the value of the learned nuisance model on any other set of samples. It also
takes as input a model for the final estimation, that takes as input the data and their associated
estimated nuisance values from the first stage and fits a model for the CATE :math:`\\theta(X)`. Then
at predict time, the final model given any set of samples of the X variable, returns the estimated
:math:`\\theta(X)`.
The method essentially implements all the crossfit and plugin logic, so that any child classes need
to only implement the appropriate `model_nuisance` and `model_final` and essentially nothing more.
It also implements the basic preprocessing logic behind the expansion of discrete treatments into
one-hot encodings.
Parameters
----------
discrete_outcome: bool
Whether the outcome should be treated as binary
discrete_treatment: bool
Whether the treatment values should be treated as categorical, rather than continuous, quantities
treatment_featurizer : :term:`transformer` or None
Must support fit_transform and transform. Used to create composite treatment in the final CATE regression.
The final CATE will be trained on the outcome of featurizer.fit_transform(T).
If featurizer=None, then CATE is trained on T.
discrete_instrument: bool
Whether the instrument values should be treated as categorical, rather than continuous, quantities
categories: 'auto' or list
The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values).
The first category will be treated as the control treatment.
cv: int, cross-validation generator or an iterable
Determines the cross-validation splitting strategy.
Possible inputs for cv are:
- None, to use the default 3-fold cross-validation,
- integer, to specify the number of folds.
- :term:`CV splitter`
- An iterable yielding (train, test) splits as arrays of indices.
For integer/None inputs, if the treatment is discrete
:class:`~sklearn.model_selection.StratifiedKFold` is used, else,
:class:`~sklearn.model_selection.KFold` is used
(with a random shuffle in either case).
Unless an iterable is used, we call `split(concat[Z, W, X], T)` to generate the splits. If all
Z, W, X are None, then we call `split(ones((T.shape[0], 1)), T)`.
random_state: int, :class:`~numpy.random.mtrand.RandomState` instance or None
If int, random_state is the seed used by the random number generator;
If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator;
If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used
by :mod:`np.random<numpy.random>`.
mc_iters: int, optional
The number of times to rerun the first stage models to reduce the variance of the nuisances.
mc_agg: {'mean', 'median'}, default 'mean'
How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of
cross-fitting.
allow_missing: bool
Whether to allow missing values in X, W. If True, will need to supply nuisance models that can handle
missing values.
use_ray: bool, default False
Whether to use ray to parallelize the cross-fitting step.
ray_remote_func_options: dict, default None
Options to pass to the ray.remote decorator.
Examples
--------
The example code below implements a very simple version of the double machine learning
method on top of the :class:`._OrthoLearner` class, for expository purposes.
For a more elaborate implementation of a Double Machine Learning child class of the class
:class:`._OrthoLearner` check out :class:`.DML`
and its child classes:
.. testcode::
import numpy as np
from sklearn.linear_model import LinearRegression
from econml._ortho_learner import _OrthoLearner
class ModelNuisance:
def __init__(self, model_t, model_y):
self._model_t = model_t
self._model_y = model_y
def train(self, is_selecting, folds, Y, T, W=None):
self._model_t.fit(W, T)
self._model_y.fit(W, Y)
return self
def predict(self, Y, T, W=None):
return Y - self._model_y.predict(W), T - self._model_t.predict(W)
class ModelFinal:
def __init__(self):
return
def fit(self, Y, T, W=None, nuisances=None):
Y_res, T_res = nuisances
self.model = LinearRegression(fit_intercept=False).fit(T_res.reshape(-1, 1), Y_res)
return self
def predict(self, X=None):
return self.model.coef_[0]
def score(self, Y, T, W=None, nuisances=None):
Y_res, T_res = nuisances
return np.mean((Y_res - self.model.predict(T_res.reshape(-1, 1)))**2)
class OrthoLearner(_OrthoLearner):
def _gen_ortho_learner_model_nuisance(self):
return ModelNuisance(LinearRegression(), LinearRegression())
def _gen_ortho_learner_model_final(self):
return ModelFinal()
np.random.seed(123)
X = np.random.normal(size=(100, 3))
y = X[:, 0] + X[:, 1] + np.random.normal(0, 0.1, size=(100,))
est = OrthoLearner(cv=2, discrete_outcome=False, discrete_treatment=False, treatment_featurizer=None,
discrete_instrument=False, categories='auto', random_state=None)
est.fit(y, X[:, 0], W=X[:, 1:])
>>> est.score_
0.00756830...
>>> est.const_marginal_effect()
1.02364992...
>>> est.effect()
array([1.023649...])
>>> est.effect(T0=0, T1=10)
array([10.236499...])
>>> est.score(y, X[:, 0], W=X[:, 1:])
0.00727995...
>>> est.ortho_learner_model_final_.model
LinearRegression(fit_intercept=False)
>>> est.ortho_learner_model_final_.model.coef_
array([1.023649...])
The following example shows how to do double machine learning with discrete treatments, using
the _OrthoLearner:
.. testcode::
class ModelNuisance:
def __init__(self, model_t, model_y):
self._model_t = model_t
self._model_y = model_y
def train(self, is_selecting, folds, Y, T, W=None):
self._model_t.fit(W, np.matmul(T, np.arange(1, T.shape[1]+1)))
self._model_y.fit(W, Y)
return self
def predict(self, Y, T, W=None):
return Y - self._model_y.predict(W), T - self._model_t.predict_proba(W)[:, 1:]
class ModelFinal:
def __init__(self):
return
def fit(self, Y, T, W=None, nuisances=None):
Y_res, T_res = nuisances
self.model = LinearRegression(fit_intercept=False).fit(T_res.reshape(-1, 1), Y_res)
return self
def predict(self):
# theta needs to be of dimension (1, d_t) if T is (n, d_t)
return np.array([[self.model.coef_[0]]])
def score(self, Y, T, W=None, nuisances=None):
Y_res, T_res = nuisances
return np.mean((Y_res - self.model.predict(T_res.reshape(-1, 1)))**2)
from sklearn.linear_model import LogisticRegression
class OrthoLearner(_OrthoLearner):
def _gen_ortho_learner_model_nuisance(self):
return ModelNuisance(LogisticRegression(solver='lbfgs'), LinearRegression())
def _gen_ortho_learner_model_final(self):
return ModelFinal()
np.random.seed(123)
W = np.random.normal(size=(100, 3))
import scipy.special
T = np.random.binomial(1, scipy.special.expit(W[:, 0]))
y = T + W[:, 0] + np.random.normal(0, 0.01, size=(100,))
est = OrthoLearner(cv=2, discrete_outcome=False, discrete_treatment=True, discrete_instrument=False,
treatment_featurizer=None, categories='auto', random_state=None)
est.fit(y, T, W=W)
>>> est.score_
0.00673015...
>>> est.const_marginal_effect()
array([[1.008401...]])
>>> est.effect()
array([1.008401...])
>>> est.score(y, T, W=W)
0.00310431...
>>> est.ortho_learner_model_final_.model.coef_[0]
1.00840170...
Attributes
----------
models_nuisance_: nested list of objects of type(model_nuisance)
A nested list of instances of the model_nuisance object. The number of sublist equals to the
number of monte carlo iterations. Each element in the sublist corresponds to a crossfitting
fold and is the model instance that was fitted for that training fold.
ortho_learner_model_final_: object of type(model_final)
An instance of the model_final object that was fitted after calling fit.
score_ : float or array of floats
If the model_final has a score method, then `score_` contains the outcome of the final model
score when evaluated on the fitted nuisances from the first stage. Represents goodness of fit,
of the final CATE model.
nuisance_scores_ : tuple of list of list of float or None
The out-of-sample scores from training each nuisance model
"""
def __init__(self, *,
discrete_outcome,
discrete_treatment,
treatment_featurizer,
discrete_instrument,
categories,
cv,
random_state,
mc_iters=None,
mc_agg='mean',
allow_missing=False,
use_ray=False,
ray_remote_func_options=None):
self.cv = cv
self.discrete_outcome = discrete_outcome
self.discrete_treatment = discrete_treatment
self.treatment_featurizer = treatment_featurizer
self.discrete_instrument = discrete_instrument
self.random_state = random_state
self.categories = categories
self.mc_iters = mc_iters
self.mc_agg = mc_agg
self.allow_missing = allow_missing
self.use_ray = use_ray
self.ray_remote_func_options = ray_remote_func_options
super().__init__()
def _gen_allowed_missing_vars(self):
return ['X', 'W'] if self.allow_missing else []
@abstractmethod
def _gen_ortho_learner_model_nuisance(self):
"""Must return a fresh instance of a nuisance model selector
Returns
-------
model_nuisance: list of selector
The selector(s) for fitting the nuisance function. The returned estimators must implement
`train` and `predict` methods that both have signatures::
model_nuisance.train(is_selecting, folds, Y, T, X=X, W=W, Z=Z,
sample_weight=sample_weight)
model_nuisance.predict(Y, T, X=X, W=W, Z=Z,
sample_weight=sample_weight)
In fact we allow for the model method signatures to skip any of the keyword arguments
as long as the class is always called with the omitted keyword argument set to ``None``.
This can be enforced in child classes by re-implementing the fit and the various effect
methods. If ``discrete_treatment=True``, then the input ``T`` to both above calls will be the
one-hot encoding of the original input ``T``, excluding the first column of the one-hot.
If the estimator also provides a score method with the same arguments as fit, it will be used to
calculate scores during training.
"""
raise NotImplementedError("Abstract method")
@abstractmethod
def _gen_ortho_learner_model_final(self):
""" Must return a fresh instance of a final model
Returns
-------
model_final: estimator for fitting the response residuals to the features and treatment residuals
Must implement `fit` and `predict` methods that must have signatures::
model_final.fit(Y, T, X=X, W=W, Z=Z, nuisances=nuisances,
sample_weight=sample_weight, freq_weight=freq_weight, sample_var=sample_var)
model_final.predict(X=X)
Predict, should just take the features X and return the constant marginal effect. In fact we allow
for the model method signatures to skip any of the keyword arguments as long as the class is always
called with the omitted keyword argument set to ``None``. Moreover, the predict function of the final
model can take no argument if the class is always called with ``X=None``. This can be enforced in child
classes by re-implementing the fit and the various effect methods. If ``discrete_treatment=True``,
then the input ``T`` to both above calls will be the one-hot encoding of the original input ``T``,
excluding the first column of the one-hot.
"""
raise NotImplementedError("Abstract method")
def _check_input_dims(self, Y, T, X=None, W=None, Z=None, *other_arrays):
assert shape(Y)[0] == shape(T)[0], "Dimension mis-match!"
for arr in [X, W, Z, *other_arrays]:
assert (arr is None) or (arr.shape[0] == Y.shape[0]), "Dimension mismatch"
self._d_x = X.shape[1:] if X is not None else None
self._d_w = W.shape[1:] if W is not None else None
self._d_z = Z.shape[1:] if Z is not None else None
def _check_fitted_dims(self, X):
if X is None:
assert self._d_x is None, "X was not None when fitting, so can't be none for score or effect"
else:
assert self._d_x == X.shape[1:], "Dimension mis-match of X with fitted X"
def _check_fitted_dims_w_z(self, W, Z):
if W is None:
assert self._d_w is None, "W was not None when fitting, so can't be none for score"
else:
assert self._d_w == W.shape[1:], "Dimension mis-match of W with fitted W"
if Z is None:
assert self._d_z is None, "Z was not None when fitting, so can't be none for score"
else:
assert self._d_z == Z.shape[1:], "Dimension mis-match of Z with fitted Z"
def _subinds_check_none(self, var, inds):
return var[inds] if var is not None else None
def _strata(self, Y, T, X=None, W=None, Z=None,
sample_weight=None, freq_weight=None, sample_var=None, groups=None,
cache_values=False, only_final=False, check_input=True):
arrs = []
if self.discrete_outcome:
arrs.append(Y)
if self.discrete_treatment:
arrs.append(T)
if self.discrete_instrument:
arrs.append(Z)
return strata_from_discrete_arrays(arrs)
def _prefit(self, Y, T, *args, only_final=False, **kwargs):
# generate an instance of the final model
self._ortho_learner_model_final = self._gen_ortho_learner_model_final()
if not only_final:
# generate an instance of the nuisance model
self._ortho_learner_model_nuisance = self._gen_ortho_learner_model_nuisance()
super()._prefit(Y, T, *args, **kwargs)
[docs] @BaseCateEstimator._wrap_fit
def fit(self, Y, T, *, X=None, W=None, Z=None, sample_weight=None, freq_weight=None, sample_var=None, groups=None,
cache_values=False, inference=None, only_final=False, check_input=True):
"""
Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`.
Parameters
----------
Y: (n, d_y) matrix or vector of length n
Outcomes for each sample
T: (n, d_t) matrix or vector of length n
Treatments for each sample
X: (n, d_x) matrix, optional
Features for each sample
W: (n, d_w) matrix, optional
Controls for each sample
Z: (n, d_z) matrix, optional
Instruments for each sample
sample_weight : (n,) array_like, optional
Individual weights for each sample. If None, it assumes equal weight.
freq_weight: (n, ) array_like of int, optional
Weight for the observation. Observation i is treated as the mean
outcome of freq_weight[i] independent observations.
When ``sample_var`` is not None, this should be provided.
sample_var : {(n,), (n, d_y)} nd array_like, optional
Variance of the outcome(s) of the original freq_weight[i] observations that were used to
compute the mean outcome represented by observation i.
groups: (n,) vector, optional
All rows corresponding to the same group will be kept together during splitting.
If groups is not None, the cv argument passed to this class's initializer
must support a 'groups' argument to its split method.
cache_values: bool, default False
Whether to cache the inputs and computed nuisances, which will allow refitting a different final model
inference: str, :class:`.Inference` instance, or None
Method for performing inference. This estimator supports 'bootstrap'
(or an instance of :class:`.BootstrapInference`).
only_final: bool, defaul False
Whether to fit the nuisance models or use the existing cached values
Note. This parameter is only used internally by the `refit` method and should not be exposed
publicly by overwrites of the `fit` method in public classes.
check_input: bool, default True
Whether to check if the input is valid
Note. This parameter is only used internally by the `refit` method and should not be exposed
publicly by overwrites of the `fit` method in public classes.
Returns
-------
self : object
"""
self._random_state = check_random_state(self.random_state)
assert (freq_weight is None) == (
sample_var is None), "Sample variances and frequency weights must be provided together!"
assert not (self.discrete_treatment and self.treatment_featurizer), "Treatment featurization " \
"is not supported when treatment is discrete"
if check_input:
Y, T, Z, sample_weight, freq_weight, sample_var, groups = check_input_arrays(
Y, T, Z, sample_weight, freq_weight, sample_var, groups)
X, = check_input_arrays(
X, force_all_finite='allow-nan' if 'X' in self._gen_allowed_missing_vars() else True)
W, = check_input_arrays(
W, force_all_finite='allow-nan' if 'W' in self._gen_allowed_missing_vars() else True)
self._check_input_dims(Y, T, X, W, Z, sample_weight, freq_weight, sample_var, groups)
if not only_final:
if self.discrete_outcome:
self.outcome_transformer = LabelEncoder()
self.outcome_transformer.fit(Y)
if Y.shape[1:] and Y.shape[1] > 1:
raise ValueError(
f"Only one outcome variable is supported when discrete_outcome=True. Got Y of shape {Y.shape}")
if len(self.outcome_transformer.classes_) > 2:
raise AttributeError(
f"({len(self.outcome_transformer.classes_)} outcome classes detected. "
"Currently, only 2 outcome classes are allowed when discrete_outcome=True. "
f"Classes provided include {self.outcome_transformer.classes_[:5]}")
else:
self.outcome_transformer = None
if self.discrete_treatment:
categories = self.categories
if categories != 'auto':
categories = [categories] # OneHotEncoder expects a 2D array with features per column
self.transformer = one_hot_encoder(categories=categories, drop='first')
self.transformer.fit(reshape(T, (-1, 1)))
self._d_t = (len(self.transformer.categories_[0]) - 1,)
elif self.treatment_featurizer:
self._original_treatment_featurizer = clone(self.treatment_featurizer, safe=False)
self.transformer = jacify_featurizer(self.treatment_featurizer)
output_T = self.transformer.fit_transform(T)
self._d_t = np.shape(output_T)[1:]
else:
self.transformer = None
if self.discrete_instrument:
self.z_transformer = one_hot_encoder(categories='auto', drop='first')
self.z_transformer.fit(reshape(Z, (-1, 1)))
else:
self.z_transformer = None
all_nuisances = []
fitted_inds = None
if sample_weight is None:
if freq_weight is not None:
sample_weight_nuisances = freq_weight
else:
sample_weight_nuisances = None
else:
if freq_weight is not None:
sample_weight_nuisances = freq_weight * sample_weight
else:
sample_weight_nuisances = sample_weight
self._models_nuisance = []
if self.use_ray:
if not ray.is_initialized():
ray.init()
self.ray_remote_func_options = self.ray_remote_func_options or {}
# Define Ray remote function (Ray remote wrapper of the _fit_nuisances function)
def _fit_nuisances(Y, T, X, W, Z, sample_weight, groups):
return self._fit_nuisances(Y, T, X, W, Z, sample_weight=sample_weight, groups=groups)
# Create Ray remote jobs for parallel processing
self.nuisances_ref = [ray.remote(_fit_nuisances).options(**self.ray_remote_func_options).remote(
Y, T, X, W, Z, sample_weight_nuisances, groups) for _ in range(self.mc_iters or 1)]
for idx in range(self.mc_iters or 1):
if self.use_ray:
nuisances, fitted_models, new_inds, scores = ray.get(self.nuisances_ref[idx])
else:
nuisances, fitted_models, new_inds, scores = self._fit_nuisances(
Y, T, X, W, Z, sample_weight=sample_weight_nuisances, groups=groups)
all_nuisances.append(nuisances)
self._models_nuisance.append(fitted_models)
if scores is None:
self.nuisance_scores_ = None
else:
if idx == 0:
self.nuisance_scores_ = tuple([] for _ in scores)
for ind, score in enumerate(scores):
self.nuisance_scores_[ind].append(score)
if fitted_inds is None:
fitted_inds = new_inds
elif not np.array_equal(fitted_inds, new_inds):
raise AttributeError("Different indices were fit by different folds, so they cannot be aggregated")
if self.mc_iters is not None:
if self.mc_agg == 'mean':
nuisances = tuple(np.mean(nuisance_mc_variants, axis=0)
for nuisance_mc_variants in zip(*all_nuisances))
elif self.mc_agg == 'median':
nuisances = tuple(np.median(nuisance_mc_variants, axis=0)
for nuisance_mc_variants in zip(*all_nuisances))
else:
raise ValueError(
f"Parameter `mc_agg` must be one of {{'mean', 'median'}}. Got {self.mc_agg}")
Y, T, X, W, Z, sample_weight, freq_weight, sample_var = (self._subinds_check_none(arr, fitted_inds)
for arr in (Y, T, X, W, Z, sample_weight,
freq_weight, sample_var))
nuisances = tuple([self._subinds_check_none(nuis, fitted_inds) for nuis in nuisances])
self._cached_values = CachedValues(nuisances=nuisances,
Y=Y, T=T, X=X, W=W, Z=Z,
sample_weight=sample_weight,
freq_weight=freq_weight,
sample_var=sample_var,
groups=groups) if cache_values else None
else:
nuisances = self._cached_values.nuisances
# _d_t is altered by fit nuisances to what prefit does. So we need to perform the same
# alteration even when we only want to fit_final.
if self.transformer is not None:
if self.discrete_treatment:
self._d_t = (len(self.transformer.categories_[0]) - 1,)
else:
output_T = self.transformer.fit_transform(T)
self._d_t = np.shape(output_T)[1:]
final_T = T
if self.transformer:
if (self.discrete_treatment):
final_T = self.transformer.transform(final_T.reshape(-1, 1))
else: # treatment featurizer case
final_T = output_T
self._fit_final(Y=Y,
T=final_T,
X=X, W=W, Z=Z,
nuisances=nuisances,
sample_weight=sample_weight,
freq_weight=freq_weight,
sample_var=sample_var,
groups=groups)
return self
@property
def _illegal_refit_inference_methods(self):
return (BootstrapInference,)
[docs] def refit_final(self, inference=None):
"""
Estimate the counterfactual model using a new final model specification but with cached first stage results.
In order for this to succeed, ``fit`` must have been called with ``cache_values=True``. This call
will only refit the final model. This call we use the current setting of any parameters that change the
final stage estimation. If any parameters that change how the first stage nuisance estimates
has also been changed then it will have no effect. You need to call fit again to change the
first stage estimation results.
Parameters
----------
inference : inference method, optional
The string or object that represents the inference method
Returns
-------
self : object
This instance
"""
assert self._cached_values, "Refit can only be called if values were cached during the original fit"
if isinstance(self._get_inference(inference), self._illegal_refit_inference_methods):
raise ValueError("The chosen inference method does not allow only for model final re-fitting.")
cached = self._cached_values
kwargs = filter_none_kwargs(
Y=cached.Y, T=cached.T, X=cached.X, W=cached.W, Z=cached.Z,
sample_weight=cached.sample_weight, freq_weight=cached.freq_weight, sample_var=cached.sample_var,
groups=cached.groups,
)
_OrthoLearner.fit(self, **kwargs,
cache_values=True, inference=inference, only_final=True, check_input=False)
return self
def _fit_nuisances(self, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
# use a binary array to get stratified split in case of discrete treatment
stratify = self.discrete_treatment or self.discrete_instrument or self.discrete_outcome
strata = self._strata(Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight, groups=groups)
if strata is None:
strata = T # always safe to pass T as second arg to split even if we're not actually stratifying
if self.transformer:
if self.discrete_treatment:
T = reshape(T, (-1, 1))
T = self.transformer.transform(T)
if self.discrete_instrument:
Z = self.z_transformer.transform(reshape(Z, (-1, 1)))
if self.discrete_outcome:
Y = self.outcome_transformer.transform(Y).reshape(-1, 1)
if self.cv == 1: # special case, no cross validation
folds = None
else:
splitter = check_cv(self.cv, [0], classifier=stratify)
# if check_cv produced a new KFold or StratifiedKFold object, we need to set shuffle and random_state
if splitter != self.cv and isinstance(splitter, (KFold, StratifiedKFold)):
# upgrade to a GroupKFold or StratiGroupKFold if groups is not None
if groups is not None:
if isinstance(splitter, KFold):
splitter = GroupKFold(n_splits=splitter.n_splits)
elif isinstance(splitter, StratifiedKFold):
splitter = StratifiedGroupKFold(n_splits=splitter.n_splits)
splitter.shuffle = True
splitter.random_state = self._random_state
all_vars = [var if np.ndim(var) == 2 else var.reshape(-1, 1) for var in [Z, W, X] if var is not None]
to_split = np.hstack(all_vars) if all_vars else np.ones((T.shape[0], 1))
if groups is not None:
# we won't have generated a KFold or StratifiedKFold ourselves when groups are passed,
# but the user might have supplied one, which won't work
if isinstance(splitter, (KFold, StratifiedKFold)):
raise TypeError("Groups were passed to fit while using a KFold or StratifiedKFold splitter. "
"Instead you must initialize this object with a splitter that can handle groups.")
folds = splitter.split(to_split, strata, groups=groups)
else:
folds = splitter.split(to_split, strata)
nuisances, fitted_models, fitted_inds, scores = _crossfit(self._ortho_learner_model_nuisance, folds,
self.use_ray, self.ray_remote_func_options, Y, T,
X=X, W=W, Z=Z, sample_weight=sample_weight,
groups=groups)
return nuisances, fitted_models, fitted_inds, scores
def _fit_final(self, Y, T, X=None, W=None, Z=None, nuisances=None, sample_weight=None,
freq_weight=None, sample_var=None, groups=None):
self._ortho_learner_model_final.fit(Y, T, **filter_none_kwargs(X=X, W=W, Z=Z,
nuisances=nuisances,
sample_weight=sample_weight,
freq_weight=freq_weight,
sample_var=sample_var,
groups=groups))
self.score_ = None
if hasattr(self._ortho_learner_model_final, 'score'):
self.score_ = self._ortho_learner_model_final.score(Y, T, **filter_none_kwargs(X=X, W=W, Z=Z,
nuisances=nuisances,
sample_weight=sample_weight,
groups=groups))
[docs] def const_marginal_effect(self, X=None):
X, = check_input_arrays(X)
self._check_fitted_dims(X)
if X is None:
return self._ortho_learner_model_final.predict()
else:
return self._ortho_learner_model_final.predict(X)
const_marginal_effect.__doc__ = LinearCateEstimator.const_marginal_effect.__doc__
[docs] def const_marginal_effect_interval(self, X=None, *, alpha=0.05):
X, = check_input_arrays(X)
self._check_fitted_dims(X)
return super().const_marginal_effect_interval(X, alpha=alpha)
const_marginal_effect_interval.__doc__ = LinearCateEstimator.const_marginal_effect_interval.__doc__
[docs] def const_marginal_effect_inference(self, X=None):
X, = check_input_arrays(X)
self._check_fitted_dims(X)
return super().const_marginal_effect_inference(X)
const_marginal_effect_inference.__doc__ = LinearCateEstimator.const_marginal_effect_inference.__doc__
[docs] def effect_interval(self, X=None, *, T0=0, T1=1, alpha=0.05):
X, T0, T1 = check_input_arrays(X, T0, T1)
self._check_fitted_dims(X)
return super().effect_interval(X, T0=T0, T1=T1, alpha=alpha)
effect_interval.__doc__ = LinearCateEstimator.effect_interval.__doc__
[docs] def effect_inference(self, X=None, *, T0=0, T1=1):
X, T0, T1 = check_input_arrays(X, T0, T1)
self._check_fitted_dims(X)
return super().effect_inference(X, T0=T0, T1=T1)
effect_inference.__doc__ = LinearCateEstimator.effect_inference.__doc__
[docs] def score(self, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
"""
Score the fitted CATE model on a new data set. Generates nuisance parameters
for the new data set based on the fitted nuisance models created at fit time.
It uses the mean prediction of the models fitted by the different crossfit folds
under different iterations. Then calls the score function of the model_final and
returns the calculated score. The model_final model must have a score method.
If model_final does not have a score method, then it raises an :exc:`.AttributeError`
Parameters
----------
Y: (n, d_y) matrix or vector of length n
Outcomes for each sample
T: (n, d_t) matrix or vector of length n
Treatments for each sample
X: (n, d_x) matrix, optional
Features for each sample
W: (n, d_w) matrix, optional
Controls for each sample
Z: (n, d_z) matrix, optional
Instruments for each sample
sample_weight:(n,) vector, optional
Weights for each samples
groups: (n,) vector, optional
All rows corresponding to the same group will be kept together during splitting.
Returns
-------
score : float or (array of float)
The score of the final CATE model on the new data. Same type as the return
type of the model_final.score method.
"""
if not hasattr(self._ortho_learner_model_final, 'score'):
raise AttributeError("Final model does not have a score method!")
Y, T, Z = check_input_arrays(Y, T, Z)
X, = check_input_arrays(X, force_all_finite='allow-nan' if 'X' in self._gen_allowed_missing_vars() else True)
W, = check_input_arrays(W, force_all_finite='allow-nan' if 'W' in self._gen_allowed_missing_vars() else True)
self._check_fitted_dims(X)
self._check_fitted_dims_w_z(W, Z)
X, T = self._expand_treatments(X, T)
if self.z_transformer is not None:
Z = self.z_transformer.transform(reshape(Z, (-1, 1)))
if self.discrete_outcome:
Y = self.outcome_transformer.transform(Y).reshape(-1, 1)
n_iters = len(self._models_nuisance)
# self._models_nuisance will be a list of lists or a list of list of lists
# so we use self._ortho_learner_model_nuisance to determine the nesting level
expand_list = not isinstance(self._ortho_learner_model_nuisance, list)
if expand_list:
n_selectors = 1
n_splits = len(self._models_nuisance[0])
else:
n_selectors = len(self._models_nuisance[0])
n_splits = len(self._models_nuisance[0][0])
kwargs = filter_none_kwargs(X=X, W=W, Z=Z, groups=groups)
accumulated_nuisances = []
for sel in range(n_selectors):
accumulated_args = tuple(accumulated_nuisances) + (Y, T)
# for each mc iteration
for i, models_nuisances in enumerate(self._models_nuisance):
if expand_list:
models_nuisances = [models_nuisances]
# for each model under cross fit setting
for j, mdl in enumerate(models_nuisances[sel]):
nuisance_temp = mdl.predict(*accumulated_args, **kwargs)
if not isinstance(nuisance_temp, tuple):
nuisance_temp = (nuisance_temp,)
if i == 0 and j == 0:
nuisances = [np.zeros((n_iters * n_splits,) + nuis.shape) for nuis in nuisance_temp]
for it, nuis in enumerate(nuisance_temp):
nuisances[it][j * n_iters + i] = nuis
for it in range(len(nuisances)):
nuisances[it] = np.mean(nuisances[it], axis=0)
accumulated_nuisances += nuisances
return self._ortho_learner_model_final.score(Y, T, nuisances=accumulated_nuisances,
**filter_none_kwargs(X=X, W=W, Z=Z,
sample_weight=sample_weight, groups=groups))
@property
def ortho_learner_model_final_(self):
if not hasattr(self, '_ortho_learner_model_final'):
raise AttributeError("Model is not fitted!")
return self._ortho_learner_model_final
@property
def models_nuisance_(self):
if not hasattr(self, '_models_nuisance'):
raise AttributeError("Model is not fitted!")
return self._models_nuisance