Skip to content

Phenotype

Bases: Node

A phenotype is a description of the state of a person at a specific time.

In Phenex, phenotypes are implemented using the Phenotype class. The Phenotype class is designed so that there is clear separation between the "what" from the "how". The "what" is expressed in the Phenotype init function: what codelists to use, what time range to include, constraints relative to other Phenotype's, visit detail information to include, etc. The "what" is meant to mirror how we normally talk about real-world data studies.

The translation of this description in actual executable code (the "how") is handled via the Phenotype.execute() method. The execute method returns a PhenotypeTable - the realization of the defined Phenotype in a particular database. See execute() for details.

All Phenotype's in Phenex derive from the Phenotype class. To subclass, see documentation for Node.

Parameters:

Name Type Description Default
description str

A plain text description of the phenotype.

None
kwargs

For additional parameters, see Node.

{}
Source code in phenex/phenotypes/phenotype.py
class Phenotype(Node):
    """
    A phenotype is a description of the state of a person at a specific time.

    In Phenex, phenotypes are implemented using the Phenotype class. The Phenotype class is designed so that there is clear separation between the "what" from the "how". The "what" is expressed in the Phenotype init function: what codelists to use, what time range to include, constraints relative to other Phenotype's, visit detail information to include, etc. The "what" is meant to mirror how we normally talk about real-world data studies.

    The translation of this description in actual executable code (the "how") is handled via the `Phenotype.execute()` method. The execute method returns a PhenotypeTable - the realization of the defined Phenotype in a particular database. See `execute()` for details.

    All Phenotype's in Phenex derive from the Phenotype class. To subclass, see documentation for Node.

    Parameters:
        description: A plain text description of the phenotype.
        kwargs: For additional parameters, see Node.
    """

    def __init__(self, description: str = None, **kwargs):
        self.description = description
        super(Phenotype, self).__init__(**kwargs)

    def _perform_final_processing(self, table: Table) -> Table:
        """
        Post process a Table before writing to disk to enforce that the table is actually a PhenotypeTable.
        """
        # post-processing specific to Phenotype Nodes
        table = table.mutate(BOOLEAN=True)

        if not set(PHENOTYPE_TABLE_COLUMNS) <= set(table.columns):
            raise ValueError(
                f"Phenotype {self.name} must return columns {PHENOTYPE_TABLE_COLUMNS}. Found {table.columns}."
            )

        self.table = table.select(PHENOTYPE_TABLE_COLUMNS)
        # for some reason, having NULL datatype screws up writing the table to disk; here we make explicit cast
        if type(self.table.schema()["VALUE"]) == ibis.expr.datatypes.core.Null:
            self.table = self.table.cast({"VALUE": "float64"})

        assert is_phenex_phenotype_table(self.table)
        return self.table

    @property
    def namespaced_table(self) -> Table:
        """
        A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

        Returns:
            table (Table): The namespaced table for the current phenotype.
        """
        if self.table is None:
            raise ValueError("Phenotype has not been executed yet.")
        # since phenotypes may be executed multiple times (in an interactive setting for example), we must always get the namespaced table freshly from self.table
        new_column_names = {
            "PERSON_ID": "PERSON_ID",
            f"{self.name}_BOOLEAN": "BOOLEAN",
            f"{self.name}_EVENT_DATE": "EVENT_DATE",
            f"{self.name}_VALUE": "VALUE",
        }
        return self.table.rename(new_column_names)

    def _execute(self, tables: Dict[str, Table]):
        """
        Executes the phenotype processing logic.

        Args:
            tables (Dict[str, Table]): A dictionary where the keys are table names and the values are Table objects.

        Raises:
            NotImplementedError: This method should be implemented by subclasses.
        """
        raise NotImplementedError()

    def __add__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "+")

    def __radd__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "+")

    def __sub__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "-")

    def __mul__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "*")

    def __rmul__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "*")

    def __truediv__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "/")

    def __and__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "&")

    def __or__(
        self, other: Union["Phenotype", "ComputationGraph"]
    ) -> "ComputationGraph":
        return ComputationGraph(self, other, "|")

    def __invert__(self) -> "ComputationGraph":
        return ComputationGraph(self, None, "~")

    def get_codelists(self, to_pandas=False):
        codelists = []
        for child in self.children:
            codelists.extend(child.get_codelists())

        if to_pandas:
            import pandas as pd

            return pd.concat([x.to_pandas() for x in codelists]).drop_duplicates()
        return codelists

    def to_dict(self):
        return to_dict(self)

    @property
    def display_name(self):
        return self.name.replace("_", " ").lower().capitalize()

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Use multithreaded execution if we have multiple children and n_threads > 1
    if len(self.children) > 1 and n_threads > 1:
        return self._execute_multithreaded(
            tables, con, overwrite, lazy_execution, n_threads
        )
    else:
        return self._execute_sequential(tables, con, overwrite, lazy_execution)

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)