Skip to content

Cohort

The Cohort computes a cohort of individuals based on specified entry criteria, inclusions, exclusions, and computes baseline characteristics and outcomes from the extracted index dates.

Parameters:

Name Type Description Default
name str

A descriptive name for the cohort.

required
entry_criterion Phenotype

The phenotype used to define index date for the cohort.

required
inclusions Optional[List[Phenotype]]

A list of phenotypes that must evaluate to True for patients to be included in the cohort.

None
exclusions Optional[List[Phenotype]]

A list of phenotypes that must evaluate to False for patients to be included in the cohort.

None
characteristics Optional[List[Phenotype]]

A list of phenotypes representing baseline characteristics of the cohort to be computed for all patients passing the inclusion and exclusion criteria.

None
outcomes Optional[List[Phenotype]]

A list of phenotypes representing outcomes of the cohort.

None

Attributes:

Name Type Description
table PhenotypeTable

The resulting index table after filtering (None until execute is called)

inclusions_table Table

The patient-level result of all inclusion criteria calculations (None until execute is called)

exclusions_table Table

The patient-level result of all exclusion criteria calculations (None until execute is called)

characteristics_table Table

The patient-level result of all baseline characteristics caclulations. (None until execute is called)

outcomes_table Table

The patient-level result of all outcomes caclulations. (None until execute is called)

subset_tables_entry Dict[str, PhenexTable]

Tables that have been subset by those patients satisfying the entry criterion.

subset_tables_index Dict[str, PhenexTable]

Tables that have been subset by those patients satisfying the entry, inclusion and exclusion criteria.

