""" PRS Factor Identification Protocol for LRISK ============================================= Pukthuanthong, Roll, and Subrahmanyam (2019, Review of Financial Studies) This script implements both conditions of the PRS protocol. CONDITION 1 (Necessary): - Uses Connor-Korajczyk (1988) asymptotic PCA on individual CRSP stocks - Computes canonical correlations between ALL K factor candidates jointly and the L eigenvectors from the covariance matrix - For each significant canonical correlation, constructs canonical variates and regresses them on all candidates to identify which individual factors contribute to systematic variation - A factor passes if its mean |t-stat| across significant variates >= 1.96 CONDITION 2 (Sufficient): - Uses Fama-MacBeth regressions with Shanken (1992) correction - Test assets: CZ decile portfolios (542) and LZ long-short portfolios (185) - Only factors surviving Condition 1 enter as regressors - A factor passes if its Shanken-corrected risk price is significant at 5% NOTE ON WHAT IS NOT IMPLEMENTED: - The JNPRW (2019, JFE) instrumental variables approach for individual stocks is NOT implemented here. That approach requires DAILY return data to split into odd/even day subsamples for beta estimation, eliminating the errors-in-variables bias. We only have monthly CRSP data. - The JNPRW IV estimator (Eq. 4 in their paper) would be: gamma_IV,t = (B_IV * B_EV')^{-1} * B_IV * r_t' where B_IV and B_EV are beta matrices estimated from odd-day and even-day daily returns within each month, used as instruments and explanatory variables respectively. - To implement JNPRW, you need: daily CRSP returns, split each month's trading days into odd and even subsets, estimate betas from each subset separately, then use the IV formula above for cross-sectional regressions on individual stocks each month. INPUT FILES: 1. FactorsCombo18_Model3.csv (LRISK factor returns) 2. Three gzipped CRSP monthly stock return files (individual stocks) 3. F-F_Research_Data_5_Factors_2x3.csv (FF5 + RF) 4. F-F_Momentum_Factor.csv 5. F-F_ST_Reversal_Factor.csv 6. F-F_LT_Reversal_Factor.csv 7. q5_factors_monthly_2024.csv 8. DHS factors monthly 1972-2023.xlsx 9. BAB__QMJ_and_HML_Devil.xlsx 10. cz_op_test_assets_deciles_long_1926_2024_decileSignals_csv.gz 11. lz_test_assets_longshort1m10_long_1967_2024_csv.gz Author: Generated for K. Pukthuanthong, March 2026 """ import pandas as pd import numpy as np from scipy import stats from numpy.linalg import eigh, inv, svd import openpyxl import warnings warnings.filterwarnings('ignore') # ============================================================ # SECTION 0: FILE PATHS — EDIT THESE TO MATCH YOUR SETUP # ============================================================ # CRSP monthly stock return files (gzipped, tab-separated) CRSP_FILES = [ 'epyig7z7duqn4vcq_shrcd10_11_12_exchcd1_2_3_WITH_permno_date_part01_txt.gz', 'epyig7z7duqn4vcq_shrcd10_11_12_exchcd1_2_3_WITH_permno_date_part02_txt.gz', 'epyig7z7duqn4vcq_shrcd10_11_12_exchcd1_2_3_WITH_permno_date_part03_txt.gz', ] # LRISK factor file LRISK_FILE = 'FactorsCombo18_Model3.csv' # Established factor files FF5_FILE = 'F-F_Research_Data_5_Factors_2x3.csv' MOM_FILE = 'F-F_Momentum_Factor.csv' ST_REV_FILE = 'F-F_ST_Reversal_Factor.csv' LT_REV_FILE = 'F-F_LT_Reversal_Factor.csv' Q5_FILE = 'q5_factors_monthly_2024.csv' DHS_FILE = 'DHS factors monthly 1972-2023 (1).xlsx' BAB_FILE = 'BAB__QMJ_and_HML_Devil.xlsx' # Test asset files CZ_DECILE_FILE = 'cz_op_test_assets_deciles_long_1926_2024_decileSignals_csv.gz' LZ_LS_FILE = 'lz_test_assets_longshort1m10_long_1967_2024_csv.gz' # Settings L_PCS = 10 # Number of PCs for Condition 1 (PRS Section 7 used 10) MIN_STOCK_MONTHS = 60 # Minimum months of data per stock for CK-PCA LRISK_METHODS = ['ELM', 'LOGIT', 'LDA', 'CombineML', 'GBRT'] # ============================================================ # SECTION 1: LOAD ALL CANDIDATE FACTORS # ============================================================ def load_ff5(filepath): """Load Fama-French 5 factors + RF. Returns in decimals.""" ff5 = pd.read_csv(filepath, skiprows=3) ff5.columns = ['ds'] + list(ff5.columns[1:]) ff5 = ff5[ff5['ds'].astype(str).str.match(r'^\s*\d{6}$')].copy() ff5['ym'] = ff5['ds'].astype(int) for c in ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'RF']: ff5[c] = pd.to_numeric(ff5[c], errors='coerce') / 100 ff5.rename(columns={'RF': 'RF_rate'}, inplace=True) return ff5 def load_single_ff_factor(filepath, skip_rows, col_name): """Load a single Fama-French factor (Mom, ST_Rev, LT_Rev). Returns in decimals.""" df = pd.read_csv(filepath, skiprows=skip_rows) df.columns = ['ds', col_name] df = df[df['ds'].astype(str).str.match(r'^\s*\d{6}$')].copy() df['ym'] = df['ds'].astype(int) df[col_name] = pd.to_numeric(df[col_name], errors='coerce') / 100 return df[['ym', col_name]] def load_q5(filepath): """Load Q5 factors. Returns in decimals.""" q5 = pd.read_csv(filepath) q5['ym'] = q5['year'] * 100 + q5['month'] for c in ['R_ME', 'R_IA', 'R_ROE', 'R_EG']: q5[c] = q5[c] / 100 return q5[['ym', 'R_ME', 'R_IA', 'R_ROE', 'R_EG']] def load_dhs(filepath): """Load DHS factors (PEAD, FIN). Returns in decimals.""" wb = openpyxl.load_workbook(filepath) ws = wb['DHS factors'] data = [r for r in ws.iter_rows(min_row=2, values_only=True)] dhs = pd.DataFrame(data, columns=['ym', 'PEAD', 'FIN']) dhs['ym'] = dhs['ym'].astype(int) dhs['PEAD'] = pd.to_numeric(dhs['PEAD'], errors='coerce') / 100 dhs['FIN'] = pd.to_numeric(dhs['FIN'], errors='coerce') / 100 return dhs[['ym', 'PEAD', 'FIN']] def load_bab_qmj(filepath): """Load BAB, QMJ, HML_Devil from AQR. Already in decimals.""" raw = pd.read_excel(filepath) bab = pd.DataFrame({ 'date': pd.to_datetime(raw.iloc[:, 0]), 'BAB': pd.to_numeric(raw.iloc[:, 1], errors='coerce') }) bab['ym'] = bab['date'].dt.year * 100 + bab['date'].dt.month # Note: HML_Devil and QMJ caused multicollinearity issues (NaN in # canonical correlation computation) due to near-collinearity with # HML and RMW respectively. They are loaded but dropped from the # candidate set. return bab[['ym', 'BAB']].dropna() def load_lrisk(filepath): """Load LRISK factor returns. Filter to VW/double baseline.""" raw = pd.read_csv(filepath) raw['date'] = pd.to_datetime(raw['date'], format='%d-%b-%Y') vw = raw[(raw['wts'] == 'VW') & (raw['type'] == 'double')].copy() wide = vw.pivot_table(index='date', columns='method', values='stock') wide['ym'] = wide.index.year * 100 + wide.index.month return wide def merge_all_factors(ff5, mom, st_rev, lt_rev, q5, dhs, bab): """Merge all established factors into a single DataFrame.""" estab = ff5[['ym', 'Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'RF_rate']].copy() estab = estab.merge(mom, on='ym', how='left') estab = estab.merge(st_rev, on='ym', how='left') estab = estab.merge(lt_rev, on='ym', how='left') estab = estab.merge(q5, on='ym', how='left') estab = estab.merge(dhs, on='ym', how='left') estab = estab.merge(bab, on='ym', how='left') return estab # ============================================================ # SECTION 2: LOAD AND PREPARE CRSP INDIVIDUAL STOCK DATA # ============================================================ def load_crsp(filepaths): """ Load CRSP monthly stock returns from multiple gzipped files. Applies filters: valid returns, delisting adjustment, nano-stock exclusion. Returns DataFrame with columns: permno, date, ym, ret_adj, me, exchcd """ parts = [] for fn in filepaths: df = pd.read_csv(fn, sep='\t', dtype=str) parts.append(df) crsp = pd.concat(parts, ignore_index=True) crsp['date'] = pd.to_datetime(crsp['date']) crsp['ret'] = pd.to_numeric(crsp['RET'], errors='coerce') crsp['dlret'] = pd.to_numeric(crsp['DLRET'], errors='coerce') crsp['prc'] = pd.to_numeric(crsp['PRC'], errors='coerce') crsp['shrout'] = pd.to_numeric(crsp['SHROUT'], errors='coerce') crsp['permno'] = pd.to_numeric(crsp['PERMNO'], errors='coerce').astype('Int64') crsp['exchcd'] = pd.to_numeric(crsp['EXCHCD'], errors='coerce').astype('Int64') # Market cap in millions crsp['me'] = np.abs(crsp['prc']) * crsp['shrout'] / 1000 # Adjust for delisting returns mask_dl = crsp['dlret'].notna() crsp['ret_adj'] = crsp['ret'].copy() crsp.loc[mask_dl, 'ret_adj'] = ( (1 + crsp.loc[mask_dl, 'ret'].fillna(0)) * (1 + crsp.loc[mask_dl, 'dlret']) - 1 ) # Filter to 1996-2024, valid returns crsp = crsp[(crsp['date'] >= '1996-01-01') & (crsp['date'] <= '2024-12-31')].copy() crsp = crsp[crsp['ret_adj'].notna()].copy() crsp['ym'] = crsp['date'].dt.year * 100 + crsp['date'].dt.month # Drop nano stocks: below 20th percentile of NYSE market cap each month nyse = crsp[crsp['exchcd'] == 1] nyse_20 = nyse.groupby('ym')['me'].quantile(0.20).reset_index() nyse_20.columns = ['ym', 'me_nyse20'] crsp = crsp.merge(nyse_20, on='ym', how='left') crsp = crsp[crsp['me'] >= crsp['me_nyse20']].copy() return crsp # ============================================================ # SECTION 3: CONDITION 1 — CONNOR-KORAJCZYK PCA + CANONICAL CORRELATIONS # ============================================================ def build_gram_matrix(crsp_lrisk, months, month_to_idx, min_months=60): """ Build the T x T Gram matrix Omega = (1/N) * sum_i(r_i * r_i') using the Connor-Korajczyk (1988) approach. Because N >> T, we work with the T x T matrix rather than N x N. We iterate stock by stock to avoid building the full T x N matrix. Args: crsp_lrisk: CRSP data filtered to LRISK period (1998+) months: sorted list of ym values month_to_idx: dict mapping ym -> row index min_months: minimum months of data per stock Returns: Omega: T x T Gram matrix n_stocks: number of stocks used """ T = len(months) Omega = np.zeros((T, T)) n_stocks = 0 for permno, grp in crsp_lrisk.groupby('permno'): ym_vals = grp['ym'].values ret_vals = grp['ret_adj'].values idxs = np.array([month_to_idx.get(ym, -1) for ym in ym_vals]) valid = idxs >= 0 idxs = idxs[valid] rets = ret_vals[valid] if len(rets) < min_months: continue # Demean returns (within-stock) rets = rets - rets.mean() # Create T x 1 vector (zeros where stock has no data) r_vec = np.zeros(T) r_vec[idxs] = rets # Accumulate outer product Omega += np.outer(r_vec, r_vec) n_stocks += 1 Omega /= n_stocks return Omega, n_stocks def extract_pcs(Omega, L): """ Extract L principal components from the Gram matrix. Returns: pc_scores: T x L matrix of PC scores (eigenvectors) eigenvalues: L eigenvalues cum_var: cumulative variance explained """ eigenvalues, eigenvectors = eigh(Omega) # Sort descending idx = np.argsort(eigenvalues)[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx] total_var = eigenvalues.sum() cum_var = np.cumsum(eigenvalues) / total_var pc_scores = eigenvectors[:, :L] return pc_scores, eigenvalues[:L], cum_var def run_condition1(factor_data, pc_scores, candidate_names, T_used): """ Run PRS Condition 1: canonical correlations between K candidates and L PCs. Following PRS Section 4: 1. Compute canonical correlations between candidate set and PC set 2. Test significance via Wilks' lambda -> Chi-square 3. For each significant canonical correlation, construct canonical variate U from PC weights (kappa) 4. Regress U on all K candidates 5. A candidate passes if mean |t-stat| across significant variates >= 1.96 Args: factor_data: T_used x K matrix of candidate factor returns pc_scores: T_used x L matrix of PC scores candidate_names: list of K candidate names T_used: number of time periods Returns: results: list of dicts with per-candidate results canon_corrs: array of canonical correlations n_significant: number of significant canonical correlations """ K = factor_data.shape[1] L = pc_scores.shape[1] n = T_used # Demean F_dm = factor_data - factor_data.mean(axis=0) P_dm = pc_scores - pc_scores.mean(axis=0) # Covariance matrices Vf = (F_dm.T @ F_dm) / (n - 1) # K x K Ve = (P_dm.T @ P_dm) / (n - 1) # L x L Cfe = (F_dm.T @ P_dm) / (n - 1) # K x L # Regularize for numerical stability Vf_reg = Vf + 1e-8 * np.eye(K) Ve_reg = Ve + 1e-8 * np.eye(L) # Compute Vf^{-1/2} and Ve^{-1/2} eigvals_f, eigvecs_f = eigh(Vf_reg) Vf_inv_half = eigvecs_f @ np.diag(1.0 / np.sqrt(np.maximum(eigvals_f, 1e-12))) @ eigvecs_f.T eigvals_e, eigvecs_e = eigh(Ve_reg) Ve_inv_half = eigvecs_e @ np.diag(1.0 / np.sqrt(np.maximum(eigvals_e, 1e-12))) @ eigvecs_e.T # SVD of Vf^{-1/2} * Cfe * Ve^{-1/2} gives canonical correlations M = Vf_inv_half @ Cfe @ Ve_inv_half # K x L U_svd, S_svd, Vt_svd = svd(M, full_matrices=False) n_canon = min(K, L) canon_corrs = S_svd[:n_canon] # Test significance of each canonical correlation using Wilks' lambda # Test from dimension m onward: Wilks = product of (1 - rho_i^2) for i >= m significant_variates = [] for m in range(n_canon): wilks = np.prod(1 - canon_corrs[m:]**2) p_val = K - m q_val = L - m df_chi = p_val * q_val # Chi-square approximation (Bartlett's) chi2_stat = -(n - 1 - 0.5 * (K + L + 1)) * np.log(max(wilks, 1e-15)) p_value = 1 - stats.chi2.cdf(chi2_stat, df_chi) if p_value < 0.05: significant_variates.append(m) # For each significant canonical correlation, construct canonical variate # and regress on all candidates if len(significant_variates) == 0: # No significant correlations — all candidates fail results = [] for cn in candidate_names: results.append({ 'candidate': cn, 'mean_abs_tstat': 0, 'mean_tstat': 0, 'pass_cond1': False, 'n_sig_variates': 0, 'top_canon_corr': canon_corrs[0] if len(canon_corrs) > 0 else np.nan }) return results, canon_corrs, 0 tstat_matrix = np.zeros((len(significant_variates), K)) for idx_sv, m in enumerate(significant_variates): # Canonical variate from PCs: U_m = P_dm @ Ve^{-1/2} @ v_m # where v_m is the m-th right singular vector kappa = Ve_inv_half @ Vt_svd[m, :] U_m = P_dm @ kappa # T_used x 1 # Regress U_m on all K factor candidates X_reg = np.column_stack([np.ones(n), F_dm]) beta_reg = np.linalg.lstsq(X_reg, U_m, rcond=None)[0] resid = U_m - X_reg @ beta_reg sigma2 = np.sum(resid**2) / (n - K - 1) XtX_inv = inv(X_reg.T @ X_reg) se = np.sqrt(np.diag(sigma2 * XtX_inv)) tstats = beta_reg / se tstat_matrix[idx_sv, :] = tstats[1:] # skip intercept # Mean absolute t-stat across significant canonical variates mean_abs_tstats = np.mean(np.abs(tstat_matrix), axis=0) mean_tstats = np.mean(tstat_matrix, axis=0) results = [] for j, cn in enumerate(candidate_names): passed = mean_abs_tstats[j] >= 1.96 results.append({ 'candidate': cn, 'mean_abs_tstat': mean_abs_tstats[j], 'mean_tstat': mean_tstats[j], 'pass_cond1': passed, 'n_sig_variates': len(significant_variates), 'top_canon_corr': canon_corrs[0] }) return results, canon_corrs, len(significant_variates) # ============================================================ # SECTION 4: CONDITION 2 — FAMA-MACBETH WITH SHANKEN CORRECTION # ============================================================ def run_condition2(test_asset_returns, factor_returns, rf_returns, surviving_factor_names, lrisk_series, is_excess_returns=False): """ Run PRS Condition 2: Fama-MacBeth cross-sectional regressions. Uses ONLY factors that survived Condition 1, plus LRISK. NOTE: This uses standard OLS Fama-MacBeth on portfolio test assets. For individual stocks, the JNPRW (2019) IV approach should be used instead (see note at top of file). Steps: 1. Time-series regressions: each test asset on surviving factors -> betas 2. Each month: cross-sectional regression of returns on betas 3. Average gammas across months 4. Apply Shanken (1992) correction to standard errors Args: test_asset_returns: DataFrame with ym as index, test assets as columns factor_returns: DataFrame with ym and factor columns rf_returns: DataFrame with ym and RF_rate columns surviving_factor_names: list of factor names that passed Condition 1 lrisk_series: DataFrame with ym and LRISK columns is_excess_returns: if True, test assets are already excess returns Returns: detail: list of dicts with per-factor results """ all_factor_names = surviving_factor_names + ['LRISK'] K = len(all_factor_names) # Merge factors with LRISK and RF fac = factor_returns[['ym'] + surviving_factor_names].copy() fac = fac.merge(lrisk_series[['ym', 'LRISK']], on='ym', how='inner') fac = fac.merge(rf_returns[['ym', 'RF_rate']], on='ym', how='inner') # Common months common_ym = sorted(set(test_asset_returns.index) & set(fac['ym'])) fac = fac[fac['ym'].isin(common_ym)].sort_values('ym') ta = test_asset_returns.loc[common_ym].copy() # Drop test assets with any NAs ta_cols = ta.columns[ta.notna().all()] ta = ta[ta_cols] T = len(common_ym) N = len(ta_cols) if N < K + 2 or T < K + 2: return None # Factor matrix T x K F_mat = fac[all_factor_names].values rf_vec = fac['RF_rate'].values # Test asset return matrix T x N ta_mat = ta.values if not is_excess_returns: ta_excess = ta_mat - rf_vec.reshape(-1, 1) else: ta_excess = ta_mat # Step 1: Full-sample time-series regressions for betas betas = np.zeros((N, K)) X_ts = np.column_stack([np.ones(T), F_mat]) for i in range(N): y_i = ta_excess[:, i] valid = ~np.isnan(y_i) if valid.sum() < K + 10: betas[i, :] = np.nan continue coef = np.linalg.lstsq(X_ts[valid], y_i[valid], rcond=None)[0] betas[i, :] = coef[1:] # skip alpha # Drop test assets with NaN betas valid_ta = ~np.any(np.isnan(betas), axis=1) betas = betas[valid_ta] ta_excess = ta_excess[:, valid_ta] N_valid = valid_ta.sum() # Step 2: Monthly cross-sectional regressions (Fama-MacBeth) gammas = np.zeros((T, K + 1)) # intercept + K factors for t in range(T): y_cs = ta_excess[t, :] X_cs = np.column_stack([np.ones(N_valid), betas]) valid_cs = ~np.isnan(y_cs) if valid_cs.sum() < K + 2: gammas[t, :] = np.nan continue gamma_t = np.linalg.lstsq(X_cs[valid_cs], y_cs[valid_cs], rcond=None)[0] gammas[t, :] = gamma_t # Drop NaN months valid_t = ~np.any(np.isnan(gammas), axis=1) gammas_clean = gammas[valid_t] F_clean = F_mat[valid_t] T_clean = len(gammas_clean) # Step 3: Time-series averages and FM t-statistics gamma_mean = gammas_clean.mean(axis=0) gamma_std = gammas_clean.std(axis=0, ddof=1) gamma_tstat_fm = gamma_mean / (gamma_std / np.sqrt(T_clean)) # Step 4: Shanken (1992) correction # Var_Shanken = (1 + lambda) * Var_FM # where lambda = gamma_fac' * Sigma_F^{-1} * gamma_fac Sigma_f = np.cov(F_clean.T) try: gm_fac = gamma_mean[1:] # factor risk prices (exclude intercept) lam = gm_fac @ inv(Sigma_f) @ gm_fac shanken_factor = 1 + lam gamma_se_shanken = (gamma_std / np.sqrt(T_clean)) * np.sqrt(shanken_factor) gamma_tstat_shanken = gamma_mean / gamma_se_shanken except Exception: gamma_tstat_shanken = gamma_tstat_fm * np.nan shanken_factor = np.nan # Build results names = ['Intercept'] + all_factor_names detail = [] for j, name in enumerate(names): pval = 2 * (1 - stats.t.cdf(abs(gamma_tstat_shanken[j]), T_clean - 1)) detail.append({ 'factor': name, 'risk_price_pct': gamma_mean[j] * 100, 'tstat_FM': gamma_tstat_fm[j], 'tstat_Shanken': gamma_tstat_shanken[j], 'pvalue_Shanken': pval, 'significant_5pct': abs(gamma_tstat_shanken[j]) > 1.96, 'N_test_assets': N_valid, 'T_months': T_clean, 'shanken_correction': shanken_factor }) return detail # ============================================================ # SECTION 5: SPANNING REGRESSIONS AND SHARPE RATIO CHECK # ============================================================ def newey_west_se(y, X, lag=None): """OLS regression with Newey-West HAC standard errors.""" T_nw, k = X.shape if lag is None: lag = int(np.floor(4 * (T_nw / 100) ** (2 / 9))) beta = np.linalg.lstsq(X, y, rcond=None)[0] resid = y - X @ beta # HAC covariance S = np.zeros((k, k)) for l in range(lag + 1): w = 1.0 if l == 0 else 1 - l / (lag + 1) for t in range(l, T_nw): if l == 0: S += np.outer(X[t] * resid[t], X[t] * resid[t]) else: S += w * (np.outer(X[t] * resid[t], X[t - l] * resid[t - l]) + np.outer(X[t - l] * resid[t - l], X[t] * resid[t])) S /= T_nw bread = inv(X.T @ X / T_nw) V = bread @ S @ bread / T_nw se = np.sqrt(np.diag(V)) tstat = beta / se return beta, se, tstat def run_spanning_regression(lrisk_returns, factor_returns, factor_names): """ Regress LRISK on established factors (FF6). If alpha is significant, LRISK is not spanned. """ y = lrisk_returns.values X = np.column_stack([np.ones(len(y)), factor_returns[factor_names].values]) beta, se, tstat = newey_west_se(y, X) resid = y - X @ beta R2 = 1 - np.sum(resid**2) / np.sum((y - y.mean())**2) return { 'alpha_pct': beta[0] * 100, 'alpha_tstat_NW': tstat[0], 'alpha_pvalue': 2 * (1 - stats.t.cdf(abs(tstat[0]), len(y) - len(factor_names) - 1)), 'R_squared': R2, 'spanned': abs(tstat[0]) < 1.96 } def sharpe_ratio_check(lrisk_returns): """ Test whether LRISK Sharpe ratio exceeds MacKinlay (1995) bound of 0.6. """ sr_monthly = lrisk_returns.mean() / lrisk_returns.std() sr_annual = sr_monthly * np.sqrt(12) T = len(lrisk_returns) se_sr = np.sqrt((1 + 0.5 * sr_annual**2) / T) z_stat = (sr_annual - 0.6) / se_sr p_exceeds = 1 - stats.norm.cdf(z_stat) return { 'sharpe_annual': sr_annual, 'z_vs_bound': z_stat, 'p_exceeds_bound': p_exceeds, 'exceeds_bound': p_exceeds < 0.05 } # ============================================================ # SECTION 6: MAIN EXECUTION # ============================================================ if __name__ == '__main__': print("=" * 70) print("PRS FACTOR IDENTIFICATION PROTOCOL FOR LRISK") print("=" * 70) # --- Load all factors --- print("\nLoading factors...") ff5 = load_ff5(FF5_FILE) mom = load_single_ff_factor(MOM_FILE, 13, 'Mom') st_rev = load_single_ff_factor(ST_REV_FILE, 13, 'ST_Rev') lt_rev = load_single_ff_factor(LT_REV_FILE, 13, 'LT_Rev') q5 = load_q5(Q5_FILE) dhs = load_dhs(DHS_FILE) bab = load_bab_qmj(BAB_FILE) lrisk_wide = load_lrisk(LRISK_FILE) estab = merge_all_factors(ff5, mom, st_rev, lt_rev, q5, dhs, bab) # Candidate names (excluding HML_Devil and QMJ due to multicollinearity) estab_names = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'Mom', 'ST_Rev', 'LT_Rev', 'R_ME', 'R_IA', 'R_ROE', 'R_EG', 'PEAD', 'FIN', 'BAB'] # --- Load CRSP --- print("Loading CRSP data...") crsp = load_crsp(CRSP_FILES) crsp_lrisk = crsp[crsp['date'] >= '1998-01-01'].copy() months = sorted(crsp_lrisk['ym'].unique()) T = len(months) month_to_idx = {m: i for i, m in enumerate(months)} print(f"CRSP: {len(crsp_lrisk):,} obs, {crsp_lrisk['permno'].nunique():,} stocks, {T} months") # --- Build Gram matrix --- print("Building CK Gram matrix...") Omega, n_stocks = build_gram_matrix(crsp_lrisk, months, month_to_idx, MIN_STOCK_MONTHS) print(f"Gram matrix: {T}x{T}, {n_stocks:,} stocks") # --- Extract PCs --- pc_scores, eigenvalues, cum_var = extract_pcs(Omega, L_PCS) print(f"Using {L_PCS} PCs (explaining {cum_var[L_PCS-1]*100:.1f}% of variance)") # --- Load test assets for Condition 2 --- print("Loading test assets...") cz_dec = pd.read_csv(CZ_DECILE_FILE) cz_dec['date'] = pd.to_datetime(cz_dec['date']) cz_dec['ym'] = cz_dec['date'].dt.year * 100 + cz_dec['date'].dt.month cz_dec['sig_dec'] = cz_dec['signal'] + '_D' + cz_dec['decile'].astype(str) cz_dec['ret_pct'] = pd.to_numeric(cz_dec['ret_pct'], errors='coerce') / 100 cz_wide = cz_dec.pivot_table(index='ym', columns='sig_dec', values='ret_pct') lrisk_yms = set(lrisk_wide['ym'].values) cz_lrisk = cz_wide[cz_wide.index.isin(lrisk_yms)] valid_cz = cz_lrisk.columns[cz_lrisk.notna().sum() >= 0.8 * len(cz_lrisk)] cz_lrisk = cz_lrisk[valid_cz] lz_ls = pd.read_csv(LZ_LS_FILE) lz_ls['date'] = pd.to_datetime(lz_ls['date']) lz_ls['ym'] = lz_ls['date'].dt.year * 100 + lz_ls['date'].dt.month lz_wide = lz_ls.pivot_table(index='ym', columns='signal', values='ret_ls1m10') / 100 lz_lrisk = lz_wide[lz_wide.index.isin(lrisk_yms)] valid_lz = lz_lrisk.columns[lz_lrisk.notna().sum() >= 0.8 * len(lz_lrisk)] lz_lrisk = lz_lrisk[valid_lz] print(f"CZ decile test assets: {len(valid_cz)}") print(f"LZ long-short test assets: {len(valid_lz)}") # ============================================================ # RUN FOR EACH LRISK METHOD # ============================================================ all_cond1 = [] all_cond2 = [] all_spanning = [] all_sharpe = [] for lrisk_method in LRISK_METHODS: print(f"\n{'='*70}") print(f"LRISK METHOD: {lrisk_method}") print(f"{'='*70}") # Prepare LRISK column lrisk_col = lrisk_wide[['ym', lrisk_method]].rename(columns={lrisk_method: 'LRISK'}) candidate_names = estab_names + ['LRISK'] # --- CONDITION 1 --- all_fac = estab.merge(lrisk_col, on='ym', how='inner') all_fac = all_fac[all_fac['ym'].isin(months)].copy() all_fac['t_idx'] = all_fac['ym'].map(month_to_idx) all_fac = all_fac.sort_values('t_idx') # Drop rows with NAs in any candidate valid_rows = all_fac[candidate_names].notna().all(axis=1) all_fac_clean = all_fac[valid_rows] t_indices = all_fac_clean['t_idx'].values.astype(int) T_used = len(all_fac_clean) F_mat = all_fac_clean[candidate_names].values P_mat = pc_scores[t_indices] print(f"\nCondition 1: K={len(candidate_names)} candidates, T={T_used} months") results_c1, canon_corrs, n_sig = run_condition1(F_mat, P_mat, candidate_names, T_used) for r in results_c1: r['lrisk_method'] = lrisk_method all_cond1.extend(results_c1) # Which factors pass? passing = [r['candidate'] for r in results_c1 if r['pass_cond1']] surviving_estab = [f for f in passing if f != 'LRISK'] lrisk_passes = 'LRISK' in passing print(f"LRISK passes: {lrisk_passes}") print(f"All passing: {', '.join(passing)}") # --- CONDITION 2 (only if LRISK passes Condition 1) --- if lrisk_passes: print(f"\nCondition 2: surviving factors = {surviving_estab}") # CZ decile portfolios (raw returns, need to subtract RF) detail_cz = run_condition2( cz_lrisk, estab, ff5[['ym', 'RF_rate']], surviving_estab, lrisk_col, is_excess_returns=False ) if detail_cz: for d in detail_cz: d['lrisk_method'] = lrisk_method d['test_assets'] = 'CZ_deciles' all_cond2.extend(detail_cz) lrisk_cz = [d for d in detail_cz if d['factor'] == 'LRISK'][0] print(f" CZ deciles: LRISK price={lrisk_cz['risk_price_pct']:.3f}%/mo, " f"t_Sh={lrisk_cz['tstat_Shanken']:.3f}, " f"Pass={'YES' if lrisk_cz['significant_5pct'] else 'NO'}") # LZ long-short portfolios (already zero-investment, excess returns) detail_lz = run_condition2( lz_lrisk, estab, ff5[['ym', 'RF_rate']], surviving_estab, lrisk_col, is_excess_returns=True ) if detail_lz: for d in detail_lz: d['lrisk_method'] = lrisk_method d['test_assets'] = 'LZ_longshort' all_cond2.extend(detail_lz) lrisk_lz = [d for d in detail_lz if d['factor'] == 'LRISK'][0] print(f" LZ l/s: LRISK price={lrisk_lz['risk_price_pct']:.3f}%/mo, " f"t_Sh={lrisk_lz['tstat_Shanken']:.3f}, " f"Pass={'YES' if lrisk_lz['significant_5pct'] else 'NO'}") # --- SPANNING REGRESSION --- ff6_names = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'Mom'] merged = estab.merge(lrisk_col, on='ym', how='inner') merged = merged[ff6_names + ['LRISK']].dropna() span_result = run_spanning_regression( merged['LRISK'], merged, ff6_names ) span_result['lrisk_method'] = lrisk_method all_spanning.append(span_result) print(f"\n Spanning: alpha={span_result['alpha_pct']:.3f}%/mo, " f"t={span_result['alpha_tstat_NW']:.3f}, " f"{'Spanned' if span_result['spanned'] else 'NOT spanned'}") # --- SHARPE RATIO --- sr_result = sharpe_ratio_check(merged['LRISK']) sr_result['lrisk_method'] = lrisk_method all_sharpe.append(sr_result) print(f" Sharpe: {sr_result['sharpe_annual']:.3f} annual " f"({'EXCEEDS' if sr_result['exceeds_bound'] else 'within'} MacKinlay bound)") # ============================================================ # SAVE RESULTS # ============================================================ pd.DataFrame(all_cond1).to_csv('PRS_Condition1_results.csv', index=False) pd.DataFrame(all_cond2).to_csv('PRS_Condition2_results.csv', index=False) pd.DataFrame(all_spanning).to_csv('PRS_spanning_results.csv', index=False) pd.DataFrame(all_sharpe).to_csv('PRS_sharpe_results.csv', index=False) print("\nResults saved to CSV files.")