Skip to content

TimeToEvent

Bases: Reporter

Perform a time to event analysis using Kaplan-Meier estimation.

This reporter generates: 1. A private patient-level time-to-event table (_tte_table) for intermediate processing 2. Aggregated survival/risk data in self.df combining results from all outcomes 3. Kaplan-Meier survival curves

The patient-level table (_tte_table) contains one row per patient with: - Index date for each patient - Event dates for all outcomes (NULL if did not occur) - Event dates for all right censoring events (NULL if did not occur) - End of study period date (if provided) - Days from index to each event - Indicator variables for whether the first event was the outcome of interest

The aggregated output (self.df) contains survival function estimates and event counts for each outcome, suitable for reporting and visualization.

Parameters:

Name Type Description Default
right_censor_phenotypes Optional[List[Phenotype]]

A list of phenotypes that should be used as right censoring events. Suggested are death and end of followup.

None
end_of_study_period Optional[datetime]

A datetime defining the end of study period.

None
decimal_places int

Number of decimal places for rounding survival probabilities. Default: 4

4
Source code in phenex/reporting/time_to_event.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
class TimeToEvent(Reporter):
    """
    Perform a time to event analysis using Kaplan-Meier estimation.

    This reporter generates:
    1. A private patient-level time-to-event table (_tte_table) for intermediate processing
    2. Aggregated survival/risk data in self.df combining results from all outcomes
    3. Kaplan-Meier survival curves

    The patient-level table (_tte_table) contains one row per patient with:
    - Index date for each patient
    - Event dates for all outcomes (NULL if did not occur)
    - Event dates for all right censoring events (NULL if did not occur)
    - End of study period date (if provided)
    - Days from index to each event
    - Indicator variables for whether the first event was the outcome of interest

    The aggregated output (self.df) contains survival function estimates and event counts
    for each outcome, suitable for reporting and visualization.

    Parameters:
        right_censor_phenotypes: A list of phenotypes that should be used as right censoring events.
            Suggested are death and end of followup.
        end_of_study_period: A datetime defining the end of study period.
        decimal_places: Number of decimal places for rounding survival probabilities. Default: 4
    """

    def __init__(
        self,
        right_censor_phenotypes: Optional[List["Phenotype"]] = None,
        end_of_study_period: Optional["datetime"] = None,
        decimal_places: int = 4,
        phenotype_names: Optional[List[str]] = None,
    ):
        super().__init__(decimal_places=decimal_places)
        self.right_censor_phenotypes = right_censor_phenotypes
        self.end_of_study_period = end_of_study_period
        self.phenotype_names = phenotype_names
        self._date_column_names = None
        self._tte_table = None  # Private: patient-level time-to-event data

    def execute(self, cohort: "Cohort") -> pd.DataFrame:
        """
        Execute the time to event analysis for a provided cohort.

        This generates:
        1. Patient-level time-to-event table (stored in self._tte_table)
        2. Aggregated survival/risk data from Kaplan-Meier fits (stored in self.df)
        3. Kaplan-Meier plots

        Parameters:
            cohort: The cohort for which the time to event analysis should be performed.

        Returns:
            DataFrame with aggregated survival function estimates and event counts for all outcomes.

            Schema:
                - Outcome (str): Name of the outcome phenotype
                - Timeline (float): Time point in days from index
                - Survival_Probability (float): Kaplan-Meier survival estimate at this time point
                - At_Risk (int): Number of patients at risk at this time point
                - Events (int): Number of outcome events observed at this time point
                - Censored (int): Number of patients censored at this time point
        """
        self.cohort = cohort
        if self.phenotype_names is not None:
            self._outcomes = [
                p for p in cohort.outcomes if p.name in self.phenotype_names
            ]
            missing = set(self.phenotype_names) - {p.name for p in self._outcomes}
            if missing:
                logger.warning(
                    f"No matching outcome phenotypes found for: {sorted(missing)}"
                )
        else:
            self._outcomes = cohort.outcomes
        self._execute_right_censoring_phenotypes(self.cohort)

        # Build patient-level time-to-event table
        table = cohort.index_table.mutate(
            INDEX_DATE=self.cohort.index_table.EVENT_DATE.cast("date")
        ).select(["PERSON_ID", "INDEX_DATE"])
        table = self._append_date_events(table)
        table = self._append_days_to_event(table)
        table = self._append_date_and_days_to_first_event(table)
        self._tte_table = table.execute()  # Convert to pandas DataFrame

        if self._tte_table.empty:
            logger.warning("No patients in cohort; skipping time-to-event analysis.")
            self.df = pd.DataFrame()
            return self.df

        # Build aggregated survival/risk data from KM fits
        self.df = self._build_aggregated_risk_table()

        logger.info("time to event finished execution")
        self.plot_multiple_kaplan_meier()
        return self.df

    def _execute_right_censoring_phenotypes(self, cohort):
        for phenotype in self.right_censor_phenotypes:
            if phenotype.table is None:
                phenotype.execute(cohort.subset_tables_index)

    def _append_date_events(self, table):
        """
        Append a column for all necessary event dates. This includes :
        1. the date of all outcome phenotypes; column name is name of phenotype
        2. the date of all right censor phenotypes; column name is name of phenotype
        3. date of end of study period; column name is END_OF_STUDY_PERIOD
        Additionally, this method populates _date_column_names with the name of all date columns appended here.
        """
        table = self._append_dates_for_phenotypes(table, self._outcomes)
        table = self._append_dates_for_phenotypes(table, self.right_censor_phenotypes)
        self._date_column_names = [
            x.name.upper() for x in self._outcomes + self.right_censor_phenotypes
        ]
        if self.end_of_study_period is not None:
            table = table.mutate(
                END_OF_STUDY_PERIOD=ibis.literal(self.end_of_study_period).cast("date")
            )
            self._date_column_names.append("END_OF_STUDY_PERIOD")
        return table

    def _append_dates_for_phenotypes(self, table, phenotypes):
        """
        Generic method that adds the EVENT_DATE for a list of phenotypes

        For example, if three phenotypes are provided, named pt1, pt2, pt3, three new columns pt1, pt2, pt3 are added each populated with the EVENT_DATE of the respective phenotype.
        """
        for _phenotype in phenotypes:
            logger.info(f"appending dates for { _phenotype.name}")
            join_table = _phenotype.table.select(["PERSON_ID", "EVENT_DATE"]).distinct()
            # rename event_date to the right_censor_phenotype's name, cast to date
            join_table = join_table.mutate(
                **{_phenotype.name.upper(): join_table.EVENT_DATE.cast("date")}
            )
            # select just person_id and event_date for current phenotype
            join_table = join_table.select(["PERSON_ID", _phenotype.name.upper()])
            # perform the join
            table = table.join(
                join_table, table.PERSON_ID == join_table.PERSON_ID, how="left"
            ).drop("PERSON_ID_right")
        return table

    def _append_days_to_event(self, table):
        """
        Calculates the days to each EVENT_DATE column found in _date_column_names. New columm names are "DAYS_TO_{date column name}".
        """
        for column_name in self._date_column_names:
            logger.info(f"appending time to event for {column_name}")
            DAYS_TO_EVENT = table[column_name].delta(table.INDEX_DATE, "day")
            table = table.mutate(**{f"DAYS_TO_{column_name}": DAYS_TO_EVENT})
        return table

    def _append_date_and_days_to_first_event(self, table):
        """
        For each outcome phenotype, determines which event occurred first, whether the outcome, a right censoring event, or the end of study period. Adds an indicator column whether the first event is the outcome.
        """
        for phenotype in self._outcomes:
            # Subset the columns from which the minimum date should be determined; this is the outcome of interest, all right censoring events, and end of study period.
            cols = [phenotype.name.upper()] + [
                x.name.upper() for x in self.right_censor_phenotypes
            ]

            # Create a proper minimum date calculation that handles nulls correctly
            # Start with a very large date as the initial minimum
            min_date_expr = ibis.literal(self.end_of_study_period).cast("date")

            # For each column, update the minimum if the column has a valid (non-null) date that's smaller
            for col in cols:
                min_date_expr = (
                    ibis.case()
                    .when(
                        table[col].notnull() & (table[col] < min_date_expr), table[col]
                    )
                    .else_(min_date_expr)
                    .end()
                )

            min_date_column = min_date_expr

            # Adding the new column to the table
            table = table.mutate(min_date=min_date_column)

            # Adding the new column to the table
            column_name_date_first_event = f"DATE_FIRST_EVENT_{phenotype.name.upper()}"
            table = table.mutate(**{column_name_date_first_event: min_date_column})
            DAYS_FIRST_EVENT = table[column_name_date_first_event].delta(
                table.INDEX_DATE, "day"
            )
            table = table.mutate(
                **{f"DAYS_FIRST_EVENT_{phenotype.name.upper()}": DAYS_FIRST_EVENT}
            )
            # Adding an indicator for whether the first event was the outcome or a censoring event
            table = table.mutate(
                **{
                    f"INDICATOR_{phenotype.name.upper()}": ibis.ifelse(
                        table[phenotype.name.upper()]
                        == table[f"DATE_FIRST_EVENT_{phenotype.name.upper()}"],
                        1,
                        0,
                    )
                }
            )
        return table

    def plot_multiple_kaplan_meier(
        self,
        xlim: Optional[List[int]] = None,
        ylim: Optional[List[int]] = None,
        n_cols: int = 3,
        outcome_indices: Optional[List[int]] = None,
        path_dir: Optional[str] = None,
    ):
        """
        For each outcome, plot a kaplan meier curve.
        """
        # subset for current codelist
        if self._tte_table is None or self._tte_table.empty:
            return
        phenotypes = self._outcomes
        if outcome_indices is not None:
            phenotypes = [
                x for i, x in enumerate(self._outcomes) if i in outcome_indices
            ]
        if not phenotypes:
            return
        n_rows = math.ceil(len(phenotypes) / n_cols)
        fig, axes = plt.subplots(n_rows, n_cols, sharey=True, sharex=True)

        for i, phenotype in enumerate(phenotypes):
            kmf = self.fit_kaplan_meier_for_phenotype(phenotype)
            if kmf is None:
                continue
            if n_rows > 1 and n_cols > 1:
                ax = axes[int(i / n_cols), i % n_cols]
            else:
                ax = axes[i]
            ax.set_title(phenotype.name)
            if xlim is not None:
                ax.set_xlim(xlim)
            if ylim is not None:
                ax.set_ylim(ylim)
            kmf.plot(ax=ax)
            ax.grid(color="gray", linestyle="-", linewidth=0.1)

        if path_dir is not None:
            cohort_name = getattr(self.cohort, "name", "cohort")
            path = os.path.join(path_dir, f"KaplanMeierPanelFor_{cohort_name}.svg")
            plt.savefig(path, dpi=150)
        plt.show()

    def plot_single_kaplan_meier(
        self,
        outcome_index: int = 0,
        xlim: Optional[List[int]] = None,
        ylim: Optional[List[int]] = None,
        path_dir: Optional[str] = None,
    ):
        """
        For each outcome, plot a kaplan meier curve.
        """
        # subset for current codelist
        phenotype = self._outcomes[outcome_index]
        kmf = self.fit_kaplan_meier_for_phenotype(phenotype)
        if kmf is None:
            return
        fig, ax = plt.subplots(1, 1, figsize=(12, 4))

        ax.set_title(f"Kaplan Meier for outcome : {phenotype.name}")
        kmf.plot(ax=ax)
        add_at_risk_counts(kmf, ax=ax)
        plt.tight_layout()
        ax.grid(color="gray", linestyle="-", linewidth=0.1)
        if xlim is not None:
            ax.set_xlim(xlim)
        if ylim is not None:
            ax.set_ylim(ylim)

        if path_dir is not None:
            path = os.path.join(
                path_dir,
                f"KaplanMeier_{getattr(self.cohort, 'name', 'cohort')}_{phenotype.name}.svg",
            )
            plt.savefig(path, dpi=150)
        plt.show()

    def fit_kaplan_meier_for_phenotype(self, phenotype):
        """
        Fit a Kaplan-Meier model for a specific phenotype outcome.

        Parameters:
            phenotype: The outcome phenotype to analyze

        Returns:
            KaplanMeierFitter or None: Fitted KM model, or None if data is empty.
        """
        indicator = f"INDICATOR_{phenotype.name.upper()}"
        durations = f"DAYS_FIRST_EVENT_{phenotype.name.upper()}"
        _df = self._tte_table[[indicator, durations]].dropna()
        if _df.empty:
            logger.warning(f"No data for outcome {phenotype.name}; skipping KM fit.")
            return None
        kmf = KaplanMeierFitter(label=phenotype.name)
        kmf.fit(durations=_df[durations], event_observed=_df[indicator])
        return kmf

    def _build_aggregated_risk_table(self) -> pd.DataFrame:
        """
        Build aggregated survival/risk data from Kaplan-Meier fits for all outcomes.

        Combines survival function estimates and event tables from all outcomes into a single
        DataFrame suitable for reporting.

        Returns:
            DataFrame with columns: Outcome, Timeline, Survival_Probability,
            CI_Lower, CI_Upper, At_Risk, Events, Censored
        """
        all_outcomes_data = []

        for phenotype in self._outcomes:
            kmf = self.fit_kaplan_meier_for_phenotype(phenotype)
            if kmf is None:
                continue

            # Get survival function
            survival_df = kmf.survival_function_.reset_index()
            survival_df.columns = ["Timeline", "Survival_Probability"]

            # Get confidence intervals
            ci_df = kmf.confidence_interval_.reset_index()
            ci_df.columns = ["Timeline", "CI_Lower", "CI_Upper"]
            survival_df = pd.merge(survival_df, ci_df, on="Timeline", how="left")

            # Get event table
            event_df = kmf.event_table.reset_index()
            event_df = event_df.rename(columns={"event_at": "Timeline"})

            # Merge survival and event data
            outcome_df = pd.merge(survival_df, event_df, on="Timeline", how="left")

            # Add outcome name
            outcome_df.insert(0, "Outcome", phenotype.name)

            # Select and rename key columns
            outcome_df = outcome_df[
                [
                    "Outcome",
                    "Timeline",
                    "Survival_Probability",
                    "CI_Lower",
                    "CI_Upper",
                    "at_risk",
                    "observed",
                    "censored",
                ]
            ].rename(
                columns={
                    "at_risk": "At_Risk",
                    "observed": "Events",
                    "censored": "Censored",
                }
            )

            all_outcomes_data.append(outcome_df)

        # Combine all outcomes
        if all_outcomes_data:
            result = pd.concat(all_outcomes_data, ignore_index=True)
        else:
            result = pd.DataFrame()

        return result

    def to_html(self, filename: str, version: str = "unknown") -> str:
        """Export KM curves for all outcomes as a self-contained HTML file with embedded PNGs."""
        filepath = Path(filename)
        if filepath.suffix != ".html":
            filepath = filepath.with_suffix(".html")
        filepath.parent.mkdir(parents=True, exist_ok=True)

        cohort_name = getattr(self.cohort, "name", "cohort")
        images_html = self._render_km_images_html()

        # Embed bird icon as base64 data URI
        icon_path = (
            Path(__file__).resolve().parent.parent / "docs" / "assets" / "bird_icon.png"
        )
        if icon_path.exists():
            icon_b64 = base64.b64encode(icon_path.read_bytes()).decode("ascii")
            icon_data_uri = f"data:image/png;base64,{icon_b64}"
        else:
            icon_data_uri = ""

        from html import escape

        version_escaped = escape(version)

        if icon_data_uri:
            footer = (
                f'<div class="phenex-footer">'
                f'<img src="{icon_data_uri}" alt="PhenEx">'
                f"<span>Generated with PhenEx v{version_escaped}</span></div>"
            )
        else:
            footer = (
                f'<div class="phenex-footer">'
                f"<span>Generated with PhenEx v{version_escaped}</span></div>"
            )

        html = (
            "<!DOCTYPE html><html><head>"
            f"<title>Time to Event - {cohort_name}</title>"
            "<style>"
            "body{font-family:sans-serif;margin:20px;padding-bottom:50px;}"
            ".phenex-footer{position:fixed;bottom:0;left:0;padding:10px 16px;"
            "display:flex;align-items:center;gap:8px;background:rgba(255,255,255,0.9);z-index:9999;}"
            ".phenex-footer img{height:24px;width:auto;}"
            ".phenex-footer span{font-size:11px;color:#999;}"
            "</style>"
            "</head><body>"
            f"<h1>Kaplan-Meier Curves &mdash; {cohort_name}</h1>"
            + "\n".join(images_html)
            + footer
            + "</body></html>"
        )
        filepath.write_text(html, encoding="utf-8")
        return str(filepath.absolute())

    def _render_km_images_html(self) -> list:
        """Render KM curves for all outcomes as base64-embedded HTML image divs."""
        images_html = []
        if self._tte_table is None or self._tte_table.empty:
            return images_html
        for i, phenotype in enumerate(self._outcomes):
            kmf = self.fit_kaplan_meier_for_phenotype(phenotype)
            if kmf is None:
                continue
            fig, ax = plt.subplots(1, 1, figsize=(12, 4))
            ax.set_title(f"Kaplan Meier for outcome : {phenotype.name}")
            kmf.plot(ax=ax)
            add_at_risk_counts(kmf, ax=ax)
            plt.tight_layout()
            ax.grid(color="gray", linestyle="-", linewidth=0.1)

            buf = io.BytesIO()
            fig.savefig(buf, format="png", dpi=150)
            plt.close(fig)
            buf.seek(0)
            b64 = base64.b64encode(buf.read()).decode("utf-8")
            images_html.append(
                f'<div style="margin-bottom:20px;">'
                f'<img src="data:image/png;base64,{b64}" style="max-width:100%;"/>'
                f"</div>"
            )
        return images_html

