Use hpstat for yli.turnbull to enable computation of confidence intervals

This commit is contained in:
RunasSudo 2023-10-20 21:11:56 +11:00
parent 675422246f
commit 14c4054a47
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
2 changed files with 100 additions and 42 deletions

View File

@ -45,7 +45,7 @@ The mandatory dependencies of this library are:
Optional dependencies are: Optional dependencies are:
* [hpstat](https://yingtongli.me/git/hpstat), for *IntervalCensoredCox* * [hpstat](https://yingtongli.me/git/hpstat), for *turnbull* and *IntervalCensoredCox*
* [matplotlib](https://matplotlib.org/) and [seaborn](https://seaborn.pydata.org/), for plotting functions * [matplotlib](https://matplotlib.org/) and [seaborn](https://seaborn.pydata.org/), for plotting functions
* [mpmath](https://mpmath.org/), for *beta_ratio* and *beta_oddsratio* * [mpmath](https://mpmath.org/), for *beta_ratio* and *beta_oddsratio*
* [PyCryptodome](https://www.pycryptodome.org/), for *pickle_write_encrypted* and *pickle_read_encrypted* * [PyCryptodome](https://www.pycryptodome.org/), for *pickle_write_encrypted* and *pickle_read_encrypted*
@ -72,7 +72,7 @@ Relevant statistical functions are all directly available from the top-level *yl
* Survival analysis: * Survival analysis:
* *kaplanmeier*: Kaplan–Meier plot * *kaplanmeier*: Kaplan–Meier plot
* *logrank*: Log-rank test * *logrank*: Log-rank test
* *turnbull*: Turnbull estimator plot for interval-censored data * *turnbull*: Turnbull estimator plot, including pointwise confidence intervals, for interval-censored data
* Input/output: * Input/output:
* *pickle_write_compressed*, *pickle_read_compressed*: Pickle a pandas DataFrame and compress using LZMA * *pickle_write_compressed*, *pickle_read_compressed*: Pickle a pandas DataFrame and compress using LZMA
* *pickle_write_encrypted*, *pickle_read_encrypted*: Pickle a pandas DataFrame, compress using LZMA, and encrypt * *pickle_write_encrypted*, *pickle_read_encrypted*: Pickle a pandas DataFrame, compress using LZMA, and encrypt

View File

@ -15,9 +15,14 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
import numpy as np import numpy as np
import pandas as pd
from scipy import stats from scipy import stats
import statsmodels.api as sm import statsmodels.api as sm
import io
import json
import subprocess
from .config import config from .config import config
from .sig_tests import ChiSquaredResult from .sig_tests import ChiSquaredResult
from .utils import Estimate, check_nan from .utils import Estimate, check_nan
@ -130,25 +135,27 @@ def calc_survfunc_kaplanmeier(time, status, ci, transform_x=None, transform_y=No
return xpoints, ypoints, None, None return xpoints, ypoints, None, None
def turnbull(df, time_left, time_right, by=None, *, step_loc=0.5, transform_x=None, transform_y=None, nan_policy='warn'): def turnbull(df, time_left, time_right, by=None, *, ci=True, step_loc=0.5, transform_x=None, transform_y=None, nan_policy='warn', fig=None, ax=None):
""" """
Generate a Turnbull estimator plot, which extends the KaplanMeier estimator to interval-censored observations Generate a Turnbull estimator plot, which extends the KaplanMeier estimator to interval-censored observations
The intervals are assumed to be half-open intervals, (*left*, *right*]. *right* == *np.inf* implies the event was right-censored. Unlike :func:`yli.kaplanmeier`, times must be given as numeric dtypes and not as pandas timedelta. The intervals are assumed to be half-open intervals, (*left*, *right*]. *right* == *np.inf* implies the event was right-censored.
By default, the survival function is drawn as a step function at the midpoint of each Turnbull interval. By default, the survival function is drawn as a step function at the midpoint of each Turnbull interval.
Uses the Python *lifelines* and *matplotlib* libraries. Uses the hpstat *turnbull* command.
:param df: Data to generate plot for :param df: Data to generate plot for
:type df: DataFrame :type df: DataFrame
:param time_left: Column in *df* for the time to event, left interval endpoint (numeric) :param time_left: Column in *df* for the time to event, left interval endpoint (numeric or timedelta)
:type time_left: str :type time_left: str
:param time_right: Column in *df* for the time to event, right interval endpoint (numeric) :param time_right: Column in *df* for the time to event, right interval endpoint (numeric or timedelta)
:type time_right: str :type time_right: str
:param by: Column in *df* to stratify by (categorical) :param by: Column in *df* to stratify by (categorical)
:type by: str :type by: str
:param step_loc: Proportion along the length of each Turnbull interval to step down the survival function, e.g. 0 for left bound, 1 for right bound, 0.5 for interval midpoint (numeric) :param ci: Whether to plot confidence intervals around the survival function
:type ci: bool
:param step_loc: Proportion along the length of each Turnbull interval to step down the survival function, e.g. 0 for left bound, 1 for right bound, 0.5 for interval midpoint
:type step_loc: float :type step_loc: float
:param transform_x: Function to transform x axis by :param transform_x: Function to transform x axis by
:type transform_x: callable :type transform_x: callable
@ -168,7 +175,11 @@ def turnbull(df, time_left, time_right, by=None, *, step_loc=0.5, transform_x=No
else: else:
df = check_nan(df[[time_left, time_right]], nan_policy) df = check_nan(df[[time_left, time_right]], nan_policy)
fig, ax = plt.subplots() # Covert timedelta to numeric
df, time_units = survtime_to_numeric(df, time_left, time_right)
if ax is None:
fig, ax = plt.subplots()
if by is not None: if by is not None:
# Group by independent variable # Group by independent variable
@ -176,13 +187,16 @@ def turnbull(df, time_left, time_right, by=None, *, step_loc=0.5, transform_x=No
for group in groups.groups: for group in groups.groups:
subset = groups.get_group(group) subset = groups.get_group(group)
handle = plot_survfunc_turnbull(ax, subset[time_left], subset[time_right], step_loc, transform_x, transform_y) handle = plot_survfunc_turnbull(ax, subset[time_left], subset[time_right], ci, step_loc, transform_x, transform_y)
handle.set_label('{} = {}'.format(by, group)) handle.set_label('{} = {}'.format(by, group))
else: else:
# No grouping # No grouping
plot_survfunc_turnbull(ax, df[time_left], df[time_right], step_loc, transform_x, transform_y) plot_survfunc_turnbull(ax, df[time_left], df[time_right], ci, step_loc, transform_x, transform_y)
ax.set_xlabel('Analysis time') if time_units:
ax.set_xlabel('{} + {} ({})'.format(time_left, time_right, time_units))
else:
ax.set_xlabel('{} + {}'.format(time_left, time_right))
ax.set_ylabel('Survival probability') ax.set_ylabel('Survival probability')
ax.set_xlim(left=0) ax.set_xlim(left=0)
ax.set_ylim(0, 1) ax.set_ylim(0, 1)
@ -192,29 +206,39 @@ def turnbull(df, time_left, time_right, by=None, *, step_loc=0.5, transform_x=No
return fig, ax return fig, ax
def plot_survfunc_turnbull(ax, time_left, time_right, step_loc=0.5, transform_x=None, transform_y=None): def plot_survfunc_turnbull(ax, time_left, time_right, ci, step_loc=0.5, transform_x=None, transform_y=None):
xpoints, ypoints = calc_survfunc_turnbull(time_left, time_right, step_loc, transform_x, transform_y) xpoints, ypoints, ypoints0, ypoints1 = calc_survfunc_turnbull(time_left, time_right, ci, step_loc, transform_x, transform_y)
handle = ax.plot(xpoints, ypoints)[0] handle = ax.plot(xpoints, ypoints)[0]
if ci:
ax.fill_between(xpoints, ypoints0, ypoints1, alpha=0.3, label='_')
return handle return handle
def calc_survfunc_turnbull(time_left, time_right, step_loc=0.5, transform_x=None, transform_y=None): def calc_survfunc_turnbull(time_left, time_right, ci, step_loc=0.5, transform_x=None, transform_y=None):
from lifelines.fitters.npmle import npmle
EPSILON = 1e-10
# TODO: Support left == right => failure was exactly observed
followup_left = time_left + EPSILON # Add epsilon to make interval half-open
followup_right = time_right
# Estimate the survival function # Estimate the survival function
#sf = lifelines.KaplanMeierFitter().fit_interval_censoring(followup_left, followup_right)
# Call lifelines.fitters.npmle.npmle directly so we can compute midpoints, etc. # Prepare arguments
sf_probs, turnbull_intervals = npmle(followup_left, followup_right) # TODO: Pass through other arguments
hpstat_args = [config.hpstat_path, 'turnbull', '-', '--output', 'json']
xpoints = [i.left*(1-step_loc) + i.right*step_loc for i in turnbull_intervals] # Export data to CSV
ypoints = 1 - np.cumsum(sf_probs) csv_buf = io.StringIO()
pd.DataFrame({'LeftTime': time_left, 'RightTime': time_right}).to_csv(csv_buf, index=False)
csv_str = csv_buf.getvalue()
# Run hpstat binary
proc = subprocess.run(hpstat_args, input=csv_str, stdout=subprocess.PIPE, stderr=None, encoding='utf-8', check=True)
raw_result = json.loads(proc.stdout)
survival_prob = np.array(raw_result['survival_prob'])
from IPython.display import clear_output
clear_output(wait=True)
xpoints = [i[0]*(1-step_loc) + i[1]*step_loc for i in raw_result['failure_intervals'] if i[1]]
ypoints = survival_prob
# Draw straight lines # Draw straight lines
# np.concatenate(...) to force starting drawing from time 0, survival 100% # np.concatenate(...) to force starting drawing from time 0, survival 100%
@ -226,9 +250,27 @@ def calc_survfunc_turnbull(time_left, time_right, step_loc=0.5, transform_x=None
if transform_y: if transform_y:
ypoints = transform_y(ypoints) ypoints = transform_y(ypoints)
return xpoints, ypoints if ci:
zstar = -stats.norm.ppf(config.alpha/2)
survival_prob_se = np.array(raw_result['survival_prob_se'])
# Get confidence intervals
ci0 = survival_prob - zstar * survival_prob_se
ci1 = survival_prob + zstar * survival_prob_se
# Plot confidence intervals
ypoints0 = np.concatenate([[1], ci0]).repeat(2)[:-1]
ypoints1 = np.concatenate([[1], ci1]).repeat(2)[:-1]
if transform_y:
ypoints0 = transform_y(ypoints0)
ypoints1 = transform_y(ypoints1)
return xpoints, ypoints, ypoints0, ypoints1
return xpoints, ypoints, None, None
def survtime_to_numeric(df, time): def survtime_to_numeric(df, time, time2=None):
""" """
Convert pandas timedelta dtype to float64, auto-detecting the best time unit to display Convert pandas timedelta dtype to float64, auto-detecting the best time unit to display
@ -236,6 +278,8 @@ def survtime_to_numeric(df, time):
:type df: DataFrame :type df: DataFrame
:param time: Column to check for pandas timedelta dtype :param time: Column to check for pandas timedelta dtype
:type df: DataFrame :type df: DataFrame
:param time: Second column, if any, to check for pandas timedelta dtype
:type df: DataFrame
:return: (*df*, *time_units*) :return: (*df*, *time_units*)
@ -243,28 +287,42 @@ def survtime_to_numeric(df, time):
* **time_units** (*str*) Human-readable description of the time unit, or *None* if not converted * **time_units** (*str*) Human-readable description of the time unit, or *None* if not converted
""" """
max_time = None
if df[time].dtype == '<m8[ns]': if df[time].dtype == '<m8[ns]':
df[time] = df[time].dt.total_seconds() df[time] = df[time].dt.total_seconds()
max_time = df[time].max()
if df[time2].dtype == '<m8[ns]':
df[time2] = df[time2].dt.total_seconds()
max_time = max(max_time or 0, df[time2].max())
if max_time is not None:
# Auto-detect best time units # Auto-detect best time units
if df[time].max() > 365.24*24*60*60: if max_time > 365.24*24*60*60:
df[time] = df[time] / (365.24*24*60*60) time_divider = 365.24*24*60*60
time_units = 'years' time_units = 'years'
elif df[time].max() > 7*24*60*60 / 12: elif max_time > 7*24*60*60 / 12:
df[time] = df[time] / (7*24*60*60) time_divider = 7*24*60*60
time_units = 'weeks' time_units = 'weeks'
elif df[time].max() > 24*60*60: elif max_time > 24*60*60:
df[time] = df[time] / (24*60*60) time_divider = 24*60*60
time_units = 'days' time_units = 'days'
elif df[time].max() > 60*60: elif max_time > 60*60:
df[time] = df[time] / (60*60) time_divider = 60*60
time_units = 'hours' time_units = 'hours'
elif df[time].max() > 60: elif max_time > 60:
df[time] = df[time] / 60 time_divider = 60
time_units = 'minutes' time_units = 'minutes'
else: else:
time_divider = 1
time_units = 'seconds' time_units = 'seconds'
df[time] /= time_divider
if time2:
df[time2] /= time_divider
return df, time_units return df, time_units
else: else:
return df, None return df, None