diff --git a/src/bopforge/categorical_pipeline/__main__.py b/src/bopforge/categorical_pipeline/__main__.py index d24e6ee..4cac102 100644 --- a/src/bopforge/categorical_pipeline/__main__.py +++ b/src/bopforge/categorical_pipeline/__main__.py @@ -76,7 +76,7 @@ def fit_signal_model(result_folder: Path) -> None: all_settings = dataif.load_result("settings.yaml") summary = dataif.load_result("summary.yaml") - signal_model = functions.get_signal_model(all_settings, df, summary) + signal_model = functions.get_signal_model(df, all_settings, summary) signal_model.fit_model(outer_step_size=200, outer_max_iter=100) df = functions.add_cols(df, signal_model) @@ -110,7 +110,7 @@ def select_bias_covs(result_folder: Path) -> None: cov_finder_linear_model = dataif.load_result("signal_model.pkl") cov_finder = functions.get_cov_finder( - all_settings, settings, cov_finder_linear_model, df + df, all_settings, settings, cov_finder_linear_model ) cov_finder.select_covs(verbose=True) @@ -158,7 +158,7 @@ def fit_linear_model(result_folder: Path) -> None: ) summary = functions.get_linear_model_summary( - all_settings, settings, summary, df, cat_coefs, pair_coefs + df, all_settings, settings, summary, cat_coefs, pair_coefs ) df_cleaned = df.loc[:, ~df.columns.str.startswith("interacted_")] @@ -168,12 +168,11 @@ def fit_linear_model(result_folder: Path) -> None: ) fig = functions.plot_linear_model( + df, name, summary, - df, cat_coefs, pair_coefs, - show_ref=all_settings["figure"]["show_ref"], ) dataif.dump_result(linear_model, "linear_model.pkl") diff --git a/src/bopforge/categorical_pipeline/functions.py b/src/bopforge/categorical_pipeline/functions.py index 68f8dbc..0ec0156 100644 --- a/src/bopforge/categorical_pipeline/functions.py +++ b/src/bopforge/categorical_pipeline/functions.py @@ -9,7 +9,7 @@ from pandas import DataFrame from scipy.stats import norm -from bopforge.utils import get_beta_info +from bopforge.utils import get_beta_info, score_to_star_rating def covariate_preprocessing( @@ -125,16 +125,16 @@ def covariate_design_mat( return df -def get_signal_model(settings: dict, df: DataFrame, summary: dict) -> MRBRT: +def get_signal_model(df: DataFrame, settings: dict, summary: dict) -> MRBRT: """Create signal model for outliers identification and covariate selection step. Parameters ---------- - settings - Dictionary containing settings for covariates and fitting the signal model df Data frame contains the training data. + settings + Dictionary containing settings for covariates and fitting the signal model summary Dictionary containing initial summary outputs @@ -148,7 +148,7 @@ def get_signal_model(settings: dict, df: DataFrame, summary: dict) -> MRBRT: cov_settings = settings["cov_type"] # Load in model covariates and candidate bias covariates - # interacted needs the design matrix columns + # interacted needs the design matrix columns from input dataframe interacted_covs = [ col for col in df.columns if col.startswith("interacted_") ] @@ -231,23 +231,23 @@ def add_cols(df: DataFrame, signal_model: MRBRT) -> DataFrame: def get_cov_finder( + df: DataFrame, all_settings: dict, settings: dict, cov_finder_linear_model: MRBRT, - df: DataFrame, ) -> CovFinder: """Create the instance of CovFinder class. Parameters ---------- + df + Dataframe containing training data with column indicating outliers all_settings All model settings settings Settings for pre-selected bias covariates. cov_finder_linear_model Fitted cov finder linear model (signal model for categorical risks) - df - Dataframe containing training data with column indicating outliers Returns ------- @@ -406,12 +406,11 @@ def get_cat_coefs( "beta": beta, } ) - # Subset to betas for categories only + # Subset to betas for exposure categories only (no covariates) beta_cats = beta_info[beta_info["cov_name"].str.startswith("cat_")].copy() beta_cats["cat"] = beta_cats["cov_name"].str.removeprefix("cat_") # Extract gamma, calculate gamma_sd, and merge with beta dataframe - # Since gamma and gamma sd are shared across categories, merge in directly gamma = model.gamma_soln[0] gamma_fisher = lt.get_gamma_fisher(gamma) gamma_sd = 1.0 / np.sqrt(gamma_fisher[0, 0]) @@ -503,28 +502,33 @@ def get_linear_model( ) model = MRBRT(data, cov_models) + return model -def hess_subset(matrix, i): +def hess_subset(matrix: np.typing.NDArray, i: int) -> np.ndarray: """Remove the ith row and ith column from the Hessian to create the sub-matrix of the Hessian for use in calculating pairwise beta variance Parameters ---------- - matrix (numpy.ndarray): Hessian - i (int): The index of the row and column to remove. + matrix + Hessian matrix for subsetting + i + index of the row and column to remove Returns ------- - numpy.ndarray: The (n-1) x (n-1) sub-Hessian. + np.ndarray + The (n-1) x (n-1) sub-Hessian. """ + return np.delete(np.delete(matrix, i, axis=0), i, axis=1) def get_pairwise_beta_var( hessian: np.ndarray, cat_names: list[str] -) -> pd.DataFrame: +) -> DataFrame: """Calculate beta_sd for pairwise comparisons using submatrices of the Hessian. Removing the i'th row and column of the Hessian allows us to directly invert the resulting sub-Hessian (no longer singular) to get the approximated @@ -570,6 +574,34 @@ def get_pairwise_beta_var( return var_df[["pair_standardized", "inner_beta_sd"]] +def get_scores(pair_info: DataFrame) -> DataFrame: + """Calculate UIs, logBPRF, and scores for pairwise comparisons + + Parameters + ---------- + pair_info + Dataframe containing pairwise betas and inner/outer beta SDs + + Returns + ------- + DataFrame + Updated pair_info dataframe with UIs, logBPRF, and scores appended + """ + sign = np.sign(pair_info["beta"]) + pair_info = pair_info.assign( + inner_ui_lower=pair_info["beta"] - 1.96 * pair_info["inner_beta_sd"], + inner_ui_upper=pair_info["beta"] + 1.96 * pair_info["inner_beta_sd"], + outer_ui_lower=pair_info["beta"] - 1.96 * pair_info["outer_beta_sd"], + outer_ui_upper=pair_info["beta"] + 1.96 * pair_info["outer_beta_sd"], + log_bprf=pair_info["beta"] - sign * 1.645 * pair_info["outer_beta_sd"], + ) + signed_bprf = sign * pair_info["log_bprf"] + product = np.prod(pair_info[["inner_ui_lower", "inner_ui_upper"]], axis=1) + pair_info["score"] = np.where(product < 0, float("nan"), 0.5 * signed_bprf) + + return pair_info + + def get_pair_info( settings: dict, summary: dict, @@ -591,7 +623,7 @@ def get_pair_info( Returns ------- - dict + DataFrame Dataframe containing pairwise beta, gamma, their standard deviations, and summary outputs. """ @@ -660,73 +692,36 @@ def get_pair_info( + 2 * cat_pairs["gamma_sd"] ) # Subset to just the relevant columns - cat_pairs_subset = cat_pairs[ - [ - "ref_risk_cat", - "alt_risk_cat", - "pair", - "beta_adjusted", - "inner_beta_sd", - "outer_beta_sd", - "gamma", - "gamma_sd", - ] - ].rename(columns={"beta_adjusted": "beta"}) - - sign = np.sign(cat_pairs_subset["beta"]) - cat_pairs_subset = cat_pairs_subset.assign( - inner_ui_lower=cat_pairs_subset["beta"] - - 1.96 * cat_pairs_subset["inner_beta_sd"], - inner_ui_upper=cat_pairs_subset["beta"] - + 1.96 * cat_pairs_subset["inner_beta_sd"], - outer_ui_lower=cat_pairs_subset["beta"] - - 1.96 * cat_pairs_subset["outer_beta_sd"], - outer_ui_upper=cat_pairs_subset["beta"] - + 1.96 * cat_pairs_subset["outer_beta_sd"], - log_bprf=cat_pairs_subset["beta"] - - sign * 1.645 * cat_pairs_subset["outer_beta_sd"], - ) - signed_bprf = sign * cat_pairs_subset["log_bprf"] - product = np.prod( - cat_pairs_subset[["inner_ui_lower", "inner_ui_upper"]], axis=1 - ) - cat_pairs_subset["score"] = np.where( - product < 0, float("nan"), 0.5 * signed_bprf + cat_pairs_subset = cat_pairs.drop(columns="pair_standardized").rename( + columns={"beta_adjusted": "beta"} ) - score_bounds = [ - np.isnan(cat_pairs_subset["score"]), - cat_pairs_subset["score"] > np.log(1 + 0.85), - cat_pairs_subset["score"] > np.log(1 + 0.5), - cat_pairs_subset["score"] > np.log(1 + 0.15), - cat_pairs_subset["score"] > 0, - ] - ratings = [0, 5, 4, 3, 2] - cat_pairs_subset["star_rating"] = np.select( - score_bounds, ratings, default=1 + + # Calculate UIs, logBPRF, and scores + cat_pairs_subset = get_scores(cat_pairs_subset) + # Add star ratings + cat_pairs_subset["star_rating"] = cat_pairs_subset["score"].apply( + score_to_star_rating ) # Subset dataset to only original ref-alt comparisons if ordinal categories + # Check first that categories in cat_order match those in data cat_order = settings["cat_order"] + _validate_cat_order(cat_order, cats) if not cat_order: cat_pairs_subset = cat_pairs_subset else: - if sorted(cat_order) != sorted(cats.tolist()): - raise ValueError( - f"Error: cat_order does not match the expected categories. Expected: {cats}, but got: {cat_order}" - ) - else: - cat_pairs_subset = cat_pairs_subset[ - cat_pairs_subset["pair"].str.contains(ref_cat) - ] + cat_pairs_subset = cat_pairs_subset[ + cat_pairs_subset["pair"].str.contains(ref_cat) + ] cat_pairs_subset.sort_values(by="beta", ascending=False, inplace=True) return cat_pairs_subset def get_linear_model_summary( + df: DataFrame, all_settings: dict, settings: dict, summary: dict, - df: DataFrame, cat_coefs: DataFrame, pair_coefs: DataFrame, ) -> dict: @@ -734,14 +729,14 @@ def get_linear_model_summary( Parameters ---------- + df + Data frame contains the all dataset. all_settings Complete list of settings - settings + settings Settings for the complete summary section. summary Summary from the signal model. - df - Data frame contains the all dataset. cat_coefs Data frame with beta and gamma for each category for fitted linear model pair_coefs @@ -769,39 +764,31 @@ def get_linear_model_summary( summary["star_rating"] = dict( zip(pair_coefs["pair"], pair_coefs["star_rating"]) ) - # Output combined score and star rating if ordinal categories + cat_order = all_settings["cat_order"] + # Output combined score and star rating: max score if non-ordinal, + # 'averaged' score if ordinal categories if cat_order: - if sorted(cat_order) != sorted(cats.tolist()): - raise ValueError( - f"Error: cat_order does not match the expected categories. Expected: {cats}, but got: {cat_order}" - ) + if np.any(np.isnan(pair_coefs["score"])): + summary["combined_score"] = float("nan") else: - if np.any(np.isnan(pair_coefs["score"])): - summary["combined_score"] = float("nan") - summary["combined_star_rating"] = 0 - else: - sign = np.sign(pair_coefs["beta"]) - signed_bprf = sign * pair_coefs["log_bprf"] - max_idx = signed_bprf.idxmax() - score = float( - (1 / len(alt_cats)) - * (np.sum(signed_bprf) - 0.5 * signed_bprf[max_idx]) - ) - summary["combined_score"] = score - # Assign star rating based on ROS - if np.isnan(score): - summary["combined_star_rating"] = 0 - elif score > np.log(1 + 0.85): - summary["combined_star_rating"] = 5 - elif score > np.log(1 + 0.50): - summary["combined_star_rating"] = 4 - elif score > np.log(1 + 0.15): - summary["combined_star_rating"] = 3 - elif score > 0: - summary["combined_star_rating"] = 2 - else: - summary["combined_star_rating"] = 1 + sign = np.sign(pair_coefs["beta"]) + signed_bprf = sign * pair_coefs["log_bprf"] + max_idx = signed_bprf.abs().idxmax() + score = float( + (1 / len(alt_cats)) + * (np.sum(signed_bprf) - 0.5 * signed_bprf[max_idx]) + ) + summary["combined_score"] = score + summary["combined_star_rating"] = score_to_star_rating(score) + summary["category_type"] = "ordinal" + else: + max_idx = pair_coefs["score"].idxmax() + summary["combined_score"] = float(pair_coefs.loc[max_idx, "score"]) + summary["combined_star_rating"] = int( + pair_coefs.loc[max_idx, "star_rating"] + ) + summary["category_type"] = "non-ordinal" # compute the publication bias index = df.is_outlier == 0 @@ -962,29 +949,26 @@ def get_quantiles( def plot_linear_model( + df: DataFrame, name: str, summary: dict, - df: DataFrame, cat_coefs: DataFrame, pair_coefs: DataFrame, - show_ref: bool = True, ) -> Figure: """Plot the linear model Parameters ---------- + df + Data frame contains the training data. name Name of the pair summary Completed summary file. - df - Data frame contains the training data. cat_coefs Data frame containing the fitted beta and gamma coefficients pair_coefs - Data frame containing fitted pairwise model coefficients - show_ref - Whether to show the reference line. Default is `True`. + Data frame containing fitted pairwise model parameters Returns ------- @@ -1002,12 +986,10 @@ def plot_linear_model( # plot data _plot_data( - name, - summary, df, + name, pair_coefs, ax[0], - show_ref=show_ref, ) # plot beta coefficients and uncertainty pred = pair_coefs.beta @@ -1044,7 +1026,7 @@ def plot_linear_model( # Add star ratings as text labels for _, row in pair_coefs.iterrows(): stars = row["star_rating"] - label = "★" * int(stars) if stars > 0 else "0" + label = "★" * int(stars) if stars > 0 else "☆" ax[0].text( x_text_pos, row["y_mid"], @@ -1054,10 +1036,12 @@ def plot_linear_model( fontsize=10, color="black", ) - ax[0].set_xlim(xlim[0], x_text_pos + 0.12 * (xlim[1] - xlim[0])) + ax[0].set_xlim( + xlim[0], x_text_pos + max_label_length * char_spacing + 2 * char_spacing + ) # plot funnel - _plot_funnel(summary, cat_coefs, df, ax[1]) + _plot_funnel(df, summary, cat_coefs, ax[1]) return fig @@ -1232,29 +1216,23 @@ def plot_linear_panel_model( def _plot_data( - name: str, - summary: dict, df: DataFrame, + name: str, pair_coefs: DataFrame, ax: Axes, - show_ref: bool = True, ) -> Axes: """Plot data points Parameters ---------- - name - Name of the pair. - summary - The summary of the signal model. df Data frame contains training data. + name + Name of the pair. pair_coefs Data frame containing the fitted pairwise model coefficients ax Axes of the figure. Usually corresponding to one panel of a figure. - show_ref - Whether to show the reference line. Default is `True`. Returns ------- @@ -1269,7 +1247,6 @@ def _plot_data( df["pair_standardized"] = df["pair"].apply( lambda x: "-".join(sorted(x.split("-"))) ) - # pair_coefs = pair_coefs.copy() pair_coefs["pair_standardized"] = pair_coefs["pair"].apply( lambda x: "-".join(sorted(x.split("-"))) ) @@ -1323,17 +1300,17 @@ def _plot_data( def _plot_funnel( - summary: dict, cat_coefs: DataFrame, df: DataFrame, ax: Axes + df: DataFrame, summary: dict, cat_coefs: DataFrame, ax: Axes ) -> Axes: """Plot the funnel plot Parameters ---------- + df + Data frame that contains training data. summary Complete summary file. cat_coefs Data frame with category-specific fitted model coefficients - df - Data frame that contains training data. ax Axes of the figure. Usually corresponding to one panel of a figure. @@ -1349,18 +1326,6 @@ def _plot_funnel( df.alt_risk_cat.map(beta).values - df.ref_risk_cat.map(beta).values ) residual_sd = np.sqrt(df["ln_rr_se"].values ** 2 + (gamma)) - # if summary["cat_specific_gamma"]: - # residual_sd = np.sqrt( - # df["ln_rr_se"].values ** 2 - # + ( - # df["alt_risk_cat"].map(gamma).values - # + df["ref_risk_cat"].map(gamma).values - # ) - # ) - # else: - # residual_sd = np.sqrt( - # df.ln_rr_se.values**2 + df["ref_risk_cat"].map(gamma).values - # ) index = df.is_outlier == 1 sd_max = residual_sd.max() * 1.1 @@ -1390,6 +1355,29 @@ def _plot_funnel( return ax +def _validate_cat_order(cat_order: list, cats: pd.Series) -> None: + """Validate that the category list matches the list of fitted categories. + Should be provided for ordinal categories only. + + Parameters + ---------- + cat_order + list of all risk exposure categories provided in cat_order setting + cats + risk exposure categories extracted from dataset + + Returns + ------- + ValueError if user-provided list of categories does not match the categories in the data + """ + if cat_order: + if sorted(cat_order) != sorted(cats.tolist()): + raise ValueError( + f"Error: cat_order does not match the expected categories. " + f"Expected: {sorted(cats.tolist())}, but got: {sorted(cat_order)}" + ) + + def _validate_distinct_cov_sets( bias: set, interacted: set, non_interacted: set ) -> None: @@ -1399,11 +1387,11 @@ def _validate_distinct_cov_sets( Parameters ---------- bias - List of bias covariates + Set of bias covariates interacted - List of interacted model covariates + Set of interacted model covariates non_interacted - List of non-interacted model covariates + Set of non-interacted model covariates Returns ------- @@ -1500,358 +1488,9 @@ def _find_covs_to_remove(df: DataFrame, all_covs: set) -> set: set Set of covariates with low variability to remove from data and settings files """ - # Identify covariates to be removed: all or all-but-one of the same value covs_to_remove = set() for col in all_covs: counts = df[col].value_counts() if counts.iloc[0] >= len(df[col]) - 1: covs_to_remove.add(col) return covs_to_remove - - -# def plot_signal_model( -# name: str, -# summary: dict, -# df: DataFrame, -# cat_coefs: DataFrame, -# show_ref: bool = True, -# ) -> Figure: -# """Plot the signal model - -# Parameters -# ---------- -# name -# Name of the pair. -# summary -# Summary from the signal model. -# df -# Data frame contains training data. -# df_coef -# Data frame containing the fitted beta coefficients for the signal model -# show_ref -# Whether to show the reference line. Default is `True`. - -# Returns -# ------- -# Figure -# The figure object for signal model. - -# """ -# offset = 0.05 -# # create fig obj -# fig, ax = plt.subplots(figsize=(8, 5)) - -# # plot data -# _plot_data( -# name, -# summary, -# df, -# cat_coefs, -# ax, -# show_ref=show_ref, -# ) - -# # plot beta coefficients -# if summary["normalize_to_tmrel"]: -# coef_min = cat_coefs.beta.min() -# for i, row in cat_coefs.iterrows(): -# ax.plot( -# [row["x_start"] + offset, row["x_end"] - offset], -# [row["beta"] - coef_min] * 2, -# color="#008080", -# ) -# else: -# for i, row in cat_coefs.iterrows(): -# ax.plot( -# [row["x_start"] + offset, row["x_end"] - offset], -# [row["beta"]] * 2, -# color="#008080", -# ) - -# return fig - - -# def get_cat_coefs_old( -# settings: dict, -# model: MRBRT, -# type: str, -# ref_cat_input: str, -# ) -> tuple[DataFrame]: -# """Get beta and gamma coefficients for the categories. - -# Parameters -# ---------- -# df -# Data frame contains the training data. -# settings -# The settings for category order -# model -# Fitted model for the categories, linear or signal -# type -# String specifying whether model is signal or linear model -# ref_cat_input -# Reference category. Optional, will be inferred from settings or data -# if not provided. - -# Returns -# ------- -# tuple[DataFrame] -# Dataframe of beta, beta_sd, gamma, gamma_sd for each category and ranges to plot each category - -# """ - -# lt = model.lt - -# # Extract betas for categories and covariates, ensuring correct matching -# cov_names = [] -# for cov_model in model.cov_models: -# if isinstance(cov_model, LinearCovModel): -# cov_name = cov_model.alt_cov[0] -# if cov_model.num_x_vars == 1: -# cov_names.append(cov_name) -# else: -# cov_names.extend( -# [f"{cov_name}_{i}" for i in range(cov_model.num_x_vars)] -# ) -# elif isinstance(cov_model, LinearCatCovModel): -# cov_names.extend("cat_" + cov_model.cats.astype(str)) -# else: -# raise TypeError("Unknown cov_model type") - -# beta = model.beta_soln -# hessian = lt.hessian(lt.soln)[: lt.k_beta, : lt.k_beta] -# beta_sd = 1.0 / np.sqrt(np.diag(hessian)) - -# beta_info = pd.DataFrame( -# { -# "cov_name": cov_names, -# "beta": beta, -# # "beta_sd": beta_sd, -# } -# ) -# # Subset to betas for categories only -# beta_cats = beta_info[beta_info["cov_name"].str.startswith("cat_")].copy() -# beta_cats["cat"] = beta_cats["cov_name"].str.removeprefix("cat_") - -# # Extract gamma, calculate gamma_sd, and merge with beta dataframe for linear model only -# # If category-specific gamma is not included: -# # gamma/gamma_sd values will be identical for each category -# if type == "linear": -# lt = model.lt -# if settings["complete_summary"]["cat_gamma"]["cat_specific_gamma"]: -# gamma = model.gamma_soln -# gamma_fisher = lt.get_gamma_fisher(gamma) -# gamma_cov = np.linalg.inv(gamma_fisher) -# gamma_sd = np.sqrt(np.diag(gamma_cov)) -# gamma_cats = pd.DataFrame( -# { -# "cat": model.cov_models[ -# model.cov_model_names == "alt_risk_cat" -# ].cats, -# "gamma": gamma, -# "gamma_sd": gamma_sd, -# } -# ) -# else: -# gamma = model.gamma_soln[0] -# gamma_fisher = lt.get_gamma_fisher(gamma) -# gamma_sd = 1.0 / np.sqrt(gamma_fisher[0, 0]) -# gamma_cats = pd.DataFrame( -# { -# "cat": model.cov_models[ -# model.cov_model_names == "alt_risk_cat" -# ].cats, -# "gamma": np.repeat( -# gamma, -# len( -# model.cov_models[ -# model.cov_model_names == "alt_risk_cat" -# ].cats -# ), -# ), -# "gamma_sd": np.repeat( -# gamma_sd, -# len( -# model.cov_models[ -# model.cov_model_names == "alt_risk_cat" -# ].cats -# ), -# ), -# } -# ) -# cat_coefs = beta_cats.merge(gamma_cats, on="cat", how="left") -# else: -# cat_coefs = beta_cats - -# # Order the categories -# cat_order = settings["figure"]["cat_order"] -# if cat_order: -# cat_coefs["cat"] = pd.Categorical( -# cat_coefs["cat"], categories=cat_order, ordered=True -# ) -# cat_coefs = cat_coefs.sort_values("cat").reset_index(drop=True) -# cat_coefs["cat"] = cat_coefs["cat"].astype(str) -# else: -# # ref_cat first, then order by proximity to ref_cat's beta coefficient -# ref_beta = cat_coefs.loc[cat_coefs["cat"] == ref_cat_input, "beta"].iloc[0] -# cat_coefs["abs_diff"] = (cat_coefs["beta"] - ref_beta).abs() -# cat_coefs = cat_coefs.sort_values(by="abs_diff") -# cat_coefs = pd.concat( -# [ -# cat_coefs[cat_coefs["cat"] == ref_cat_input], -# cat_coefs[cat_coefs["cat"] != ref_cat_input], -# ] -# ).reset_index(drop=True) -# cat_coefs = cat_coefs.sort_values(by=["cat" == ref_cat_input, "abs_diff"], ascending=[False, True]) -# cat_coefs = cat_coefs.drop(columns=["abs_diff"]) -# # Add x ranges for plotting -# num_cats = cat_coefs["cat"].nunique() -# cat_coefs["x_start"] = range(num_cats) -# cat_coefs["x_end"] = range(1, num_cats + 1) -# cat_coefs["x_mid"] = cat_coefs.eval("0.5 * (x_start + x_end)") - -# return cat_coefs - - -# def get_linear_model_summary( -# settings: dict, -# summary: dict, -# df: DataFrame, -# cat_coefs: DataFrame, -# linear_model: MRBRT, -# ) -> dict: -# """Complete the summary from the signal model. - -# Parameters -# ---------- -# settings -# Settings for the complete summary section. -# summary -# Summary from the signal model. -# df -# Data frame contains the all dataset. -# cat_coefs -# Data frame with beta and gamma for each category for fitted linear model -# linear_model -# Fitted linear model for risk curve. - -# Returns -# ------- -# dict -# Summary file contains all necessary information. - -# """ -# # load summary -# summary["normalize_to_tmrel"] = settings["score"]["normalize_to_tmrel"] -# ref_cat = summary["ref_cat"] -# cats = cat_coefs["cat"] -# # cats = linear_model.cov_models[0].cats -# alt_cats = [cat for cat in cats if cat != ref_cat] - -# # solution of the final model -# summary["beta"] = dict(zip(cat_coefs["cat"], cat_coefs["beta"])) -# summary["beta_sd"] = dict(zip(cat_coefs["cat"], cat_coefs["beta_sd"])) -# summary["gamma"] = dict(zip(cat_coefs["cat"], cat_coefs["gamma"])) -# summary["gamma_sd"] = dict(zip(cat_coefs["cat"], cat_coefs["gamma_sd"])) -# # beta_info = get_beta_cats(linear_model) -# # gamma_info = get_gamma_info(linear_model) -# # summary["beta"] = dict( -# # zip(beta_info["cov_name_standard"], beta_info["beta"]) -# # ) -# # summary["beta_sd"] = dict( -# # zip(beta_info["cov_name_standard"], beta_info["beta_sd"]) -# # ) -# # summary["gamma"] = [float(gamma_info[0]), float(gamma_info[1])] - -# # compute the score and add star rating -# # Subset to only alternative categories -# alt_cat_coefs = cat_coefs[cat_coefs["cat"] != ref_cat] -# beta_sd = np.sqrt( -# alt_cat_coefs["beta_sd"] ** 2 -# + alt_cat_coefs["gamma"] -# + 2 * alt_cat_coefs["gamma_sd"] -# ) -# pred = np.array(alt_cat_coefs["beta"]) -# inner_ui = np.vstack( -# [ -# alt_cat_coefs["beta"] - 1.96 * alt_cat_coefs["beta_sd"], -# alt_cat_coefs["beta"] + 1.96 * alt_cat_coefs["beta_sd"], -# ] -# ) -# sign = np.sign(pred) -# burden_of_proof = alt_cat_coefs["beta"] - sign * 1.645 * beta_sd - -# if settings["score"]["normalize_to_tmrel"]: -# index = np.argmin(pred) -# pred -= pred[index] -# burden_of_proof -= burden_of_proof[None, index] -# inner_ui -= inner_ui[:, None, index] - -# signed_bprf = sign * burden_of_proof -# # Number of alternative categories -# n = len(alt_cats) -# # Assign dichotomous score for each alternative category -# score_by_category = np.zeros(n) -# product = np.prod(inner_ui, axis=0) -# score_by_category[product < 0] = float("nan") -# score_by_category[product >= 0] = 0.5 * signed_bprf[product >= 0] -# summary["score_by_category"] = dict( -# zip(alt_cats, score_by_category.tolist()) -# ) -# # Index with largest signed coefficient -# max_idx = signed_bprf.idxmax() -# if np.any(product < 0): -# summary["score"] = float("nan") -# summary["star_rating"] = 0 -# else: -# score = float( -# (1 / n) * (np.sum(signed_bprf) - 0.5 * signed_bprf[max_idx]) -# ) -# summary["score"] = score -# # Assign star rating based on ROS -# if np.isnan(score): -# summary["star_rating"] = 0 -# elif score > np.log(1 + 0.85): -# summary["star_rating"] = 5 -# elif score > np.log(1 + 0.50): -# summary["star_rating"] = 4 -# elif score > np.log(1 + 0.15): -# summary["star_rating"] = 3 -# elif score > 0: -# summary["star_rating"] = 2 -# else: -# summary["star_rating"] = 1 - -# # compute the publication bias -# index = df.is_outlier == 0 -# beta_dict = dict(zip(cat_coefs["cat"], cat_coefs["beta"])) -# gamma_dict = dict(zip(cat_coefs["cat"], cat_coefs["gamma"])) -# # beta_dict = dict(zip(beta_info["cov_name_standard"], beta_info["beta"])) -# residual = df["ln_rr"].values[index] - ( -# df["alt_risk_cat"].map(beta_dict).values[index] -# - df["ref_risk_cat"].map(beta_dict).values[index] -# ) -# if summary["cat_specific_gamma"]: -# residual_sd = np.sqrt( -# df["ln_rr_se"].values[index] ** 2 -# + ( -# df["alt_risk_cat"].map(gamma_dict).values[index] -# + df["ref_risk_cat"].map(gamma_dict).values[index] -# ) -# ) -# else: -# # Avoid doubling gamma contribution in case of shared gamma across categories -# # Could equally use "alt_risk_cat" here as gamma_dict will return the -# # same value for both, since gamma is the same across all categories -# residual_sd = np.sqrt( -# df.ln_rr_se.values[index] ** 2 -# + df["ref_risk_cat"].map(gamma_dict).values[index] -# ) -# weighted_residual = residual / residual_sd -# r_mean = weighted_residual.mean() -# r_sd = 1 / np.sqrt(weighted_residual.size) -# pval = 1 - norm.cdf(np.abs(r_mean / r_sd)) -# summary["pub_bias"] = int(pval < 0.05) -# summary["pub_bias_pval"] = float(pval) - -# return summary diff --git a/src/bopforge/utils.py b/src/bopforge/utils.py index 8eb44c1..b9a4f8f 100644 --- a/src/bopforge/utils.py +++ b/src/bopforge/utils.py @@ -105,6 +105,33 @@ def get_signal(signal_model: MRBeRT, risk: np.ndarray) -> np.ndarray: ) +def score_to_star_rating(score: float) -> int: + """Takes in risk outcome score(s) and returns associated star rating(s) + + Parameters + ---------- + score + risk outcome scores + + Returns + ------- + int + Associated star rating + """ + if np.isnan(score): + return 0 + elif score > np.log(1 + 0.85): + return 5 + elif score > np.log(1 + 0.5): + return 4 + elif score > np.log(1 + 0.15): + return 3 + elif score > 0: + return 2 + else: + return 1 + + class ParseKwargs(argparse.Action): def __call__( self,