name property

Name of the reporter, used for identification and output file naming.

execute(cohort)

Execute the time to event analysis for a provided cohort.

This generates: 1. Patient-level time-to-event table (stored in self._tte_table) 2. Aggregated survival/risk data from Kaplan-Meier fits (stored in self.df) 3. Kaplan-Meier plots

Parameters:

Name Type Description Default
cohort Cohort

The cohort for which the time to event analysis should be performed.

required

Returns:

Name Type Description
DataFrame

DataFrame with aggregated survival function estimates and event counts for all outcomes.

Schema DataFrame
  • Outcome (str): Name of the outcome phenotype
  • Timeline (float): Time point in days from index
  • Survival_Probability (float): Kaplan-Meier survival estimate at this time point
  • At_Risk (int): Number of patients at risk at this time point
  • Events (int): Number of outcome events observed at this time point
  • Censored (int): Number of patients censored at this time point
Source code in phenex/reporting/time_to_event.py
def execute(self, cohort: "Cohort") -> pd.DataFrame:
    """
    Execute the time to event analysis for a provided cohort.

    This generates:
    1. Patient-level time-to-event table (stored in self._tte_table)
    2. Aggregated survival/risk data from Kaplan-Meier fits (stored in self.df)
    3. Kaplan-Meier plots

    Parameters:
        cohort: The cohort for which the time to event analysis should be performed.

    Returns:
        DataFrame with aggregated survival function estimates and event counts for all outcomes.

        Schema:
            - Outcome (str): Name of the outcome phenotype
            - Timeline (float): Time point in days from index
            - Survival_Probability (float): Kaplan-Meier survival estimate at this time point
            - At_Risk (int): Number of patients at risk at this time point
            - Events (int): Number of outcome events observed at this time point
            - Censored (int): Number of patients censored at this time point
    """
    self.cohort = cohort
    if self.phenotype_names is not None:
        self._outcomes = [
            p for p in cohort.outcomes if p.name in self.phenotype_names
        ]
        missing = set(self.phenotype_names) - {p.name for p in self._outcomes}
        if missing:
            logger.warning(
                f"No matching outcome phenotypes found for: {sorted(missing)}"
            )
    else:
        self._outcomes = cohort.outcomes
    self._execute_right_censoring_phenotypes(self.cohort)

    # Build patient-level time-to-event table
    table = cohort.index_table.mutate(
        INDEX_DATE=self.cohort.index_table.EVENT_DATE.cast("date")
    ).select(["PERSON_ID", "INDEX_DATE"])
    table = self._append_date_events(table)
    table = self._append_days_to_event(table)
    table = self._append_date_and_days_to_first_event(table)
    self._tte_table = table.execute()  # Convert to pandas DataFrame

    if self._tte_table.empty:
        logger.warning("No patients in cohort; skipping time-to-event analysis.")
        self.df = pd.DataFrame()
        return self.df

    # Build aggregated survival/risk data from KM fits
    self.df = self._build_aggregated_risk_table()

    logger.info("time to event finished execution")
    self.plot_multiple_kaplan_meier()
    return self.df

