Skip to content

CodelistPhenotype

Bases: Phenotype

CodelistPhenotype extracts patients from a CodeTable based on a specified codelist and other optional filters such as date range, relative time range and categorical filters.

Parameters:

Name Type Description Default
domain str

The domain of the phenotype.

required
codelist Codelist

The codelist used for filtering.

required
name Optional[str]

The name of the phenotype. Optional. If not passed, name will be derived from the name of the codelist.

None
date_range DateFilter

A date range filter to apply.

None
relative_time_range Union[RelativeTimeRangeFilter, List[RelativeTimeRangeFilter]]

A relative time range filter or a list of filters to apply.

None
return_date

Specifies whether to return the 'first', 'last', or 'nearest' event date. Default is 'first'.

'first'
categorical_filter Optional[CategoricalFilter]

Additional categorical filters to apply.

None

Attributes:

Name Type Description
table PhenotypeTable

The resulting phenotype table after filtering (None until execute is called)

Examples:

Inpatient Atrial Fibrillation (OMOP)
from phenex.phenotypes import CodelistPhenotype
from phenex.codelists import Codelist
from phenex.mappers import OMOPDomains
from phenex.filters import DateFilter, CategoricalFilter, Value
from phenex.ibis_connect import SnowflakeConnector

con = SnowflakeConnector() # requires some configuration
mapped_tables = OMOPDomains.get_mapped_tables(con)

af_codelist = Codelist([313217]) # list of concept ids
date_range = DateFilter(
    min_date=After("2020-01-01"),
    max_date=Before("2020-12-31")
    )

inpatient = CategoricalFilter(
    column_name='VISIT_DETAIL_CONCEPT_ID',
    allowed_values=[9201],
    domain='VISIT_DETAIL'
)

af_phenotype = CodelistPhenotype(
    name="af",
    domain='CONDITION_OCCURRENCE',
    codelist=af_codelist,
    date_range=date_range,
    return_date='first',
    categorical_filter=inpatient
)

af = af_phenotype.execute(mapped_tables)
af.head()
Myocardial Infarction One Year Pre-index (OMOP)
from phenex.filters import RelativeTimeRangeFilter, Value

af_phenotype = (...) # take from above example

oneyear_preindex = RelativeTimeRangeFilter(
    min_days=Value('>', 0), # exclude index date
    max_days=Value('<', 365),
    anchor_phenotype=af_phenotype # use af phenotype above as reference date
    )

