Source code for skillmodels.simulate_data

"""Functions to simulate a dataset generated by a latent factor model."""
import numpy as np
import pandas as pd
from numpy.random import binomial
from numpy.random import choice
from numpy.random import multivariate_normal

import skillmodels.transition_functions as tf
from skillmodels import elliptical_distributions
from skillmodels.parse_params import parse_params


[docs]def add_missings(data, meas_names, p_b, p_r): """Add np.nans to data. nans are only added to measurements, not to control variables or factors. Note that p is NOT the marginal probability of a measurement being missing. The marginal probability is given by: p_m = p/(1-serial_corr), where serial_corr = (p_r-p_b) in general != 0, since p_r != p_b. This means that in average the share of missing values (in the entire dataset) will be larger than p. Thus, p and q should be set accordingly given the desired share of missing values. Args: data (pd.DataFrame): contains the observable part of a simulated dataset meas_names (list): list of strings of names of each measurement variable p_b (float): probability of a measurement to become missing p_r (float): probability of a measurement to remain missing in the next period Returns: data_with_missings (pd.DataFrame): Dataset with a share of measurements replaced by np.nan values """ n_meas = len(meas_names) data_with_missings = data.copy(deep=True) for i in set(data_with_missings.index): ind_data = data_with_missings.loc[i][meas_names].to_numpy() s_0 = binomial(1, p_b, n_meas) ind_data[0, np.where(s_0 == 1)] = np.nan for t in range(1, len(ind_data)): indc_nan = np.isnan(ind_data[t - 1]) prob = p_r * indc_nan + p_b * (1 - indc_nan) s_m = binomial(1, prob) ind_data[t, np.where(s_m == 1)] = np.nan data_with_missings.loc[i, meas_names] = ind_data return data_with_missings
[docs]def simulate_datasets( labels, dimensions, n_obs, params, parsing_info, update_info, control_means, dist_arg_dict, dist_name="multivariate_normal", policies=None, ): """Simulate datasets generated by a latent factor model. Args: labels (dict): Dict of lists with labels for the model quantities like factors, periods, controls, stagemap and stages. See :ref:`labels`. dimensions (dict): Dimensional information like n_states, n_periods, n_controls, n_mixtures. See :ref:`dimensions`. n_obs (int): number of observations. params (jax.numpy.array): 1d array with model parameters parsing_info (dict): Dictionary with information on how the parameters have to be parsed. control_mean (array): 1d array with initial means of the control variables dist_name (string): the elliptical distribution to use in the mixture dist_arg_dict (list): list of length n_mixtures of dictionaries with the relevant arguments of the mixture distributions. Arguments with default values should NOT be included in the dictionaries. Lengths of arrays in the arguments should be in accordance with n_states + n_controls. the key names of dist_arg_dict can be looked up in the module elliptical_distributions. For multivariate_normal it's mean and cov. policies (list): list of dictionaries. Each dictionary specifies a a stochastic shock to a latent factor AT THE END of "period" for "factor" with mean "effect_size" and "standard deviation" Returns: observed_data (pd.DataFrame): Dataset with measurements and control variables in long format latent_data (pd.DataFrame): Dataset with latent factors in long format """ states, _, log_weights, pardict = parse_params( params, parsing_info, dimensions, labels, n_obs ) return _simulate_datasets( states, log_weights, pardict, labels, dimensions, n_obs, update_info, control_means, dist_arg_dict, dist_name, policies, )
def _simulate_datasets( states, log_weights, pardict, labels, dimensions, n_obs, update_info, control_means, dist_arg_dict, dist_name, policies, ): """Simulate datasets generated by a latent factor model. Args: See simulate_data Returns: See simulate_data """ policies = policies if policies is not None else [] n_states = dimensions["n_states"] n_controls = dimensions["n_controls"] n_periods = dimensions["n_periods"] weights = np.exp(log_weights) loadings_df = pd.DataFrame( data=pardict["loadings"], index=update_info.index, columns=labels["factors"] ) control_params_df = pd.DataFrame( data=pardict["controls"], index=update_info.index, columns=labels["controls"] ) meas_sds = pd.DataFrame( data=pardict["meas_sds"].reshape(-1, 1), index=update_info.index ) transition_names = labels["transition_names"] transition_params = pardict["transition"] shock_sds = pardict["shock_sds"] states_means = states[0] initial_means = np.column_stack( [states_means, control_means.reshape(-1, n_controls - 1)] ) if dist_name in ["_mv_student_t", "multivariate_normal"]: for i, d in enumerate(dist_arg_dict): d.update({"mean": initial_means[i]}) stat = np.zeros((n_periods, n_obs, n_states)) stat[0], cont = generate_start_state_and_control_variables_elliptical( n_obs, dimensions, dist_name, dist_arg_dict, weights ) cont = pd.DataFrame(data=cont, columns=labels["controls"]) for t in range(n_periods - 1): # if there is a shock in period t, add it here policies_t = [p for p in policies if p["period"] == t] for policy in policies_t: position = labels["factors"].index(policy["factor"]) stat[t, :, position] += _get_shock( mean=policy["effect_size"], sd=policy["standard_deviation"], size=n_obs ) stat[t + 1] = next_period_states( stat[t], transition_names, transition_params[t], shock_sds[t] ) observed_data_by_period = [] for t in range(n_periods): meas = pd.DataFrame( data=measurements_from_states( stat[t], cont.to_numpy(), loadings_df.loc[t].to_numpy(), control_params_df.loc[t].to_numpy(), meas_sds.loc[t].to_numpy().flatten(), ), columns=loadings_df.loc[t].index, ) meas["period"] = t observed_data_by_period.append(pd.concat([meas, cont], axis=1)) observed_data = pd.concat(observed_data_by_period, axis=0, sort=True) observed_data["id"] = observed_data.index observed_data.sort_values(["id", "period"], inplace=True) observed_data.set_index(["id", "period"], inplace=True) latent_data_by_period = [] for t in range(n_periods): lat = pd.DataFrame(data=stat[t], columns=labels["factors"]) lat["period"] = t latent_data_by_period.append(lat) latent_data = pd.concat(latent_data_by_period, axis=0, sort=True) latent_data["id"] = latent_data.index latent_data.sort_values(["id", "period"], inplace=True) latent_data.set_index(["id", "period"], inplace=True) return observed_data, latent_data def _get_shock(mean, sd, size): """Add stochastic effect to a factor of length n_obs. Args: mean (float): mean of the stochastic effect sd (float): standard deviation of the effect size (int): length of resulting array Returns: shock (np.array): 1d array of length n_obs with the stochastic shock """ if sd == 0: shock = np.full(size, mean) elif sd > 0: shock = np.random.normal(mean, sd, size) else: raise ValueError("No negative standard deviation allowed.") return shock
[docs]def generate_start_state_and_control_variables_elliptical( n_obs, dimensions, dist_name, dist_arg_dict, weights ): """Draw initial states and control variables from a (mixture of) normals. Args: n_obs (int): number of observations dimensions (dict): Dimensional information like n_states, n_periods, n_controls, n_mixtures. See :ref:`dimensions`. dist_name (string): the elliptical distribution to use in the mixture dist_arg_dict (list of dict): list of length nmixtures of dictionaries with the relevant arguments of the mixture distributions. Arguments with default values should NOT be included in the dictionaries. Lengths of arrays in the arguments should be in accordance with n_states + n_controls weights (np.ndarray): size (nmixtures). The weight of each mixture element. Default value is equal to 1. Returns: start_states (np.ndarray): shape (n_obs, n_states), controls (np.ndarray): shape (n_obs, n_controls), """ n_states = dimensions["n_states"] n_controls = dimensions["n_controls"] if np.size(weights) == 1: out = getattr(elliptical_distributions, dist_name)( size=n_obs, **dist_arg_dict[0] ) else: helper_array = choice(np.arange(len(weights)), p=weights, size=n_obs) out = np.zeros((n_obs, n_states + n_controls - 1)) for i in range(n_obs): out[i] = getattr(elliptical_distributions, dist_name)( **dist_arg_dict[helper_array[i]] ) initial_states = out[:, 0:n_states] controls = out[:, n_states:] controls = np.hstack([np.ones((n_obs, 1)), controls]) return initial_states, controls
[docs]def next_period_states(states, transition_names, transition_params, shock_sds): """Apply transition function to factors and add shocks. Args: states (np.ndarray): shape (n_obs, n_states) transition_names (list): list of strings with the names of the transition function of each factor. transition_params (list): list of dictionaries of length n_states with the arguments for the transition function of each factor. A detailed description of the arguments of transition functions can be found in the module docstring of skillmodels.model_functions.transition_functions. shock_sds (np.ndarray): numpy array of length n_states. Returns: next_factors (np.ndarray): shape(n_obs,n_states) """ n_obs, n_states = states.shape states_tp1 = np.zeros((n_obs, n_states)) for i in range(n_states): states_tp1[:, i] = getattr(tf, transition_names[i])( states, transition_params[i] ) # Assumption: In general err_{Obs_j,Fac_i}!=err{Obs_k,Fac_i}, where j!=k errors = multivariate_normal( [0] * n_states, np.diag(shock_sds ** 2), n_obs ).reshape(n_obs, n_states) next_states = states_tp1 + errors return next_states
[docs]def measurements_from_states(states, controls, loadings, control_params, sds): """Generate the variables that would be observed in practice. This generates the data for only one period. Let n_meas be the number of measurements in that period. Args: states (pd.DataFrame or np.ndarray): DataFrame of shape (n_obs, n_states) controls (pd.DataFrame or np.ndarray): DataFrame of shape (n_obs, n_controlsrols) loadings (np.ndarray): numpy array of size (n_meas, n_states) control_coeffs (np.ndarray): numpy array of size (n_meas, n_states) sds (np.ndarray): numpy array of size (n_meas) with the standard deviations of the measurements. Measurement error is assumed to be independent across measurements. Returns: measurements (np.ndarray): array of shape (n_obs, n_meas) with measurements. """ n_meas = loadings.shape[0] n_obs, n_states = states.shape # Assumption: In general eps_{Obs_j,Meas_i}!=eps_{Obs_k,Meas_i} where j!=k epsilon = multivariate_normal([0] * n_meas, np.diag(sds ** 2), n_obs) states_part = np.dot(states, loadings.T) control_part = np.dot(controls, control_params.T) meas = states_part + control_part + epsilon return meas