fit_kaplan_meier_for_phenotype(phenotype)

Fit a Kaplan-Meier model for a specific phenotype outcome.

Parameters:

Name Type Description Default
phenotype

The outcome phenotype to analyze

required

Returns:

Type Description

KaplanMeierFitter or None: Fitted KM model, or None if data is empty.

Source code in phenex/reporting/time_to_event.py
def fit_kaplan_meier_for_phenotype(self, phenotype):
    """
    Fit a Kaplan-Meier model for a specific phenotype outcome.

    Parameters:
        phenotype: The outcome phenotype to analyze

    Returns:
        KaplanMeierFitter or None: Fitted KM model, or None if data is empty.
    """
    indicator = f"INDICATOR_{phenotype.name.upper()}"
    durations = f"DAYS_FIRST_EVENT_{phenotype.name.upper()}"
    _df = self._tte_table[[indicator, durations]].dropna()
    if _df.empty:
        logger.warning(f"No data for outcome {phenotype.name}; skipping KM fit.")
        return None
    kmf = KaplanMeierFitter(label=phenotype.name)
    kmf.fit(durations=_df[durations], event_observed=_df[indicator])
    return kmf

get_pretty_display()

Return a formatted version of the reporter's results for display.

Default implementation returns a copy of self.df with: - Numeric values rounded to decimal_places - NaN values replaced with empty strings for cleaner display

