Skip to content

Study

Orchestrates the execution of multiple cohorts and aggregates their reports.

A Study manages the execution of one or more cohorts, automatically generating standardized reports (Waterfall, Table1) for each cohort and concatenating them into a single multi-sheet Excel file for easy comparison. Each execution creates a timestamped directory containing individual cohort outputs and a combined study results file.

Parameters:

Name Type Description Default
path str

Base directory where study outputs will be saved. A subdirectory with the study name will be created if it doesn't exist.

required
name str

Name of the study. Used for directory naming and identification.

required
cohorts List[Cohort]

List of Cohort objects to execute. Each cohort must have a unique name and an assigned database.

required
custom_reporters List[Reporter]

Additional reporters to run on each cohort. A Waterfall and Table1 reporter is always included by default.

None

Example:

    # will write to output path ./my_study
    # every time we execute, a new directory with date and time will be added
    # within it, a directory with each cohort's output is created and
    # a combined study_results.xlsx file with all reports concatenated
    study1 = Study(
        name = "my_study",
        path = "./",
        cohorts = [cohort1, cohort2],
    )
    study1.execute()

Source code in phenex/core/study.py
class Study:
    """
    Orchestrates the execution of multiple cohorts and aggregates their reports.

    A Study manages the execution of one or more cohorts, automatically generating standardized reports (Waterfall, Table1) for each cohort and concatenating them into a single multi-sheet Excel file for easy comparison. Each execution creates a timestamped directory containing individual cohort outputs and a combined study results file.

    Parameters:
        path: Base directory where study outputs will be saved. A subdirectory with the study name will be created if it doesn't exist.
        name: Name of the study. Used for directory naming and identification.
        cohorts: List of Cohort objects to execute. Each cohort must have a unique name and an assigned database.
        custom_reporters: Additional reporters to run on each cohort. A Waterfall and Table1 reporter is always included by default.

    Example:
    ```python
        # will write to output path ./my_study
        # every time we execute, a new directory with date and time will be added
        # within it, a directory with each cohort's output is created and
        # a combined study_results.xlsx file with all reports concatenated
        study1 = Study(
            name = "my_study",
            path = "./",
            cohorts = [cohort1, cohort2],
        )
        study1.execute()

    ```

    """

    def __init__(
        self,
        path: str,
        name: str,
        cohorts: List[Cohort],
        custom_reporters: List["Reporter"] = None,
    ):
        self.path = path
        self.name = name
        self.cohorts = cohorts
        self.custom_reporters = custom_reporters

        self._create_study_output_path()
        self._check_cohort_names_unique()
        self._check_cohorts_have_databases()

    def _create_study_output_path(self):
        # ensure that the output path directory is the name of the study
        if self.path.split(os.sep)[-1] != self.name:
            self.path = os.path.join(self.path, self.name)
        # ensure directory exists
        if not os.path.exists(self.path):
            os.makedirs(self.path)

    def _check_cohort_names_unique(self):
        all_names = [x.name for x in self.cohorts]
        unique_names = list(set(all_names))
        if len(all_names) != len(unique_names):
            raise ValueError(
                f"Ensure that cohort names are unique; found cohort names {sorted(all_names)}"
            )

    def _check_cohorts_have_databases(self):
        missing_database = []
        for cohort in self.cohorts:
            if cohort.database is None:
                missing_database.append(cohort)
        if len(missing_database) > 0:
            raise ValueError(
                f"Cohorts must have databases defined in order for use in a Study. Cohorts missing database : {[x.name for x in missing_database]}"
            )

    def execute(
        self,
        overwrite: Optional[bool] = False,
        n_threads: Optional[int] = 1,
        lazy_execution: Optional[bool] = False,
    ):
        path_exec_dir_study = self._prepare_study_execution_directory()
        self._freeze_software_versions(path_exec_dir_study)

        waterfall_reporter = Waterfall()
        self.custom_reporters = [waterfall_reporter] + (self.custom_reporters or [])

        for _cohort in self.cohorts:
            path_exec_dir_cohort = self._prepare_cohort_execution_directory(
                _cohort, path_exec_dir_study
            )
            self._save_serialized_cohort(_cohort, path_exec_dir_cohort)

            _cohort.execute(
                overwrite=overwrite, lazy_execution=lazy_execution, n_threads=n_threads
            )

            path_table = os.path.join(path_exec_dir_cohort, "table1.xlsx")
            _cohort.table1.to_excel(path_table)

            if self.custom_reporters is not None:
                for reporter in self.custom_reporters:
                    reporter.execute(_cohort)
                    report_filename = reporter.__class__.__name__
                    print("executing reporter", report_filename, reporter.df)
                    reporter.to_excel(
                        os.path.join(path_exec_dir_cohort, report_filename + ".xlsx")
                    )

        self._concatenate_reports(path_exec_dir_study)

    def _prepare_study_execution_directory(self):
        now = datetime.datetime.today()
        dirname = now.strftime("D%Y-%m-%d__T%H-%M")
        path = os.path.join(self.path, dirname)
        print(path)
        if os.path.exists(path):
            logger.warning(f"Output directory {path} already exists!")
        else:
            logger.info(f"Creating output directory for study execution : {path}")
            os.makedirs(path)
        return path

    def _freeze_software_versions(self, path_exec_dir_study):
        """Store Python and PhenEx versions in info.txt file for reproducibility."""
        info_path = os.path.join(path_exec_dir_study, "info.txt")

        # Get Python version
        python_version = sys.version

        # Get PhenEx version
        try:
            phenex_version = version("phenex")
        except PackageNotFoundError:
            phenex_version = "unknown (package not installed)"

        # Write to file
        with open(info_path, "w") as f:
            f.write("Software Environment Information\n")
            f.write("=" * 50 + "\n\n")
            f.write(
                f"Study Execution Date: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
            )
            f.write(f"Python Version:\n{python_version}\n\n")
            f.write(f"PhenEx Version: {phenex_version}\n")

        logger.info(f"Software version information saved to {info_path}")

    def _prepare_cohort_execution_directory(self, cohort, path_exec_dir_study):
        _path = os.path.join(path_exec_dir_study, cohort.name)
        if not os.path.exists(_path):
            os.makedirs(_path)
        return _path

    def _save_serialized_cohort(self, cohort, path_exec_dir_cohort):
        _path = os.path.join(path_exec_dir_cohort, cohort.name + ".json")
        with open(_path, "w") as f:
            dump(cohort, f, indent=4)

    def _concatenate_reports(self, path_exec_dir_study):
        """Concatenate all cohort reports into a single Excel file."""
        concatenator = OutputConcatenator(path_exec_dir_study)
        concatenator.concatenate_all_reports()