Skip to content

EventCountPhenotype

Bases: Phenotype

EventCountPhenotype counts the number of events that occur on distinct days. It is additionally able to filter patients based on: 1. the number of distinct days an event occurred, by setting value_filter 2. the number of days between pairs of events

EventCountPhenotype is a composite phenotype, meaning that it does not directly operate on source data and takes a phenotype as an argument. The phenotype passed to EventCountPhenotype must have return_date set to 'all' (if return_date on the provided phenotype is set to first or last, there will only be one event per patient...)

DATE: The event date selected based on component_date_select and return_date parameters. return_date returns multiple rows per patient for all events that fulfill criteria. return_date first is the first fulfilling event date, last the last. If component_date_select = 'first' the returned date is a pair of events, if component_date_select = 'second' we return the second of a pair of events. VALUE: The number of days that the phenotype of interest has occurred i.e. if 4, that means the phenotype has occurred on 4 distinct days.

Parameters:

Name Type Description Default
name

The name of the phenotype.

required
phenotype Phenotype

The phenotype that returns events of interest. Note that return_date must be set to all or an error will be thrown.

required
value_filter ValueFilter

Set the minimum and/or maximum number of distinct days on which an event may occur.

None
relative_time_range RelativeTimeRangeFilter

Set the minimum and/or maximum number of days that are allowed to occur between any pair of events.

None
return_date

Specifies whether to return the 'first', 'last', or 'all' dates on which the criteria are fulfilled. Default is 'first'.

'first'
component_date_select

Specifies whether to return the 'first' or 'second' event date within each pair of events. Default is 'second'. It is highly recommended to never use 'first', as there is a high risk of introducing immortal time bias.

'second'
Example
codelist = Codelist(name="example_codelist", codes=[...])

phenotype = CodelistPhenotype(
    name="example_phenotype",
    domain="CONDITION_OCCURRENCE",
    codelist=codelist,
    return_date='first'
)

tables = {"CONDITION_OCCURRENCE": example_code_table}
multiple_occurrences = EventCountPhenotype(
    phenotype=phenotype,
    value_filter=ValueFilter(min_value=GreaterThanOrEqualTo(2)),
    relative_time_range=RelativeTimeRangeFilter(
        min_days=GreaterThanOrEqualTo(90),
        max_days=LessThanOrEqualTo(180)
    ),
    return_date='first',
    component_date_select='second'
)