Subclasses can override this method for custom formatting (e.g., phenotype display names).

Returns:

Type Description
DataFrame

pd.DataFrame: Formatted copy of the results

Raises:

Type Description
AttributeError

If self.df is not defined

Source code in phenex/reporting/reporter.py
def get_pretty_display(self) -> pd.DataFrame:
    """
    Return a formatted version of the reporter's results for display.

    Default implementation returns a copy of self.df with:
    - Numeric values rounded to decimal_places
    - NaN values replaced with empty strings for cleaner display

    Subclasses can override this method for custom formatting (e.g., phenotype display names).

    Returns:
        pd.DataFrame: Formatted copy of the results

    Raises:
        AttributeError: If self.df is not defined
    """
    if not hasattr(self, "df"):
        raise AttributeError(
            f"{self.__class__.__name__} does not have a 'df' attribute. "
            "Call execute() first or implement a custom get_pretty_display() method."
        )

    # Create a copy to avoid modifying the original
    pretty_df = self.df.copy()

    # Round numeric columns to decimal_places
    numeric_columns = pretty_df.select_dtypes(include=["number"]).columns
    pretty_df[numeric_columns] = pretty_df[numeric_columns].round(
        self.decimal_places
    )

    # Replace NaN with empty strings for cleaner display
    pretty_df = pretty_df.fillna("")

    return pretty_df

