Skip to content

AgePhenotype

AgePhenotype

Bases: Phenotype

AgePhenotype is a class that represents an age-based phenotype. It calculates the age of individuals based on their date of birth and an optional anchor phenotype. The age is computed in years and can be filtered within a specified range.

Parameters:

Name Type Description Default
name Optional[str]

Name of the phenotype, default is 'age'.

'AGE'
value_filter Optional[ValueFilter]

Filter the returned patients based on their age (in years)

None
anchor_phenotype Optional[Phenotype]

An optional anchor phenotype to calculate relative age.

None
domain str

Domain of the phenotype, default is 'PERSON'.

'PERSON'

Attributes:

Name Type Description
table PhenotypeTable

The resulting phenotype table after filtering (None until execute is called)

Age at First Atrial Fibrillation Diagnosis
from phenex.phenotypes import CodelistPhenotype
from phenex.codelists import Codelist

af_codelist = Codelist([313217])
af_phenotype = CodelistPhenotype(
    name="af",
    domain='CONDITION_OCCURRENCE',
    codelist=af_codelist,
    return_date='first',
)

age_phenotype = AgePhenotype(
    value_filter=ValueFilter(
        min_value=GreaterThan(18),
        max_value=LessThan(65)
        ),
    anchor_phenotype=af_phenotype
)

result_table = age_phenotype.execute(tables)
display(result_table)
Source code in phenex/phenotypes/age_phenotype.py
class AgePhenotype(Phenotype):
    """
    AgePhenotype is a class that represents an age-based phenotype. It calculates the age of individuals
    based on their date of birth and an optional anchor phenotype. The age is computed in years and can
    be filtered within a specified range.

    Parameters:
        name: Name of the phenotype, default is 'age'.
        value_filter: Filter the returned patients based on their age (in years)
        anchor_phenotype: An optional anchor phenotype to calculate relative age.
        domain: Domain of the phenotype, default is 'PERSON'.

    Attributes:
        table (PhenotypeTable): The resulting phenotype table after filtering (None until execute is called)

    Example: Age at First Atrial Fibrillation Diagnosis
        ```python
        from phenex.phenotypes import CodelistPhenotype
        from phenex.codelists import Codelist

        af_codelist = Codelist([313217])
        af_phenotype = CodelistPhenotype(
            name="af",
            domain='CONDITION_OCCURRENCE',
            codelist=af_codelist,
            return_date='first',
        )

        age_phenotype = AgePhenotype(
            value_filter=ValueFilter(
                min_value=GreaterThan(18),
                max_value=LessThan(65)
                ),
            anchor_phenotype=af_phenotype
        )

        result_table = age_phenotype.execute(tables)
        display(result_table)
        ```
    """

    # FIXME this will become a problem when modern medicine allows people to live more
    # than 365*4 years (so they accumulate enough leap days to get an extra year)
    DAYS_IN_YEAR = 365

    def __init__(
        self,
        name: Optional[str] = "AGE",
        value_filter: Optional[ValueFilter] = None,
        anchor_phenotype: Optional[Phenotype] = None,
        domain: str = "PERSON",
        **kwargs,
    ):
        super(AgePhenotype, self).__init__(name=name)

        self.min_age = self.max_age = None
        if value_filter:
            self.min_age = value_filter.min_value
            self.max_age = value_filter.max_value

        self.domain = domain
        self.anchor_phenotype = anchor_phenotype

        self.time_range_filter = RelativeTimeRangeFilter(
            anchor_phenotype=anchor_phenotype
        )

        # Set children to the dependent PHENOTYPES
        if anchor_phenotype is not None:
            self.add_children(anchor_phenotype)

    def _execute(self, tables: Dict[str, Table]) -> PhenotypeTable:
        person_table = tables[self.domain]
        assert is_phenex_person_table(person_table)

        if "YEAR_OF_BIRTH" in person_table.columns:
            if "DATE_OF_BIRTH" in person_table.columns:
                logger.debug(
                    "Year of birth and date of birth is present, taking date of birth where possible otherwise setting date of birth to june 6th"
                )
                date_of_birth = ibis.coalesce(
                    ibis.date(person_table.DATE_OF_BIRTH),
                    ibis.date(person_table.YEAR_OF_BIRTH, 6, 1),
                )
            else:
                logger.debug(
                    "Only year of birth is present in person table, setting birth date to june 6th"
                )
                date_of_birth = ibis.date(person_table.YEAR_OF_BIRTH, 6, 1)
        else:
            logger.debug("Year of birth not present, taking date of birth")
            date_of_birth = ibis.date(person_table.DATE_OF_BIRTH)
        person_table = person_table.mutate(EVENT_DATE=date_of_birth)

        # Apply the time range filter
        table = person_table
        if self.anchor_phenotype is not None:
            if self.anchor_phenotype.table is None:
                raise ValueError(
                    f"Dependent Phenotype {self.anchor_phenotype.name} must be executed before this node can run!"
                )
            else:
                anchor_table = self.anchor_phenotype.table
                reference_column = anchor_table.EVENT_DATE
                # Note that joins can change column names if the tables have name collisions!
                table = table.join(anchor_table, "PERSON_ID")
        else:
            assert (
                "INDEX_DATE" in table.columns
            ), f"INDEX_DATE column not found in table {table}"
            reference_column = table.INDEX_DATE

        YEARS_FROM_ANCHOR = (
            reference_column.delta(table.EVENT_DATE, "day") / self.DAYS_IN_YEAR
        ).floor()
        table = table.mutate(YEARS_FROM_ANCHOR=YEARS_FROM_ANCHOR)

        conditions = []
        # Fix this, this logic needs to be abstracted to a ValueFilter
        if self.min_age is not None:
            if self.min_age.operator == ">":
                conditions.append(table.YEARS_FROM_ANCHOR > self.min_age.value)
            elif self.min_age.operator == ">=":
                conditions.append(table.YEARS_FROM_ANCHOR >= self.min_age.value)
            else:
                raise ValueError("Operator for min days be > or >=")
        if self.max_age is not None:
            if self.max_age.operator == "<":
                conditions.append(table.YEARS_FROM_ANCHOR < self.max_age.value)
            elif self.max_age.operator == "<=":
                conditions.append(table.YEARS_FROM_ANCHOR <= self.max_age.value)
            else:
                raise ValueError("Operator for max days be < or <=")
        if conditions:
            table = table.filter(conditions)
        person_table = table

        person_table = person_table.mutate(VALUE=person_table.YEARS_FROM_ANCHOR)

        return self._perform_final_processing(person_table)

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Use multithreaded execution if we have multiple children and n_threads > 1
    if len(self.children) > 1 and n_threads > 1:
        return self._execute_multithreaded(
            tables, con, overwrite, lazy_execution, n_threads
        )
    else:
        return self._execute_sequential(tables, con, overwrite, lazy_execution)

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)