"""Functions to simulate a dataset generated by a latent factor model."""
import numpy as np
import pandas as pd
from numpy.random import binomial
from numpy.random import choice
from numpy.random import multivariate_normal
import skillmodels.transition_functions as tf
from skillmodels import elliptical_distributions
from skillmodels.parse_params import parse_params
[docs]def add_missings(data, meas_names, p_b, p_r):
"""Add np.nans to data.
nans are only added to measurements, not to control variables or factors.
Note that p is NOT the marginal probability of a measurement being missing.
The marginal probability is given by: p_m = p/(1-serial_corr), where
serial_corr = (p_r-p_b) in general != 0, since p_r != p_b. This means that in
average the share of missing values (in the entire dataset) will be larger
than p. Thus, p and q should be set accordingly given the desired share
of missing values.
Args:
data (pd.DataFrame): contains the observable part of a simulated dataset
meas_names (list): list of strings of names of each measurement variable
p_b (float): probability of a measurement to become missing
p_r (float): probability of a measurement to remain missing in the next period
Returns:
data_with_missings (pd.DataFrame): Dataset with a share of measurements
replaced by np.nan values
"""
n_meas = len(meas_names)
data_with_missings = data.copy(deep=True)
for i in set(data_with_missings.index):
ind_data = data_with_missings.loc[i][meas_names].to_numpy()
s_0 = binomial(1, p_b, n_meas)
ind_data[0, np.where(s_0 == 1)] = np.nan
for t in range(1, len(ind_data)):
indc_nan = np.isnan(ind_data[t - 1])
prob = p_r * indc_nan + p_b * (1 - indc_nan)
s_m = binomial(1, prob)
ind_data[t, np.where(s_m == 1)] = np.nan
data_with_missings.loc[i, meas_names] = ind_data
return data_with_missings
[docs]def simulate_datasets(
labels,
dimensions,
n_obs,
params,
parsing_info,
update_info,
control_means,
dist_arg_dict,
dist_name="multivariate_normal",
policies=None,
):
"""Simulate datasets generated by a latent factor model.
Args:
labels (dict): Dict of lists with labels for the model quantities like
factors, periods, controls, stagemap and stages. See :ref:`labels`.
dimensions (dict): Dimensional information like n_states, n_periods, n_controls,
n_mixtures. See :ref:`dimensions`.
n_obs (int): number of observations.
params (jax.numpy.array): 1d array with model parameters
parsing_info (dict): Dictionary with information on how the parameters
have to be parsed.
control_mean (array): 1d array with initial means of the control variables
dist_name (string): the elliptical distribution to use in the mixture
dist_arg_dict (list): list of length n_mixtures of dictionaries with the
relevant arguments of the mixture distributions. Arguments with default
values should NOT be included in the dictionaries. Lengths of arrays in the
arguments should be in accordance with n_states + n_controls.
the key names of dist_arg_dict can be looked up in the module
elliptical_distributions. For multivariate_normal it's mean and cov.
policies (list): list of dictionaries. Each dictionary specifies a
a stochastic shock to a latent factor AT THE END of "period" for "factor"
with mean "effect_size" and "standard deviation"
Returns:
observed_data (pd.DataFrame): Dataset with measurements and control variables
in long format
latent_data (pd.DataFrame): Dataset with latent factors in long format
"""
states, _, log_weights, pardict = parse_params(
params, parsing_info, dimensions, labels, n_obs
)
return _simulate_datasets(
states,
log_weights,
pardict,
labels,
dimensions,
n_obs,
update_info,
control_means,
dist_arg_dict,
dist_name,
policies,
)
def _simulate_datasets(
states,
log_weights,
pardict,
labels,
dimensions,
n_obs,
update_info,
control_means,
dist_arg_dict,
dist_name,
policies,
):
"""Simulate datasets generated by a latent factor model.
Args:
See simulate_data
Returns:
See simulate_data
"""
policies = policies if policies is not None else []
n_states = dimensions["n_states"]
n_controls = dimensions["n_controls"]
n_periods = dimensions["n_periods"]
weights = np.exp(log_weights)
loadings_df = pd.DataFrame(
data=pardict["loadings"], index=update_info.index, columns=labels["factors"]
)
control_params_df = pd.DataFrame(
data=pardict["controls"], index=update_info.index, columns=labels["controls"]
)
meas_sds = pd.DataFrame(
data=pardict["meas_sds"].reshape(-1, 1), index=update_info.index
)
transition_names = labels["transition_names"]
transition_params = pardict["transition"]
shock_sds = pardict["shock_sds"]
states_means = states[0]
initial_means = np.column_stack(
[states_means, control_means.reshape(-1, n_controls - 1)]
)
if dist_name in ["_mv_student_t", "multivariate_normal"]:
for i, d in enumerate(dist_arg_dict):
d.update({"mean": initial_means[i]})
stat = np.zeros((n_periods, n_obs, n_states))
stat[0], cont = generate_start_state_and_control_variables_elliptical(
n_obs, dimensions, dist_name, dist_arg_dict, weights
)
cont = pd.DataFrame(data=cont, columns=labels["controls"])
for t in range(n_periods - 1):
# if there is a shock in period t, add it here
policies_t = [p for p in policies if p["period"] == t]
for policy in policies_t:
position = labels["factors"].index(policy["factor"])
stat[t, :, position] += _get_shock(
mean=policy["effect_size"], sd=policy["standard_deviation"], size=n_obs
)
stat[t + 1] = next_period_states(
stat[t], transition_names, transition_params[t], shock_sds[t]
)
observed_data_by_period = []
for t in range(n_periods):
meas = pd.DataFrame(
data=measurements_from_states(
stat[t],
cont.to_numpy(),
loadings_df.loc[t].to_numpy(),
control_params_df.loc[t].to_numpy(),
meas_sds.loc[t].to_numpy().flatten(),
),
columns=loadings_df.loc[t].index,
)
meas["period"] = t
observed_data_by_period.append(pd.concat([meas, cont], axis=1))
observed_data = pd.concat(observed_data_by_period, axis=0, sort=True)
observed_data["id"] = observed_data.index
observed_data.sort_values(["id", "period"], inplace=True)
observed_data.set_index(["id", "period"], inplace=True)
latent_data_by_period = []
for t in range(n_periods):
lat = pd.DataFrame(data=stat[t], columns=labels["factors"])
lat["period"] = t
latent_data_by_period.append(lat)
latent_data = pd.concat(latent_data_by_period, axis=0, sort=True)
latent_data["id"] = latent_data.index
latent_data.sort_values(["id", "period"], inplace=True)
latent_data.set_index(["id", "period"], inplace=True)
return observed_data, latent_data
def _get_shock(mean, sd, size):
"""Add stochastic effect to a factor of length n_obs.
Args:
mean (float): mean of the stochastic effect
sd (float): standard deviation of the effect
size (int): length of resulting array
Returns:
shock (np.array): 1d array of length n_obs with the stochastic shock
"""
if sd == 0:
shock = np.full(size, mean)
elif sd > 0:
shock = np.random.normal(mean, sd, size)
else:
raise ValueError("No negative standard deviation allowed.")
return shock
[docs]def generate_start_state_and_control_variables_elliptical(
n_obs, dimensions, dist_name, dist_arg_dict, weights
):
"""Draw initial states and control variables from a (mixture of) normals.
Args:
n_obs (int): number of observations
dimensions (dict): Dimensional information like n_states, n_periods, n_controls,
n_mixtures. See :ref:`dimensions`.
dist_name (string): the elliptical distribution to use in the mixture
dist_arg_dict (list of dict): list of length nmixtures of dictionaries with the
relevant arguments of the mixture distributions. Arguments with default
values should NOT be included in the dictionaries. Lengths of arrays in the
arguments should be in accordance with n_states + n_controls
weights (np.ndarray): size (nmixtures). The weight of each mixture element.
Default value is equal to 1.
Returns:
start_states (np.ndarray): shape (n_obs, n_states),
controls (np.ndarray): shape (n_obs, n_controls),
"""
n_states = dimensions["n_states"]
n_controls = dimensions["n_controls"]
if np.size(weights) == 1:
out = getattr(elliptical_distributions, dist_name)(
size=n_obs, **dist_arg_dict[0]
)
else:
helper_array = choice(np.arange(len(weights)), p=weights, size=n_obs)
out = np.zeros((n_obs, n_states + n_controls - 1))
for i in range(n_obs):
out[i] = getattr(elliptical_distributions, dist_name)(
**dist_arg_dict[helper_array[i]]
)
initial_states = out[:, 0:n_states]
controls = out[:, n_states:]
controls = np.hstack([np.ones((n_obs, 1)), controls])
return initial_states, controls
[docs]def next_period_states(states, transition_names, transition_params, shock_sds):
"""Apply transition function to factors and add shocks.
Args:
states (np.ndarray): shape (n_obs, n_states)
transition_names (list): list of strings with the names of the transition
function of each factor.
transition_params (list): list of dictionaries of length n_states with
the arguments for the transition function of each factor. A detailed
description of the arguments of transition functions can be found in the
module docstring of skillmodels.model_functions.transition_functions.
shock_sds (np.ndarray): numpy array of length n_states.
Returns:
next_factors (np.ndarray): shape(n_obs,n_states)
"""
n_obs, n_states = states.shape
states_tp1 = np.zeros((n_obs, n_states))
for i in range(n_states):
states_tp1[:, i] = getattr(tf, transition_names[i])(
states, transition_params[i]
)
# Assumption: In general err_{Obs_j,Fac_i}!=err{Obs_k,Fac_i}, where j!=k
errors = multivariate_normal(
[0] * n_states, np.diag(shock_sds ** 2), n_obs
).reshape(n_obs, n_states)
next_states = states_tp1 + errors
return next_states
[docs]def measurements_from_states(states, controls, loadings, control_params, sds):
"""Generate the variables that would be observed in practice.
This generates the data for only one period. Let n_meas be the number
of measurements in that period.
Args:
states (pd.DataFrame or np.ndarray): DataFrame of shape (n_obs, n_states)
controls (pd.DataFrame or np.ndarray): DataFrame of shape
(n_obs, n_controlsrols)
loadings (np.ndarray): numpy array of size (n_meas, n_states)
control_coeffs (np.ndarray): numpy array of size (n_meas, n_states)
sds (np.ndarray): numpy array of size (n_meas) with the standard deviations
of the measurements. Measurement error is assumed to be independent
across measurements.
Returns:
measurements (np.ndarray): array of shape (n_obs, n_meas) with measurements.
"""
n_meas = loadings.shape[0]
n_obs, n_states = states.shape
# Assumption: In general eps_{Obs_j,Meas_i}!=eps_{Obs_k,Meas_i} where j!=k
epsilon = multivariate_normal([0] * n_meas, np.diag(sds ** 2), n_obs)
states_part = np.dot(states, loadings.T)
control_part = np.dot(controls, control_params.T)
meas = states_part + control_part + epsilon
return meas