plot_multiple_kaplan_meier(xlim=None, ylim=None, n_cols=3, outcome_indices=None, path_dir=None)

For each outcome, plot a kaplan meier curve.

Source code in phenex/reporting/time_to_event.py
def plot_multiple_kaplan_meier(
    self,
    xlim: Optional[List[int]] = None,
    ylim: Optional[List[int]] = None,
    n_cols: int = 3,
    outcome_indices: Optional[List[int]] = None,
    path_dir: Optional[str] = None,
):
    """
    For each outcome, plot a kaplan meier curve.
    """
    # subset for current codelist
    if self._tte_table is None or self._tte_table.empty:
        return
    phenotypes = self._outcomes
    if outcome_indices is not None:
        phenotypes = [
            x for i, x in enumerate(self._outcomes) if i in outcome_indices
        ]
    if not phenotypes:
        return
    n_rows = math.ceil(len(phenotypes) / n_cols)
    fig, axes = plt.subplots(n_rows, n_cols, sharey=True, sharex=True)

    for i, phenotype in enumerate(phenotypes):
        kmf = self.fit_kaplan_meier_for_phenotype(phenotype)
        if kmf is None:
            continue
        if n_rows > 1 and n_cols > 1:
            ax = axes[int(i / n_cols), i % n_cols]
        else:
            ax = axes[i]
        ax.set_title(phenotype.name)
        if xlim is not None:
            ax.set_xlim(xlim)
        if ylim is not None:
            ax.set_ylim(ylim)
        kmf.plot(ax=ax)
        ax.grid(color="gray", linestyle="-", linewidth=0.1)

    if path_dir is not None:
        cohort_name = getattr(self.cohort, "name", "cohort")
        path = os.path.join(path_dir, f"KaplanMeierPanelFor_{cohort_name}.svg")
        plt.savefig(path, dpi=150)
    plt.show()

