Skip to content

LogicPhenotype

Bases: ComputationGraphPhenotype

LogicPhenotype is a composite phenotype that performs boolean operations using the boolean column of its component phenotypes and populations the boolean column of the resulting phenotype table. It should be used in any instance where multiple phenotypes are logically combined, for example, does a patient have diabetes AND hypertension, etc.

--> See the comparison table of CompositePhenotype classes

Parameters:

Name Type Description Default
expression ComputationGraph

The logical expression to be evaluated composed of phenotypes combined by python arithmetic operations.

required
return_date Union[str, Phenotype]

The date to be returned for the phenotype. Can be "first", "last", or a Phenotype object.

'first'
name str

The name of the phenotype.

None

Attributes:

Name Type Description
table PhenotypeTable

The resulting phenotype table after filtering (None until execute is called)

Source code in phenex/phenotypes/computation_graph_phenotypes.py
class LogicPhenotype(ComputationGraphPhenotype):
    """
    LogicPhenotype is a composite phenotype that performs boolean operations using the **boolean** column of its component phenotypes and populations the **boolean** column of the resulting phenotype table. It should be used in any instance where multiple phenotypes are logically combined, for example, does a patient have diabetes AND hypertension, etc.

    --> See the comparison table of CompositePhenotype classes

    Parameters:
        expression: The logical expression to be evaluated composed of phenotypes combined by python arithmetic operations.
        return_date: The date to be returned for the phenotype. Can be "first", "last", or a Phenotype object.
        name: The name of the phenotype.

    Attributes:
        table (PhenotypeTable): The resulting phenotype table after filtering (None until execute is called)
    """

    def __init__(
        self,
        expression: ComputationGraph,
        return_date: Union[str, Phenotype] = "first",
        name: str = None,
        **kwargs,
    ):
        # Remove keys that we set explicitly to avoid duplicate keyword arguments
        kwargs = {
            k: v
            for k, v in kwargs.items()
            if k not in ("operate_on", "populate", "reduce")
        }

        super(LogicPhenotype, self).__init__(
            name=name,
            expression=expression,
            return_date=return_date,
            operate_on="boolean",
            populate="boolean",
            reduce=True,
            **kwargs,
        )

    def _get_value_column_types(self, table):
        """
        Inspect the VALUE columns of child phenotypes and categorize their types.

        Args:
            table: The Ibis table containing child phenotype columns

        Returns:
            set: A set of type categories ('numeric', 'string', 'other')
        """
        schema = table.schema()
        value_col_types = set()
        for child in self.children:
            child_value_col = f"{child.name}_VALUE"
            if child_value_col in schema:
                col_type = schema[child_value_col]
                # Categorize as numeric, string, or other
                if col_type.is_numeric():
                    value_col_types.add("numeric")
                elif col_type.is_string():
                    value_col_types.add("string")
                else:
                    value_col_types.add("other")
        return value_col_types

    def _execute(self, tables: Dict[str, Table]) -> PhenotypeTable:
        """
        Executes the logic phenotype processing logic.
        Unlike the base class, LogicPhenotype populates both BOOLEAN and VALUE columns.
        The VALUE is taken from the phenotype whose date is selected based on return_date.

        Args:
            tables (Dict[str, Table]): A dictionary where the keys are table names and the values are Table objects.

        Returns:
            PhenotypeTable: The resulting phenotype table containing the required columns.
        """
        joined_table = hstack(self.children, tables["PERSON"].select("PERSON_ID"))
        # Convert boolean columns to integers for arithmetic operations if needed
        if self.populate == "value" and self.operate_on == "boolean":
            for child in self.children:
                column_name = f"{child.name}_BOOLEAN"
                mutated_column = ibis.ifelse(
                    joined_table[column_name].isnull(),
                    0,
                    joined_table[column_name].cast("int"),
                ).cast("float")
                joined_table = joined_table.mutate(**{column_name: mutated_column})

        # Populate the BOOLEAN column using the logical expression
        _boolean_expression = self.expression.get_boolean_expression(
            joined_table, operate_on=self.operate_on
        )
        joined_table = joined_table.mutate(BOOLEAN=_boolean_expression)

        # Get date columns for determining which phenotype's value to use
        date_columns = self._coalesce_all_date_columns(joined_table)

        # Handle the "all" case separately since it returns a Union table
        if self.return_date == "all":
            joined_table = self._return_all_dates_with_value(joined_table, date_columns)
        else:
            # Determine the selected date and corresponding value for non-"all" cases
            if self.return_date == "first":
                selected_date = ibis.least(*date_columns)
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)
            elif self.return_date == "last":
                selected_date = ibis.greatest(*date_columns)
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)
            elif isinstance(self.return_date, Phenotype):
                selected_date = getattr(
                    joined_table, f"{self.return_date.name}_EVENT_DATE"
                )
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)
            else:
                selected_date = ibis.null(date)
                joined_table = joined_table.mutate(EVENT_DATE=selected_date)

            # the least and greatest operation select the first/last event between the children phenotypes. however, if one of the child phenotypes returns more than one event, we need to select the first/last from among those (i.e. the child has return_date = 'all', which is necessary for and operations)
            joined_table = self._perform_date_selection(joined_table)

            # Check data types of VALUE columns to determine how to handle them
            value_col_types = self._get_value_column_types(joined_table)

            # If mixed types (e.g., string and numeric), set VALUE to NULL to avoid type conflicts
            # Otherwise, use the appropriate type
            if len(value_col_types) > 1:
                # Mixed types - use string NULL as a common denominator
                joined_table = joined_table.mutate(VALUE=ibis.null().cast("string"))
            else:
                # Homogeneous types - populate VALUE with the phenotype whose date matches
                value_cases = []
                for child in self.children:
                    child_date_col = f"{child.name}_EVENT_DATE"
                    child_value_col = f"{child.name}_VALUE"

                    # Check if this child's date matches the selected date
                    condition = getattr(joined_table, child_date_col) == selected_date
                    value_cases.append(
                        (condition, getattr(joined_table, child_value_col))
                    )

                # Build the CASE expression: when date matches, use that phenotype's value
                if value_cases:
                    selected_value = ibis.case()
                    for condition, value in value_cases:
                        selected_value = selected_value.when(condition, value)

                    # Use appropriate null type based on the homogeneous type
                    if "numeric" in value_col_types:
                        selected_value = selected_value.else_(
                            ibis.null().cast("int32")
                        ).end()
                    elif "string" in value_col_types:
                        selected_value = selected_value.else_(
                            ibis.null().cast("string")
                        ).end()
                    else:
                        # Fallback to int32 for unknown types
                        selected_value = selected_value.else_(
                            ibis.null().cast("int32")
                        ).end()

                    joined_table = joined_table.mutate(VALUE=selected_value)
                else:
                    # No value cases - use int32 as default
                    joined_table = joined_table.mutate(VALUE=ibis.null().cast("int32"))

        # Reduce the table to only include rows where the boolean column is True
        if self.reduce:
            joined_table = joined_table.filter(joined_table.BOOLEAN == True)

        # Select only the required phenotype columns
        return select_phenotype_columns(joined_table).distinct()

    def _return_all_dates_with_value(self, table, date_columns):
        """
        Custom version of _return_all_dates that properly handles VALUE column for LogicPhenotype.
        For each date column, creates a separate table with the correct VALUE populated, then unions them.

        Args:
            table: The Ibis table object (e.g., joined_table) that contains all leaf phenotypes stacked horizontally
            date_columns: List of base columns as ibis objects

        Returns:
            Ibis expression representing the UNION of all non null dates with proper VALUE columns.
        """
        # Check if component phenotypes are of mixed VALUE types (i.e. one has a string value, the other a numeric) to determine how to handle them
        value_col_types = self._get_value_column_types(table)

        # get all the non-null dates for each date column and populate VALUE correctly
        non_null_dates_by_date_col = []
        for date_col in date_columns:
            # Filter for non-null dates
            non_null_dates = table.filter(date_col.notnull()).mutate(
                EVENT_DATE=date_col
            )

            # If component phenotypes are of mixed VALUE types (i.e. one has a string value, the other a numeric), set VALUE to NULL; otherwise populate appropriately
            if len(value_col_types) > 1:
                # Mixed types - use string NULL as a common denominator
                non_null_dates = non_null_dates.mutate(VALUE=ibis.null().cast("string"))
            else:
                # For this specific date, find which phenotype's value to use
                value_cases = []
                for child in self.children:
                    child_date_col = f"{child.name}_EVENT_DATE"
                    child_value_col = f"{child.name}_VALUE"

                    # Check if this child's date matches the current date
                    condition = getattr(non_null_dates, child_date_col) == date_col
                    value_cases.append(
                        (condition, getattr(non_null_dates, child_value_col))
                    )

                # Build the CASE expression for this date
                if value_cases:
                    selected_value = ibis.case()
                    for condition, value in value_cases:
                        selected_value = selected_value.when(condition, value)

                    # Use appropriate null type based on the homogeneous type
                    if "numeric" in value_col_types:
                        selected_value = selected_value.else_(
                            ibis.null().cast("int32")
                        ).end()
                    elif "string" in value_col_types:
                        selected_value = selected_value.else_(
                            ibis.null().cast("string")
                        ).end()
                    else:
                        # Fallback to int32 for unknown types
                        selected_value = selected_value.else_(
                            ibis.null().cast("int32")
                        ).end()

                    non_null_dates = non_null_dates.mutate(VALUE=selected_value)
                else:
                    # No value cases - use int32 as default
                    non_null_dates = non_null_dates.mutate(
                        VALUE=ibis.null().cast("int32")
                    )

            non_null_dates_by_date_col.append(non_null_dates)

        # do the union of all the non-null dates
        all_dates = non_null_dates_by_date_col[0]
        for non_null_dates in non_null_dates_by_date_col[1:]:
            all_dates = all_dates.union(non_null_dates)

        # Select only the required phenotype columns
        from phenex.phenotypes.functions import select_phenotype_columns

        return select_phenotype_columns(all_dates)

    def repr_short(self, level=0):
        indent = "  " * level
        s = f"Defined as the logical expression : {str(self.expression)}"
        s += self.repr_short_for_all_children(level + 1)

        return f"{indent}- **{self.display_name}** : {s}"

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

