Skip to content

BinPhenotype

Bases: Phenotype

BinPhenotype converts values into categorical bin labels. Supports both continuous numeric binning and discrete value mapping.

For continuous values: Takes a phenotype that returns numeric values (like age, measurements, etc.) and converts the VALUE column into bin labels like "[10-20)", "[20-30)", etc.

For discrete values: Takes a phenotype that returns discrete values (like codes from CodelistPhenotype) and maps them to categorical labels using a bin mapping dictionary.

DATE: The event date selected from the input phenotype VALUE: A categorical variable representing the bin label

Parameters:

Name Type Description Default
name

The name of the phenotype.

required
phenotype Phenotype

The phenotype that returns values of interest (AgePhenotype, MeasurementPhenotype, CodelistPhenotype, etc.)

required
bins Optional[List[Union[int, float]]]

List of bin edges for continuous binning. Default is [0, 10, 20, ..., 100] for age ranges.

None
value_mapping Optional[Dict[str, List[str]]]

Dictionary mapping bin names to lists of values or Codelist objects (e.g., {"Heart Disease": ["I21", "I22", "I23"]} or {"Heart Disease": heart_disease_codelist})

None

Examples:

Example: Binning on continuous value

    # Continuous binning example
    from phenex.phenotypes import AgePhenotype

    age = AgePhenotype()
    binned_age = BinPhenotype(
        name="age_groups",
        phenotype=age,
        bins=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
    )

Example: Binning on discrete value

    # Discrete mapping example
    from phenex.phenotypes import CodelistPhenotype
    from phenex.codelists import Codelist

    diagnosis_codelist = Codelist(
        name="diagnosis_codes",
        codelist=["I21", "I22", "I23", "I24", "I25"]
    )
    diagnosis_codes = CodelistPhenotype(
        name="diagnosis",
        domain="CONDITION_OCCURRENCE",
        codelist=diagnosis_codelist,
        return_value="all"
    )
    diagnosis_categories = BinPhenotype(
        name="diagnosis_categories",
        phenotype=diagnosis_codes,
        value_mapping={
            "Acute MI": ["I21", "I22"],
            "MI Complications": ["I23", "I24"],
            "Chronic Heart Disease": ["I25"]
        }
    )

Example: Binning on discrete value

    # Alternative: Using Codelist objects in value_mapping
    acute_mi_codelist = Codelist(name="acute_mi", codelist=["I21", "I22"])
    mi_complications_codelist = Codelist(name="mi_complications", codelist=["I23", "I24"])
    chronic_hd_codelist = Codelist(name="chronic_hd", codelist=["I25"])

    diagnosis_categories_v2 = BinPhenotype(
        name="diagnosis_categories_v2",
        phenotype=diagnosis_codes,
        value_mapping={
            "Acute MI": acute_mi_codelist,
            "MI Complications": mi_complications_codelist,
            "Chronic Heart Disease": chronic_hd_codelist
        }
    )

    tables = {"PERSON": example_person_table}
    result_table = binned_age.execute(tables)
    # Result will have VALUE column with labels like "[20-30)", "[30-40)", etc.

    result_table = diagnosis_categories.execute(tables)
    # Result will have VALUE column with labels like "Acute MI", "MI Complications", etc.