Source code in phenex/phenotypes/cohort.py
class Cohort:
    """
    The Cohort computes a cohort of individuals based on specified entry criteria, inclusions, exclusions, and computes baseline characteristics and outcomes from the extracted index dates.

    Parameters:
        name: A descriptive name for the cohort.
        entry_criterion: The phenotype used to define index date for the cohort.
        inclusions: A list of phenotypes that must evaluate to True for patients to be included in the cohort.
        exclusions: A list of phenotypes that must evaluate to False for patients to be included in the cohort.
        characteristics: A list of phenotypes representing baseline characteristics of the cohort to be computed for all patients passing the inclusion and exclusion criteria.
        outcomes: A list of phenotypes representing outcomes of the cohort.

    Attributes:
        table (PhenotypeTable): The resulting index table after filtering (None until execute is called)
        inclusions_table (Table): The patient-level result of all inclusion criteria calculations (None until execute is called)
        exclusions_table (Table): The patient-level result of all exclusion criteria calculations (None until execute is called)
        characteristics_table (Table): The patient-level result of all baseline characteristics caclulations. (None until execute is called)
        outcomes_table (Table): The patient-level result of all outcomes caclulations. (None until execute is called)
        subset_tables_entry (Dict[str, PhenexTable]): Tables that have been subset by those patients satisfying the entry criterion.
        subset_tables_index (Dict[str, PhenexTable]): Tables that have been subset by those patients satisfying the entry, inclusion and exclusion criteria.
    """

    def __init__(
        self,
        name: str,
        entry_criterion: Phenotype,
        inclusions: Optional[List[Phenotype]] = None,
        exclusions: Optional[List[Phenotype]] = None,
        characteristics: Optional[List[Phenotype]] = None,
        derived_tables: Optional[List["DerivedTable"]] = None,
        outcomes: Optional[List[Phenotype]] = None,
    ):
        self.name = name
        self.table = None  # Will be set during execution
        self.subset_tables_index = None  # Will be set during execution
        self.entry_criterion = entry_criterion
        self.inclusions = inclusions if inclusions is not None else []
        self.exclusions = exclusions if exclusions is not None else []
        self.characteristics = characteristics if characteristics is not None else []
        self.derived_tables = derived_tables if derived_tables is not None else []
        self.outcomes = outcomes if outcomes is not None else []

        #
        # Entry stage
        #
        self.subset_tables_entry_nodes = self._get_subset_tables_nodes(
            stage="subset_entry", index_phenotype=entry_criterion
        )
        self.entry_stage = NodeGroup(name="entry", nodes=self.subset_tables_entry_nodes)

        #
        # Derived tables stage
        #
        self.derived_tables_stage = None
        if derived_tables:
            self.derived_tables_stage = NodeGroup(
                name="entry", nodes=self.derived_tables
            )

        #
        # Index stage
        #
        self.inclusions_table_node = None
        self.exclusions_table_node = None
        index_nodes = []
        if inclusions:
            self.inclusions_table_node = InclusionsTableNode(
                name=f"{self.name}__inclusions".upper(),
                index_phenotype=self.entry_criterion,
                phenotypes=self.inclusions,
            )
            index_nodes.append(self.inclusions_table_node)
        if exclusions:
            self.exclusions_table_node = ExclusionsTableNode(
                name=f"{self.name}__exclusions".upper(),
                index_phenotype=self.entry_criterion,
                phenotypes=self.exclusions,
            )
            index_nodes.append(self.exclusions_table_node)

        self.index_table_node = IndexTableNode(
            f"{self.name}__index".upper(),
            entry_phenotype=self.entry_criterion,
            inclusion_table_node=self.inclusions_table_node,
            exclusion_table_node=self.exclusions_table_node,
        )
        index_nodes.append(self.index_table_node)
        self.subset_tables_index_nodes = self._get_subset_tables_nodes(
            stage="subset_index", index_phenotype=self.index_table_node
        )
        self.index_stage = NodeGroup(
            name="index",
            nodes=self.subset_tables_index_nodes + index_nodes,
        )

        #
        # Post-index / reporting stage
        #
        # Create HStackNodes for characteristics and outcomes
        self.characteristics_table_node = None
        self.outcomes_table_node = None
        reporting_nodes = []
        if self.characteristics:
            self.characteristics_table_node = HStackNode(
                name=f"{self.name}__characteristics".upper(),
                phenotypes=self.characteristics,
            )
            reporting_nodes.append(self.characteristics_table_node)
        if self.outcomes:
            self.outcomes_table_node = HStackNode(
                name=f"{self.name}__outcomes".upper(), phenotypes=self.outcomes
            )
            reporting_nodes.append(self.outcomes_table_node)

        self.reporting_stage = NodeGroup(name="reporting", nodes=reporting_nodes)

        self._table1 = None

        # Validate that all nodes are unique across all stages
        self._validate_node_uniqueness()

        logger.info(
            f"Cohort '{self.name}' initialized with entry criterion '{self.entry_criterion.name}'"
        )

    def _get_domains(self):
        """
        Get a list of all domains used by any phenotype in this cohort.
        """
        top_level_nodes = (
            [self.entry_criterion]
            + self.inclusions
            + self.exclusions
            + self.characteristics
            + self.outcomes
        )
        all_nodes = top_level_nodes + sum([t.dependencies for t in top_level_nodes], [])
        # FIXME Person domain should not be HARD CODED; however, it IS hardcoded in SCORE phenotype. Remove hardcoding!
        domains = list(
            set(
                ["PERSON"]
                + [
                    getattr(pt, "domain", None)
                    for pt in all_nodes
                    if getattr(pt, "domain", None) is not None
                ]
            )
        )
        return domains

    def _get_subset_tables_nodes(self, stage: str, index_phenotype: Phenotype):
        """
        Get the nodes for subsetting tables for all domains in this cohort subsetting by the given index_phenotype.
        """
        domains = self._get_domains()
        return [
            SubsetTable(
                name=f"{self.name}__{stage}_{domain}".upper(),
                domain=domain,
                index_phenotype=index_phenotype,
            )
            for domain in domains
        ]

    @property
    def inclusions_table(self):
        if self.inclusions_table_node:
            return self.inclusions_table_node.table

    @property
    def exclusions_table(self):
        if self.exclusions_table_node:
            return self.exclusions_table_node.table

    @property
    def index_table(self):
        return self.index_table_node.table

    @property
    def characteristics_table(self):
        if self.characteristics_table_node:
            return self.characteristics_table_node.table

    @property
    def outcomes_table(self):
        if self.outcomes_table_node:
            return self.outcomes_table_node.table

    def get_subset_tables_entry(self, tables):
        """
        Get the PhenexTable from the ibis Table for subsetting tables for all domains in this cohort subsetting by the given entry_phenotype.
        """
        subset_tables_entry = {}
        for node in self.subset_tables_entry_nodes:
            subset_tables_entry[node.domain] = type(tables[node.domain])(node.table)
        return subset_tables_entry

    def get_subset_tables_index(self, tables):
        """
        Get the PhenexTable from the ibis Table for subsetting tables for all domains in this cohort subsetting by the given index_phenotype.
        """
        subset_tables_index = {}
        for node in self.subset_tables_index_nodes:
            subset_tables_index[node.domain] = type(tables[node.domain])(node.table)
        return subset_tables_index

    def execute(
        self,
        tables,
        con: Optional["SnowflakeConnector"] = None,
        overwrite: Optional[bool] = False,
        n_threads: Optional[int] = 1,
        lazy_execution: Optional[bool] = False,
    ):
        """
        The execute method executes the full cohort in order of computation. The order is entry criterion -> inclusion -> exclusion -> baseline characteristics. Tables are subset at two points, after entry criterion and after full inclusion/exclusion calculation to result in subset_entry data (contains all source data for patients that fulfill the entry criterion, with a possible index date) and subset_index data (contains all source data for patients that fulfill all in/ex criteria, with a set index date). Additionally, default reporters are executed such as table 1 for baseline characteristics.

        Parameters:
            tables: A dictionary mapping domains to Table objects
            con: Database connector for materializing outputs
            overwrite: Whether to overwrite existing tables
            lazy_execution: Whether to use lazy execution with change detection
            n_threads: Max number of jobs to run simultaneously.

        Returns:
            PhenotypeTable: The index table corresponding the cohort.
        """
        if self.derived_tables_stage:
            logger.info(f"Cohort '{self.name}': executing derived tables stage ...")
            self.derived_tables_stage.execute(
                tables=tables,
                con=con,
                overwrite=overwrite,
                n_threads=n_threads,
                lazy_execution=lazy_execution,
            )
            logger.info(f"Cohort '{self.name}': completed derived tables stage.")
            for node in self.derived_tables:
                tables[node.name] = PhenexTable(node.table)

        logger.info(f"Cohort '{self.name}': executing entry stage ...")

        self.entry_stage.execute(
            tables=tables,
            con=con,
            overwrite=overwrite,
            n_threads=n_threads,
            lazy_execution=lazy_execution,
        )
        tables = self.get_subset_tables_entry(tables)

        logger.info(f"Cohort '{self.name}': completed entry stage.")
        logger.info(f"Cohort '{self.name}': executing index stage ...")

        self.index_stage.execute(
            tables=tables,
            con=con,
            overwrite=overwrite,
            n_threads=n_threads,
            lazy_execution=lazy_execution,
        )
        self.table = self.index_table_node.table

        logger.info(f"Cohort '{self.name}': completed index stage.")
        logger.info(f"Cohort '{self.name}': executing reporting stage ...")

        self.subset_tables_index = tables = self.get_subset_tables_index(tables)
        self.reporting_stage.execute(
            tables=tables,
            con=con,
            overwrite=overwrite,
            n_threads=n_threads,
            lazy_execution=lazy_execution,
        )

        return self.index_table

    # FIXME this should be implmemented as a ComputeNode and added to the graph
    @property
    def table1(self):
        if self._table1 is None:
            logger.debug("Generating Table1 report ...")
            reporter = Table1()
            self._table1 = reporter.execute(self)
            logger.debug("Table1 report generated.")
        return self._table1

    def to_dict(self):
        """
        Return a dictionary representation of the Node. The dictionary must contain all dependencies of the Node such that if anything in self.to_dict() changes, the Node must be recomputed.
        """
        return to_dict(self)

    def _validate_node_uniqueness(self):
        """
        Validate that all nodes and dependencies are unique according to the rule:
        node1.name == node2.name implies hash(node1) == hash(node2)

        This ensures that nodes with the same name have identical parameters (same hash).
        """
        name_to_hash = {}

        # Collect all nodes from all stages
        all_nodes = []

        # Add nodes from entry stage
        if hasattr(self, "entry_stage") and self.entry_stage:
            all_nodes += list(self.entry_stage.dependencies)

        # Add nodes from derived tables stage
        if hasattr(self, "derived_tables_stage") and self.derived_tables_stage:
            all_nodes += list(self.derived_tables_stage.dependencies)

        # Add nodes from index stage
        if hasattr(self, "index_stage") and self.index_stage:
            all_nodes += list(self.index_stage.dependencies)

        # Add nodes from reporting stage
        if hasattr(self, "reporting_stage") and self.reporting_stage:
            all_nodes += list(self.reporting_stage.dependencies)

        for node in all_nodes:
            node_name = node.name
            node_hash = hash(node)

            # Check if we've seen this name before
            if node_name in name_to_hash:
                existing_hash = name_to_hash[node_name]
                if existing_hash != node_hash:
                    raise ValueError(
                        f"Duplicate node name found: '{node_name}'."
                        f"Nodes with the same name must have identical parameters."
                    )
            else:
                existing_hash = None
                name_to_hash[node_name] = node_hash

