Skip to content

Cohort

Bases: Phenotype

The Cohort class represents a cohort of individuals based on specified entry criteria, inclusions, exclusions, and baseline characteristics. It extends the Phenotype class.

Parameters:

Name Type Description Default
name str

A descriptive name for the cohort.

required
entry_criterion Phenotype

The phenotype used to define index date for the cohort.

required
inclusions Optional[List[Phenotype]]

A list of phenotypes that must evaluate to True for patients to be included in the cohort.

None
exclusions Optional[List[Phenotype]]

A list of phenotypes that must evaluate to False for patients to be included in the cohort.

None
characteristics Optional[List[Phenotype]]

A list of phenotypes representing baseline characteristics of the cohort to be computed for all patients passing the inclusion and exclusion criteria.

None
outcomes Optional[List[Phenotype]]

A list of phenotypes representing outcomes of the cohort.

None

Attributes:

Name Type Description
table PhenotypeTable

The resulting index table after filtering (None until execute is called)

inclusions_table Table

The patient-level result of all inclusion criteria calculations (None until execute is called)

exclusions_table Table

The patient-level result of all exclusion criteria calculations (None until execute is called)

characteristics_table Table

The patient-level result of all baseline characteristics caclulations. (None until execute is called)

outcomes_table Table

The patient-level result of all outcomes caclulations. (None until execute is called)

subset_tables_entry Dict[str, PhenexTable]

Tables that have been subset by those patients satisfying the entry criterion.

subset_tables_index Dict[str, PhenexTable]

Tables that have been subset by those patients satisfying the entry, inclusion and exclusion criteria.