execution_metadata property

Retrieve the full execution metadata row for this node from the local DuckDB database.

Returns:

Type Description

pandas.DataFrame: A table containing NODE_NAME, NODE_HASH, NODE_PARAMS, EXECUTION_PARAMS, EXECUTION_START_TIME, EXECUTION_END_TIME, and EXECUTION_DURATION for execution of this node, or None if the node has never been executed.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

clear_cache(con=None, recursive=False)

Clear the cached state for this node, forcing re-execution on the next call to execute().

This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.

Parameters:

Name Type Description Default
con Optional[object]

Database connector. If provided, clears only runs with matching execution context and drops the materialized table. If None, clears all runs for the node.

None
recursive bool

If True, also clear the cache for all child nodes recursively. Defaults to False.

False
Example
# Clear all cached runs for a single node
my_node.clear_cache()

# Clear runs with specific execution context and drop materialized table
my_node.clear_cache(con=my_connector)

# Clear cache for node and all its dependencies
my_node.clear_cache(recursive=True)
Source code in phenex/node.py
def clear_cache(self, con: Optional[object] = None, recursive: bool = False):
    """
    Clear the cached state for this node, forcing re-execution on the next call to execute().

    This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.

    Parameters:
        con: Database connector. If provided, clears only runs with matching execution context and drops the materialized table. If None, clears all runs for the node.
        recursive: If True, also clear the cache for all child nodes recursively. Defaults to False.

    Example:
        ```python
        # Clear all cached runs for a single node
        my_node.clear_cache()

        # Clear runs with specific execution context and drop materialized table
        my_node.clear_cache(con=my_connector)

        # Clear cache for node and all its dependencies
        my_node.clear_cache(recursive=True)
        ```
    """
    # Delegate all logic to NodeManager
    return Node._node_manager.clear_cache(self, con=con, recursive=recursive)

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies.

