Source code for skillmodels.simulate_data

"""Functions to simulate a dataset generated by a latent factor model."""
import jax.numpy as jnp
import numpy as np
import pandas as pd
from numpy.random import binomial
from numpy.random import choice
from numpy.random import multivariate_normal

import skillmodels.transition_functions as tf
from skillmodels.params_index import get_params_index
from skillmodels.parse_params import create_parsing_info
from skillmodels.parse_params import parse_params
from skillmodels.process_model import process_model


[docs]def add_missings(data, meas_names, p_b, p_r): """Add np.nans to data. nans are only added to measurements, not to control variables or factors. Note that p is NOT the marginal probability of a measurement being missing. The marginal probability is given by: p_m = p/(1-serial_corr), where serial_corr = (p_r-p_b) in general != 0, since p_r != p_b. This means that in average the share of missing values (in the entire dataset) will be larger than p. Thus, p and q should be set accordingly given the desired share of missing values. Args: data (pd.DataFrame): contains the observable part of a simulated dataset meas_names (list): list of strings of names of each measurement variable p_b (float): probability of a measurement to become missing p_r (float): probability of a measurement to remain missing in the next period Returns: data_with_missings (pd.DataFrame): Dataset with a share of measurements replaced by np.nan values """ n_meas = len(meas_names) data_with_missings = data.copy(deep=True) for i in set(data_with_missings.index): ind_data = data_with_missings.loc[i][meas_names].to_numpy() s_0 = binomial(1, p_b, n_meas) ind_data[0, np.where(s_0 == 1)] = np.nan for t in range(1, len(ind_data)): indc_nan = np.isnan(ind_data[t - 1]) prob = p_r * indc_nan + p_b * (1 - indc_nan) s_m = binomial(1, prob) ind_data[t, np.where(s_m == 1)] = np.nan data_with_missings.loc[i, meas_names] = ind_data return data_with_missings
[docs]def simulate_dataset( model_dict, params, n_obs, control_means=None, control_sds=None, policies=None ): """Simulate datasets generated by a latent factor model. Args: model_dict (dict): The model specification. See: :ref:`model_specs` params (pandas.DataFrame): DataFrame with model parameters. n_obs (int): Number of simulated individuals control_means (pd.Series): Series with means of the control variables. The index are the names of the control variables. Control variables are assumed to be time constant for the simulation. control_sds (pd.Series): Series with standard deviations of the control variables. The index are the names of the control variables. policies (list): list of dictionaries. Each dictionary specifies a a stochastic shock to a latent factor AT THE END of "period" for "factor" with mean "effect_size" and "standard deviation" Returns: observed_data (pd.DataFrame): Dataset with measurements and control variables in long format latent_data (pd.DataFrame): Dataset with latent factors in long format """ model = process_model(model_dict) params_index = get_params_index( update_info=model["update_info"], labels=model["labels"], dimensions=model["dimensions"], ) params = params.reindex(params_index) parsing_info = create_parsing_info( params_index=params.index, update_info=model["update_info"], labels=model["labels"], anchoring=model["anchoring"], ) states, covs, log_weights, pardict = parse_params( params=jnp.array(params["value"].to_numpy()), parsing_info=parsing_info, dimensions=model["dimensions"], labels=model["labels"], n_obs=n_obs, ) out = _simulate_dataset( states=states, covs=covs, log_weights=log_weights, pardict=pardict, labels=model["labels"], dimensions=model["dimensions"], n_obs=n_obs, update_info=model["update_info"], control_means=control_means, control_sds=control_sds, policies=policies, ) return out
def _simulate_dataset( states, covs, log_weights, pardict, labels, dimensions, n_obs, update_info, control_means, control_sds, policies, ): """Simulate datasets generated by a latent factor model. Args: See simulate_data Returns: See simulate_data """ policies = policies if policies is not None else [] n_states = dimensions["n_states"] n_controls = dimensions["n_controls"] n_periods = dimensions["n_periods"] weights = np.exp(log_weights)[0] loadings_df = pd.DataFrame( data=pardict["loadings"], index=update_info.index, columns=labels["factors"] ) control_params_df = pd.DataFrame( data=pardict["controls"], index=update_info.index, columns=labels["controls"] ) meas_sds = pd.DataFrame( data=pardict["meas_sds"].reshape(-1, 1), index=update_info.index ) transition_names = labels["transition_names"] transition_params = pardict["transition"] shock_sds = pardict["shock_sds"] control_means_np = control_means[labels["controls"][1:]].to_numpy().astype(float) control_sds_np = control_sds[labels["controls"][1:]].to_numpy().astype(float) dist_args = [] for mixture in range(dimensions["n_mixtures"]): args = {} args["mean"] = np.hstack([states[0][mixture], control_means_np]) states_cov = covs[0][mixture].T @ covs[0][mixture] full_cov = np.zeros((n_states + n_controls - 1, n_states + n_controls - 1)) full_cov[:n_states, :n_states] = states_cov full_cov[n_states:, n_states:] = np.diag(control_sds_np ** 2) args["cov"] = full_cov dist_args.append(args) states = np.zeros((n_periods, n_obs, n_states)) states[0], controls = generate_start_state_and_control_variable( n_obs, dimensions, dist_args, weights ) controls = pd.DataFrame(data=controls, columns=labels["controls"]) for t in range(n_periods - 1): # if there is a shock in period t, add it here policies_t = [p for p in policies if p["period"] == t] for policy in policies_t: position = labels["factors"].index(policy["factor"]) states[t, :, position] += _get_shock( mean=policy["effect_size"], sd=policy["standard_deviation"], size=n_obs ) states[t + 1] = next_period_states( states[t], transition_names, transition_params[t], shock_sds[t] ) observed_data_by_period = [] for t in range(n_periods): meas = pd.DataFrame( data=measurements_from_states( states[t], controls.to_numpy(), loadings_df.loc[t].to_numpy(), control_params_df.loc[t].to_numpy(), meas_sds.loc[t].to_numpy().flatten(), ), columns=loadings_df.loc[t].index, ) meas["period"] = t observed_data_by_period.append(pd.concat([meas, controls], axis=1)) observed_data = pd.concat(observed_data_by_period, axis=0, sort=True) observed_data["id"] = observed_data.index observed_data.sort_values(["id", "period"], inplace=True) observed_data.set_index(["id", "period"], inplace=True) latent_data_by_period = [] for t in range(n_periods): lat = pd.DataFrame(data=states[t], columns=labels["factors"]) lat["period"] = t latent_data_by_period.append(lat) latent_data = pd.concat(latent_data_by_period, axis=0, sort=True) latent_data["id"] = latent_data.index latent_data.sort_values(["id", "period"], inplace=True) latent_data.set_index(["id", "period"], inplace=True) return observed_data, latent_data def _get_shock(mean, sd, size): """Add stochastic effect to a factor of length n_obs. Args: mean (float): mean of the stochastic effect sd (float): standard deviation of the effect size (int): length of resulting array Returns: shock (np.array): 1d array of length n_obs with the stochastic shock """ if sd == 0: shock = np.full(size, mean) elif sd > 0: shock = np.random.normal(mean, sd, size) else: raise ValueError("No negative standard deviation allowed.") return shock
[docs]def generate_start_state_and_control_variable(n_obs, dimensions, dist_args, weights): """Draw initial states and control variables from a (mixture of) normals. Args: n_obs (int): number of observations dimensions (dict): Dimensional information like n_states, n_periods, n_controls, n_mixtures. See :ref:`dimensions`. dist_args (list): list of dicts of length nmixtures of dictionaries with the entries "mean" and "cov" for each mixture distribution. Returns: start_states (np.ndarray): shape (n_obs, n_states), controls (np.ndarray): shape (n_obs, n_controls), """ n_states = dimensions["n_states"] n_controls = dimensions["n_controls"] if np.size(weights) == 1: out = multivariate_normal(size=n_obs, **dist_args[0]) else: helper_array = choice(np.arange(len(weights)), p=weights, size=n_obs) out = np.zeros((n_obs, n_states + n_controls - 1)) for i in range(n_obs): out[i] = multivariate_normal(**dist_args[helper_array[i]]) initial_states = out[:, 0:n_states] controls = np.column_stack([np.ones(n_obs), out[:, n_states:]]) return initial_states, controls
[docs]def next_period_states(states, transition_names, transition_params, shock_sds): """Apply transition function to factors and add shocks. Args: states (np.ndarray): shape (n_obs, n_states) transition_names (list): list of strings with the names of the transition function of each factor. transition_params (list): list of dictionaries of length n_states with the arguments for the transition function of each factor. A detailed description of the arguments of transition functions can be found in the module docstring of skillmodels.model_functions.transition_functions. shock_sds (np.ndarray): numpy array of length n_states. Returns: next_factors (np.ndarray): shape(n_obs,n_states) """ n_obs, n_states = states.shape states_tp1 = np.zeros((n_obs, n_states)) for i in range(n_states): if transition_names[i] != "constant": states_tp1[:, i] = getattr(tf, transition_names[i])( states, transition_params[i] ) else: states_tp1[:, i] = states[..., i] # Assumption: In general err_{Obs_j,Fac_i}!=err{Obs_k,Fac_i}, where j!=k errors = multivariate_normal( [0] * n_states, np.diag(shock_sds ** 2), n_obs ).reshape(n_obs, n_states) next_states = states_tp1 + errors return next_states
[docs]def measurements_from_states(states, controls, loadings, control_params, sds): """Generate the variables that would be observed in practice. This generates the data for only one period. Let n_meas be the number of measurements in that period. Args: states (pd.DataFrame or np.ndarray): DataFrame of shape (n_obs, n_states) controls (pd.DataFrame or np.ndarray): DataFrame of shape (n_obs, n_controlsrols) loadings (np.ndarray): numpy array of size (n_meas, n_states) control_coeffs (np.ndarray): numpy array of size (n_meas, n_states) sds (np.ndarray): numpy array of size (n_meas) with the standard deviations of the measurements. Measurement error is assumed to be independent across measurements. Returns: measurements (np.ndarray): array of shape (n_obs, n_meas) with measurements. """ n_meas = loadings.shape[0] n_obs, n_states = states.shape # Assumption: In general eps_{Obs_j,Meas_i}!=eps_{Obs_k,Meas_i} where j!=k epsilon = multivariate_normal([0] * n_meas, np.diag(sds ** 2), n_obs) states_part = np.dot(states, loadings.T) control_part = np.dot(controls, control_params.T) meas = states_part + control_part + epsilon return meas