Skip to content

WithinSameEncounterPhenotype

Bases: Phenotype

WithinSameEncounterPhenotype is a phenotype that filters a target phenotype based on the occurrence of an anchor phenotype within the same encounter. This phenotype can only be used with CodelistPhenotypes and MeasurementPhenotypes as anchor/phenotype.

Parameters:

Name Type Description Default
name

The name of the phenotype. Optional. If not passed, name will be derived from the name of the codelist.

required
anchor_phenotype Union[CodelistPhenotype, MeasurementPhenotype]

The phenotype which should define the encounter of interest

required
phenotype Union[CodelistPhenotype, MeasurementPhenotype]

The phenotype which should occur within the same encounter of interest, defined by the anchor_phenotype

required
column_name str

The name of the column which must be shared between anchor_phenotype and phenotype

required

Attributes:

Name Type Description
table PhenotypeTable

The resulting phenotype table after filtering (None until execute is called)

Examples:

Imaging procedure within the same encounter as Pulmonary Embolism (OMOP)
from phenex.phenotypes import CodelistPhenotype, WithinSameEncounterPhenotype
from phenex.codelists import Codelist
from phenex.mappers import OMOPDomains
from phenex.filters import DateFilter, CategoricalFilter, Value
from phenex.ibis_connect import SnowflakeConnector

pe_diagnosis = CodelistPhenotype(
    name = "diagnosis_of_PE",
    domain = "CONDITION_OCCURRENCE",
    codelist = Codelist(codelist={'SNOMED': [608954, 4120091, 45768439, 45768888, 37165995, 436768, 44782732, 45768887, 45771016, 4017134, 4240832, 4129841, 43530934, 4219469, 442055, 442051, 442050, 433545, 437060, 435026, 433832, 440477, 439380, 439379, 4331168, 4108681, 37160752, 4091708, 1244882, 440417, 37109911, 3655209, 3655210, 45757145, 37016922, 43530605, 4119608, 4253796, 45766471, 4121618, 4236271, 36713113, 35615055, 40479606, 4119607, 4119609]}).copy(use_code_type=False),
    return_date = "first",
)

# where column id alone is idnetical left join
imaging = CodelistPhenotype(
    name = 'imaging',
    domain = 'PROCEDURE_OCCURRENCE',
    codelist = Codelist(codelist={'CPT4': [2211381, 2211379, 2211380, 2212002, 2212004, 2212003, 2212011, 42742555, 42742554, 2212006, 2212005, 2212008, 2212010, 2212009, 42742557, 42742556]}).copy(use_code_type=False),
)


