Skip to content

LogicPhenotype

Bases: ComputationGraphPhenotype

LogicPhenotype is a composite phenotype that performs boolean operations using the boolean column of its component phenotypes and populations the boolean column of the resulting phenotype table. It should be used in any instance where multiple phenotypes are logically combined, for example, does a patient have diabetes AND hypertension, etc.

--> See the comparison table of CompositePhenotype classes

Parameters:

Name Type Description Default
expression ComputationGraph

The logical expression to be evaluated composed of phenotypes combined by python arithmetic operations.

required
return_date Union[str, Phenotype]

The date to be returned for the phenotype. Can be "first", "last", or a Phenotype object.

'first'
name str

The name of the phenotype.

None

Attributes:

Name Type Description
table PhenotypeTable

The resulting phenotype table after filtering (None until execute is called)

Source code in phenex/phenotypes/computation_graph_phenotypes.py
class LogicPhenotype(ComputationGraphPhenotype):
    """
    LogicPhenotype is a composite phenotype that performs boolean operations using the **boolean** column of its component phenotypes and populations the **boolean** column of the resulting phenotype table. It should be used in any instance where multiple phenotypes are logically combined, for example, does a patient have diabetes AND hypertension, etc.

    --> See the comparison table of CompositePhenotype classes

    Parameters:
        expression: The logical expression to be evaluated composed of phenotypes combined by python arithmetic operations.
        return_date: The date to be returned for the phenotype. Can be "first", "last", or a Phenotype object.
        name: The name of the phenotype.

    Attributes:
        table (PhenotypeTable): The resulting phenotype table after filtering (None until execute is called)
    """

    def __init__(
        self,
        expression: ComputationGraph,
        return_date: Union[str, Phenotype] = "first",
        name: str = None,
        **kwargs,
    ):
        super(LogicPhenotype, self).__init__(
            name=name,
            expression=expression,
            return_date=return_date,
            operate_on="boolean",
            populate="boolean",
            reduce=True,
        )

    def _execute(self, tables: Dict[str, Table]) -> PhenotypeTable:
        """
        Executes the logic phenotype processing logic.
        Unlike the base class, LogicPhenotype populates both BOOLEAN and VALUE columns.
        The VALUE is taken from the phenotype whose date is selected based on return_date.

        Args:
            tables (Dict[str, Table]): A dictionary where the keys are table names and the values are Table objects.

        Returns:
            PhenotypeTable: The resulting phenotype table containing the required columns.
        """
        joined_table = hstack(self.children, tables["PERSON"].select("PERSON_ID"))

        # Convert boolean columns to integers for arithmetic operations if needed
        if self.populate == "value" and self.operate_on == "boolean":
            for child in self.children:
                column_name = f"{child.name}_BOOLEAN"
                mutated_column = ibis.ifelse(
                    joined_table[column_name].isnull(),
                    0,
                    joined_table[column_name].cast("int"),
                ).cast("float")
                joined_table = joined_table.mutate(**{column_name: mutated_column})

        # Populate the BOOLEAN column using the logical expression
        _boolean_expression = self.expression.get_boolean_expression(
            joined_table, operate_on=self.operate_on
        )
        joined_table = joined_table.mutate(BOOLEAN=_boolean_expression)

        # Get date columns for determining which phenotype's value to use
        date_columns = self._coalesce_all_date_columns(joined_table)

        # Handle the "all" case separately since it returns a Union table
        if self.return_date == "all":
            joined_table = self._return_all_dates_with_value(joined_table, date_columns)
        else:
            # Determine the selected date and corresponding value for non-"all" cases
            if self.return_date == "first":
                selected_date = ibis.least(*date_columns)
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)
            elif self.return_date == "last":
                selected_date = ibis.greatest(*date_columns)
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)
            elif isinstance(self.return_date, Phenotype):
                selected_date = getattr(
                    joined_table, f"{self.return_date.name}_EVENT_DATE"
                )
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)
            else:
                selected_date = ibis.null(date)
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)

            # Populate the VALUE column with the value from the phenotype whose date matches the selected date
            value_cases = []
            for child in self.children:
                child_date_col = f"{child.name}_EVENT_DATE"
                child_value_col = f"{child.name}_VALUE"

                # Check if this child's date matches the selected date
                condition = getattr(joined_table, child_date_col) == selected_date
                value_cases.append((condition, getattr(joined_table, child_value_col)))

            # Build the CASE expression: when date matches, use that phenotype's value
            if value_cases:
                selected_value = ibis.case()
                for condition, value in value_cases:
                    selected_value = selected_value.when(condition, value)
                selected_value = selected_value.else_(ibis.null()).end()
                joined_table = joined_table.mutate(VALUE=selected_value)
            else:
                joined_table = joined_table.mutate(VALUE=ibis.null().cast("int32"))

        # Reduce the table to only include rows where the boolean column is True
        if self.reduce:
            joined_table = joined_table.filter(joined_table.BOOLEAN == True)

        # Select only the required phenotype columns
        return select_phenotype_columns(joined_table)

    def _return_all_dates_with_value(self, table, date_columns):
        """
        Custom version of _return_all_dates that properly handles VALUE column for LogicPhenotype.
        For each date column, creates a separate table with the correct VALUE populated, then unions them.

        Args:
            table: The Ibis table object (e.g., joined_table) that contains all leaf phenotypes stacked horizontally
            date_columns: List of base columns as ibis objects

        Returns:
            Ibis expression representing the UNION of all non null dates with proper VALUE columns.
        """
        # get all the non-null dates for each date column and populate VALUE correctly
        non_null_dates_by_date_col = []
        for date_col in date_columns:
            # Filter for non-null dates
            non_null_dates = table.filter(date_col.notnull()).mutate(
                EVENT_DATE=date_col
            )

            # For this specific date, find which phenotype's value to use
            value_cases = []
            for child in self.children:
                child_date_col = f"{child.name}_EVENT_DATE"
                child_value_col = f"{child.name}_VALUE"

                # Check if this child's date matches the current date
                condition = getattr(non_null_dates, child_date_col) == date_col
                value_cases.append(
                    (condition, getattr(non_null_dates, child_value_col))
                )

            # Build the CASE expression for this date
            if value_cases:
                selected_value = ibis.case()
                for condition, value in value_cases:
                    selected_value = selected_value.when(condition, value)
                selected_value = selected_value.else_(ibis.null()).end()
                non_null_dates = non_null_dates.mutate(VALUE=selected_value)
            else:
                non_null_dates = non_null_dates.mutate(VALUE=ibis.null().cast("int32"))

            non_null_dates_by_date_col.append(non_null_dates)

        # do the union of all the non-null dates
        all_dates = non_null_dates_by_date_col[0]
        for non_null_dates in non_null_dates_by_date_col[1:]:
            all_dates = all_dates.union(non_null_dates)

        # Select only the required phenotype columns
        from phenex.phenotypes.functions import select_phenotype_columns

        return select_phenotype_columns(all_dates)

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Use multithreaded execution if we have multiple children and n_threads > 1
    if len(self.children) > 1 and n_threads > 1:
        return self._execute_multithreaded(
            tables, con, overwrite, lazy_execution, n_threads
        )
    else:
        return self._execute_sequential(tables, con, overwrite, lazy_execution)

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)