"""Functions to simulate a dataset generated by a latent factor model."""
import jax.numpy as jnp
import numpy as np
import pandas as pd
from numpy.random import binomial
from numpy.random import choice
from numpy.random import multivariate_normal
import skillmodels.transition_functions as tf
from skillmodels.params_index import get_params_index
from skillmodels.parse_params import create_parsing_info
from skillmodels.parse_params import parse_params
from skillmodels.process_model import process_model
[docs]def add_missings(data, meas_names, p_b, p_r):
"""Add np.nans to data.
nans are only added to measurements, not to control variables or factors.
Note that p is NOT the marginal probability of a measurement being missing.
The marginal probability is given by: p_m = p/(1-serial_corr), where
serial_corr = (p_r-p_b) in general != 0, since p_r != p_b. This means that in
average the share of missing values (in the entire dataset) will be larger
than p. Thus, p and q should be set accordingly given the desired share
of missing values.
Args:
data (pd.DataFrame): contains the observable part of a simulated dataset
meas_names (list): list of strings of names of each measurement variable
p_b (float): probability of a measurement to become missing
p_r (float): probability of a measurement to remain missing in the next period
Returns:
data_with_missings (pd.DataFrame): Dataset with a share of measurements
replaced by np.nan values
"""
n_meas = len(meas_names)
data_with_missings = data.copy(deep=True)
for i in set(data_with_missings.index):
ind_data = data_with_missings.loc[i][meas_names].to_numpy()
s_0 = binomial(1, p_b, n_meas)
ind_data[0, np.where(s_0 == 1)] = np.nan
for t in range(1, len(ind_data)):
indc_nan = np.isnan(ind_data[t - 1])
prob = p_r * indc_nan + p_b * (1 - indc_nan)
s_m = binomial(1, prob)
ind_data[t, np.where(s_m == 1)] = np.nan
data_with_missings.loc[i, meas_names] = ind_data
return data_with_missings
[docs]def simulate_dataset(
model_dict, params, n_obs, control_means=None, control_sds=None, policies=None
):
"""Simulate datasets generated by a latent factor model.
Args:
model_dict (dict): The model specification. See: :ref:`model_specs`
params (pandas.DataFrame): DataFrame with model parameters.
n_obs (int): Number of simulated individuals
control_means (pd.Series): Series with means of the control variables. The index
are the names of the control variables. Control variables are assumed to
be time constant for the simulation.
control_sds (pd.Series): Series with standard deviations of the control
variables. The index are the names of the control variables.
policies (list): list of dictionaries. Each dictionary specifies a
a stochastic shock to a latent factor AT THE END of "period" for "factor"
with mean "effect_size" and "standard deviation"
Returns:
observed_data (pd.DataFrame): Dataset with measurements and control variables
in long format
latent_data (pd.DataFrame): Dataset with latent factors in long format
"""
model = process_model(model_dict)
params_index = get_params_index(
update_info=model["update_info"],
labels=model["labels"],
dimensions=model["dimensions"],
)
params = params.reindex(params_index)
parsing_info = create_parsing_info(
params_index=params.index,
update_info=model["update_info"],
labels=model["labels"],
anchoring=model["anchoring"],
)
states, covs, log_weights, pardict = parse_params(
params=jnp.array(params["value"].to_numpy()),
parsing_info=parsing_info,
dimensions=model["dimensions"],
labels=model["labels"],
n_obs=n_obs,
)
out = _simulate_dataset(
states=states,
covs=covs,
log_weights=log_weights,
pardict=pardict,
labels=model["labels"],
dimensions=model["dimensions"],
n_obs=n_obs,
update_info=model["update_info"],
control_means=control_means,
control_sds=control_sds,
policies=policies,
)
return out
def _simulate_dataset(
states,
covs,
log_weights,
pardict,
labels,
dimensions,
n_obs,
update_info,
control_means,
control_sds,
policies,
):
"""Simulate datasets generated by a latent factor model.
Args:
See simulate_data
Returns:
See simulate_data
"""
policies = policies if policies is not None else []
n_states = dimensions["n_states"]
n_controls = dimensions["n_controls"]
n_periods = dimensions["n_periods"]
weights = np.exp(log_weights)[0]
loadings_df = pd.DataFrame(
data=pardict["loadings"], index=update_info.index, columns=labels["factors"]
)
control_params_df = pd.DataFrame(
data=pardict["controls"], index=update_info.index, columns=labels["controls"]
)
meas_sds = pd.DataFrame(
data=pardict["meas_sds"].reshape(-1, 1), index=update_info.index
)
transition_names = labels["transition_names"]
transition_params = pardict["transition"]
shock_sds = pardict["shock_sds"]
control_means_np = control_means[labels["controls"][1:]].to_numpy().astype(float)
control_sds_np = control_sds[labels["controls"][1:]].to_numpy().astype(float)
dist_args = []
for mixture in range(dimensions["n_mixtures"]):
args = {}
args["mean"] = np.hstack([states[0][mixture], control_means_np])
states_cov = covs[0][mixture].T @ covs[0][mixture]
full_cov = np.zeros((n_states + n_controls - 1, n_states + n_controls - 1))
full_cov[:n_states, :n_states] = states_cov
full_cov[n_states:, n_states:] = np.diag(control_sds_np ** 2)
args["cov"] = full_cov
dist_args.append(args)
states = np.zeros((n_periods, n_obs, n_states))
states[0], controls = generate_start_state_and_control_variable(
n_obs, dimensions, dist_args, weights
)
controls = pd.DataFrame(data=controls, columns=labels["controls"])
for t in range(n_periods - 1):
# if there is a shock in period t, add it here
policies_t = [p for p in policies if p["period"] == t]
for policy in policies_t:
position = labels["factors"].index(policy["factor"])
states[t, :, position] += _get_shock(
mean=policy["effect_size"], sd=policy["standard_deviation"], size=n_obs
)
states[t + 1] = next_period_states(
states[t], transition_names, transition_params[t], shock_sds[t]
)
observed_data_by_period = []
for t in range(n_periods):
meas = pd.DataFrame(
data=measurements_from_states(
states[t],
controls.to_numpy(),
loadings_df.loc[t].to_numpy(),
control_params_df.loc[t].to_numpy(),
meas_sds.loc[t].to_numpy().flatten(),
),
columns=loadings_df.loc[t].index,
)
meas["period"] = t
observed_data_by_period.append(pd.concat([meas, controls], axis=1))
observed_data = pd.concat(observed_data_by_period, axis=0, sort=True)
observed_data["id"] = observed_data.index
observed_data.sort_values(["id", "period"], inplace=True)
observed_data.set_index(["id", "period"], inplace=True)
latent_data_by_period = []
for t in range(n_periods):
lat = pd.DataFrame(data=states[t], columns=labels["factors"])
lat["period"] = t
latent_data_by_period.append(lat)
latent_data = pd.concat(latent_data_by_period, axis=0, sort=True)
latent_data["id"] = latent_data.index
latent_data.sort_values(["id", "period"], inplace=True)
latent_data.set_index(["id", "period"], inplace=True)
return observed_data, latent_data
def _get_shock(mean, sd, size):
"""Add stochastic effect to a factor of length n_obs.
Args:
mean (float): mean of the stochastic effect
sd (float): standard deviation of the effect
size (int): length of resulting array
Returns:
shock (np.array): 1d array of length n_obs with the stochastic shock
"""
if sd == 0:
shock = np.full(size, mean)
elif sd > 0:
shock = np.random.normal(mean, sd, size)
else:
raise ValueError("No negative standard deviation allowed.")
return shock
[docs]def generate_start_state_and_control_variable(n_obs, dimensions, dist_args, weights):
"""Draw initial states and control variables from a (mixture of) normals.
Args:
n_obs (int): number of observations
dimensions (dict): Dimensional information like n_states, n_periods, n_controls,
n_mixtures. See :ref:`dimensions`.
dist_args (list): list of dicts of length nmixtures of dictionaries with the
entries "mean" and "cov" for each mixture distribution.
Returns:
start_states (np.ndarray): shape (n_obs, n_states),
controls (np.ndarray): shape (n_obs, n_controls),
"""
n_states = dimensions["n_states"]
n_controls = dimensions["n_controls"]
if np.size(weights) == 1:
out = multivariate_normal(size=n_obs, **dist_args[0])
else:
helper_array = choice(np.arange(len(weights)), p=weights, size=n_obs)
out = np.zeros((n_obs, n_states + n_controls - 1))
for i in range(n_obs):
out[i] = multivariate_normal(**dist_args[helper_array[i]])
initial_states = out[:, 0:n_states]
controls = np.column_stack([np.ones(n_obs), out[:, n_states:]])
return initial_states, controls
[docs]def next_period_states(states, transition_names, transition_params, shock_sds):
"""Apply transition function to factors and add shocks.
Args:
states (np.ndarray): shape (n_obs, n_states)
transition_names (list): list of strings with the names of the transition
function of each factor.
transition_params (list): list of dictionaries of length n_states with
the arguments for the transition function of each factor. A detailed
description of the arguments of transition functions can be found in the
module docstring of skillmodels.model_functions.transition_functions.
shock_sds (np.ndarray): numpy array of length n_states.
Returns:
next_factors (np.ndarray): shape(n_obs,n_states)
"""
n_obs, n_states = states.shape
states_tp1 = np.zeros((n_obs, n_states))
for i in range(n_states):
if transition_names[i] != "constant":
states_tp1[:, i] = getattr(tf, transition_names[i])(
states, transition_params[i]
)
else:
states_tp1[:, i] = states[..., i]
# Assumption: In general err_{Obs_j,Fac_i}!=err{Obs_k,Fac_i}, where j!=k
errors = multivariate_normal(
[0] * n_states, np.diag(shock_sds ** 2), n_obs
).reshape(n_obs, n_states)
next_states = states_tp1 + errors
return next_states
[docs]def measurements_from_states(states, controls, loadings, control_params, sds):
"""Generate the variables that would be observed in practice.
This generates the data for only one period. Let n_meas be the number
of measurements in that period.
Args:
states (pd.DataFrame or np.ndarray): DataFrame of shape (n_obs, n_states)
controls (pd.DataFrame or np.ndarray): DataFrame of shape
(n_obs, n_controlsrols)
loadings (np.ndarray): numpy array of size (n_meas, n_states)
control_coeffs (np.ndarray): numpy array of size (n_meas, n_states)
sds (np.ndarray): numpy array of size (n_meas) with the standard deviations
of the measurements. Measurement error is assumed to be independent
across measurements.
Returns:
measurements (np.ndarray): array of shape (n_obs, n_meas) with measurements.
"""
n_meas = loadings.shape[0]
n_obs, n_states = states.shape
# Assumption: In general eps_{Obs_j,Meas_i}!=eps_{Obs_k,Meas_i} where j!=k
epsilon = multivariate_normal([0] * n_meas, np.diag(sds ** 2), n_obs)
states_part = np.dot(states, loadings.T)
control_part = np.dot(controls, control_params.T)
meas = states_part + control_part + epsilon
return meas