imaging_in_entry_hospitalization = WithinSameEncounterPhenotype(
    name='imaging_in_entry_hospitalization',
    anchor_phenotype = pe_diagnosis,
    phenotype = imaging,
    column_name = 'VISIT_OCCURRENCE_ID'
)
Source code in phenex/phenotypes/within_same_encounter_phenotype.py
class WithinSameEncounterPhenotype(Phenotype):
    """
    WithinSameEncounterPhenotype is a phenotype that filters a target phenotype based on the occurrence of an anchor phenotype within the same encounter. This phenotype can only be used with CodelistPhenotypes and MeasurementPhenotypes as anchor/phenotype.

    Parameters:
        name: The name of the phenotype. Optional. If not passed, name will be derived from the name of the codelist.
        anchor_phenotype: The phenotype which should define the encounter of interest
        phenotype: The phenotype which should occur within the same encounter of interest, defined by the anchor_phenotype
        column_name: The name of the column which must be shared between anchor_phenotype and phenotype


    Attributes:
        table (PhenotypeTable): The resulting phenotype table after filtering (None until execute is called)

    Examples:

    Example: Imaging procedure within the same encounter as Pulmonary Embolism (OMOP)
        ```python
        from phenex.phenotypes import CodelistPhenotype, WithinSameEncounterPhenotype
        from phenex.codelists import Codelist
        from phenex.mappers import OMOPDomains
        from phenex.filters import DateFilter, CategoricalFilter, Value
        from phenex.ibis_connect import SnowflakeConnector

        pe_diagnosis = CodelistPhenotype(
            name = "diagnosis_of_PE",
            domain = "CONDITION_OCCURRENCE",
            codelist = Codelist(codelist={'SNOMED': [608954, 4120091, 45768439, 45768888, 37165995, 436768, 44782732, 45768887, 45771016, 4017134, 4240832, 4129841, 43530934, 4219469, 442055, 442051, 442050, 433545, 437060, 435026, 433832, 440477, 439380, 439379, 4331168, 4108681, 37160752, 4091708, 1244882, 440417, 37109911, 3655209, 3655210, 45757145, 37016922, 43530605, 4119608, 4253796, 45766471, 4121618, 4236271, 36713113, 35615055, 40479606, 4119607, 4119609]}).copy(use_code_type=False),
            return_date = "first",
        )

        # where column id alone is idnetical left join
        imaging = CodelistPhenotype(
            name = 'imaging',
            domain = 'PROCEDURE_OCCURRENCE',
            codelist = Codelist(codelist={'CPT4': [2211381, 2211379, 2211380, 2212002, 2212004, 2212003, 2212011, 42742555, 42742554, 2212006, 2212005, 2212008, 2212010, 2212009, 42742557, 42742556]}).copy(use_code_type=False),
        )


        imaging_in_entry_hospitalization = WithinSameEncounterPhenotype(
            name='imaging_in_entry_hospitalization',
            anchor_phenotype = pe_diagnosis,
            phenotype = imaging,
            column_name = 'VISIT_OCCURRENCE_ID'
        )

        ```
    """

    def __init__(
        self,
        anchor_phenotype: Union["CodelistPhenotype", "MeasurementPhenotype"],
        phenotype: Union["CodelistPhenotype", "MeasurementPhenotype"],
        column_name: str,
        **kwargs,
    ):
        super(WithinSameEncounterPhenotype, self).__init__(**kwargs)
        self.add_children(anchor_phenotype)
        if (
            anchor_phenotype.__class__.__name__
            not in ["CodelistPhenotype", "MeasurementPhenotype"]
        ) or (
            phenotype.__class__.__name__
            not in ["CodelistPhenotype", "MeasurementPhenotype"]
        ):
            raise ValueError(
                "Both anchor_phenotype and phenotype must be of type CodelistPhenotype or MeasurementPhenotype"
            )

        self.anchor_phenotype = anchor_phenotype
        self.phenotype = phenotype
        self.column_name = column_name

    def _execute(self, tables) -> "PhenotypeTable":
        # Subset the raw anchor data that occurs on the same day as the anchor date in order to get the column of interest
        _anchor_table = tables[self.anchor_phenotype.domain]
        _anchor_table = _anchor_table.join(
            self.anchor_phenotype.table,
            (self.anchor_phenotype.table.PERSON_ID == _anchor_table.PERSON_ID)
            & (self.anchor_phenotype.table.EVENT_DATE == _anchor_table.EVENT_DATE),
            how="inner",
        ).select(["PERSON_ID", self.column_name])

        # Subset the target phenotype raw data for patient id and column name of interest
        _table = tables[self.phenotype.domain]
        _table = _table.join(
            _anchor_table,
            (_anchor_table.PERSON_ID == _table.PERSON_ID)
            & (_anchor_table[self.column_name] == _table[self.column_name]),
            how="inner",
        )

        # run the target phenotype on the subsetted data
        return self.phenotype._execute({self.phenotype.domain: _table})

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Use multithreaded execution if we have multiple children and n_threads > 1
    if len(self.children) > 1 and n_threads > 1:
        return self._execute_multithreaded(
            tables, con, overwrite, lazy_execution, n_threads
        )
    else:
        return self._execute_sequential(tables, con, overwrite, lazy_execution)

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)