mi_codelist = Codelist([49601007]) # list of concept ids
mi_phenotype = CodelistPhenotype(
    name='mi',
    domain='CONDITION_OCCURRENCE',
    codelist=mi_codelist,
    return_date='first',
    relative_time_range=oneyear_preindex
)
mi = mi_phenotype.execute(mapped_tables)
mi.head()
Source code in phenex/phenotypes/codelist_phenotype.py
class CodelistPhenotype(Phenotype):
    """
    CodelistPhenotype extracts patients from a CodeTable based on a specified codelist and other optional filters such as date range, relative time range and categorical filters.

    Parameters:
        domain: The domain of the phenotype.
        codelist: The codelist used for filtering.
        name: The name of the phenotype. Optional. If not passed, name will be derived from the name of the codelist.
        date_range: A date range filter to apply.
        relative_time_range: A relative time range filter or a list of filters to apply.
        return_date: Specifies whether to return the 'first', 'last', or 'nearest' event date. Default is 'first'.
        categorical_filter: Additional categorical filters to apply.

    Attributes:
        table (PhenotypeTable): The resulting phenotype table after filtering (None until execute is called)

    Examples:

    Example: Inpatient Atrial Fibrillation (OMOP)
        ```python
        from phenex.phenotypes import CodelistPhenotype
        from phenex.codelists import Codelist
        from phenex.mappers import OMOPDomains
        from phenex.filters import DateFilter, CategoricalFilter, Value
        from phenex.ibis_connect import SnowflakeConnector

        con = SnowflakeConnector() # requires some configuration
        mapped_tables = OMOPDomains.get_mapped_tables(con)

        af_codelist = Codelist([313217]) # list of concept ids
        date_range = DateFilter(
            min_date=After("2020-01-01"),
            max_date=Before("2020-12-31")
            )

        inpatient = CategoricalFilter(
            column_name='VISIT_DETAIL_CONCEPT_ID',
            allowed_values=[9201],
            domain='VISIT_DETAIL'
        )

        af_phenotype = CodelistPhenotype(
            name="af",
            domain='CONDITION_OCCURRENCE',
            codelist=af_codelist,
            date_range=date_range,
            return_date='first',
            categorical_filter=inpatient
        )

        af = af_phenotype.execute(mapped_tables)
        af.head()
        ```

    Example: Myocardial Infarction One Year Pre-index (OMOP)
        ```python
        from phenex.filters import RelativeTimeRangeFilter, Value

        af_phenotype = (...) # take from above example

        oneyear_preindex = RelativeTimeRangeFilter(
            min_days=Value('>', 0), # exclude index date
            max_days=Value('<', 365),
            anchor_phenotype=af_phenotype # use af phenotype above as reference date
            )

        mi_codelist = Codelist([49601007]) # list of concept ids
        mi_phenotype = CodelistPhenotype(
            name='mi',
            domain='CONDITION_OCCURRENCE',
            codelist=mi_codelist,
            return_date='first',
            relative_time_range=oneyear_preindex
        )
        mi = mi_phenotype.execute(mapped_tables)
        mi.head()
        ```
    """

    def __init__(
        self,
        domain: str,
        codelist: Codelist,
        name: Optional[str] = None,
        date_range: DateFilter = None,
        relative_time_range: Union[
            RelativeTimeRangeFilter, List[RelativeTimeRangeFilter]
        ] = None,
        return_date="first",
        categorical_filter: Optional["CategoricalFilter"] = None,
        **kwargs,
    ):
        super(CodelistPhenotype, self).__init__(name=name or codelist.name)

        self.codelist_filter = CodelistFilter(codelist)
        self.codelist = codelist
        self.categorical_filter = categorical_filter
        self.date_range = date_range
        self.return_date = return_date
        assert self.return_date in [
            "first",
            "last",
            "nearest",
            "all",
        ], f"Unknown return_date: {return_date}"
        self.domain = domain
        if isinstance(relative_time_range, RelativeTimeRangeFilter):
            relative_time_range = [relative_time_range]

        self.relative_time_range = relative_time_range
        if self.relative_time_range is not None:
            for rtr in self.relative_time_range:
                if rtr.anchor_phenotype is not None:
                    self.add_children(rtr.anchor_phenotype)

    def _execute(self, tables) -> PhenotypeTable:
        code_table = tables[self.domain]
        code_table = self._perform_codelist_filtering(code_table)
        code_table = self._perform_categorical_filtering(code_table, tables)
        code_table = self._perform_time_filtering(code_table)
        code_table = self._perform_date_selection(code_table)
        code_table = select_phenotype_columns(code_table)
        code_table = self._perform_final_processing(code_table)
        return code_table

    def _perform_codelist_filtering(self, code_table):
        assert is_phenex_code_table(code_table)
        code_table = self.codelist_filter.filter(code_table)
        return code_table

    def _perform_categorical_filtering(self, code_table, tables):
        if self.categorical_filter is not None:
            assert is_phenex_code_table(code_table)
            code_table = self.categorical_filter.autojoin_filter(code_table, tables)
        return code_table

    def _perform_time_filtering(self, code_table):
        if self.date_range is not None:
            code_table = self.date_range.filter(code_table)
        if self.relative_time_range is not None:
            for rtr in self.relative_time_range:
                code_table = rtr.filter(code_table)
        return code_table

    def _perform_date_selection(self, code_table, reduce=True):
        if self.return_date is None or self.return_date == "all":
            return code_table
        if self.return_date == "first":
            aggregator = First(reduce=reduce)
        elif self.return_date == "last":
            aggregator = Last(reduce=reduce)
        else:
            raise ValueError(f"Unknown return_date: {self.return_date}")
        return aggregator.aggregate(code_table)

    def get_codelists(self) -> List[Codelist]:
        """
        Get all codelists used in the phenotype definition, including all children / dependent phenotypes.

        Returns:
            codeslist: A list of codelists used in the cohort definition.
        """
        codelists = [self.codelist]
        for p in self.children:
            codelists.extend(p.get_codelists())
        return codelists

