Skip to content

Table1

Bases: Reporter

Table1 is a common term used in epidemiology to describe a table that shows an overview of the baseline characteristics of a cohort. It contains the counts and percentages of the cohort that have each characteristic, for both boolean and value characteristics. In addition, summary statistics are provided for value characteristics (mean, std, median, min, max).

Source code in phenex/reporting/table1.py
class Table1(Reporter):
    """
    Table1 is a common term used in epidemiology to describe a table that shows an overview of the baseline characteristics of a cohort. It contains the counts and percentages of the cohort that have each characteristic, for both boolean and value characteristics. In addition, summary statistics are provided for value characteristics (mean, std, median, min, max).

    """

    def execute(self, cohort: "Cohort") -> pd.DataFrame:
        if len(cohort.characteristics) == 0:
            logger.info("No characteristics. table1 is empty")
            return pd.DataFrame()

        self.cohort = cohort
        self.N = (
            cohort.index_table.filter(cohort.index_table.BOOLEAN == True)
            .select("PERSON_ID")
            .distinct()
            .count()
            .execute()
        )
        logger.debug("Starting with categorical columns for table1")
        self.df_categoricals = self._report_categorical_columns()
        logger.debug("Starting with boolean columns for table1")
        self.df_booleans = self._report_boolean_columns()
        logger.debug("Starting with value columns for table1")
        self.df_values = self._report_value_columns()

        # add percentage column
        dfs = [
            df
            for df in [self.df_booleans, self.df_values, self.df_categoricals]
            if df is not None
        ]
        if len(dfs) > 1:
            self.df = pd.concat(dfs)
        elif len(dfs) == 1:
            self.df = dfs[0]
        else:
            self.df = None
        if self.df is not None:
            self.df["%"] = 100 * self.df["N"] / self.N
            # reorder columns so N and % are first
            first_cols = ["N", "%"]
            column_order = first_cols + [
                x for x in self.df.columns if x not in first_cols
            ]
            self.df = self.df[column_order]
        logger.debug("Finished creating table1")
        return self.df

    def _get_boolean_characteristics(self):
        return [
            x
            for x in self.cohort.characteristics
            if type(x).__name__
            not in [
                "MeasurementPhenotype",
                "AgePhenotype",
                "CategoricalPhenotype",
                "SexPhenotype",
                "TimeRangePhenotype",
            ]
        ]

    def _get_value_characteristics(self):
        return [
            x
            for x in self.cohort.characteristics
            if type(x).__name__
            in ["MeasurementPhenotype", "AgePhenotype", "TimeRangePhenotype"]
        ]

    def _get_categorical_characteristics(self):
        return [
            x
            for x in self.cohort.characteristics
            if type(x).__name__ in ["CategoricalPhenotype", "SexPhenotype"]
        ]

    def _report_boolean_columns(self):
        table = self.cohort.characteristics_table
        # get list of all boolean columns
        boolean_phenotypes = self._get_boolean_characteristics()
        boolean_columns = list(set([f"{x.name}_BOOLEAN" for x in boolean_phenotypes]))
        logger.debug(f"Found {len(boolean_columns)} : {boolean_columns}")
        if len(boolean_columns) == 0:
            return None

        def get_counts_for_column(col):
            return table.select(["PERSON_ID", col]).distinct()[col].sum().execute()

        # get count of 'Trues' in the boolean columns i.e. the phenotype counts
        df_t1 = pd.DataFrame()
        df_t1["N"] = [get_counts_for_column(col) for col in boolean_columns]
        df_t1.index = [x.replace("_BOOLEAN", "") for x in boolean_columns]

        # add the full cohort size as the first row
        df_n = pd.DataFrame({"N": [self.N]}, index=["cohort"])
        # concat population size
        df = pd.concat([df_n, df_t1])
        return df

    def _report_value_columns(self):
        table = self.cohort.characteristics_table
        # get value columns
        value_phenotypes = self._get_value_characteristics()
        value_columns = [f"{x.name}_VALUE" for x in value_phenotypes]
        logger.debug(f"Found {len(value_columns)} : {value_columns}")

        if len(value_columns) == 0:
            return None

        names = []
        dfs = []
        for col in value_columns:
            name = col.split("_VALUE")[0]
            _table = table.select(["PERSON_ID", col]).distinct()
            d = {
                "N": _table[col].count().execute(),
                "mean": _table[col].mean().execute(),
                "std": _table[col].std().execute(),
                "median": _table[col].median().execute(),
                "min": _table[col].min().execute(),
                "max": _table[col].max().execute(),
            }
            dfs.append(pd.DataFrame.from_dict([d]))
            names.append(name)
        if len(dfs) == 1:
            df = dfs[0]
        else:
            df = pd.concat(dfs)
        df.index = names
        return df

    def _report_categorical_columns(self):
        table = self.cohort.characteristics_table
        categorical_phenotypes = self._get_categorical_characteristics()
        categorical_columns = [
            f"{x.name}_VALUE"
            for x in categorical_phenotypes
            if f"{x.name}_VALUE" in table.columns
        ]
        logger.debug(f"Found {len(categorical_columns)} : {categorical_columns}")
        if len(categorical_columns) == 0:
            return None
        dfs = []
        names = []
        for col in categorical_columns:
            name = col.replace("_VALUE", "")
            # Get counts for each category
            cat_counts = (
                table.select(["PERSON_ID", col])
                .distinct()
                .group_by(col)
                .aggregate(N=_.count())
                .execute()
            )
            cat_counts.index = [f"{name}={v}" for v in cat_counts[col]]
            dfs.append(cat_counts["N"])
            names.extend(cat_counts.index)
        if len(dfs) == 1:
            df = dfs[0]
        else:
            df = pd.concat(dfs)
        df.index = names
        return df