execute(tables, con=None, overwrite=False, n_threads=1, lazy_execution=False)

The execute method executes the full cohort in order of computation. The order is entry criterion -> inclusion -> exclusion -> baseline characteristics. Tables are subset at two points, after entry criterion and after full inclusion/exclusion calculation to result in subset_entry data (contains all source data for patients that fulfill the entry criterion, with a possible index date) and subset_index data (contains all source data for patients that fulfill all in/ex criteria, with a set index date). Additionally, default reporters are executed such as table 1 for baseline characteristics.

Parameters:

Name Type Description Default
tables

A dictionary mapping domains to Table objects

required
con Optional[SnowflakeConnector]

Database connector for materializing outputs

None
overwrite Optional[bool]

Whether to overwrite existing tables

False
lazy_execution Optional[bool]

Whether to use lazy execution with change detection

False
n_threads Optional[int]

Max number of jobs to run simultaneously.

1

Returns:

Name Type Description
PhenotypeTable

The index table corresponding the cohort.

Source code in phenex/phenotypes/cohort.py
def execute(
    self,
    tables,
    con: Optional["SnowflakeConnector"] = None,
    overwrite: Optional[bool] = False,
    n_threads: Optional[int] = 1,
    lazy_execution: Optional[bool] = False,
):
    """
    The execute method executes the full cohort in order of computation. The order is entry criterion -> inclusion -> exclusion -> baseline characteristics. Tables are subset at two points, after entry criterion and after full inclusion/exclusion calculation to result in subset_entry data (contains all source data for patients that fulfill the entry criterion, with a possible index date) and subset_index data (contains all source data for patients that fulfill all in/ex criteria, with a set index date). Additionally, default reporters are executed such as table 1 for baseline characteristics.

    Parameters:
        tables: A dictionary mapping domains to Table objects
        con: Database connector for materializing outputs
        overwrite: Whether to overwrite existing tables
        lazy_execution: Whether to use lazy execution with change detection
        n_threads: Max number of jobs to run simultaneously.

    Returns:
        PhenotypeTable: The index table corresponding the cohort.
    """
    if self.derived_tables_stage:
        logger.info(f"Cohort '{self.name}': executing derived tables stage ...")
        self.derived_tables_stage.execute(
            tables=tables,
            con=con,
            overwrite=overwrite,
            n_threads=n_threads,
            lazy_execution=lazy_execution,
        )
        logger.info(f"Cohort '{self.name}': completed derived tables stage.")
        for node in self.derived_tables:
            tables[node.name] = PhenexTable(node.table)

    logger.info(f"Cohort '{self.name}': executing entry stage ...")

    self.entry_stage.execute(
        tables=tables,
        con=con,
        overwrite=overwrite,
        n_threads=n_threads,
        lazy_execution=lazy_execution,
    )
    tables = self.get_subset_tables_entry(tables)

    logger.info(f"Cohort '{self.name}': completed entry stage.")
    logger.info(f"Cohort '{self.name}': executing index stage ...")

    self.index_stage.execute(
        tables=tables,
        con=con,
        overwrite=overwrite,
        n_threads=n_threads,
        lazy_execution=lazy_execution,
    )
    self.table = self.index_table_node.table

    logger.info(f"Cohort '{self.name}': completed index stage.")
    logger.info(f"Cohort '{self.name}': executing reporting stage ...")

    self.subset_tables_index = tables = self.get_subset_tables_index(tables)
    self.reporting_stage.execute(
        tables=tables,
        con=con,
        overwrite=overwrite,
        n_threads=n_threads,
        lazy_execution=lazy_execution,
    )

    return self.index_table