result_table = multiple_occurrences.execute(tables)
display(result_table)
Source code in phenex/phenotypes/event_count_phenotype.py
class EventCountPhenotype(Phenotype):
    """
    EventCountPhenotype counts the number of events that occur on distinct days. It is additionally able to filter patients based on:
    1. the number of distinct days an event occurred, by setting value_filter
    2. the number of days between pairs of events

    EventCountPhenotype is a composite phenotype, meaning that it does not directly operate on source data and takes a phenotype as an argument. The phenotype passed to EventCountPhenotype must have return_date set to 'all' (if return_date on the provided phenotype is set to `first` or `last`, there will only be one event per patient...)


    DATE: The event date selected based on `component_date_select` and `return_date` parameters. `return_date` returns multiple rows per patient for all events that fulfill criteria. `return_date` first is the first fulfilling event date, last the last. If component_date_select = 'first' the returned date is a pair of events, if component_date_select = 'second' we return the second of a pair of events.
    VALUE: The number of days that the phenotype of interest has occurred i.e. if 4, that means the phenotype has occurred on 4 distinct days.

    Parameters:
        name: The name of the phenotype.
        phenotype: The phenotype that returns events of interest. Note that return_date must be set to `all` or an error will be thrown.
        value_filter: Set the minimum and/or maximum number of distinct days on which an event may occur.
        relative_time_range: Set the minimum and/or maximum number of days that are allowed to occur between any pair of events.
        return_date: Specifies whether to return the 'first', 'last', or 'all' dates on which the criteria are fulfilled. Default is 'first'.
        component_date_select: Specifies whether to return the 'first' or 'second' event date within each pair of events. Default is 'second'. It is highly recommended to never use 'first', as there is a high risk of introducing immortal time bias.

    Example:
        ```python
        codelist = Codelist(name="example_codelist", codes=[...])

        phenotype = CodelistPhenotype(
            name="example_phenotype",
            domain="CONDITION_OCCURRENCE",
            codelist=codelist,
            return_date='first'
        )

        tables = {"CONDITION_OCCURRENCE": example_code_table}
        multiple_occurrences = EventCountPhenotype(
            phenotype=phenotype,
            value_filter=ValueFilter(min_value=GreaterThanOrEqualTo(2)),
            relative_time_range=RelativeTimeRangeFilter(
                min_days=GreaterThanOrEqualTo(90),
                max_days=LessThanOrEqualTo(180)
            ),
            return_date='first',
            component_date_select='second'
        )

        result_table = multiple_occurrences.execute(tables)
        display(result_table)
        ```
    """

    def __init__(
        self,
        phenotype: Phenotype,
        value_filter: ValueFilter = None,
        relative_time_range: RelativeTimeRangeFilter = None,
        return_date="first",
        component_date_select="second",
        **kwargs,
    ):
        super(EventCountPhenotype, self).__init__(**kwargs)
        self.relative_time_range = relative_time_range
        self.return_date = return_date
        self.component_date_select = component_date_select
        if self.component_date_select not in ["first", "second"]:
            raise ValueError(
                f"Invalid component_date_select: {self.component_date_select}"
            )
        self.value_filter = value_filter
        self.phenotype = phenotype
        self.add_children(phenotype)

    def _execute(self, tables) -> PhenotypeTable:
        # Execute the child phenotype to get the initial table to filter
        if self.phenotype.return_date != "all":
            raise ValueError(
                "EventCountPhenotype requires that return_date is set to all on its component phenotype"
            )
        table = self.phenotype.table

        # Select only distinct dates:
        table = table.select(["PERSON_ID", "EVENT_DATE"]).distinct()

        # Count occurrences per PERSON_ID
        occurrence_counts_table = table.group_by("PERSON_ID").aggregate(VALUE=_.count())
        table, occurrence_counts_table = self._perform_value_filtering(
            table, occurrence_counts_table
        )
        table = self._perform_relative_time_range_filtering(table)
        table = self._perform_date_selection(table)
        table = table.left_join(
            occurrence_counts_table.select("PERSON_ID", "VALUE"),
            table.PERSON_ID == occurrence_counts_table.PERSON_ID,
        ).select("PERSON_ID", "EVENT_DATE", "VALUE")

        table = table.mutate(BOOLEAN=True).distinct()
        return table

    def _perform_value_filtering(self, table, occurrence_counts_table):
        if self.value_filter is not None:
            occurrence_counts_table = self.value_filter.filter(occurrence_counts_table)
            table = table.right_join(
                occurrence_counts_table,
                table.PERSON_ID == occurrence_counts_table.PERSON_ID,
            ).select(["PERSON_ID", "EVENT_DATE", "VALUE"])
        return table, occurrence_counts_table

    def _perform_relative_time_range_filtering(self, table):
        if self.relative_time_range is not None:
            # make sure that the 'when' keyword parameter is correctly set to after
            self.relative_time_range.when = "after"
            # Self join and rename event_date columns;
            # the first dates will be called INDEX_DATE
            # the second dates will be called EVENT_DATE
            first_table = table.select(
                "PERSON_ID",
                table.EVENT_DATE.name("INDEX_DATE"),
            )
            second_table = table.select(
                "PERSON_ID",
                table.EVENT_DATE.name("EVENT_DATE"),
            )
            table = first_table.join(
                second_table, first_table.PERSON_ID == second_table.PERSON_ID
            )

            table = table.filter(table.INDEX_DATE <= table.EVENT_DATE)
            # perform relative time range filtering; the first date is the anchor ('index_date')
            table = self.relative_time_range.filter(table)

            if self.component_date_select == "first":
                table = table.select("PERSON_ID", "INDEX_DATE").rename(
                    {"EVENT_DATE": "INDEX_DATE"}
                )
            elif self.component_date_select == "second":
                table = table.select("PERSON_ID", "EVENT_DATE")
        return table

    def _perform_date_selection(self, table, reduce=True):
        if self.return_date is None or self.return_date == "all":
            return table
        if self.return_date == "first":
            aggregator = First(reduce=reduce)
        elif self.return_date == "last":
            aggregator = Last(reduce=reduce)
        else:
            raise ValueError(f"Unknown return_date: {self.return_date}")
        table = aggregator.aggregate(table)
        return table.select("PERSON_ID", "EVENT_DATE")

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Use multithreaded execution if we have multiple children and n_threads > 1
    if len(self.children) > 1 and n_threads > 1:
        return self._execute_multithreaded(
            tables, con, overwrite, lazy_execution, n_threads
        )
    else:
        return self._execute_sequential(tables, con, overwrite, lazy_execution)

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)