Source code in phenex/phenotypes/bin_phenotype.py
class BinPhenotype(Phenotype):
    """
    BinPhenotype converts values into categorical bin labels. Supports both continuous numeric binning and discrete value mapping.

    For continuous values: Takes a phenotype that returns numeric values (like age, measurements, etc.) and converts the VALUE column into bin labels like "[10-20)", "[20-30)", etc.

    For discrete values: Takes a phenotype that returns discrete values (like codes from CodelistPhenotype) and maps them to categorical labels using a bin mapping dictionary.

    DATE: The event date selected from the input phenotype
    VALUE: A categorical variable representing the bin label

    Parameters:
        name: The name of the phenotype.
        phenotype: The phenotype that returns values of interest (AgePhenotype, MeasurementPhenotype, CodelistPhenotype, etc.)
        bins: List of bin edges for continuous binning. Default is [0, 10, 20, ..., 100] for age ranges.
        value_mapping: Dictionary mapping bin names to lists of values or Codelist objects (e.g., {"Heart Disease": ["I21", "I22", "I23"]} or {"Heart Disease": heart_disease_codelist})

    Examples:

    Example: Binning on continuous value
    ```python
        # Continuous binning example
        from phenex.phenotypes import AgePhenotype

        age = AgePhenotype()
        binned_age = BinPhenotype(
            name="age_groups",
            phenotype=age,
            bins=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
        )
    ```

    Example: Binning on discrete value
    ```python
        # Discrete mapping example
        from phenex.phenotypes import CodelistPhenotype
        from phenex.codelists import Codelist

        diagnosis_codelist = Codelist(
            name="diagnosis_codes",
            codelist=["I21", "I22", "I23", "I24", "I25"]
        )
        diagnosis_codes = CodelistPhenotype(
            name="diagnosis",
            domain="CONDITION_OCCURRENCE",
            codelist=diagnosis_codelist,
            return_value="all"
        )
        diagnosis_categories = BinPhenotype(
            name="diagnosis_categories",
            phenotype=diagnosis_codes,
            value_mapping={
                "Acute MI": ["I21", "I22"],
                "MI Complications": ["I23", "I24"],
                "Chronic Heart Disease": ["I25"]
            }
        )
    ```

    Example: Binning on discrete value
    ```python
        # Alternative: Using Codelist objects in value_mapping
        acute_mi_codelist = Codelist(name="acute_mi", codelist=["I21", "I22"])
        mi_complications_codelist = Codelist(name="mi_complications", codelist=["I23", "I24"])
        chronic_hd_codelist = Codelist(name="chronic_hd", codelist=["I25"])

        diagnosis_categories_v2 = BinPhenotype(
            name="diagnosis_categories_v2",
            phenotype=diagnosis_codes,
            value_mapping={
                "Acute MI": acute_mi_codelist,
                "MI Complications": mi_complications_codelist,
                "Chronic Heart Disease": chronic_hd_codelist
            }
        )

        tables = {"PERSON": example_person_table}
        result_table = binned_age.execute(tables)
        # Result will have VALUE column with labels like "[20-30)", "[30-40)", etc.

        result_table = diagnosis_categories.execute(tables)
        # Result will have VALUE column with labels like "Acute MI", "MI Complications", etc.
    ```
    """

    def __init__(
        self,
        phenotype: Phenotype,
        bins: Optional[List[Union[int, float]]] = None,
        value_mapping: Optional[Dict[str, List[str]]] = None,
        **kwargs,
    ):
        super(BinPhenotype, self).__init__(**kwargs)

        # Set default bins for backward compatibility if neither parameter is provided
        if bins is None and value_mapping is None:
            bins = list(range(0, 101, 10))

        # Validate that only one binning method is specified
        if bins is not None and value_mapping is not None:
            raise ValueError(
                "Cannot specify both 'bins' and 'value_mapping' - choose one"
            )

        self.bins = bins
        self.value_mapping = value_mapping
        self.phenotype = phenotype

        # Validate continuous binning
        if self.bins is not None:
            if len(self.bins) < 2:
                raise ValueError("bins must contain at least 2 values")
            if self.bins != sorted(self.bins):
                raise ValueError("bins must be sorted in ascending order")

        # Validate discrete mapping
        if self.value_mapping is not None:
            if not isinstance(self.value_mapping, dict):
                raise ValueError("value_mapping must be a dictionary")
            if len(self.value_mapping) == 0:
                raise ValueError("value_mapping cannot be empty")
            # Validate that all values are either lists of strings or Codelist objects
            for bin_name, values in self.value_mapping.items():
                if isinstance(values, Codelist):
                    # Validate that the Codelist can be converted to a list
                    values_list = values.to_list()
                    if not all(isinstance(v, str) for v in values_list):
                        raise ValueError(
                            f"All values in Codelist for bin '{bin_name}' must be strings"
                        )
                elif isinstance(values, list):
                    if not all(isinstance(v, str) for v in values):
                        raise ValueError(
                            f"All values for bin '{bin_name}' must be strings"
                        )
                else:
                    raise ValueError(
                        f"Values for bin '{bin_name}' must be either a list or a Codelist object"
                    )

        # Validate phenotype types for continuous binning
        if self.bins is not None and self.phenotype.__class__.__name__ not in [
            "AgePhenotype",
            "MeasurementPhenotype",
            "ArithmeticPhenotype",
            "ScorePhenotype",
        ]:
            raise ValueError(
                f"Invalid phenotype type for continuous binning: {self.phenotype.__class__.__name__}"
            )

        # For discrete mapping, allow CodelistPhenotype and other string-based phenotypes
        if (
            self.value_mapping is not None
            and self.phenotype.__class__.__name__
            not in ["CodelistPhenotype", "CategoricalPhenotype"]
        ):
            raise ValueError(
                f"Invalid phenotype type for discrete mapping: {self.phenotype.__class__.__name__}"
            )

        self.add_children(phenotype)

    def _execute(self, tables) -> PhenotypeTable:
        # Execute the child phenotype to get the initial table to filter
        table = self.phenotype.table

        if self.bins is not None:
            # Continuous binning logic
            table = self._execute_continuous_binning(table)
        else:
            # Discrete mapping logic
            table = self._execute_discrete_mapping(table)

        return table

    def _execute_continuous_binning(self, table) -> PhenotypeTable:
        """Handle continuous value binning with numeric ranges."""
        # Create bin labels
        bin_labels = []

        # Add a bin for values < first bin edge
        bin_labels.append(f"<{self.bins[0]}")

        # Add bins for each range
        for i in range(len(self.bins) - 1):
            bin_labels.append(f"[{self.bins[i]}-{self.bins[i+1]})")

        # Add a final bin for values >= last bin edge
        bin_labels.append(f">={self.bins[-1]}")

        # Create binning logic using Ibis case statements
        value_col = table.VALUE

        # Start with the case expression
        case_expr = None

        # Handle values < first bin edge
        first_condition = value_col < self.bins[0]
        case_expr = ibis.case().when(first_condition, bin_labels[0])

        # Create conditions for each bin range
        for i in range(len(self.bins) - 1):
            condition = (value_col >= self.bins[i]) & (value_col < self.bins[i + 1])
            case_expr = case_expr.when(condition, bin_labels[i + 1])

        # Handle values >= last bin edge
        final_condition = value_col >= self.bins[-1]
        case_expr = case_expr.when(final_condition, bin_labels[-1])

        # Handle null values
        case_expr = case_expr.else_(None)

        # Replace the VALUE column with bin labels
        table = table.mutate(VALUE=case_expr.end())

        return table

    def _execute_discrete_mapping(self, table) -> PhenotypeTable:
        """Handle discrete value mapping with bin-to-values dictionary."""
        value_col = table.VALUE

        # Start with the case expression
        case_expr = ibis.case()

        # Add conditions for each bin and its associated values
        for bin_name, values in self.value_mapping.items():
            # Check if values is a Codelist and convert to list, otherwise use as list
            if isinstance(values, Codelist):
                values_list = values.to_list()
            else:
                values_list = values

            # Create condition that checks if value is in the list of values for this bin
            condition = value_col.isin(values_list)
            case_expr = case_expr.when(condition, bin_name)

        # Handle unmapped values as null
        case_expr = case_expr.else_(None)

        # Replace the VALUE column with mapped labels
        table = table.mutate(VALUE=case_expr.end())

        return table

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

