Xp-age / lib /metrics.py
MedicalAILabo's picture
Upload app.py and lib.
1f53a4c
raw
history blame contribute delete
No virus
23.5 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
from matplotlib import colors as mcolors
from .logger import BaseLogger
from typing import Dict, Union
logger = BaseLogger.get_logger(__name__)
class MetricsData:
"""
Class to store metrics as class variable.
Metrics are defined depending on task.
For ROC
self.fpr: np.ndarray
self.tpr: np.ndarray
self.auc: float
For Regression
self.y_obs: np.ndarray
self.y_pred: np.ndarray
self.r2: float
For DeepSurv
self.c_index: float
"""
def __init__(self) -> None:
pass
class LabelMetrics:
"""
Class to store metrics of each split for each label.
"""
def __init__(self) -> None:
"""
Metrics of split, ie 'val' and 'test'
"""
self.val = MetricsData()
self.test = MetricsData()
def set_label_metrics(self, split: str, attr: str, value: Union[np.ndarray, float]) -> None:
"""
Set value as appropriate metrics of split.
Args:
split (str): split
attr (str): attribute name as follows:
classification: 'fpr', 'tpr', or 'auc',
regression: 'y_obs'(ground truth), 'y_pred'(prediction) or 'r2', or
deepsurv: 'c_index'
value (Union[np.ndarray,float]): value of attr
"""
setattr(getattr(self, split), attr, value)
def get_label_metrics(self, split: str, attr: str) -> Union[np.ndarray, float]:
"""
Return value of metrics of split.
Args:
split (str): split
attr (str): metrics name
Returns:
Union[np.ndarray,float]: value of attr
"""
return getattr(getattr(self, split), attr)
class ROCMixin:
"""
Class for calculating ROC and AUC.
"""
def _set_roc(self, label_metrics: LabelMetrics, split: str, fpr: np.ndarray, tpr: np.ndarray) -> None:
"""
Set fpr, tpr, and auc.
Args:
label_metrics (LabelMetrics): metrics of 'val' and 'test'
split (str): 'val' or 'test'
fpr (np.ndarray): FPR
tpr (np.ndarray): TPR
self.metrics_kind = 'auc' is defined in class ClsEval below.
"""
label_metrics.set_label_metrics(split, 'fpr', fpr)
label_metrics.set_label_metrics(split, 'tpr', tpr)
label_metrics.set_label_metrics(split, self.metrics_kind, metrics.auc(fpr, tpr))
def _cal_label_roc_binary(self, label_name: str, df_group: pd.DataFrame) -> LabelMetrics:
"""
Calculate ROC for binary class.
Args:
label_name (str): label name
df_group (pd.DataFrame): likelihood for group
Returns:
LabelMetrics: metrics of 'val' and 'test'
"""
required_columns = [column_name for column_name in df_group.columns if label_name in column_name] + ['split']
df_label = df_group[required_columns]
POSITIVE = 1
positive_pred_name = 'pred_' + label_name + '_' + str(POSITIVE)
# ! When splits is 'test' only, ie when external dataset, error occurs.
label_metrics = LabelMetrics()
for split in ['val', 'test']:
df_split = df_label.query('split == @split')
y_true = df_split[label_name]
y_score = df_split[positive_pred_name]
_fpr, _tpr, _ = metrics.roc_curve(y_true, y_score)
self._set_roc(label_metrics, split, _fpr, _tpr)
return label_metrics
def _cal_label_roc_multi(self, label_name: str, df_group: pd.DataFrame) -> LabelMetrics:
"""
Calculate ROC for multi-class by macro average.
Args:
label_name (str): label name
df_group (pd.DataFrame): likelihood for group
Returns:
LabelMetrics: metrics of 'val' and 'test'
"""
required_columns = [column_name for column_name in df_group.columns if label_name in column_name] + ['split']
df_label = df_group[required_columns]
pred_name_list = list(df_label.columns[df_label.columns.str.startswith('pred')])
class_list = [int(pred_name.rsplit('_', 1)[-1]) for pred_name in pred_name_list] # [pred_label_0, pred_label_1, pred_label_2] -> [0, 1, 2]
num_classes = len(class_list)
label_metrics = LabelMetrics()
for split in ['val', 'test']:
df_split = df_label.query('split == @split')
y_true = df_split[label_name]
y_true_bin = label_binarize(y_true, classes=class_list) # Since y_true: List[int], should be class_list: List[int]
# Compute ROC for each class by OneVsRest
_fpr = dict()
_tpr = dict()
for i, class_name in enumerate(class_list):
pred_name = 'pred_' + label_name + '_' + str(class_name)
_fpr[class_name], _tpr[class_name], _ = metrics.roc_curve(y_true_bin[:, i], df_split[pred_name])
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([_fpr[class_name] for class_name in class_list]))
# Then interpolate all ROC at this points
mean_tpr = np.zeros_like(all_fpr)
for class_name in class_list:
mean_tpr += np.interp(all_fpr, _fpr[class_name], _tpr[class_name])
# Finally average it and compute AUC
mean_tpr /= num_classes
_fpr['macro'] = all_fpr
_tpr['macro'] = mean_tpr
self._set_roc(label_metrics, split, _fpr['macro'], _tpr['macro'])
return label_metrics
def cal_label_metrics(self, label_name: str, df_group: pd.DataFrame) -> LabelMetrics:
"""
Calculate ROC and AUC for label depending on binary or multi-class.
Args:
label_name (str):label name
df_group (pd.DataFrame): likelihood for group
Returns:
LabelMetrics: metrics of 'val' and 'test'
"""
pred_name_list = df_group.columns[df_group.columns.str.startswith('pred_' + label_name)]
isMultiClass = (len(pred_name_list) > 2)
if isMultiClass:
label_metrics = self._cal_label_roc_multi(label_name, df_group)
else:
label_metrics = self._cal_label_roc_binary(label_name, df_group)
return label_metrics
class YYMixin:
"""
Class for calculating YY and R2.
"""
def _set_yy(self, label_metrics: LabelMetrics, split: str, y_obs: np.ndarray, y_pred: np.ndarray) -> None:
"""
Set ground truth, prediction, and R2.
Args:
label_metrics (LabelMetrics): metrics of 'val' and 'test'
split (str): 'val' or 'test'
y_obs (np.ndarray): ground truth
y_pred (np.ndarray): prediction
self.metrics_kind = 'r2' is defined in class RegEval below.
"""
label_metrics.set_label_metrics(split, 'y_obs', y_obs.values)
label_metrics.set_label_metrics(split, 'y_pred', y_pred.values)
label_metrics.set_label_metrics(split, self.metrics_kind, metrics.r2_score(y_obs, y_pred))
def cal_label_metrics(self, label_name: str, df_group: pd.DataFrame) -> LabelMetrics:
"""
Calculate YY and R2 for label.
Args:
label_name (str): label name
df_group (pd.DataFrame): likelihood for group
Returns:
LabelMetrics: metrics of 'val' and 'test'
"""
required_columns = [column_name for column_name in df_group.columns if label_name in column_name] + ['split']
df_label = df_group[required_columns]
label_metrics = LabelMetrics()
for split in ['val', 'test']:
df_split = df_label.query('split == @split')
y_obs = df_split[label_name]
y_pred = df_split['pred_' + label_name]
self._set_yy(label_metrics, split, y_obs, y_pred)
return label_metrics
class C_IndexMixin:
"""
Class for calculating C-Index.
"""
def _set_c_index(
self,
label_metrics: LabelMetrics,
split: str,
periods: pd.Series,
preds: pd.Series,
labels: pd.Series
) -> None:
"""
Set C-Index.
Args:
label_metrics (LabelMetrics): metrics of 'val' and 'test'
split (str): 'val' or 'test'
periods (pd.Series): periods
preds (pd.Series): prediction
labels (pd.Series): label
self.metrics_kind = 'c_index' is defined in class DeepSurvEval below.
"""
from lifelines.utils import concordance_index
value_c_index = concordance_index(periods, (-1)*preds, labels)
label_metrics.set_label_metrics(split, self.metrics_kind, value_c_index)
def cal_label_metrics(self, label_name: str, df_group: pd.DataFrame) -> LabelMetrics:
"""
Calculate C-Index for label.
Args:
label_name (str): label name
df_group (pd.DataFrame): likelihood for group
Returns:
LabelMetrics: metrics of 'val' and 'test'
"""
required_columns = [column_name for column_name in df_group.columns if label_name in column_name] + ['periods', 'split']
df_label = df_group[required_columns]
label_metrics = LabelMetrics()
for split in ['val', 'test']:
df_split = df_label.query('split == @split')
periods = df_split['periods']
preds = df_split['pred_' + label_name]
labels = df_split[label_name]
self._set_c_index(label_metrics, split, periods, preds, labels)
return label_metrics
class MetricsMixin:
"""
Class to calculate metrics and make summary.
"""
def _cal_group_metrics(self, df_group: pd.DataFrame) -> Dict[str, LabelMetrics]:
"""
Calculate metrics for each group.
Args:
df_group (pd.DataFrame): likelihood for group
Returns:
Dict[str, LabelMetrics]: dictionary of label and its LabelMetrics
eg. {{label_1: LabelMetrics(), label_2: LabelMetrics(), ...}
"""
label_list = list(df_group.columns[df_group.columns.str.startswith('label')])
group_metrics = dict()
for label_name in label_list:
label_metrics = self.cal_label_metrics(label_name, df_group)
group_metrics[label_name] = label_metrics
return group_metrics
def cal_whole_metrics(self, df_likelihood: pd.DataFrame) -> Dict[str, Dict[str, LabelMetrics]]:
"""
Calculate metrics for all groups.
Args:
df_likelihood (pd.DataFrame) : DataFrame of likelihood
Returns:
Dict[str, Dict[str, LabelMetrics]]: dictionary of group and dictionary of label and its LabelMetrics
eg. {
groupA: {label_1: LabelMetrics(), label_2: LabelMetrics(), ...},
groupB: {label_1: LabelMetrics(), label_2: LabelMetrics()}, ...},
...}
"""
whole_metrics = dict()
for group in df_likelihood['group'].unique():
df_group = df_likelihood.query('group == @group')
whole_metrics[group] = self._cal_group_metrics(df_group)
return whole_metrics
def make_summary(
self,
whole_metrics: Dict[str, Dict[str, LabelMetrics]],
likelihood_path: Path,
metrics_kind: str
) -> pd.DataFrame:
"""
Make summary.
Args:
whole_metrics (Dict[str, Dict[str, LabelMetrics]]): metrics for all groups
likelihood_path (Path): path to likelihood
metrics_kind (str): kind of metrics, ie, 'auc', 'r2', or 'c_index'
Returns:
pd.DataFrame: summary
"""
_datetime = likelihood_path.parents[1].name
_weight = likelihood_path.stem.replace('likelihood_', '') + '.pt'
df_summary = pd.DataFrame()
for group, group_metrics in whole_metrics.items():
_new = dict()
_new['datetime'] = [_datetime]
_new['weight'] = [ _weight]
_new['group'] = [group]
for label_name, label_metrics in group_metrics.items():
_val_metrics = label_metrics.get_label_metrics('val', metrics_kind)
_test_metrics = label_metrics.get_label_metrics('test', metrics_kind)
_new[label_name + '_val_' + metrics_kind] = [f"{_val_metrics:.2f}"]
_new[label_name + '_test_' + metrics_kind] = [f"{_test_metrics:.2f}"]
df_summary = pd.concat([df_summary, pd.DataFrame(_new)], ignore_index=True)
df_summary = df_summary.sort_values('group')
return df_summary
def print_metrics(self, df_summary: pd.DataFrame, metrics_kind: str) -> None:
"""
Print metrics.
Args:
df_summary (pd.DataFrame): summary
metrics_kind (str): kind of metrics, ie. 'auc', 'r2', or 'c_index'
"""
label_list = list(df_summary.columns[df_summary.columns.str.startswith('label')]) # [label_1_val, label_1_test, label_2_val, label_2_test, ...]
num_splits = len(['val', 'test'])
_column_val_test_list = [label_list[i:i+num_splits] for i in range(0, len(label_list), num_splits)] # [[label_1_val, label_1_test], [label_2_val, label_2_test], ...]
for _, row in df_summary.iterrows():
logger.info(row['group'])
for _column_val_test in _column_val_test_list:
_label_name = _column_val_test[0].replace('_val', '')
_label_name_val = _column_val_test[0]
_label_name_test = _column_val_test[1]
logger.info(f"{_label_name:<25} val_{metrics_kind}: {row[_label_name_val]:>7}, test_{metrics_kind}: {row[_label_name_test]:>7}")
def update_summary(self, df_summary: pd.DataFrame, likelihood_path: Path) -> None:
"""
Update summary.
Args:
df_summary (pd.DataFrame): summary to be added to the previous summary
likelihood_path (Path): path to likelihood
"""
_project_dir = likelihood_path.parents[3]
summary_dir = Path(_project_dir, 'summary')
summary_path = Path(summary_dir, 'summary.csv')
if summary_path.exists():
df_prev = pd.read_csv(summary_path)
df_updated = pd.concat([df_prev, df_summary], axis=0)
else:
summary_dir.mkdir(parents=True, exist_ok=True)
df_updated = df_summary
df_updated.to_csv(summary_path, index=False)
def make_metrics(self, likelihood_path: Path) -> None:
"""
Make metrics.
Args:
likelihood_path (Path): path to likelihood
"""
df_likelihood = pd.read_csv(likelihood_path)
whole_metrics = self.cal_whole_metrics(df_likelihood)
self.make_save_fig(whole_metrics, likelihood_path, self.fig_kind)
df_summary = self.make_summary(whole_metrics, likelihood_path, self.metrics_kind)
self.print_metrics(df_summary, self.metrics_kind)
self.update_summary(df_summary, likelihood_path)
class FigROCMixin:
"""
Class to plot ROC.
"""
def _plot_fig_group_metrics(self, group: str, group_metrics: Dict[str, LabelMetrics]) -> plt:
"""
Plot ROC.
Args:
group (str): group
group_metrics (Dict[str, LabelMetrics]): dictionary of label and its LabelMetrics
Returns:
plt: ROC
"""
label_list = group_metrics.keys()
num_rows = 1
num_cols = len(label_list)
base_size = 7
height = num_rows * base_size
width = num_cols * height
fig = plt.figure(figsize=(width, height))
for i, label_name in enumerate(label_list):
label_metrics = group_metrics[label_name]
offset = i + 1
ax_i = fig.add_subplot(
num_rows,
num_cols,
offset,
title=group + ': ' + label_name,
xlabel='1 - Specificity',
ylabel='Sensitivity',
xmargin=0,
ymargin=0
)
ax_i.plot(label_metrics.val.fpr, label_metrics.val.tpr, label=f"AUC_val = {label_metrics.val.auc:.2f}", marker='x')
ax_i.plot(label_metrics.test.fpr, label_metrics.test.tpr, label=f"AUC_test = {label_metrics.test.auc:.2f}", marker='o')
ax_i.grid()
ax_i.legend()
fig.tight_layout()
return fig
class FigYYMixin:
"""
Class to plot YY-graph.
"""
def _plot_fig_group_metrics(self, group: str, group_metrics: Dict[str, LabelMetrics]) -> plt:
"""
Plot yy.
Args:
group (str): group
group_metrics (Dict[str, LabelMetrics]): dictionary of label and its LabelMetrics
Returns:
plt: YY-graph
"""
label_list = group_metrics.keys()
num_splits = len(['val', 'test'])
num_rows = 1
num_cols = len(label_list) * num_splits
base_size = 7
height = num_rows * base_size
width = num_cols * height
fig = plt.figure(figsize=(width, height))
for i, label_name in enumerate(label_list):
label_metrics = group_metrics[label_name]
val_offset = (i * num_splits) + 1
test_offset = val_offset + 1
val_ax = fig.add_subplot(
num_rows,
num_cols,
val_offset,
title=group + ': ' + label_name + '\n' + 'val: Observed-Predicted Plot',
xlabel='Observed',
ylabel='Predicted',
xmargin=0,
ymargin=0
)
test_ax = fig.add_subplot(
num_rows,
num_cols,
test_offset,
title=group + ': ' + label_name + '\n' + 'test: Observed-Predicted Plot',
xlabel='Observed',
ylabel='Predicted',
xmargin=0,
ymargin=0
)
y_obs_val = label_metrics.val.y_obs
y_pred_val = label_metrics.val.y_pred
y_obs_test = label_metrics.test.y_obs
y_pred_test = label_metrics.test.y_pred
# Plot
color = mcolors.TABLEAU_COLORS
val_ax.scatter(y_obs_val, y_pred_val, color=color['tab:blue'], label='val')
test_ax.scatter(y_obs_test, y_pred_test, color=color['tab:orange'], label='test')
# Draw diagonal line
y_values_val = np.concatenate([y_obs_val.flatten(), y_pred_val.flatten()])
y_values_test = np.concatenate([y_obs_test.flatten(), y_pred_test.flatten()])
y_values_val_min, y_values_val_max, y_values_val_range = np.amin(y_values_val), np.amax(y_values_val), np.ptp(y_values_val)
y_values_test_min, y_values_test_max, y_values_test_range = np.amin(y_values_test), np.amax(y_values_test), np.ptp(y_values_test)
val_ax.plot([y_values_val_min - (y_values_val_range * 0.01), y_values_val_max + (y_values_val_range * 0.01)],
[y_values_val_min - (y_values_val_range * 0.01), y_values_val_max + (y_values_val_range * 0.01)], color='red')
test_ax.plot([y_values_test_min - (y_values_test_range * 0.01), y_values_test_max + (y_values_test_range * 0.01)],
[y_values_test_min - (y_values_test_range * 0.01), y_values_test_max + (y_values_test_range * 0.01)], color='red')
fig.tight_layout()
return fig
class FigMixin:
"""
Class for make and save figure
This class is for ROC and YY-graph.
"""
def make_save_fig(self, whole_metrics: Dict[str, Dict[str, LabelMetrics]], likelihood_path: Path, fig_kind: str) -> None:
"""
Make and save figure.
Args:
whole_metrics (Dict[str, Dict[str, LabelMetrics]]): metrics for all groups
likelihood_path (Path): path to likelihood
fig_kind (str): kind of figure, ie. 'roc' or 'yy'
"""
_datetime_dir = likelihood_path.parents[1]
save_dir = Path(_datetime_dir, fig_kind)
save_dir.mkdir(parents=True, exist_ok=True)
_fig_name = fig_kind + '_' + likelihood_path.stem.replace('likelihood_', '')
for group, group_metrics in whole_metrics.items():
fig = self._plot_fig_group_metrics(group, group_metrics)
save_path = Path(save_dir, group + '_' + _fig_name + '.png')
fig.savefig(save_path)
plt.close()
class ClsEval(MetricsMixin, ROCMixin, FigMixin, FigROCMixin):
"""
Class for calculation metrics for classification.
"""
def __init__(self) -> None:
self.fig_kind = 'roc'
self.metrics_kind = 'auc'
class RegEval(MetricsMixin, YYMixin, FigMixin, FigYYMixin):
"""
Class for calculation metrics for regression.
"""
def __init__(self) -> None:
self.fig_kind = 'yy'
self.metrics_kind = 'r2'
class DeepSurvEval(MetricsMixin, C_IndexMixin):
"""
Class for calculation metrics for DeepSurv.
"""
def __init__(self) -> None:
self.fig_kind = None
self.metrics_kind = 'c_index'
def make_metrics(self, likelihood_path: Path) -> None:
"""
Make metrics, substantially this method handles everything all.
Args:
likelihood_path (Path): path to likelihood
Overwrite def make_metrics() in class MetricsMixin by deleting self.make_save_fig(),
because of no need to plot and save figure.
"""
df_likelihood = pd.read_csv(likelihood_path)
whole_metrics = self.cal_whole_metrics(df_likelihood)
df_summary = self.make_summary(whole_metrics, likelihood_path, self.metrics_kind)
self.print_metrics(df_summary, self.metrics_kind)
self.update_summary(df_summary, likelihood_path)
def set_eval(task: str) -> Union[ClsEval, RegEval, DeepSurvEval]:
"""
Set class for evaluation depending on task depending on task.
Args:
task (str): task
Returns:
Union[ClsEval, RegEval, DeepSurvEval]: class for evaluation
"""
if task == 'classification':
return ClsEval()
elif task == 'regression':
return RegEval()
elif task == 'deepsurv':
return DeepSurvEval()
else:
raise ValueError(f"Invalid task: {task}.")