Source code in phenex/phenotypes/cohort.py
class Cohort(Phenotype):
    """
    The Cohort class represents a cohort of individuals based on specified entry criteria,
    inclusions, exclusions, and baseline characteristics. It extends the Phenotype class.

    Parameters:
        name: A descriptive name for the cohort.
        entry_criterion: The phenotype used to define index date for the cohort.
        inclusions: A list of phenotypes that must evaluate to True for patients to be included in the cohort.
        exclusions: A list of phenotypes that must evaluate to False for patients to be included in the cohort.
        characteristics: A list of phenotypes representing baseline characteristics of the cohort to be computed for all patients passing the inclusion and exclusion criteria.
        outcomes: A list of phenotypes representing outcomes of the cohort.

    Attributes:
        table (PhenotypeTable): The resulting index table after filtering (None until execute is called)
        inclusions_table (Table): The patient-level result of all inclusion criteria calculations (None until execute is called)
        exclusions_table (Table): The patient-level result of all exclusion criteria calculations (None until execute is called)
        characteristics_table (Table): The patient-level result of all baseline characteristics caclulations. (None until execute is called)
        outcomes_table (Table): The patient-level result of all outcomes caclulations. (None until execute is called)
        subset_tables_entry (Dict[str, PhenexTable]): Tables that have been subset by those patients satisfying the entry criterion.
        subset_tables_index (Dict[str, PhenexTable]): Tables that have been subset by those patients satisfying the entry, inclusion and exclusion criteria.
    """

    table = None

    def __init__(
        self,
        name: str,
        entry_criterion: Phenotype,
        inclusions: Optional[List[Phenotype]] = None,
        exclusions: Optional[List[Phenotype]] = None,
        characteristics: Optional[List[Phenotype]] = None,
        outcomes: Optional[List[Phenotype]] = None,
    ):
        super(Cohort, self).__init__()
        self.name = name
        self.entry_criterion = entry_criterion
        self.inclusions = inclusions if inclusions is not None else []
        self.exclusions = exclusions if exclusions is not None else []
        self.characteristics = characteristics if characteristics is not None else []
        self.outcomes = outcomes if outcomes is not None else []
        self.index_table = None
        self.exclusions_table = None
        self.inclusions_table = None
        self.characteristics_table = None
        self.outcomes_table = None
        self.children = (
            [entry_criterion]
            + self.inclusions
            + self.exclusions
            + self.characteristics
            + self.outcomes
        )
        self._table1 = None
        logger.info(
            f"Cohort '{self.name}' initialized with entry criterion '{self.entry_criterion.name}'"
        )

    def execute(
        self,
        tables: Dict[str, Table],
        con: "SnowflakeConnector" = None,
        write_subset_tables=False,
        overwrite: bool = False,
        n_threads: int = 1,
    ) -> PhenotypeTable:
        """
        The execute method executes the full cohort in order of computation. The order is entry criterion -> inclusion -> exclusion -> baseline characteristics. Tables are subset at two points, after entry criterion and after full inclusion/exclusion calculation to result in subset_entry data (contains all source data for patients that fulfill the entry criterion, with a possible index date) and subset_index data (contains all source data for patients that fulfill all in/ex criteria, with a set index date). Additionally, default reporters are executed such as table 1 for baseline characteristics.

        Args:
            tables (Dict[str, Table]): A dictionary of table names to Table objects.
            con (SnowflakeConnector, optional): A connection to Snowflake. Defaults to None. If passed, will write entry, inclusions, exclusions, index, characteristics and outcomes tables.
            write_subset_tables (bool, optional): Whether to write subset tables (subset-entry and subset-index) in addition to the standard intermediate tables.
            overwrite (bool, optional): Whether to overwrite existing tables when writing to disk.
            n_threads (int, optional): Number of threads to use for parallel execution. Defaults to 1.

        Returns:
            PhenotypeTable: The index table corresponding the cohort.
        """
        logger.info(f"Executing cohort '{self.name}' with {n_threads} threads...")
        # Compute entry criterion
        logger.debug("Computing entry criterion ...")
        self.entry_criterion.execute(tables)
        if con:
            logger.debug("Writing entry table ...")
            self.entry_criterion.table = con.create_table(
                self.entry_criterion.table, f"{self.name}__entry", overwrite=overwrite
            )

        logger.debug("Entry criterion computed.")
        self.subset_tables_entry = subset_and_add_index_date(
            tables, self.entry_criterion.table
        )
        if write_subset_tables:
            logger.debug("Writing subset entry tables ...")
            with ThreadPoolExecutor(max_workers=n_threads) as executor:
                futures = {}
                for key, table in self.subset_tables_entry.items():
                    futures[key] = executor.submit(
                        con.create_table,
                        table.table,
                        f"{self.name}__subset_entry_{key}",
                        overwrite,
                    )
                for key, future in futures.items():
                    self.subset_tables_entry[key] = type(self.subset_tables_entry[key])(
                        future.result()
                    )

        index_table = self.entry_criterion.table

        # Apply inclusions if any
        if self.inclusions:
            logger.debug("Applying inclusions ...")
            self._compute_inclusions_table(n_threads)
            if con:
                logger.debug("Writing inclusions table ...")
                self.inclusions_table = con.create_table(
                    self.inclusions_table,
                    f"{self.name}__inclusions",
                    overwrite=overwrite,
                )
            include = self.inclusions_table.filter(
                self.inclusions_table["BOOLEAN"] == True
            ).select(["PERSON_ID"])
            index_table = index_table.inner_join(include, ["PERSON_ID"])
            logger.debug("Inclusions applied.")

        # Apply exclusions if any
        if self.exclusions:
            logger.debug("Applying exclusions ...")
            self._compute_exclusions_table(n_threads)
            if con:
                logger.debug("Writing exclusions table ...")
                self.exclusions_table = con.create_table(
                    self.exclusions_table,
                    f"{self.name}__exclusions",
                    overwrite=overwrite,
                )
            exclude = self.exclusions_table.filter(
                self.exclusions_table["BOOLEAN"] == False
            ).select(["PERSON_ID"])
            index_table = index_table.inner_join(exclude, ["PERSON_ID"])
            logger.debug("Exclusions applied.")

        self.index_table = index_table
        if con:
            logger.debug("Writing index table ...")
            self.index_table = con.create_table(
                index_table, f"{self.name}__index", overwrite=overwrite
            )

        self.subset_tables_index = subset_and_add_index_date(tables, self.index_table)
        if write_subset_tables:
            logger.debug("Writing subset index tables ...")
            with ThreadPoolExecutor(max_workers=n_threads) as executor:
                futures = {}
                for key, table in self.subset_tables_index.items():
                    futures[key] = executor.submit(
                        con.create_table,
                        table.table,
                        f"{self.name}__subset_index_{key}",
                        overwrite,
                    )
                for key, future in futures.items():
                    self.subset_tables_index[key] = type(self.subset_tables_index[key])(
                        future.result()
                    )

        if self.characteristics:
            logger.debug("Computing characteristics ...")
            self._compute_characteristics_table(n_threads)
            if con:
                logger.debug("Writing characteristics table ...")
                self.characteristics_table = con.create_table(
                    self.characteristics_table,
                    f"{self.name}__characteristics",
                    overwrite=overwrite,
                )
            logger.debug("Characteristics computed.")
            _ = self.table1

        if self.outcomes:
            logger.debug("Computing outcomes ...")
            self._compute_outcomes_table(n_threads)
            logger.debug("Outcomes computed.")

        logger.info(f"Cohort '{self.name}' execution completed.")
        return index_table

    def _compute_inclusions_table(self, n_threads: int) -> Table:
        logger.debug("Computing inclusions table")
        """
        Compute the inclusions table from the individual inclusions phenotypes.
        Meant only to be called internally from execute() so that all dependent phenotypes
        have already been computed.

        Returns:
            Table: The join of all inclusion phenotypes together with a single "BOOLEAN"
            column that is the logical AND of all individual inclusion phenotypes
        """
        # create an inex table;
        # rows are persons that fulfill the entry criterion
        # columns are inclusion criteria with true of false if that column pt criteria are fulfilled
        inclusions_table = self._compute_inex_table(self.inclusions, n_threads)

        # create the final boolean inclusion column
        # this is true only if all inclusion criteria are true
        inclusions_table = inclusions_table.mutate(
            BOOLEAN=ibis.least(
                *[inclusions_table[f"{x.name}_BOOLEAN"] for x in self.inclusions]
            )
        )
        self.inclusions_table = inclusions_table
        logger.debug("Inclusions table computed")
        return self.inclusions_table

    def _compute_exclusions_table(self, n_threads: int) -> Table:
        logger.debug("Computing exclusions table")
        """
        Compute the exclusions table from the individual exclusions phenotypes.

        Returns:
            Table: The join of all exclusions phenotypes together with a single "BOOLEAN"
            column that is the logical OR of all individual inclusion phenotypes
        """
        # create an inex table;
        # rows are persons that fulfill the entry criterion
        # columns are inclusion criteria with true of false if fulfill
        exclusions_table = self._compute_inex_table(self.exclusions, n_threads)

        # create the boolean inclusions column
        # this is true only if all inclusions criteria are true
        exclusions_table = exclusions_table.mutate(
            BOOLEAN=ibis.greatest(
                *[exclusions_table[f"{x.name}_BOOLEAN"] for x in self.exclusions]
            )
        )
        self.exclusions_table = exclusions_table
        logger.debug("Exclusions table computed")
        return self.exclusions_table

    def _compute_inex_table(
        self, phenotypes: List["Phenotype"], n_threads: int
    ) -> Table:
        logger.debug("Computing inex table")
        """
        Compute the exclusion table from the individual exclusion phenotypes.

        Returns:
            Table: The join of all inclusion phenotypes together with a single "BOOLEAN"
            column that is the logical AND of all individual inclusion phenotypes
        """
        inex_table = self.entry_criterion.table.select(["PERSON_ID"])
        # execute all phenotypes and join the boolean column only
        with ThreadPoolExecutor(max_workers=n_threads) as executor:
            futures = [
                executor.submit(pt.execute, self.subset_tables_entry)
                for pt in phenotypes
            ]
            for future in futures:
                future.result()
        for pt in phenotypes:
            pt_table = pt.table.select(["PERSON_ID", "BOOLEAN"]).rename(
                **{
                    f"{pt.name}_BOOLEAN": "BOOLEAN",
                }
            )
            inex_table = inex_table.left_join(pt_table, ["PERSON_ID"])
            columns = inex_table.columns
            columns.remove("PERSON_ID_right")
            inex_table = inex_table.select(columns)

        # fill all nones with False
        boolean_columns = [col for col in inex_table.columns if "BOOLEAN" in col]
        for col in boolean_columns:
            inex_table = inex_table.mutate({col: inex_table[col].fill_null(False)})
        logger.debug("Inex table computed")
        return inex_table

    def _compute_characteristics_table(self, n_threads: int) -> Table:
        logger.debug("Computing characteristics table")
        """
        Retrieves and joins all characteristic tables.
        Meant only to be called internally from _execute() so that all dependent phenotypes
        have already been computed.

        Returns:
            Table: The join of all characteristic tables.
        """
        with ThreadPoolExecutor(max_workers=n_threads) as executor:
            futures = [
                executor.submit(c.execute, self.subset_tables_index)
                for c in self.characteristics
            ]
            for future in futures:
                future.result()
        self.characteristics_table = hstack(
            self.characteristics,
            join_table=self.index_table.select(["PERSON_ID", "EVENT_DATE"]),
        )
        logger.debug("Characteristics table computed")
        return self.characteristics_table

    def _compute_outcomes_table(self, n_threads: int) -> Table:
        logger.debug("Computing outcomes table")
        """
        Retrieves and joins all outcome tables. Meant only to be called internally from execute() so that all dependent phenotypes have already been computed.

        Returns:
            Table: The join of all outcome tables.
        """
        with ThreadPoolExecutor(max_workers=n_threads) as executor:
            futures = [
                executor.submit(o.execute, self.subset_tables_index)
                for o in self.outcomes
            ]
            for future in futures:
                future.result()
        self.outcomes_table = hstack(
            self.outcomes,
            join_table=self.index_table.select(["PERSON_ID", "EVENT_DATE"]),
        )
        logger.debug("Outcomes table computed")
        return self.outcomes_table

    @property
    def table1(self):
        if self._table1 is None:
            logger.debug("Generating Table1 report ...")
            reporter = Table1()
            self._table1 = reporter.execute(self)
            logger.debug("Table1 report generated.")
        return self._table1

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table appends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