plot_single_kaplan_meier(outcome_index=0, xlim=None, ylim=None, path_dir=None)

For each outcome, plot a kaplan meier curve.

Source code in phenex/reporting/time_to_event.py
def plot_single_kaplan_meier(
    self,
    outcome_index: int = 0,
    xlim: Optional[List[int]] = None,
    ylim: Optional[List[int]] = None,
    path_dir: Optional[str] = None,
):
    """
    For each outcome, plot a kaplan meier curve.
    """
    # subset for current codelist
    phenotype = self._outcomes[outcome_index]
    kmf = self.fit_kaplan_meier_for_phenotype(phenotype)
    if kmf is None:
        return
    fig, ax = plt.subplots(1, 1, figsize=(12, 4))

    ax.set_title(f"Kaplan Meier for outcome : {phenotype.name}")
    kmf.plot(ax=ax)
    add_at_risk_counts(kmf, ax=ax)
    plt.tight_layout()
    ax.grid(color="gray", linestyle="-", linewidth=0.1)
    if xlim is not None:
        ax.set_xlim(xlim)
    if ylim is not None:
        ax.set_ylim(ylim)

    if path_dir is not None:
        path = os.path.join(
            path_dir,
            f"KaplanMeier_{getattr(self.cohort, 'name', 'cohort')}_{phenotype.name}.svg",
        )
        plt.savefig(path, dpi=150)
    plt.show()

to_csv(filename)

Export reporter results to CSV format.

Default implementation exports self.df if it exists. Subclasses can override for custom behavior. If pretty_display=True, formats the DataFrame before export.

Parameters:

Name Type Description Default
filename str

Path to the output file (relative or absolute, with or without .csv extension)

required

Returns:

Name Type Description
str str

Full path to the created file

Raises:

Type Description
AttributeError

If self.df is not defined (call execute() first)

Source code in phenex/reporting/reporter.py
def to_csv(self, filename: str) -> str:
    """
    Export reporter results to CSV format.

    Default implementation exports self.df if it exists. Subclasses can override for custom behavior. If pretty_display=True, formats the DataFrame before export.

    Args:
        filename: Path to the output file (relative or absolute, with or without .csv extension)

    Returns:
        str: Full path to the created file

    Raises:
        AttributeError: If self.df is not defined (call execute() first)
    """
    if not hasattr(self, "df"):
        raise AttributeError(
            f"{self.__class__.__name__} does not have a 'df' attribute. "
            "Call execute() first or implement a custom to_csv() method."
        )

    # Convert to Path object and ensure .csv extension
    filepath = Path(filename)
    if filepath.suffix != ".csv":
        filepath = filepath.with_suffix(".csv")

    # Create parent directories if needed
    filepath.parent.mkdir(parents=True, exist_ok=True)

    # Apply pretty display formatting
    df_to_export = self.get_pretty_display()

    # Export to CSV
    df_to_export.to_csv(filepath, index=False)

    return str(filepath.absolute())

to_excel(filename)

Export reporter results to Excel format.

