Bases: Phenotype
As the name implies, TimeRangePhenotype is designed for working with time ranges. If the input data has a start and an end date, use TimeRangePhenotype to identify other events (or patients) that occur within this time range. The most common use case of this is working with 'health insurance coverage' data i.e. on 'OBSERVATION_PERIOD' table. These tables have one or many rows per patient with the start of coverage and end of coverage i.e. domains compatible with TimeRangePhenotype require a START_DATE and an END_DATE column. At it's simplest, TimeRangePhenotype identifies patients who have their INDEX_DATE (or other anchor date of interest) within this time range. Additionally, a minimum or maximum number of days from the anchor date to the beginning/end of the time range can be defined. The returned Phenotype has the following interpretation:
DATE: If relative_time_range.when='before', then DATE is the beginning of the coverage period containing the anchor phenotype. If relative_time_range.when='after', then DATE is the end of the coverage period containing the anchor date.
VALUE: Coverage (in days) relative to the anchor date. By convention, always non-negative.
There are two primary use cases for TimeRangePhenotype
- Identify patients with some minimum duration of coverage prior to anchor_phenotype date e.g. "identify patients with 1 year of continuous coverage prior to index date"
- Determine the date of loss to followup (right censoring) i.e. the duration of coverage after the anchor_phenotype event
Data for TimeRangePhenotype
This phenotype requires a table with PersonID and a coverage start date and end date. Depending on the datasource used, this information is a separate ObservationPeriod table or found in the PersonTable. Use an PhenexObservationPeriodTable to map required coverage start and end date columns. For tables with overlapping time ranges, use the CombineOverlappingPeriods derived table to combine time ranges into a single time range.
| PersonID |
startDate |
endDate |
| 1 |
2009-01-01 |
2010-01-01 |
| 2 |
2008-01-01 |
2010-01-02 |
One assumption that is made by TimeRangePhenotype is that there are NO overlapping coverage periods.
Parameters:
| Name |
Type |
Description |
Default |
name
|
Optional[str]
|
The name of the phenotype.
|
'TIME_RANGE'
|
domain
|
Optional[str]
|
The domain of the phenotype. Default is 'observation_period'.
|
'OBSERVATION_PERIOD'
|
date_range
|
Optional[DateFilter]
|
A DateFilter to apply. min_date clips START_DATE (periods starting before min_date are trimmed to min_date); max_date clips END_DATE (periods ending after max_date are trimmed to max_date). Periods entirely outside the range are excluded. VALUE is then computed on the clipped period.
|
None
|
relative_time_range
|
Optional[RelativeTimeRangeFilter]
|
Filter returned persons based on the duration of coverage in days. The relative_time_range.anchor_phenotype defines the reference date with respect to calculate coverage. In typical applications, the anchor phenotype will be the entry criterion. The relative_time_range.when 'before', 'after'. If before, the return date is the start of the coverage period containing the anchor_phenotype. If after, the return date is the end of the coverage period containing the anchor_phenotype.
|
None
|
allow_null_end_date
|
bool
|
TimeRangePhenotype checks that anchor date is within the time range of interest. This requires that the start date is not null, and the end date is either null or after the anchor date. If you want to require that the end date is not null, set allow_null_end_date to False.
|
True
|
Example:
# make sure to create an entry phenotype, for example 'atrial fibrillation diagnosis'
entry_phenotype = CodelistPhenotype(...)
# one year continuous coverage prior to index
one_year_coverage = TimeRangePhenotype(
relative_time_range = RelativeTimeRangeFilter(
min_days=GreaterThanOrEqualTo(365),
anchor_phenotype = entry_phenotype,
when = 'before',
),
)
# determine the date of loss to followup
loss_to_followup = TimeRangePhenotype(
relative_time_range = RelativeTimeRangeFilter(
anchor_phenotype = entry_phenotype
when = 'after',
)
)
# determine the date when a drug was discontinued
drug_discontinuation = TimeRangePhenotype(
relative_time_range = RelativeTimeRangeFilter(
anchor_phenotype = entry_phenotype
when = 'after',
)
)
Source code in phenex/phenotypes/time_range_phenotype.py
| class TimeRangePhenotype(Phenotype):
"""
As the name implies, TimeRangePhenotype is designed for working with time ranges. If the input data has a start and an end date, use TimeRangePhenotype to identify other events (or patients) that occur within this time range. The most common use case of this is working with 'health insurance coverage' data i.e. on 'OBSERVATION_PERIOD' table. These tables have one or many rows per patient with the start of coverage and end of coverage i.e. domains compatible with TimeRangePhenotype require a START_DATE and an END_DATE column. At it's simplest, TimeRangePhenotype identifies patients who have their INDEX_DATE (or other anchor date of interest) within this time range. Additionally, a minimum or maximum number of days from the anchor date to the beginning/end of the time range can be defined. The returned Phenotype has the following interpretation:
DATE: If relative_time_range.when='before', then DATE is the beginning of the coverage period containing the anchor phenotype. If relative_time_range.when='after', then DATE is the end of the coverage period containing the anchor date.
VALUE: Coverage (in days) relative to the anchor date. By convention, always non-negative.
There are two primary use cases for TimeRangePhenotype:
1. Identify patients with some minimum duration of coverage prior to anchor_phenotype date e.g. "identify patients with 1 year of continuous coverage prior to index date"
2. Determine the date of loss to followup (right censoring) i.e. the duration of coverage after the anchor_phenotype event
## Data for TimeRangePhenotype
This phenotype requires a table with PersonID and a coverage start date and end date. Depending on the datasource used, this information is a separate ObservationPeriod table or found in the PersonTable. Use an PhenexObservationPeriodTable to map required coverage start and end date columns. For tables with overlapping time ranges, use the CombineOverlappingPeriods derived table to combine time ranges into a single time range.
| PersonID | startDate | endDate |
|-------------|----------------------|--------------------|
| 1 | 2009-01-01 | 2010-01-01 |
| 2 | 2008-01-01 | 2010-01-02 |
One assumption that is made by TimeRangePhenotype is that there are **NO overlapping coverage periods**.
Parameters:
name: The name of the phenotype.
domain: The domain of the phenotype. Default is 'observation_period'.
date_range: A DateFilter to apply. min_date clips START_DATE (periods starting before min_date are trimmed to min_date); max_date clips END_DATE (periods ending after max_date are trimmed to max_date). Periods entirely outside the range are excluded. VALUE is then computed on the clipped period.
relative_time_range: Filter returned persons based on the duration of coverage in days. The relative_time_range.anchor_phenotype defines the reference date with respect to calculate coverage. In typical applications, the anchor phenotype will be the entry criterion. The relative_time_range.when 'before', 'after'. If before, the return date is the start of the coverage period containing the anchor_phenotype. If after, the return date is the end of the coverage period containing the anchor_phenotype.
allow_null_end_date: TimeRangePhenotype checks that anchor date is within the time range of interest. This requires that the start date is not null, and the end date is either null or after the anchor date. If you want to require that the end date is not null, set allow_null_end_date to False.
Example:
```python
# make sure to create an entry phenotype, for example 'atrial fibrillation diagnosis'
entry_phenotype = CodelistPhenotype(...)
# one year continuous coverage prior to index
one_year_coverage = TimeRangePhenotype(
relative_time_range = RelativeTimeRangeFilter(
min_days=GreaterThanOrEqualTo(365),
anchor_phenotype = entry_phenotype,
when = 'before',
),
)
# determine the date of loss to followup
loss_to_followup = TimeRangePhenotype(
relative_time_range = RelativeTimeRangeFilter(
anchor_phenotype = entry_phenotype
when = 'after',
)
)
# determine the date when a drug was discontinued
drug_discontinuation = TimeRangePhenotype(
relative_time_range = RelativeTimeRangeFilter(
anchor_phenotype = entry_phenotype
when = 'after',
)
)
```
"""
output_display_type = "value"
def __init__(
self,
name: Optional[str] = "TIME_RANGE",
domain: Optional[str] = "OBSERVATION_PERIOD",
date_range: Optional[DateFilter] = None,
relative_time_range: Optional["RelativeTimeRangeFilter"] = None,
allow_null_end_date: bool = True,
**kwargs
):
super(TimeRangePhenotype, self).__init__(name=name, **kwargs)
self.domain = domain
self.date_range = date_range
self.relative_time_range = relative_time_range
self.allow_null_end_date = allow_null_end_date
if self.relative_time_range is not None:
if self.relative_time_range.anchor_phenotype is not None:
self.add_children(self.relative_time_range.anchor_phenotype)
def _execute(self, tables: Dict[str, Table]) -> PhenotypeTable:
table = tables[self.domain]
table = self._perform_anchor_join(table)
table = self._perform_date_range_clipping(table)
table = self._perform_anchor_containment_filtering(table)
table = self._perform_value_date_assignment(table)
table = self._perform_coverage_duration_filtering(table)
return self._perform_final_processing(table)
def _perform_anchor_join(self, table):
"""Join the domain table with the anchor phenotype to get the reference date column."""
table, reference_column = attach_anchor_and_get_reference_date(
table, self.relative_time_range.anchor_phenotype
)
self._reference_column = reference_column
return table
def _perform_date_range_clipping(self, table):
"""Clip START_DATE to min_date and END_DATE to max_date, then exclude periods that fall entirely outside the range."""
if self.date_range is None:
return table
if self.date_range.min_value is not None:
min_date = self.date_range.min_value.value
table = table.mutate(
START_DATE=ibis.greatest(table.START_DATE, ibis.literal(min_date))
)
if self.date_range.max_value is not None:
max_date = self.date_range.max_value.value
table = table.mutate(
END_DATE=ibis.least(table.END_DATE, ibis.literal(max_date))
)
return table.filter(table.START_DATE <= table.END_DATE)
def _perform_anchor_containment_filtering(self, table):
"""Keep only periods that contain the anchor date. Null END_DATE is treated as ongoing if allow_null_end_date is True."""
ref = self._reference_column
if self.allow_null_end_date:
return table.filter(
(table.START_DATE <= ref)
& ((ref <= table.END_DATE) | table.END_DATE.isnull())
)
return table.filter((table.START_DATE <= ref) & (ref <= table.END_DATE))
def _perform_value_date_assignment(self, table):
"""Assign VALUE (coverage days) and EVENT_DATE based on the direction of the relative time range."""
ref = self._reference_column
if (
self.relative_time_range is None
or self.relative_time_range.when == "before"
):
value = ref.cast("date").delta(table.START_DATE.cast("date"), "day")
event_date = table.START_DATE
else:
value = table.END_DATE.cast("date").delta(ref.cast("date"), "day")
event_date = table.END_DATE
return table.mutate(VALUE=value, EVENT_DATE=event_date)
def _perform_coverage_duration_filtering(self, table):
"""Filter by min_days / max_days from the relative time range."""
if self.relative_time_range is None:
return table
value_filter = ValueFilter(
min_value=self.relative_time_range.min_days,
max_value=self.relative_time_range.max_days,
column_name="VALUE",
)
return value_filter.filter(table)
|
dependencies
property
Recursively collect all dependencies of a node (including dependencies of dependencies).
Returns:
| Type |
Description |
Set[Node]
|
List[Node]: A list of Node objects on which this Node depends.
|
dependency_graph
property
Build a dependency graph where each node maps to its direct dependencies (children).
Returns:
| Type |
Description |
Dict[Node, Set[Node]]
|
Dict[Node, Set[Node]: A mapping of Node's to their children Node's.
|
Retrieve the full execution metadata row for this node from the local DuckDB database.
Returns:
| Type |
Description |
|
|
pandas.DataFrame: A table containing NODE_NAME, NODE_HASH, NODE_PARAMS, EXECUTION_PARAMS, EXECUTION_START_TIME, EXECUTION_END_TIME, and EXECUTION_DURATION for execution of this node, or None if the node has never been executed.
|
namespaced_table
property
A PhenotypeTable has generic column names 'person_id', 'boolean', 'event_date', and 'value'. The namespaced_table prepends the phenotype name to all of these columns. This is useful when joining multiple phenotype tables together.
Returns:
| Name | Type |
Description |
table |
Table
|
The namespaced table for the current phenotype.
|
reverse_dependency_graph
property
Build a reverse dependency graph where each node maps to nodes that depend on it (parents).
Returns:
| Type |
Description |
Dict[Node, Set[Node]]
|
Dict[Node, List[Node]: A mapping of Node's to their parent Node's.
|
clear_cache(con=None, recursive=False)
Clear the cached state for this node, forcing re-execution on the next call to execute().
This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.
Parameters:
| Name |
Type |
Description |
Default |
con
|
Optional[object]
|
Database connector. If provided, clears only runs with matching execution context and drops the materialized table. If None, clears all runs for the node.
|
None
|
recursive
|
bool
|
If True, also clear the cache for all child nodes recursively. Defaults to False.
|
False
|
Example
# Clear all cached runs for a single node
my_node.clear_cache()
# Clear runs with specific execution context and drop materialized table
my_node.clear_cache(con=my_connector)
# Clear cache for node and all its dependencies
my_node.clear_cache(recursive=True)
Source code in phenex/node.py
| def clear_cache(self, con: Optional[object] = None, recursive: bool = False):
"""
Clear the cached state for this node, forcing re-execution on the next call to execute().
This method removes the node's hash from the node states table and optionally drops the materialized table from the database. After calling this method, the node will be treated as if it has never been executed before.
Parameters:
con: Database connector. If provided, clears only runs with matching execution context and drops the materialized table. If None, clears all runs for the node.
recursive: If True, also clear the cache for all child nodes recursively. Defaults to False.
Example:
```python
# Clear all cached runs for a single node
my_node.clear_cache()
# Clear runs with specific execution context and drop materialized table
my_node.clear_cache(con=my_connector)
# Clear cache for node and all its dependencies
my_node.clear_cache(recursive=True)
```
"""
# Delegate all logic to NodeManager
return Node._node_manager.clear_cache(self, con=con, recursive=recursive)
|
execute(tables=None, con=None, overwrite=False, lazy_execution=False, n_threads=1, table_name_prefix=None)
Executes the Node computation for the current node and its dependencies.
Lazy Execution
When lazy_execution=True, nodes are only recomputed if changes are detected. The system tracks:
1. Node definition changes: Detected by hashing the node's parameters (from to_dict()) and class name
2. Execution environment changes: Detected by tracking source/destination database configurations
A node will be rerun if either:
- The node's defining parameters have changed (different hash than last execution)
- The database connector's source or destination databases have changed
- The node has never been executed before
If no changes are detected, the node uses its cached result from the database instead of recomputing.
Requirements for lazy execution:
- A database connector (con) must be provided to store and retrieve cached results
- overwrite=True must be set to allow updating existing cached tables
State tracking is maintained in a local DuckDB database (__PHENEX_META__NODE_STATES table) that stores:
- Node hashes, parameters, and execution metadata
- Database connector configuration used during execution
- Execution timing information
Parameters:
| Name |
Type |
Description |
Default |
tables
|
Dict[str, Table]
|
A dictionary mapping domains to Table objects.
|
None
|
con
|
Optional[object]
|
Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector. Required for lazy_execution.
|
None
|
overwrite
|
bool
|
If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed. Must be True when using lazy_execution.
|
False
|
lazy_execution
|
bool
|
If True, only re-executes nodes when changes are detected in either the node definition or execution environment. Defaults to False. Requires con to be provided.
|
False
|
n_threads
|
int
|
Max number of Node's to execute simultaneously when this node has multiple children.
|
1
|
Returns:
| Name | Type |
Description |
Table |
Table
|
The resulting table for this node. Also accessible through self.table after calling self.execute().
|
Raises:
| Type |
Description |
ValueError
|
If lazy_execution=True but overwrite=False or con=None.
|
Source code in phenex/node.py
| def execute(
self,
tables: Dict[str, Table] = None,
con: Optional[object] = None,
overwrite: bool = False,
lazy_execution: bool = False,
n_threads: int = 1,
table_name_prefix: Optional[str] = None,
) -> Table:
"""
Executes the Node computation for the current node and its dependencies.
Lazy Execution:
When lazy_execution=True, nodes are only recomputed if changes are detected. The system tracks:
1. Node definition changes: Detected by hashing the node's parameters (from to_dict()) and class name
2. Execution environment changes: Detected by tracking source/destination database configurations
A node will be rerun if either:
- The node's defining parameters have changed (different hash than last execution)
- The database connector's source or destination databases have changed
- The node has never been executed before
If no changes are detected, the node uses its cached result from the database instead of recomputing.
Requirements for lazy execution:
- A database connector (con) must be provided to store and retrieve cached results
- overwrite=True must be set to allow updating existing cached tables
State tracking is maintained in a local DuckDB database (__PHENEX_META__NODE_STATES table) that stores:
- Node hashes, parameters, and execution metadata
- Database connector configuration used during execution
- Execution timing information
Parameters:
tables: A dictionary mapping domains to Table objects.
con: Connection to database for materializing outputs. If provided, outputs from the node and all children nodes will be materialized (written) to the database using the connector. Required for lazy_execution.
overwrite: If True, will overwrite any existing tables found in the database while writing. If False, will throw an error when an existing table is found. Has no effect if con is not passed. Must be True when using lazy_execution.
lazy_execution: If True, only re-executes nodes when changes are detected in either the node definition or execution environment. Defaults to False. Requires con to be provided.
n_threads: Max number of Node's to execute simultaneously when this node has multiple children.
Returns:
Table: The resulting table for this node. Also accessible through self.table after calling self.execute().
Raises:
ValueError: If lazy_execution=True but overwrite=False or con=None.
"""
if table_name_prefix:
table_name_prefix = re.sub(r"[^A-Za-z0-9_]", "_", table_name_prefix).upper()
# Handle None tables
if tables is None:
tables = {}
# Build dependency graph for all dependencies
all_deps = self.dependencies
nodes = {node.name: node for node in all_deps}
nodes[self.name] = self # Add self to the nodes
# Build dependency and reverse graphs
dependency_graph = self._build_dependency_graph(nodes)
reverse_graph = self._build_reverse_graph(dependency_graph)
# Track completion status and results
completed = set()
completion_lock = threading.Lock()
worker_exceptions = [] # Track exceptions from worker threads
stop_all_workers = (
threading.Event()
) # Signal to stop all workers on first error
# Track in-degree for scheduling
in_degree = {}
for node_name, dependencies in dependency_graph.items():
in_degree[node_name] = len(dependencies)
for node_name in nodes:
if node_name not in in_degree:
in_degree[node_name] = 0
# Queue for nodes ready to execute
ready_queue = queue.Queue()
# Add nodes with no dependencies to ready queue
for node_name, degree in in_degree.items():
if degree == 0:
ready_queue.put(node_name)
def _run_and_materialise(node, node_name):
"""Execute *node*, materialise the result, record timing, and update the run hash."""
db_name = (
f"{table_name_prefix}__{node_name}"
if table_name_prefix and not node_name.startswith(table_name_prefix)
else node_name
)
node.lastexecution_start_time = datetime.now()
table = node._execute(tables)
if table is not None:
con.create_table(table, db_name, overwrite=overwrite)
table = con.get_dest_table(db_name)
node.lastexecution_end_time = datetime.now()
node.lastexecution_duration = (
node.lastexecution_end_time - node.lastexecution_start_time
).total_seconds()
Node._node_manager.update_run_params(node, con)
return table
def worker():
"""Worker function for thread pool"""
while not stop_all_workers.is_set():
try:
node_name = ready_queue.get(timeout=1)
# timeout forces to wait 1 second to avoid busy waiting
if node_name is None: # Sentinel value to stop worker
break
except queue.Empty:
continue
try:
logger.info(
f"Thread {threading.current_thread().name}: executing node '{node_name}'"
)
node = nodes[node_name]
# Execute the node (without recursive child execution since we handle dependencies here)
if lazy_execution:
if not overwrite:
raise ValueError(
"lazy_execution only works with overwrite=True."
)
if con is None:
raise ValueError(
"A DatabaseConnector is required for lazy execution."
)
if Node._node_manager.should_rerun(node, con):
table = _run_and_materialise(node, node_name)
else:
db_name = (
f"{table_name_prefix}__{node_name}"
if table_name_prefix
and not node_name.startswith(table_name_prefix)
else node_name
)
try:
table = con.get_dest_table(db_name)
except Exception:
# Cached table was dropped or is inaccessible; recompute.
logger.warning(
f"Cached table for '{node_name}' not found at {db_name}; recomputing."
)
table = _run_and_materialise(node, node_name)
else:
# Time the execution
node.lastexecution_start_time = datetime.now()
table = node._execute(tables)
if (
con and table is not None
): # Only create table if _execute returns something
db_name = (
f"{table_name_prefix}__{node_name}"
if table_name_prefix
and not node_name.startswith(table_name_prefix)
else node_name
)
logger.info(
f"Thread {threading.current_thread().name}: materializing '{node_name}' to database ..."
)
_t_mat = datetime.now()
con.create_table(table, db_name, overwrite=overwrite)
logger.info(
f"Thread {threading.current_thread().name}: materialized '{node_name}' "
f"in {(datetime.now() - _t_mat).total_seconds():.3f}s"
)
table = con.get_dest_table(db_name)
node.lastexecution_end_time = datetime.now()
node.lastexecution_duration = (
node.lastexecution_end_time - node.lastexecution_start_time
).total_seconds()
node.table = table
with completion_lock:
completed.add(node_name)
# Update in-degree for dependent nodes and add ready ones to queue
for dependent in reverse_graph.get(node_name, set()):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
# Check if all dependencies are completed
deps_completed = all(
dep in completed
for dep in dependency_graph.get(dependent, set())
)
if deps_completed:
ready_queue.put(dependent)
# Log completion with timing info
if node.lastexecution_duration is not None:
logger.info(
f"Thread {threading.current_thread().name}: completed node '{node_name}' "
f"in {node.lastexecution_duration:.3f} seconds"
)
else:
logger.info(
f"Thread {threading.current_thread().name}: completed node '{node_name}' (cached)"
)
except Exception as e:
logger.error(f"Error executing node '{node_name}': {str(e)}")
with completion_lock:
# Store exception for main thread
worker_exceptions.append(e)
# Signal all workers to stop immediately and exit worker loop
stop_all_workers.set()
break
finally:
ready_queue.task_done()
# Start worker threads
threads = []
for i in range(min(n_threads, len(nodes))):
thread = threading.Thread(target=worker, name=f"PhenexWorker-{i}")
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for all nodes to complete or for an error to occur
_last_heartbeat = datetime.now()
while (
len(completed) < len(nodes)
and not worker_exceptions
and not stop_all_workers.is_set()
):
threading.Event().wait(0.1) # Small delay to prevent busy waiting
_now = datetime.now()
if (_now - _last_heartbeat).total_seconds() >= 30:
_pending = sorted(set(nodes.keys()) - completed)
logger.info(
f"Node '{self.name}': still waiting for {len(_pending)} nodes: {_pending}"
)
_last_heartbeat = _now
if not stop_all_workers.is_set():
# Time to stop workers and cleanup
stop_all_workers.set()
# Check if any worker thread had an exception
if worker_exceptions:
# Signal workers to stop
for _ in threads:
ready_queue.put(None)
# Wait for threads to finish
for thread in threads:
thread.join(timeout=1)
# Re-raise the first exception
raise worker_exceptions[0]
# Signal workers to stop and wait for them
for _ in threads:
ready_queue.put(None) # Sentinel value to stop workers
for thread in threads:
thread.join(timeout=1)
logger.info(
f"Node '{self.name}': completed multithreaded execution of {len(nodes)} nodes"
)
return self.table
|
visualize_dependencies()
Create a text visualization of the dependency graph for this node and its dependencies.
Returns:
| Name | Type |
Description |
str |
str
|
A text representation of the dependency graph
|
Source code in phenex/node.py
| def visualize_dependencies(self) -> str:
"""
Create a text visualization of the dependency graph for this node and its dependencies.
Returns:
str: A text representation of the dependency graph
"""
lines = [f"Dependencies for Node '{self.name}':"]
# Get all dependencies
all_deps = self.dependencies
nodes = {node.name: node for node in all_deps}
nodes[self.name] = self # Add self to the nodes
# Build dependency graph
dependency_graph = self._build_dependency_graph(nodes)
for node_name in sorted(nodes.keys()):
dependencies = dependency_graph.get(node_name, set())
if dependencies:
deps_str = ", ".join(sorted(dependencies))
lines.append(f" {node_name} depends on: {deps_str}")
else:
lines.append(f" {node_name} (no dependencies)")
return "\n".join(lines)
|