Skip to content

Waterfall

Bases: Reporter

A waterfall diagram, also known as an attrition table, shows how inclusion/exclusion criteria contribute to a final population size. Each inclusion/exclusion criteria is a row in the table, and the number of patients remaining after applying that criteria are shown on that row.

Column name Description
Type The type of the phenotype, either entry, inclusion or exclusion
Name The name of entry, inclusion or exclusion criteria
N The absolute number of patients that fulfill that phenotype. For the entry criterium this is the absolute number in the dataset. For inclusion/exclusion criteria this is the number of patients that fulfill the entry criterium AND the phenotype and that row.
Remaining The number of patients remaining in the cohort after sequentially applying the inclusion/exclusion criteria in the order that they are listed in this table.
% The percentage of patients who fulfill the entry criterion who are remaining in the cohort after application of the phenotype on that row
Delta The change in number of patients that occurs by applying the phenotype on that row.
Source code in phenex/reporting/waterfall.py
class Waterfall(Reporter):
    """
    A waterfall diagram, also known as an attrition table, shows how inclusion/exclusion criteria contribute to a final population size. Each inclusion/exclusion criteria is a row in the table, and the number of patients remaining after applying that criteria are shown on that row.

    | Column name | Description |
    | --- | --- |
    | Type | The type of the phenotype, either entry, inclusion or exclusion |
    | Name | The name of entry, inclusion or exclusion criteria |
    | N | The absolute number of patients that fulfill that phenotype. For the entry criterium this is the absolute number in the dataset. For inclusion/exclusion criteria this is the number of patients that fulfill the entry criterium AND the phenotype and that row. |
    | Remaining | The number of patients remaining in the cohort after sequentially applying the inclusion/exclusion criteria in the order that they are listed in this table. |
    | % | The percentage of patients who fulfill the entry criterion who are remaining in the cohort after application of the phenotype on that row |
    | Delta | The change in number of patients that occurs by applying the phenotype on that row. |

    """

    def execute(self, cohort: "Cohort") -> pd.DataFrame:
        self.cohort = cohort
        logger.debug(f"Beginning execution of waterfall. Calculating N patents")
        N = (
            cohort.index_table.filter(cohort.index_table.BOOLEAN == True)
            .select("PERSON_ID")
            .distinct()
            .count()
            .execute()
        )
        logger.debug(f"Cohort has {N} patients")
        self.ds = []

        table = cohort.entry_criterion.table
        N_entry = table.count().execute()
        self.ds.append(
            {
                "Type": "entry",
                "Name": (
                    cohort.entry_criterion.display_name
                    if self.pretty_display
                    else cohort.entry_criterion.name
                ),
                "N": N_entry,
                "Remaining": table.count().execute(),
            }
        )

        for inclusion in cohort.inclusions:
            table = self.append_phenotype_to_waterfall(table, inclusion, "inclusion")

        for exclusion in cohort.exclusions:
            table = self.append_phenotype_to_waterfall(table, exclusion, "exclusion")

        self.ds.append(
            {
                "Type": "final_cohort",
                "Name": "",
                "N": None,
                "Remaining": N,
            }
        )
        self.ds = self.append_delta(self.ds)

        # create dataframe with phenotype counts
        self.df = pd.DataFrame(self.ds)

        # calculate percentage of entry criterion
        self.df["%"] = self.df["Remaining"] / N_entry * 100
        self.df = self.df.round(self.decimal_places)

        if self.pretty_display:
            self.create_pretty_display()

        # Do final column selection
        self.df = self.df[["Type", "Name", "N", "Remaining", "%", "Delta"]]

        return self.df

    def append_phenotype_to_waterfall(self, table, phenotype, type):
        if type == "inclusion":
            table = table.inner_join(
                phenotype.table, table["PERSON_ID"] == phenotype.table["PERSON_ID"]
            )
        elif type == "exclusion":
            table = table.filter(~table["PERSON_ID"].isin(phenotype.table["PERSON_ID"]))
        else:
            raise ValueError("type must be either inclusion or exclusion")
        logger.debug(f"Starting {type} criteria {phenotype.name}")
        self.ds.append(
            {
                "Type": type,
                "Name": (
                    phenotype.display_name if self.pretty_display else phenotype.name
                ),
                "N": phenotype.table.select("PERSON_ID").distinct().count().execute(),
                "Remaining": table.select("PERSON_ID").distinct().count().execute(),
            }
        )
        logger.debug(
            f"Finished {type} criteria {phenotype.name}: N = {self.ds[-1]['N']} waterfall = {self.ds[-1]['Remaining']}"
        )
        return table.select("PERSON_ID")

    def append_delta(self, ds):
        ds[0]["Delta"] = None
        for i in range(1, len(ds) - 1):
            d_current = ds[i]
            d_previous = ds[i - 1]
            d_current["Delta"] = d_current["Remaining"] - d_previous["Remaining"]
        return ds

    def create_pretty_display(self):
        # cast counts to integer and to str, so that we can display without 'NaNs'
        self.df["N"] = self.df["N"].astype("Int64").astype(str)
        self.df["Delta"] = self.df["Delta"].astype("Int64").astype(str)

        # Replace NAs and None values with empty strings for display
        self.df = self.df.replace("<NA>", "")

        # create a sparse 'type' column name where inclusion/exclusion only appear once (instead of repeated on each row)
        previous_type = None
        sparse_types = []
        for _type in self.df["Type"].values:
            if _type != previous_type:
                sparse_types.append(_type)
                previous_type = _type
            else:
                sparse_types.append("")
        self.df["Type"] = sparse_types