Default implementation exports self.df if it exists. Subclasses can override for custom behavior. If pretty_display=True, formats the DataFrame before export using get_pretty_display().

Parameters:

Name Type Description Default
filename str

Path to the output file (relative or absolute, with or without .xlsx extension)

required

Returns:

Name Type Description
str str

Full path to the created file

Raises:

Type Description
AttributeError

If self.df is not defined (call execute() first)

ImportError

If openpyxl is not installed

Source code in phenex/reporting/reporter.py
def to_excel(self, filename: str) -> str:
    """
    Export reporter results to Excel format.

    Default implementation exports self.df if it exists. Subclasses can override for custom behavior.
    If pretty_display=True, formats the DataFrame before export using get_pretty_display().

    Args:
        filename: Path to the output file (relative or absolute, with or without .xlsx extension)

    Returns:
        str: Full path to the created file

    Raises:
        AttributeError: If self.df is not defined (call execute() first)
        ImportError: If openpyxl is not installed
    """
    if not hasattr(self, "df"):
        raise AttributeError(
            f"{self.__class__.__name__} does not have a 'df' attribute. "
            "Call execute() first or implement a custom to_excel() method."
        )

    # Convert to Path object and ensure .xlsx extension
    filepath = Path(filename)
    if filepath.suffix != ".xlsx":
        filepath = filepath.with_suffix(".xlsx")

    # Create parent directories if needed
    filepath.parent.mkdir(parents=True, exist_ok=True)

    # Apply pretty display formatting
    df_to_export = self.get_pretty_display()

    # Export to Excel
    df_to_export.to_excel(filepath, index=False)

    return str(filepath.absolute())

to_html(filename, version='unknown')

Export KM curves for all outcomes as a self-contained HTML file with embedded PNGs.

Source code in phenex/reporting/time_to_event.py
def to_html(self, filename: str, version: str = "unknown") -> str:
    """Export KM curves for all outcomes as a self-contained HTML file with embedded PNGs."""
    filepath = Path(filename)
    if filepath.suffix != ".html":
        filepath = filepath.with_suffix(".html")
    filepath.parent.mkdir(parents=True, exist_ok=True)

    cohort_name = getattr(self.cohort, "name", "cohort")
    images_html = self._render_km_images_html()

    # Embed bird icon as base64 data URI
    icon_path = (
        Path(__file__).resolve().parent.parent / "docs" / "assets" / "bird_icon.png"
    )
    if icon_path.exists():
        icon_b64 = base64.b64encode(icon_path.read_bytes()).decode("ascii")
        icon_data_uri = f"data:image/png;base64,{icon_b64}"
    else:
        icon_data_uri = ""

    from html import escape

    version_escaped = escape(version)

    if icon_data_uri:
        footer = (
            f'<div class="phenex-footer">'
            f'<img src="{icon_data_uri}" alt="PhenEx">'
            f"<span>Generated with PhenEx v{version_escaped}</span></div>"
        )
    else:
        footer = (
            f'<div class="phenex-footer">'
            f"<span>Generated with PhenEx v{version_escaped}</span></div>"
        )

    html = (
        "<!DOCTYPE html><html><head>"
        f"<title>Time to Event - {cohort_name}</title>"
        "<style>"
        "body{font-family:sans-serif;margin:20px;padding-bottom:50px;}"
        ".phenex-footer{position:fixed;bottom:0;left:0;padding:10px 16px;"
        "display:flex;align-items:center;gap:8px;background:rgba(255,255,255,0.9);z-index:9999;}"
        ".phenex-footer img{height:24px;width:auto;}"
        ".phenex-footer span{font-size:11px;color:#999;}"
        "</style>"
        "</head><body>"
        f"<h1>Kaplan-Meier Curves &mdash; {cohort_name}</h1>"
        + "\n".join(images_html)
        + footer
        + "</body></html>"
    )
    filepath.write_text(html, encoding="utf-8")
    return str(filepath.absolute())

to_json(filename)

Export reporter results to JSON (machine-readable intermediate format).

Stores raw (unformatted) data so downstream tools can apply their own formatting. Subclasses may override to include additional metadata (e.g. Table1 adds section information).

Parameters:

Name Type Description Default
filename str

Path to the output file (with or without .json extension)

required

Returns:

Name Type Description
str str

Full path to the created file

Source code in phenex/reporting/reporter.py
def to_json(self, filename: str) -> str:
    """
    Export reporter results to JSON (machine-readable intermediate format).

    Stores raw (unformatted) data so downstream tools can apply their own
    formatting.  Subclasses may override to include additional metadata
    (e.g. Table1 adds section information).

    Args:
        filename: Path to the output file (with or without .json extension)

    Returns:
        str: Full path to the created file
    """
    import json

    if not hasattr(self, "df"):
        raise AttributeError(
            f"{self.__class__.__name__} does not have a 'df' attribute. "
            "Call execute() first or implement a custom to_json() method."
        )

    filepath = Path(filename)
    if filepath.suffix != ".json":
        filepath = filepath.with_suffix(".json")
    filepath.parent.mkdir(parents=True, exist_ok=True)

    payload = {
        "reporter_type": self.__class__.__name__,
        "rows": self.df.to_dict(orient="records"),
    }

    with filepath.open("w") as f:
        json.dump(payload, f, indent=2, default=str)

    return str(filepath.absolute())

