Skip to content

Table2

Bases: Reporter

Table2 generates outcome incidence rates and event counts for a cohort at specified time points.

For each outcome, reports: - N events in the cohort - N censored patients (patients whose follow-up was cut short) - Time under risk in patient-years (accounting for censoring) - Incidence rate per 100 patient-years

Time under risk accounts for censoring from competing events (e.g., death) and administrative censoring at end of study period.

Parameters:

Name Type Description Default
time_points List[int]

List of days from index to evaluate outcomes (e.g., [90, 365])

[365]
right_censor_phenotypes Optional[List[Phenotype]]

List of phenotypes for right censoring (e.g., death)

None
end_of_study_period Optional[datetime]

End date of study period for administrative censoring

None
Example
from datetime import datetime

# Simple analysis without censoring
table2 = Table2(
    time_points=[90, 365, 730],  # 3 months, 1 year, 2 years
)

# Analysis with right censoring
table2_censored = Table2(
    time_points=[90, 365, 730],
    right_censor_phenotypes=[death_phenotype],
    end_of_study_period=datetime(2023, 12, 31)
)
results = table2_censored.execute(cohort)  # Uses cohort.outcomes
Source code in phenex/reporting/table2.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
class Table2(Reporter):
    """
    Table2 generates outcome incidence rates and event counts for a cohort at specified time points.

    For each outcome, reports:
    - N events in the cohort
    - N censored patients (patients whose follow-up was cut short)
    - Time under risk in patient-years (accounting for censoring)
    - Incidence rate per 100 patient-years

    Time under risk accounts for censoring from competing events (e.g., death) and administrative censoring at end of study period.

    Parameters:
        time_points: List of days from index to evaluate outcomes (e.g., [90, 365])
        right_censor_phenotypes: List of phenotypes for right censoring (e.g., death)
        end_of_study_period: End date of study period for administrative censoring

    Example:
        ```python
        from datetime import datetime

        # Simple analysis without censoring
        table2 = Table2(
            time_points=[90, 365, 730],  # 3 months, 1 year, 2 years
        )

        # Analysis with right censoring
        table2_censored = Table2(
            time_points=[90, 365, 730],
            right_censor_phenotypes=[death_phenotype],
            end_of_study_period=datetime(2023, 12, 31)
        )
        results = table2_censored.execute(cohort)  # Uses cohort.outcomes
        ```
    """

    def __init__(
        self,
        time_points: List[int] = [365],  # Default to 1 year
        decimal_places: int = 3,
        pretty_display: bool = True,
        right_censor_phenotypes: Optional[List[Phenotype]] = None,
        end_of_study_period: Optional["datetime"] = None,
    ):
        super().__init__(decimal_places=decimal_places, pretty_display=pretty_display)
        self.time_points = sorted(time_points)  # Sort time points
        self.right_censor_phenotypes = right_censor_phenotypes or []
        self.end_of_study_period = end_of_study_period

    def execute(self, cohort) -> pd.DataFrame:
        """
        Execute Table2 analysis for the provided cohort.

        Expected input columns:
        - cohort.table: PERSON_ID, EVENT_DATE (index date)
        - cohort.outcomes[].table: PERSON_ID, EVENT_DATE (outcome event date)
        - right_censor_phenotypes[].table: PERSON_ID, EVENT_DATE, BOOLEAN (if censoring used)

        Args:
            cohort: The cohort containing outcomes and index table

        Returns:
            DataFrame with columns:
            - Outcome: Name of outcome variable
            - Time_Point: Days from index date
            - N_Events: Number of events in cohort
            - N_Censored: Number of censored patients
            - Time_Under_Risk: Follow-up time in patient-years
            - Incidence_Rate: Incidence rate per 100 patient-years
        """
        self.cohort = cohort

        # Get outcomes from cohort like Table1
        if len(cohort.outcomes) == 0:
            logger.info("No outcomes in cohort. Table2 is empty")
            return pd.DataFrame()

        self.outcomes = cohort.outcomes
        logger.info(
            f"Starting Table2 analysis with {len(self.outcomes)} outcomes at {len(self.time_points)} time points"
        )

        # Execute right censoring phenotypes if they exist
        for phenotype in self.right_censor_phenotypes:
            phenotype.execute(cohort.subset_tables_index)

        # Analyze each outcome at each time point
        results_list = []
        for outcome in self.outcomes:
            for time_point in self.time_points:
                result = self._calculate_aggregate_time_under_risk(outcome, time_point)
                if result is not None:
                    results_list.append(result)

        # Combine results
        if results_list:
            self.df = pd.DataFrame(results_list)
        else:
            self.df = pd.DataFrame()

        if self.pretty_display and not self.df.empty:
            self._create_pretty_display()

        logger.info("Completed Table2 analysis")
        return self.df

    def _calculate_aggregate_time_under_risk(
        self, outcome: Phenotype, time_point: int
    ) -> Optional[dict]:
        """
        Calculate the total time under risk for single outcome at a specific time point.

        Expected input columns:
        - cohort.table: PERSON_ID, EVENT_DATE (index date)
        - outcome.table: PERSON_ID, EVENT_DATE (outcome event date)

        Args:
            outcome: Phenotype outcome to analyze
            time_point: Number of days from index to analyze

        Returns:
            Dictionary with aggregated analysis results, or None if no data available:
            - "Outcome": str - Name of the outcome phenotype
            - "Time_Point": int - Days from index date analyzed
            - "N_Events": int - Number of events observed in the cohort
            - "N_Censored": int - Number of patients censored before time_point
            - "Time_Under_Risk": float - Total follow-up time in patient-years (rounded to decimal_places)
            - "Incidence_Rate": float - Events per 100 patient-years (rounded to decimal_places)
        """
        # Get per-patient followup data
        followup_table = self._calculate_per_patient_time_under_risk(
            outcome, time_point
        )

        # Aggregate to get summary statistics
        summary = followup_table.aggregate(
            [
                _.HAS_EVENT.sum().name("N_Events"),
                _.IS_CENSORED.sum().name("N_Censored"),
                _.FOLLOWUP_TIME.sum().name("Total_Followup_Days"),
            ]
        )

        # Convert to pandas only at the very end for final calculations
        summary_df = summary.execute()

        if len(summary_df) == 0:
            logger.warning(f"No data for {outcome.name} at {time_point} days")
            return None

        row = summary_df.iloc[0]
        n_events = int(row["N_Events"])
        n_censored = int(row["N_Censored"])
        total_followup_days = float(row["Total_Followup_Days"])

        # Convert to patient-years and calculate incidence rate
        time_years = total_followup_days / 365.25
        incidence_rate = (n_events / time_years * 100) if time_years > 0 else 0

        logger.debug(
            f"Outcome {outcome.name} at {time_point} days: {n_events} events, {n_censored} censored. "
        )

        return {
            "Outcome": outcome.name,
            "Time_Point": time_point,
            "N_Events": n_events,
            "N_Censored": n_censored,
            "Time_Under_Risk": round(time_years, self.decimal_places),
            "Incidence_Rate": round(incidence_rate, self.decimal_places),
        }

    def _calculate_per_patient_time_under_risk(
        self, outcome: Phenotype, time_point: int
    ):
        """
        Calculate per-patient time under risk data for a single outcome at a specific time point.

        Expected input columns:
        - cohort.table: PERSON_ID, EVENT_DATE (index date)
        - outcome.table: PERSON_ID, EVENT_DATE (outcome event date)

        Returns:
            Ibis table with columns:
            - PERSON_ID: Patient identifier
            - INDEX_DATE: Index date for this patient
            - OUTCOME_DATE: Date of outcome event (null if no event)
            - DAYS_TO_EVENT: Days from index to outcome event (null if no event)
            - CENSOR_DATE: Date of censoring event (null if no censoring)
            - DAYS_TO_CENSOR: Days from index date to censoring event
            - HAS_EVENT: 1 if valid event within time window, 0 otherwise
            - FOLLOWUP_TIME: Actual follow-up time accounting for censoring
            - IS_CENSORED: 1 if patient was censored before time_point, 0 otherwise

        Args:
            outcome: Phenotype outcome to analyze
            time_point: Number of days from index to analyze

        Returns:
            Ibis table with per-patient followup data
        """
        # Get cohort index table
        index_table = self.cohort.table

        # Rename EVENT_DATE to INDEX_DATE for clarity
        index_table = index_table.mutate(INDEX_DATE=index_table.EVENT_DATE)
        index_table = index_table.select(["PERSON_ID", "INDEX_DATE"])

        # Calculate time to first outcome event
        index_table = self._calculate_time_to_first_post_index_event(
            index_table, [outcome], "OUTCOME_DATE", "DAYS_TO_EVENT"
        )

        # Calculate censoring time from right-censoring phenotypes
        index_table = self._calculate_time_to_first_post_index_event(
            index_table, self.right_censor_phenotypes, "CENSOR_DATE", "DAYS_TO_CENSOR"
        )

        # Apply end_of_study_period censoring if specified
        if self.end_of_study_period is not None:
            # Convert datetime to date if needed
            if hasattr(self.end_of_study_period, "date"):
                end_date = ibis.literal(self.end_of_study_period.date())
            else:
                # Fallback for string dates
                end_date = ibis.literal(pd.to_datetime(self.end_of_study_period).date())

            index_table = index_table.mutate(
                DAYS_TO_END_STUDY=(end_date - index_table.INDEX_DATE.cast("date")).cast(
                    "int"
                )
            )

            # Update DAYS_TO_CENSOR and CENSOR_DATE if end_of_study_period is earlier
            index_table = index_table.mutate(
                DAYS_TO_CENSOR=ibis.case()
                .when(
                    index_table.DAYS_TO_CENSOR.isnull(), index_table.DAYS_TO_END_STUDY
                )
                .when(
                    index_table.DAYS_TO_END_STUDY < index_table.DAYS_TO_CENSOR,
                    index_table.DAYS_TO_END_STUDY,
                )
                .else_(index_table.DAYS_TO_CENSOR)
                .end(),
                CENSOR_DATE=ibis.case()
                .when(index_table.DAYS_TO_CENSOR.isnull(), end_date)
                .when(
                    index_table.DAYS_TO_END_STUDY < index_table.DAYS_TO_CENSOR, end_date
                )
                .else_(index_table.CENSOR_DATE)
                .end(),
            ).drop("DAYS_TO_END_STUDY")

        # Filter to valid events within time window (after censoring)
        # FIXME need to be careful about ties!
        index_table = index_table.mutate(
            HAS_EVENT=ibis.case()
            .when(
                (index_table.DAYS_TO_EVENT.notnull())
                & (index_table.DAYS_TO_EVENT >= 0)
                & (index_table.DAYS_TO_EVENT <= time_point)
                & (
                    (index_table.DAYS_TO_CENSOR.isnull())
                    | (index_table.DAYS_TO_EVENT <= index_table.DAYS_TO_CENSOR)
                ),
                1,
            )
            .else_(0)
            .end()
        )

        # Calculate followup time (min of event time and censor time)
        index_table = index_table.mutate(
            FOLLOWUP_TIME=ibis.least(
                ibis.case()
                .when(
                    index_table.DAYS_TO_EVENT.notnull(),
                    index_table.DAYS_TO_EVENT,
                )
                .else_(time_point)
                .end(),
                index_table.DAYS_TO_CENSOR.fill_null(time_point),
                time_point,
            ),
            # Mark patients as censored if their follow-up was cut short by censoring
            # (i.e., censor time is less than time_point and they didn't have an event)
            IS_CENSORED=ibis.case()
            .when(
                (index_table.HAS_EVENT == 0)
                & (index_table.DAYS_TO_CENSOR.notnull())
                & (index_table.DAYS_TO_CENSOR < time_point),
                1,
            )
            .else_(0)
            .end(),
        )

        return index_table

    def _calculate_time_to_first_post_index_event(
        self, index_table, phenotypes, date_column_name: str, days_column_name: str
    ):
        """
        Calculate time to first event (outcome or censoring) for each person_id, index_date combination in the cohort.

        Input columns:
        - index_table: patient_id, index_date
        - phenotypes: List of phenotypes with event_date columns

        Output columns added:
        - date_column_name: Date of first event or None
        - days_column_name: Days from index to event or None if no event
        """
        # Validate row count before processing
        initial_row_count = index_table.count().execute()

        # If no phenotypes specified, everyone has no events
        if not phenotypes:
            result = index_table.mutate(
                **{date_column_name: None, days_column_name: None}
            )
            final_row_count = result.count().execute()
            if initial_row_count != final_row_count:
                raise ValueError(
                    f"Row count changed during processing: {initial_row_count} -> {final_row_count}"
                )
            return result

        # Collect all phenotype dates
        event_dates = []
        for phenotype in phenotypes:
            # Get the PERSON_ID and date columns from this phenotype
            # Handle both real phenotypes (where table is a method) and mock phenotypes (where table is an attribute)
            phenotype_table = phenotype.table.select(["PERSON_ID", "EVENT_DATE"])
            phenotype_table = phenotype_table.rename({"phenotype_date": "EVENT_DATE"})
            event_dates.append(phenotype_table)

        # Union all event dates
        if len(event_dates) == 1:
            all_event_dates = event_dates[0]
        else:
            all_event_dates = event_dates[0]
            for table in event_dates[1:]:
                all_event_dates = all_event_dates.union(table)

        # Join with index table and filter to events >= index_date
        with_events = index_table.join(all_event_dates, "PERSON_ID", how="left")

        # Get all existing columns except the ones we're about to join
        existing_columns = [col for col in index_table.columns]
        group_by_columns = (
            existing_columns  # Group by all existing columns to preserve them
        )

        # Group by patient and get the first (minimum) event date >= index_date
        # Use case expression to only consider valid events in the min aggregation
        first_event = with_events.group_by(group_by_columns).aggregate(
            first_event_date=ibis.case()
            .when(
                with_events.phenotype_date.isnull()
                | (with_events.phenotype_date < with_events.INDEX_DATE),
                None,
            )
            .else_(with_events.phenotype_date)
            .end()
            .min()
        )

        # Calculate days between index and first event
        result = first_event.mutate(
            **{
                date_column_name: first_event.first_event_date,
                days_column_name: ibis.case()
                .when(first_event.first_event_date.isnull(), None)
                .else_(
                    (first_event.first_event_date - first_event.INDEX_DATE).cast(int)
                )
                .end(),
            }
        ).drop("first_event_date")

        # Validate row count after processing
        final_row_count = result.count().execute()
        if initial_row_count != final_row_count:
            raise ValueError(
                f"Row count changed during processing: {initial_row_count} -> {final_row_count}"
            )

        return result

    def _create_pretty_display(self):
        """
        Create formatted display version of results.

        Expected input columns (from self.df):
        - Outcome: Name of outcome variable
        - Time_Point: Days from index date
        - N_Events: Number of events in cohort
        - N_Censored: Number of censored patients
        - Time_Under_Risk: Follow-up time in 100 patient-years
        - Incidence_Rate: Incidence rate per 100 patient-years

        Columns modified:
        - Incidence_Rate: Rounded to specified decimal places
        - Time_Under_Risk: Rounded to specified decimal places

        Final column order:
        - Outcome, Time_Point, N_Events, N_Censored, N_Total, Time_Under_Risk, Incidence_Rate
        """
        if self.df.empty:
            return

        # Round numeric columns
        numeric_columns = [
            "Incidence_Rate",
            "Time_Under_Risk",
        ]
        for col in numeric_columns:
            if col in self.df.columns:
                self.df[col] = self.df[col].round(self.decimal_places)

        # Reorder columns for display
        display_columns = [
            "Outcome",
            "Time_Point",
            "N_Events",
            "N_Censored",
            "N_Total",
            "Time_Under_Risk",
            "Incidence_Rate",
        ]

        # Only include columns that exist
        display_columns = [col for col in display_columns if col in self.df.columns]
        self.df = self.df[display_columns]