Lazy Execution

When lazy_execution=True, nodes are only recomputed if changes are detected. The system tracks: 1. Node definition changes: Detected by hashing the node's parameters (from to_dict()) and class name 2. Execution environment changes: Detected by tracking source/destination database configurations

A node will be rerun if either: - The node's defining parameters have changed (different hash than last execution) - The database connector's source or destination databases have changed - The node has never been executed before

If no changes are detected, the node uses its cached result from the database instead of recomputing.

Requirements for lazy execution: - A database connector (con) must be provided to store and retrieve cached results - overwrite=True must be set to allow updating existing cached tables

State tracking is maintained in a local DuckDB database (__PHENEX_META__NODE_STATES table) that stores: - Node hashes, parameters, and execution metadata - Database connector configuration used during execution - Execution timing information

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector. Required for lazy_execution.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed. Must be True when using lazy_execution.

False
lazy_execution bool

If True, only re-executes nodes when changes are detected in either the node definition or execution environment. Defaults to False. Requires con to be provided.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Raises:

Type Description
ValueError

If lazy_execution=True but overwrite=False or con=None.

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies.

    Lazy Execution:
        When lazy_execution=True, nodes are only recomputed if changes are detected. The system tracks:
        1. Node definition changes: Detected by hashing the node's parameters (from to_dict()) and class name
        2. Execution environment changes: Detected by tracking source/destination database configurations

        A node will be rerun if either:
        - The node's defining parameters have changed (different hash than last execution)
        - The database connector's source or destination databases have changed
        - The node has never been executed before

        If no changes are detected, the node uses its cached result from the database instead of recomputing.

        Requirements for lazy execution:
        - A database connector (con) must be provided to store and retrieve cached results
        - overwrite=True must be set to allow updating existing cached tables

        State tracking is maintained in a local DuckDB database (__PHENEX_META__NODE_STATES table) that stores:
        - Node hashes, parameters, and execution metadata
        - Database connector configuration used during execution
        - Execution timing information

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector. Required for lazy_execution.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed. Must be True when using lazy_execution.
        lazy_execution: If True, only re-executes nodes when changes are detected in either the node definition or execution environment. Defaults to False. Requires con to be provided.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().

    Raises:
        ValueError: If lazy_execution=True but overwrite=False or con=None.
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Build dependency graph for all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency and reverse graphs
    dependency_graph = self._build_dependency_graph(nodes)
    reverse_graph = self._build_reverse_graph(dependency_graph)

    # Track completion status and results
    completed = set()
    completion_lock = threading.Lock()
    worker_exceptions = []  # Track exceptions from worker threads
    stop_all_workers = (
        threading.Event()
    )  # Signal to stop all workers on first error

    # Track in-degree for scheduling
    in_degree = {}
    for node_name, dependencies in dependency_graph.items():
        in_degree[node_name] = len(dependencies)
    for node_name in nodes:
        if node_name not in in_degree:
            in_degree[node_name] = 0

    # Queue for nodes ready to execute
    ready_queue = queue.Queue()

    # Add nodes with no dependencies to ready queue
    for node_name, degree in in_degree.items():
        if degree == 0:
            ready_queue.put(node_name)

    def worker():
        """Worker function for thread pool"""
        while not stop_all_workers.is_set():
            try:
                node_name = ready_queue.get(timeout=1)
                # timeout forces to wait 1 second to avoid busy waiting
                if node_name is None:  # Sentinel value to stop worker
                    break
            except queue.Empty:
                continue

            try:
                logger.info(
                    f"Thread {threading.current_thread().name}: executing node '{node_name}'"
                )
                node = nodes[node_name]

                # Execute the node (without recursive child execution since we handle dependencies here)
                if lazy_execution:
                    if not overwrite:
                        raise ValueError(
                            "lazy_execution only works with overwrite=True."
                        )
                    if con is None:
                        raise ValueError(
                            "A DatabaseConnector is required for lazy execution."
                        )

                    if Node._node_manager.should_rerun(node, con):
                        # Time the execution
                        node.lastexecution_start_time = datetime.now()
                        table = node._execute(tables)

                        if (
                            table is not None
                        ):  # Only create table if _execute returns something
                            con.create_table(table, node_name, overwrite=overwrite)
                            table = con.get_dest_table(node_name)

                        node.lastexecution_end_time = datetime.now()
                        node.lastexecution_duration = (
                            node.lastexecution_end_time
                            - node.lastexecution_start_time
                        ).total_seconds()

                        Node._node_manager.update_run_params(node, con)
                    else:
                        table = con.get_dest_table(node_name)
                else:
                    # Time the execution
                    node.lastexecution_start_time = datetime.now()
                    table = node._execute(tables)

                    if (
                        con and table is not None
                    ):  # Only create table if _execute returns something
                        con.create_table(table, node_name, overwrite=overwrite)
                        table = con.get_dest_table(node_name)

                    node.lastexecution_end_time = datetime.now()
                    node.lastexecution_duration = (
                        node.lastexecution_end_time - node.lastexecution_start_time
                    ).total_seconds()

                node.table = table

                with completion_lock:
                    completed.add(node_name)

                    # Update in-degree for dependent nodes and add ready ones to queue
                    for dependent in reverse_graph.get(node_name, set()):
                        in_degree[dependent] -= 1
                        if in_degree[dependent] == 0:
                            # Check if all dependencies are completed
                            deps_completed = all(
                                dep in completed
                                for dep in dependency_graph.get(dependent, set())
                            )
                            if deps_completed:
                                ready_queue.put(dependent)

                # Log completion with timing info
                if node.lastexecution_duration is not None:
                    logger.info(
                        f"Thread {threading.current_thread().name}: completed node '{node_name}' "
                        f"in {node.lastexecution_duration:.3f} seconds"
                    )
                else:
                    logger.info(
                        f"Thread {threading.current_thread().name}: completed node '{node_name}' (cached)"
                    )

            except Exception as e:
                logger.error(f"Error executing node '{node_name}': {str(e)}")
                with completion_lock:
                    # Store exception for main thread
                    worker_exceptions.append(e)
                    # Signal all workers to stop immediately and exit worker loop
                    stop_all_workers.set()
                    break
            finally:
                ready_queue.task_done()

    # Start worker threads
    threads = []
    for i in range(min(n_threads, len(nodes))):
        thread = threading.Thread(target=worker, name=f"PhenexWorker-{i}")
        thread.daemon = True
        thread.start()
        threads.append(thread)

    # Wait for all nodes to complete or for an error to occur
    while (
        len(completed) < len(nodes)
        and not worker_exceptions
        and not stop_all_workers.is_set()
    ):
        threading.Event().wait(0.1)  # Small delay to prevent busy waiting

    if not stop_all_workers.is_set():
        # Time to stop workers and cleanup
        stop_all_workers.set()

    # Check if any worker thread had an exception
    if worker_exceptions:
        # Signal workers to stop
        for _ in threads:
            ready_queue.put(None)
        # Wait for threads to finish
        for thread in threads:
            thread.join(timeout=1)
        # Re-raise the first exception
        raise worker_exceptions[0]

    # Signal workers to stop and wait for them
    for _ in threads:
        ready_queue.put(None)  # Sentinel value to stop workers

    for thread in threads:
        thread.join(timeout=1)

    logger.info(
        f"Node '{self.name}': completed multithreaded execution of {len(nodes)} nodes"
    )
    return self.table

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)