_execute(tables)

Executes the phenotype processing logic.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary where the keys are table names and the values are Table objects.

required

Raises:

Type Description
NotImplementedError

This method should be implemented by subclasses.

Source code in phenex/phenotypes/phenotype.py
def _execute(self, tables: Dict[str, Table]):
    """
    Executes the phenotype processing logic.

    Args:
        tables (Dict[str, Table]): A dictionary where the keys are table names and the values are Table objects.

    Raises:
        NotImplementedError: This method should be implemented by subclasses.
    """
    raise NotImplementedError()

execute(tables, con=None, write_subset_tables=False, overwrite=False, n_threads=1)

The execute method executes the full cohort in order of computation. The order is entry criterion -> inclusion -> exclusion -> baseline characteristics. Tables are subset at two points, after entry criterion and after full inclusion/exclusion calculation to result in subset_entry data (contains all source data for patients that fulfill the entry criterion, with a possible index date) and subset_index data (contains all source data for patients that fulfill all in/ex criteria, with a set index date). Additionally, default reporters are executed such as table 1 for baseline characteristics.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary of table names to Table objects.

required
con SnowflakeConnector

A connection to Snowflake. Defaults to None. If passed, will write entry, inclusions, exclusions, index, characteristics and outcomes tables.

None
write_subset_tables bool

