Bases: Node
EventsToTimeRange converts individual code events into time ranges with start and end dates.
This derived table takes a codelist (e.g. medication prescriptions) and creates a
time range for each event. The start date is the event date, and the end date is calculated
by adding a specified number of days to the start date. Adjacent or overlapping periods are
combined into single continuous periods.
This is particularly useful for identifying medication discontinuation when only prescription
dates (not durations) are available. For example, discontinuation may be defined as a gap of
≥180 days between prescriptions.
Parameters:
| Name |
Type |
Description |
Default |
domain
|
str
|
The source domain containing event data.
|
required
|
codelist
|
Codelist
|
The codelist used to filter events.
|
required
|
max_days
|
Value
|
A Value filter (like LessThan or LessThanOrEqualTo) specifying the number of days
to add to each event date to create the end date.
|
required
|
name
|
|
Optional name for the derived table.
|
required
|
Attributes:
| Name |
Type |
Description |
domain |
|
The domain of events to process.
|
codelist |
|
The codelist used for filtering events.
|
max_days |
|
The number of days to add to each event date.
|
Examples:
Identifying medication discontinuation
from phenex.derived_tables import EventsToTimeRange
from phenex.phenotypes import TimeRangePhenotype
from phenex.codelists import Codelist
from phenex.filters.value import LessThanOrEqualTo
from phenex.filters import RelativeTimeRangeFilter
# Create a derived table for medication coverage periods
et_codelist = Codelist(["RX12345", "RX12346"])
derived_table = EventsToTimeRange(
name = 'ET_USAGE',
domain = 'DRUG_EXPOSURE',
codelist = et_codelist,
max_days = LessThanOrEqualTo(180)
)
# Return the persons that discontinue post index
# EVENT_DATE column will be the date of discontinuation
# VALUE will be the number of days from index to discontinuation date
pt_et_discontinuation = TimeRangePhenotype(
domain = 'ET_USAGE',
relative_time_range = RelativeTimeRangeFilter(
when = 'after',
)
)
# Execute the derived table with your data
et_periods = derived_table.execute(tables)
Source code in phenex/derived_tables/events_to_time_range.py
| class EventsToTimeRange(Node):
"""
EventsToTimeRange converts individual code events into time ranges with start and end dates.
This derived table takes a codelist (e.g. medication prescriptions) and creates a
time range for each event. The start date is the event date, and the end date is calculated
by adding a specified number of days to the start date. Adjacent or overlapping periods are
combined into single continuous periods.
This is particularly useful for identifying medication discontinuation when only prescription
dates (not durations) are available. For example, discontinuation may be defined as a gap of
≥180 days between prescriptions.
Parameters:
domain: The source domain containing event data.
codelist: The codelist used to filter events.
max_days: A Value filter (like LessThan or LessThanOrEqualTo) specifying the number of days
to add to each event date to create the end date.
name: Optional name for the derived table.
Attributes:
domain: The domain of events to process.
codelist: The codelist used for filtering events.
max_days: The number of days to add to each event date.
Examples:
Example: Identifying medication discontinuation
```python
from phenex.derived_tables import EventsToTimeRange
from phenex.phenotypes import TimeRangePhenotype
from phenex.codelists import Codelist
from phenex.filters.value import LessThanOrEqualTo
from phenex.filters import RelativeTimeRangeFilter
# Create a derived table for medication coverage periods
et_codelist = Codelist(["RX12345", "RX12346"])
derived_table = EventsToTimeRange(
name = 'ET_USAGE',
domain = 'DRUG_EXPOSURE',
codelist = et_codelist,
max_days = LessThanOrEqualTo(180)
)
# Return the persons that discontinue post index
# EVENT_DATE column will be the date of discontinuation
# VALUE will be the number of days from index to discontinuation date
pt_et_discontinuation = TimeRangePhenotype(
domain = 'ET_USAGE',
relative_time_range = RelativeTimeRangeFilter(
when = 'after',
)
)
# Execute the derived table with your data
et_periods = derived_table.execute(tables)
```
"""
def __init__(self, domain: str, codelist: "Codelist", max_days: "Value", **kwargs):
self.domain = domain
self.codelist_filter = CodelistFilter(codelist)
self.codelist = codelist
assert max_days.operator in [
"<",
"<=",
], f"max_days operator must be < or <=, not {max_days.operator}"
self.max_days = max_days
super(EventsToTimeRange, self).__init__(**kwargs)
def _execute(
self,
tables: Dict[str, Table],
) -> "Table":
table = tables[self.domain]
table = self._perform_codelist_filtering(table)
table = self._create_start_end_date_table(table)
table = self._combine_overlapping_periods(table)
return table
def _perform_codelist_filtering(self, table):
"""
Filter source table to codelist events of interest i.e. drug x events only
Returns:
Source DataFrame with all original columns:
"""
assert is_phenex_code_table(table)
table = self.codelist_filter.filter(table)
return table
def _create_start_end_date_table(self, table):
"""
Create start and end date columns for the events. Start date is the codelist event, end date is the start date plus max_days.
Returns:
Table with three columns
PERSON_ID
START_DATE : the codelist EVENT_DATE
END_DATE : START_DATE + max_days
"""
table = table.select("PERSON_ID", "EVENT_DATE")
table = table.mutate(START_DATE=table.EVENT_DATE)
if self.max_days.operator == "<":
days_to_add = self.max_days.value - 1
table = table.mutate(
END_DATE=table.START_DATE + ibis.interval(days=days_to_add)
)
else:
days_to_add = self.max_days.value
table = table.mutate(
END_DATE=table.START_DATE + ibis.interval(days=days_to_add)
)
return table
def _combine_overlapping_periods(self, table):
"""
Combine all overlapping and consecutive periods
Returns:
Table with three columns with consecutive and overlapping periods combined into single time ranges
PERSON_ID
START_DATE : the codelist EVENT_DATE
END_DATE : START_DATE + max_days
"""
cop = CombineOverlappingPeriods(domain="_")
table = cop.execute(tables={"_": table})
return table
|
dependencies
property
Recursively collect all dependencies of a node (including dependencies of dependencies).
Returns:
| Type |
Description |
Set[Node]
|
List[Node]: A list of Node objects on which this Node depends.
|
dependency_graph
property
Build a dependency graph where each node maps to its direct dependencies (children).
Returns:
| Type |
Description |
Dict[Node, Set[Node]]
|
Dict[Node, Set[Node]: A mapping of Node's to their children Node's.
|
Retrieve the full execution metadata row for this node from the local DuckDB database.
Returns:
| Type |
Description |
|
|
pandas.Series: A series containing NODE_NAME, LAST_HASH, NODE_PARAMS, and LAST_EXECUTED
for this node, or None if the node has never been executed.
|
reverse_dependency_graph
property
Build a reverse dependency graph where each node maps to nodes that depend on it (parents).
Returns:
| Type |
Description |
Dict[Node, Set[Node]]
|
Dict[Node, List[Node]: A mapping of Node's to their parent Node's.
|
clear_cache(con=None, recursive=False)
Clear the cached state for this node, forcing re-execution on the next call to execute().
This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.
Parameters:
| Name |
Type |
Description |
Default |
con
|
Optional[object]
|
Database connector. If provided, will also drop the materialized table from the database.
|
None
|
recursive
|
bool
|
If True, also clear the cache for all child nodes recursively. Defaults to False.
|
False
|
Example
# Clear cache for a single node
my_node.clear_cache()
# Clear cache and drop materialized table
my_node.clear_cache(con=my_connector)
# Clear cache for node and all its dependencies
my_node.clear_cache(recursive=True)
Source code in phenex/node.py
| def clear_cache(self, con: Optional[object] = None, recursive: bool = False):
"""
Clear the cached state for this node, forcing re-execution on the next call to execute().
This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.
Parameters:
con: Database connector. If provided, will also drop the materialized table from the database.
recursive: If True, also clear the cache for all child nodes recursively. Defaults to False.
Example:
```python
# Clear cache for a single node
my_node.clear_cache()
# Clear cache and drop materialized table
my_node.clear_cache(con=my_connector)
# Clear cache for node and all its dependencies
my_node.clear_cache(recursive=True)
```
"""
logger.info(f"Node '{self.name}': clearing cached state...")
# Clear the hash from the node states table
with Node._hash_update_lock:
duckdb_con = DuckDBConnector(DUCKDB_DEST_DATABASE=NODE_STATES_DB_NAME)
if NODE_STATES_TABLE_NAME in duckdb_con.dest_connection.list_tables():
table = duckdb_con.get_dest_table(NODE_STATES_TABLE_NAME).to_pandas()
# Remove this node's entry
table = table[table.NODE_NAME != self.name]
# Update the table
if len(table) > 0:
updated_table = ibis.memtable(table)
duckdb_con.create_table(
updated_table, name_table=NODE_STATES_TABLE_NAME, overwrite=True
)
else:
# Drop the table if it's empty
duckdb_con.dest_connection.drop_table(NODE_STATES_TABLE_NAME)
# Drop materialized table if connector is provided
if con is not None:
try:
if self.name in con.dest_connection.list_tables():
logger.info(f"Node '{self.name}': dropping materialized table...")
con.dest_connection.drop_table(self.name)
except Exception as e:
logger.warning(
f"Node '{self.name}': failed to drop materialized table: {e}"
)
# Reset the table attribute
self.table = None
# Recursively clear children if requested
if recursive:
for child in self.children:
child.clear_cache(con=con, recursive=recursive)
logger.info(f"Node '{self.name}': cache cleared successfully.")
|
execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1)
Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.
Parameters:
| Name |
Type |
Description |
Default |
tables
|
Dict[str, Table]
|
A dictionary mapping domains to Table objects.
|
None
|
con
|
Optional[object]
|
Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
|
None
|
overwrite
|
bool
|
If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
|
False
|
lazy_execution
|
bool
|
If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
|
False
|
n_threads
|
int
|
Max number of Node's to execute simultaneously when this node has multiple children.
|
1
|
Returns:
| Name | Type |
Description |
Table |
Table
|
The resulting table for this node. Also accessible through self.table after calling self.execute().
|
Source code in phenex/node.py
| def execute(
self,
tables: Dict[str, Table] = None,
con: Optional[object] = None,
overwrite: bool = False,
lazy_execution: bool = False,
n_threads: int = 1,
) -> Table:
"""
Executes the Node computation for the current node and its dependencies. Supports lazy execution using hash-based change detection to avoid recomputing Node's that have already executed.
Parameters:
tables: A dictionary mapping domains to Table objects.
con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector.
overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed.
lazy_execution: If True, only re-executes if the node's definition has changed. Defaults to False. You should pass overwrite=True with lazy_execution as lazy_execution is intended precisely for iterative updates to a node definition. You must pass a connector (to cache results) for lazy_execution to work.
n_threads: Max number of Node's to execute simultaneously when this node has multiple children.
Returns:
Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
"""
# Handle None tables
if tables is None:
tables = {}
# Build dependency graph for all dependencies
all_deps = self.dependencies
nodes = {node.name: node for node in all_deps}
nodes[self.name] = self # Add self to the nodes
# Build dependency and reverse graphs
dependency_graph = self._build_dependency_graph(nodes)
reverse_graph = self._build_reverse_graph(dependency_graph)
# Track completion status and results
completed = set()
completion_lock = threading.Lock()
worker_exceptions = [] # Track exceptions from worker threads
stop_all_workers = (
threading.Event()
) # Signal to stop all workers on first error
# Track in-degree for scheduling
in_degree = {}
for node_name, dependencies in dependency_graph.items():
in_degree[node_name] = len(dependencies)
for node_name in nodes:
if node_name not in in_degree:
in_degree[node_name] = 0
# Queue for nodes ready to execute
ready_queue = queue.Queue()
# Add nodes with no dependencies to ready queue
for node_name, degree in in_degree.items():
if degree == 0:
ready_queue.put(node_name)
def worker():
"""Worker function for thread pool"""
while not stop_all_workers.is_set():
try:
node_name = ready_queue.get(timeout=1)
# timeout forces to wait 1 second to avoid busy waiting
if node_name is None: # Sentinel value to stop worker
break
except queue.Empty:
continue
try:
logger.info(
f"Thread {threading.current_thread().name}: executing node '{node_name}'"
)
node = nodes[node_name]
# Execute the node (without recursive child execution since we handle dependencies here)
if lazy_execution:
if not overwrite:
raise ValueError(
"lazy_execution only works with overwrite=True."
)
if con is None:
raise ValueError(
"A DatabaseConnector is required for lazy execution."
)
if node._get_current_hash() != node._get_last_hash():
logger.info(f"Node '{node_name}': computing...")
table = node._execute(tables)
if (
table is not None
): # Only create table if _execute returns something
con.create_table(table, node_name, overwrite=overwrite)
table = con.get_dest_table(node_name)
node._update_current_hash()
else:
logger.info(
f"Node '{node_name}': unchanged, using cached result"
)
table = con.get_dest_table(node_name)
else:
table = node._execute(tables)
if (
con and table is not None
): # Only create table if _execute returns something
con.create_table(table, node_name, overwrite=overwrite)
table = con.get_dest_table(node_name)
node.table = table
with completion_lock:
completed.add(node_name)
# Update in-degree for dependent nodes and add ready ones to queue
for dependent in reverse_graph.get(node_name, set()):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
# Check if all dependencies are completed
deps_completed = all(
dep in completed
for dep in dependency_graph.get(dependent, set())
)
if deps_completed:
ready_queue.put(dependent)
logger.info(
f"Thread {threading.current_thread().name}: completed node '{node_name}'"
)
except Exception as e:
logger.error(f"Error executing node '{node_name}': {str(e)}")
with completion_lock:
# Store exception for main thread
worker_exceptions.append(e)
# Signal all workers to stop immediately and exit worker loop
stop_all_workers.set()
break
finally:
ready_queue.task_done()
# Start worker threads
threads = []
for i in range(min(n_threads, len(nodes))):
thread = threading.Thread(target=worker, name=f"PhenexWorker-{i}")
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for all nodes to complete or for an error to occur
while (
len(completed) < len(nodes)
and not worker_exceptions
and not stop_all_workers.is_set()
):
threading.Event().wait(0.1) # Small delay to prevent busy waiting
if not stop_all_workers.is_set():
# Time to stop workers and cleanup
stop_all_workers.set()
# Check if any worker thread had an exception
if worker_exceptions:
# Signal workers to stop
for _ in threads:
ready_queue.put(None)
# Wait for threads to finish
for thread in threads:
thread.join(timeout=1)
# Re-raise the first exception
raise worker_exceptions[0]
# Signal workers to stop and wait for them
for _ in threads:
ready_queue.put(None) # Sentinel value to stop workers
for thread in threads:
thread.join(timeout=1)
logger.info(
f"Node '{self.name}': completed multithreaded execution of {len(nodes)} nodes"
)
return self.table
|
to_dict()
Return a dictionary representation of the Node. The dictionary must contain all dependencies of the Node such that if anything in self.to_dict() changes, the Node must be recomputed.
Source code in phenex/node.py
| def to_dict(self):
"""
Return a dictionary representation of the Node. The dictionary must contain all dependencies of the Node such that if anything in self.to_dict() changes, the Node must be recomputed.
"""
return to_dict(self)
|
visualize_dependencies()
Create a text visualization of the dependency graph for this node and its dependencies.
Returns:
| Name | Type |
Description |
str |
str
|
A text representation of the dependency graph
|
Source code in phenex/node.py
| def visualize_dependencies(self) -> str:
"""
Create a text visualization of the dependency graph for this node and its dependencies.
Returns:
str: A text representation of the dependency graph
"""
lines = [f"Dependencies for Node '{self.name}':"]
# Get all dependencies
all_deps = self.dependencies
nodes = {node.name: node for node in all_deps}
nodes[self.name] = self # Add self to the nodes
# Build dependency graph
dependency_graph = self._build_dependency_graph(nodes)
for node_name in sorted(nodes.keys()):
dependencies = dependency_graph.get(node_name, set())
if dependencies:
deps_str = ", ".join(sorted(dependencies))
lines.append(f" {node_name} depends on: {deps_str}")
else:
lines.append(f" {node_name} (no dependencies)")
return "\n".join(lines)
|