get_subset_tables_entry(tables)

Get the PhenexTable from the ibis Table for subsetting tables for all domains in this cohort subsetting by the given entry_phenotype.

Source code in phenex/phenotypes/cohort.py
def get_subset_tables_entry(self, tables):
    """
    Get the PhenexTable from the ibis Table for subsetting tables for all domains in this cohort subsetting by the given entry_phenotype.
    """
    subset_tables_entry = {}
    for node in self.subset_tables_entry_nodes:
        subset_tables_entry[node.domain] = type(tables[node.domain])(node.table)
    return subset_tables_entry

get_subset_tables_index(tables)

Get the PhenexTable from the ibis Table for subsetting tables for all domains in this cohort subsetting by the given index_phenotype.

Source code in phenex/phenotypes/cohort.py
def get_subset_tables_index(self, tables):
    """
    Get the PhenexTable from the ibis Table for subsetting tables for all domains in this cohort subsetting by the given index_phenotype.
    """
    subset_tables_index = {}
    for node in self.subset_tables_index_nodes:
        subset_tables_index[node.domain] = type(tables[node.domain])(node.table)
    return subset_tables_index

to_dict()

Return a dictionary representation of the Node. The dictionary must contain all dependencies of the Node such that if anything in self.to_dict() changes, the Node must be recomputed.

Source code in phenex/phenotypes/cohort.py
def to_dict(self):
    """
    Return a dictionary representation of the Node. The dictionary must contain all dependencies of the Node such that if anything in self.to_dict() changes, the Node must be recomputed.
    """
    return to_dict(self)