execution_metadata property

Retrieve the full execution metadata row for this node from the local DuckDB database.

Returns:

Type Description

pandas.Series: A series containing NODE_NAME, LAST_HASH, NODE_PARAMS, and LAST_EXECUTED for this node, or None if the node has never been executed.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

clear_cache(con=None, recursive=False)

Clear the cached state for this node, forcing re-execution on the next call to execute().

This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.

Parameters:

Name Type Description Default
con Optional[object]

Database connector. If provided, will also drop the materialized table from the database.

None
recursive bool

If True, also clear the cache for all child nodes recursively. Defaults to False.

False
Example
# Clear cache for a single node
my_node.clear_cache()

# Clear cache and drop materialized table
my_node.clear_cache(con=my_connector)

# Clear cache for node and all its dependencies
my_node.clear_cache(recursive=True)
Source code in phenex/node.py
def clear_cache(self, con: Optional[object] = None, recursive: bool = False):
    """
    Clear the cached state for this node, forcing re-execution on the next call to execute().

    This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.

    Parameters:
        con: Database connector. If provided, will also drop the materialized table from the database.
        recursive: If True, also clear the cache for all child nodes recursively. Defaults to False.

    Example:
        ```python
        # Clear cache for a single node
        my_node.clear_cache()

        # Clear cache and drop materialized table
        my_node.clear_cache(con=my_connector)

        # Clear cache for node and all its dependencies
        my_node.clear_cache(recursive=True)
        ```
    """
    logger.info(f"Node '{self.name}': clearing cached state...")

    # Clear the hash from the node states table
    with Node._hash_update_lock:
        duckdb_con = DuckDBConnector(DUCKDB_DEST_DATABASE=NODE_STATES_DB_NAME)
        if NODE_STATES_TABLE_NAME in duckdb_con.dest_connection.list_tables():
            table = duckdb_con.get_dest_table(NODE_STATES_TABLE_NAME).to_pandas()
            # Remove this node's entry
            table = table[table.NODE_NAME != self.name]

            # Update the table
            if len(table) > 0:
                updated_table = ibis.memtable(table)
                duckdb_con.create_table(
                    updated_table, name_table=NODE_STATES_TABLE_NAME, overwrite=True
                )
            else:
                # Drop the table if it's empty
                duckdb_con.dest_connection.drop_table(NODE_STATES_TABLE_NAME)

    # Drop materialized table if connector is provided
    if con is not None:
        try:
            if self.name in con.dest_connection.list_tables():
                logger.info(f"Node '{self.name}': dropping materialized table...")
                con.dest_connection.drop_table(self.name)
        except Exception as e:
            logger.warning(
                f"Node '{self.name}': failed to drop materialized table: {e}"
            )

    # Reset the table attribute
    self.table = None

    # Recursively clear children if requested
    if recursive:
        for child in self.children:
            child.clear_cache(con=con, recursive=recursive)

    logger.info(f"Node '{self.name}': cache cleared successfully.")

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Build dependency graph for all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency and reverse graphs
    dependency_graph = self._build_dependency_graph(nodes)
    reverse_graph = self._build_reverse_graph(dependency_graph)

    # Track completion status and results
    completed = set()
    completion_lock = threading.Lock()
    worker_exceptions = []  # Track exceptions from worker threads
    stop_all_workers = (
        threading.Event()
    )  # Signal to stop all workers on first error

    # Track in-degree for scheduling
    in_degree = {}
    for node_name, dependencies in dependency_graph.items():
        in_degree[node_name] = len(dependencies)
    for node_name in nodes:
        if node_name not in in_degree:
            in_degree[node_name] = 0

    # Queue for nodes ready to execute
    ready_queue = queue.Queue()

    # Add nodes with no dependencies to ready queue
    for node_name, degree in in_degree.items():
        if degree == 0:
            ready_queue.put(node_name)

    def worker():
        """Worker function for thread pool"""
        while not stop_all_workers.is_set():
            try:
                node_name = ready_queue.get(timeout=1)
                # timeout forces to wait 1 second to avoid busy waiting
                if node_name is None:  # Sentinel value to stop worker
                    break
            except queue.Empty:
                continue

            try:
                logger.info(
                    f"Thread {threading.current_thread().name}: executing node '{node_name}'"
                )
                node = nodes[node_name]

                # Execute the node (without recursive child execution since we handle dependencies here)
                if lazy_execution:
                    if not overwrite:
                        raise ValueError(
                            "lazy_execution only works with overwrite=True."
                        )
                    if con is None:
                        raise ValueError(
                            "A DatabaseConnector is required for lazy execution."
                        )

                    if node._get_current_hash() != node._get_last_hash():
                        logger.info(f"Node '{node_name}': computing...")
                        table = node._execute(tables)
                        if (
                            table is not None
                        ):  # Only create table if _execute returns something
                            con.create_table(table, node_name, overwrite=overwrite)
                            table = con.get_dest_table(node_name)
                        node._update_current_hash()
                    else:
                        logger.info(
                            f"Node '{node_name}': unchanged, using cached result"
                        )
                        table = con.get_dest_table(node_name)
                else:
                    table = node._execute(tables)
                    if (
                        con and table is not None
                    ):  # Only create table if _execute returns something
                        con.create_table(table, node_name, overwrite=overwrite)
                        table = con.get_dest_table(node_name)

                node.table = table

                with completion_lock:
                    completed.add(node_name)

                    # Update in-degree for dependent nodes and add ready ones to queue
                    for dependent in reverse_graph.get(node_name, set()):
                        in_degree[dependent] -= 1
                        if in_degree[dependent] == 0:
                            # Check if all dependencies are completed
                            deps_completed = all(
                                dep in completed
                                for dep in dependency_graph.get(dependent, set())
                            )
                            if deps_completed:
                                ready_queue.put(dependent)

                logger.info(
                    f"Thread {threading.current_thread().name}: completed node '{node_name}'"
                )

            except Exception as e:
                logger.error(f"Error executing node '{node_name}': {str(e)}")
                with completion_lock:
                    # Store exception for main thread
                    worker_exceptions.append(e)
                    # Signal all workers to stop immediately and exit worker loop
                    stop_all_workers.set()
                    break
            finally:
                ready_queue.task_done()

    # Start worker threads
    threads = []
    for i in range(min(n_threads, len(nodes))):
        thread = threading.Thread(target=worker, name=f"PhenexWorker-{i}")
        thread.daemon = True
        thread.start()
        threads.append(thread)

    # Wait for all nodes to complete or for an error to occur
    while (
        len(completed) < len(nodes)
        and not worker_exceptions
        and not stop_all_workers.is_set()
    ):
        threading.Event().wait(0.1)  # Small delay to prevent busy waiting

    if not stop_all_workers.is_set():
        # Time to stop workers and cleanup
        stop_all_workers.set()

    # Check if any worker thread had an exception
    if worker_exceptions:
        # Signal workers to stop
        for _ in threads:
            ready_queue.put(None)
        # Wait for threads to finish
        for thread in threads:
            thread.join(timeout=1)
        # Re-raise the first exception
        raise worker_exceptions[0]

    # Signal workers to stop and wait for them
    for _ in threads:
        ready_queue.put(None)  # Sentinel value to stop workers

    for thread in threads:
        thread.join(timeout=1)

    logger.info(
        f"Node '{self.name}': completed multithreaded execution of {len(nodes)} nodes"
    )
    return self.table

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)