Whether to write subset tables (subset-entry and subset-index) in addition to the standard intermediate tables.

False
overwrite bool

Whether to overwrite existing tables when writing to disk.

False
n_threads int

Number of threads to use for parallel execution. Defaults to 1.

1

Returns:

Name Type Description
PhenotypeTable PhenotypeTable

The index table corresponding the cohort.

Source code in phenex/phenotypes/cohort.py
def execute(
    self,
    tables: Dict[str, Table],
    con: "SnowflakeConnector" = None,
    write_subset_tables=False,
    overwrite: bool = False,
    n_threads: int = 1,
) -> PhenotypeTable:
    """
    The execute method executes the full cohort in order of computation. The order is entry criterion -> inclusion -> exclusion -> baseline characteristics. Tables are subset at two points, after entry criterion and after full inclusion/exclusion calculation to result in subset_entry data (contains all source data for patients that fulfill the entry criterion, with a possible index date) and subset_index data (contains all source data for patients that fulfill all in/ex criteria, with a set index date). Additionally, default reporters are executed such as table 1 for baseline characteristics.

    Args:
        tables (Dict[str, Table]): A dictionary of table names to Table objects.
        con (SnowflakeConnector, optional): A connection to Snowflake. Defaults to None. If passed, will write entry, inclusions, exclusions, index, characteristics and outcomes tables.
        write_subset_tables (bool, optional): Whether to write subset tables (subset-entry and subset-index) in addition to the standard intermediate tables.
        overwrite (bool, optional): Whether to overwrite existing tables when writing to disk.
        n_threads (int, optional): Number of threads to use for parallel execution. Defaults to 1.

    Returns:
        PhenotypeTable: The index table corresponding the cohort.
    """
    logger.info(f"Executing cohort '{self.name}' with {n_threads} threads...")
    # Compute entry criterion
    logger.debug("Computing entry criterion ...")
    self.entry_criterion.execute(tables)
    if con:
        logger.debug("Writing entry table ...")
        self.entry_criterion.table = con.create_table(
            self.entry_criterion.table, f"{self.name}__entry", overwrite=overwrite
        )

    logger.debug("Entry criterion computed.")
    self.subset_tables_entry = subset_and_add_index_date(
        tables, self.entry_criterion.table
    )
    if write_subset_tables:
        logger.debug("Writing subset entry tables ...")
        with ThreadPoolExecutor(max_workers=n_threads) as executor:
            futures = {}
            for key, table in self.subset_tables_entry.items():
                futures[key] = executor.submit(
                    con.create_table,
                    table.table,
                    f"{self.name}__subset_entry_{key}",
                    overwrite,
                )
            for key, future in futures.items():
                self.subset_tables_entry[key] = type(self.subset_tables_entry[key])(
                    future.result()
                )

    index_table = self.entry_criterion.table

    # Apply inclusions if any
    if self.inclusions:
        logger.debug("Applying inclusions ...")
        self._compute_inclusions_table(n_threads)
        if con:
            logger.debug("Writing inclusions table ...")
            self.inclusions_table = con.create_table(
                self.inclusions_table,
                f"{self.name}__inclusions",
                overwrite=overwrite,
            )
        include = self.inclusions_table.filter(
            self.inclusions_table["BOOLEAN"] == True
        ).select(["PERSON_ID"])
        index_table = index_table.inner_join(include, ["PERSON_ID"])
        logger.debug("Inclusions applied.")

    # Apply exclusions if any
    if self.exclusions:
        logger.debug("Applying exclusions ...")
        self._compute_exclusions_table(n_threads)
        if con:
            logger.debug("Writing exclusions table ...")
            self.exclusions_table = con.create_table(
                self.exclusions_table,
                f"{self.name}__exclusions",
                overwrite=overwrite,
            )
        exclude = self.exclusions_table.filter(
            self.exclusions_table["BOOLEAN"] == False
        ).select(["PERSON_ID"])
        index_table = index_table.inner_join(exclude, ["PERSON_ID"])
        logger.debug("Exclusions applied.")

    self.index_table = index_table
    if con:
        logger.debug("Writing index table ...")
        self.index_table = con.create_table(
            index_table, f"{self.name}__index", overwrite=overwrite
        )

    self.subset_tables_index = subset_and_add_index_date(tables, self.index_table)
    if write_subset_tables:
        logger.debug("Writing subset index tables ...")
        with ThreadPoolExecutor(max_workers=n_threads) as executor:
            futures = {}
            for key, table in self.subset_tables_index.items():
                futures[key] = executor.submit(
                    con.create_table,
                    table.table,
                    f"{self.name}__subset_index_{key}",
                    overwrite,
                )
            for key, future in futures.items():
                self.subset_tables_index[key] = type(self.subset_tables_index[key])(
                    future.result()
                )

    if self.characteristics:
        logger.debug("Computing characteristics ...")
        self._compute_characteristics_table(n_threads)
        if con:
            logger.debug("Writing characteristics table ...")
            self.characteristics_table = con.create_table(
                self.characteristics_table,
                f"{self.name}__characteristics",
                overwrite=overwrite,
            )
        logger.debug("Characteristics computed.")
        _ = self.table1

    if self.outcomes:
        logger.debug("Computing outcomes ...")
        self._compute_outcomes_table(n_threads)
        logger.debug("Outcomes computed.")

    logger.info(f"Cohort '{self.name}' execution completed.")
    return index_table