dependencies property

Recursively collect all dependencies of a node (including dependencies of dependencies).

Returns:

Type Description
Set[Node]

List[Node]: A list of Node objects on which this Node depends.

dependency_graph property

Build a dependency graph where each node maps to its direct dependencies (children).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, Set[Node]: A mapping of Node's to their children Node's.

namespaced_table property

A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.

Returns:

Name Type Description
table Table

The namespaced table for the current phenotype.

reverse_dependency_graph property

Build a reverse dependency graph where each node maps to nodes that depend on it (parents).

Returns:

Type Description
Dict[Node, Set[Node]]

Dict[Node, List[Node]: A mapping of Node's to their parent Node's.

execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)

Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

Parameters:

Name Type Description Default
tables Dict[str, Table]

A dictionary mapping domains to Table objects.

None
con Optional[object]

Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.

None
overwrite bool

If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.

False
lazy_execution bool

If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.

False
n_threads int

Max number of Node's to execute simultaneously when this node has multiple children.

1

Returns:

Name Type Description
Table Table

The resulting table for this node. Also accessible through self.table after calling self.execute().

Source code in phenex/node.py
def execute(
    self,
    tables: Dict[str, Table] = None,
    con: Optional[object] = None,
    overwrite: bool = False,
    lazy_execution: bool = False,
    n_threads: int = 1,
) -> Table:
    """
    Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.

    Parameters:
        tables: A dictionary mapping domains to Table objects.
        con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
        overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
        lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
        n_threads: Max number of Node's to execute simultaneously when this node has multiple children.

    Returns:
        Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
    """
    # Handle None tables
    if tables is None:
        tables = {}

    # Use multithreaded execution if we have multiple children and n_threads > 1
    if len(self.children) > 1 and n_threads > 1:
        return self._execute_multithreaded(
            tables, con, overwrite, lazy_execution, n_threads
        )
    else:
        return self._execute_sequential(tables, con, overwrite, lazy_execution)

get_codelists()

Get all codelists used in the phenotype definition, including all children / dependent phenotypes.

Returns:

Name Type Description
codeslist List[Codelist]

A list of codelists used in the cohort definition.

Source code in phenex/phenotypes/codelist_phenotype.py
def get_codelists(self) -> List[Codelist]:
    """
    Get all codelists used in the phenotype definition, including all children / dependent phenotypes.

    Returns:
        codeslist: A list of codelists used in the cohort definition.
    """
    codelists = [self.codelist]
    for p in self.children:
        codelists.extend(p.get_codelists())
    return codelists

visualize_dependencies()

Create a text visualization of the dependency graph for this node and its dependencies.

Returns:

Name Type Description
str str

A text representation of the dependency graph

Source code in phenex/node.py
def visualize_dependencies(self) -> str:
    """
    Create a text visualization of the dependency graph for this node and its dependencies.

    Returns:
        str: A text representation of the dependency graph
    """
    lines = [f"Dependencies for Node '{self.name}':"]

    # Get all dependencies
    all_deps = self.dependencies
    nodes = {node.name: node for node in all_deps}
    nodes[self.name] = self  # Add self to the nodes

    # Build dependency graph
    dependency_graph = self._build_dependency_graph(nodes)

    for node_name in sorted(nodes.keys()):
        dependencies = dependency_graph.get(node_name, set())
        if dependencies:
            deps_str = ", ".join(sorted(dependencies))
            lines.append(f"  {node_name} depends on: {deps_str}")
        else:
            lines.append(f"  {node_name} (no dependencies)")

    return "\n".join(lines)