execute(cohort)

Execute Table2 analysis for the provided cohort.

Expected input columns: - cohort.table: PERSON_ID, EVENT_DATE (index date) - cohort.outcomes[].table: PERSON_ID, EVENT_DATE (outcome event date) - right_censor_phenotypes[].table: PERSON_ID, EVENT_DATE, BOOLEAN (if censoring used)

Parameters:

Name Type Description Default
cohort

The cohort containing outcomes and index table

required

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • Outcome: Name of outcome variable
DataFrame
  • Time_Point: Days from index date
DataFrame
  • N_Events: Number of events in cohort
DataFrame
  • N_Censored: Number of censored patients
DataFrame
  • Time_Under_Risk: Follow-up time in patient-years
DataFrame
  • Incidence_Rate: Incidence rate per 100 patient-years
Source code in phenex/reporting/table2.py
def execute(self, cohort) -> pd.DataFrame:
    """
    Execute Table2 analysis for the provided cohort.

    Expected input columns:
    - cohort.table: PERSON_ID, EVENT_DATE (index date)
    - cohort.outcomes[].table: PERSON_ID, EVENT_DATE (outcome event date)
    - right_censor_phenotypes[].table: PERSON_ID, EVENT_DATE, BOOLEAN (if censoring used)

    Args:
        cohort: The cohort containing outcomes and index table

    Returns:
        DataFrame with columns:
        - Outcome: Name of outcome variable
        - Time_Point: Days from index date
        - N_Events: Number of events in cohort
        - N_Censored: Number of censored patients
        - Time_Under_Risk: Follow-up time in patient-years
        - Incidence_Rate: Incidence rate per 100 patient-years
    """
    self.cohort = cohort

    # Get outcomes from cohort like Table1
    if len(cohort.outcomes) == 0:
        logger.info("No outcomes in cohort. Table2 is empty")
        return pd.DataFrame()

    self.outcomes = cohort.outcomes
    logger.info(
        f"Starting Table2 analysis with {len(self.outcomes)} outcomes at {len(self.time_points)} time points"
    )

    # Execute right censoring phenotypes if they exist
    for phenotype in self.right_censor_phenotypes:
        phenotype.execute(cohort.subset_tables_index)

    # Analyze each outcome at each time point
    results_list = []
    for outcome in self.outcomes:
        for time_point in self.time_points:
            result = self._calculate_aggregate_time_under_risk(outcome, time_point)
            if result is not None:
                results_list.append(result)

    # Combine results
    if results_list:
        self.df = pd.DataFrame(results_list)
    else:
        self.df = pd.DataFrame()

    if self.pretty_display and not self.df.empty:
        self._create_pretty_display()

    logger.info("Completed Table2 analysis")
    return self.df