to_markdown(filename)

Export reporter results to Markdown format.

Default implementation exports self.df if it exists. Subclasses can override for custom behavior. If pretty_display=True, formats the DataFrame before export.

Parameters:

Name Type Description Default
filename str

Path to the output file (relative or absolute, with or without .md extension)

required

Returns:

Name Type Description
str str

Full path to the created file

Raises:

Type Description
AttributeError

If self.df is not defined (call execute() first)

ImportError

If tabulate is not installed (required for df.to_markdown())

Source code in phenex/reporting/reporter.py
def to_markdown(self, filename: str) -> str:
    """
    Export reporter results to Markdown format.

    Default implementation exports self.df if it exists. Subclasses can override for custom behavior. If pretty_display=True, formats the DataFrame before export.

    Args:
        filename: Path to the output file (relative or absolute, with or without .md extension)

    Returns:
        str: Full path to the created file

    Raises:
        AttributeError: If self.df is not defined (call execute() first)
        ImportError: If tabulate is not installed (required for df.to_markdown())
    """
    if not hasattr(self, "df"):
        raise AttributeError(
            f"{self.__class__.__name__} does not have a 'df' attribute. "
            "Call execute() first or implement a custom to_markdown() method."
        )

    # Convert to Path object and ensure .md extension
    filepath = Path(filename)
    if filepath.suffix != ".md":
        filepath = filepath.with_suffix(".md")

    # Create parent directories if needed
    filepath.parent.mkdir(parents=True, exist_ok=True)

    # Apply pretty display formatting
    df_to_export = self.get_pretty_display()

    # Export to Markdown (requires tabulate package)
    try:
        markdown_content = df_to_export.to_markdown(index=False)
        filepath.write_text(markdown_content)
    except ImportError:
        raise ImportError(
            "tabulate is required for Markdown export. Install with: pip install tabulate"
        )

    return str(filepath.absolute())

to_word(filename)

Export reporter results to Microsoft Word format.

Default implementation exports self.df as a simple table if it exists. Subclasses can override for custom formatting (headers, styling, etc). If pretty_display=True, formats the DataFrame before export using get_pretty_display().

Parameters:

Name Type Description Default
filename str

Path to the output file (relative or absolute, with or without .docx extension)

required

Returns:

Name Type Description
str str

Full path to the created file

Raises:

Type Description
AttributeError

If self.df is not defined (call execute() first)

ImportError

If python-docx is not installed

Source code in phenex/reporting/reporter.py
def to_word(self, filename: str) -> str:
    """
    Export reporter results to Microsoft Word format.

    Default implementation exports self.df as a simple table if it exists.
    Subclasses can override for custom formatting (headers, styling, etc).
    If pretty_display=True, formats the DataFrame before export using get_pretty_display().

    Args:
        filename: Path to the output file (relative or absolute, with or without .docx extension)

    Returns:
        str: Full path to the created file

    Raises:
        AttributeError: If self.df is not defined (call execute() first)
        ImportError: If python-docx is not installed
    """
    if not hasattr(self, "df"):
        raise AttributeError(
            f"{self.__class__.__name__} does not have a 'df' attribute. "
            "Call execute() first or implement a custom to_word() method."
        )

    try:
        from docx import Document
    except ImportError:
        raise ImportError(
            "python-docx is required for Word export. Install with: pip install python-docx"
        )

    # Convert to Path object and ensure .docx extension
    filepath = Path(filename)
    if filepath.suffix != ".docx":
        filepath = filepath.with_suffix(".docx")

    # Create parent directories if needed
    filepath.parent.mkdir(parents=True, exist_ok=True)

    # Apply pretty display formatting
    df_to_export = self.get_pretty_display()

    # Create Word document with table
    doc = Document()

    # Add table (rows + 1 for header)
    table = doc.add_table(
        rows=len(df_to_export) + 1, cols=len(df_to_export.columns)
    )
    table.style = "Light Grid Accent 1"

    # Add header row
    for col_idx, column_name in enumerate(df_to_export.columns):
        table.rows[0].cells[col_idx].text = str(column_name)

    # Add data rows
    for row_idx, (_, row_data) in enumerate(df_to_export.iterrows(), start=1):
        for col_idx, value in enumerate(row_data):
            table.rows[row_idx].cells[col_idx].text = str(value)

    # Save document
    doc.save(str(filepath))

    return str(filepath.absolute())