Skip to content

Codebase Reference

This document provides an overview of the main components of the weather_data_retrieval package, detailing the primary modules and their functionalities.

weather_data_retrieval.runner

run

run(
    config,
    run_mode="interactive",
    verbose=True,
    logger=None,
)

Unified orchestration entry point for both interactive and automatic runs. Handles validation, logging, estimation, and download orchestration.

Returns: 0=success, 1=fatal error, 2=some downloads failed.

Parameters:

Name Type Description Default
config dict

Configuration dictionary with all required parameters.

required
run_mode str

Run mode, either 'interactive' or 'automatic', by default "interactive".

'interactive'
logger Logger

Pre-configured logger instance, by default None.

None

Returns:

Type Description
int

Exit code: 0=success, 1=fatal error, 2=some downloads failed.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/runner.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def run(
        config: dict,
        run_mode: str = "interactive",
        verbose: bool = True,
        logger=None
        ) -> int:
    """
    Unified orchestration entry point for both interactive and automatic runs.
    Handles validation, logging, estimation, and download orchestration.

    Returns: 0=success, 1=fatal error, 2=some downloads failed.

    Parameters
    ----------
    config : dict
        Configuration dictionary with all required parameters.
    run_mode : str, optional
        Run mode, either 'interactive' or 'automatic', by default "interactive".
    logger : logging.Logger, optional
        Pre-configured logger instance, by default None.

    Returns
    -------
    int
        Exit code: 0=success, 1=fatal error, 2=some downloads failed.
    """

    console_handler_active = (run_mode == "interactive") or verbose

    def echo_only_if_no_console_handler(force: bool = False) -> bool:
        # In automatic non-verbose mode, console handler is absent.
        # Return True to echo only selected messages (summary, warnings/errors/exceptions).
        return force and (not console_handler_active)

    save_base = config.get("save_dir")
    if save_base:
        save_path = resolve_under(data_dir(create=True), save_base)
    else:
        save_path = data_dir(create=True)

    if logger is None:
        logger = setup_logger(str(log_dir(create=True)), run_mode=run_mode, verbose=verbose)

    # Header
    log_msg("=" * 60, logger, echo_console=echo_only_if_no_console_handler(False))
    log_msg(f"Starting {run_mode.upper()} run at {datetime.now().isoformat()}",
            logger, echo_console=echo_only_if_no_console_handler(False))
    log_msg("=" * 60, logger, echo_console=echo_only_if_no_console_handler(False))

    try:
        # 1) Validate config
        validate_config(config, logger=logger, run_mode=run_mode)
        log_msg("Configuration validation successful.", logger, echo_console=echo_only_if_no_console_handler(False))

        # 2) Map config → session
        session = SessionState()
        ok, notes = map_config_to_session(config, session, logger=logger)
        for note in notes:
            log_msg(note, logger, echo_console=echo_only_if_no_console_handler(False))
        if not ok:
            log_msg("Config mapping reported blocking issues. Exiting.",
                    logger, level="error", echo_console=echo_only_if_no_console_handler(True))
            if "filename_base" in locals():
                create_final_log_file(session, filename_base, logger, delete_original=True, reattach_to_final=True)
            else:
                log_msg("Skipping final log file creation because filename_base was not defined (run failed early).", logger)
            return 1

        # 2b) Automatic mode cannot be case-by-case for existing file policy (already coerced by validate_config)
        # Nothing to do here; messages already logged.

        # 3) Short internet speed test (both modes)
        speed_mbps = internet_speedtest(test_urls=None, max_seconds=10, logger=logger, echo_console=echo_only_if_no_console_handler(False))
        log_msg(f"Detected speed: {speed_mbps:.1f} Mbps", logger, echo_console=echo_only_if_no_console_handler(False))

        dataset_short_name = session.get("dataset_short_name")
        if dataset_short_name == "era5-world":
            grid_res = 0.25
        elif dataset_short_name == "era5-land":
            grid_res = 0.1
        else:
            raise ValueError(f"Unknown dataset_short_name: {dataset_short_name}")
        # 4) Estimate size/time
        estimates = estimate_cds_download(
            variables=session.get("variables"),
            area=session.get("region_bounds"),
            start_date=session.get("start_date"),
            end_date=session.get("end_date"),
            observed_speed_mbps=speed_mbps,
            grid_resolution=grid_res,
        )

        # Adjust for parallelisation — scale total_time_sec only
        parallel_conf = session.get("parallel_settings")
        if parallel_conf and parallel_conf.get("enabled"):
            efficiency_factor = 0.60
            max_conc = max(1, int(parallel_conf["max_concurrent"]))
            estimates["total_time_sec"] = estimates["total_time_sec"] / (max_conc * efficiency_factor)
            log_msg(
                f"Adjusted total time for parallel downloads: {format_duration(estimates['total_time_sec'])}",
                logger, echo_console=echo_only_if_no_console_handler(False)
            )

        # 5) Filename + hash
        coord_str = format_coordinates_nwse(boundaries=session.get("region_bounds"))
        hash_str = generate_filename_hash(
            dataset_short_name=session.get("dataset_short_name"),
            variables=session.get("variables"),
            boundaries=session.get("region_bounds"),
        )
        filename_base = f"{session.get('dataset_short_name')}_{coord_str}_{hash_str}"

        # 6) Summary (ALWAYS printed in interactive/verbose; printed in non-verbose via echo)
        summary = build_download_summary(session, estimates, speed_mbps)
        log_msg(msg=summary, logger=logger, echo_console=echo_only_if_no_console_handler(force=True))
        log_msg(msg=f"Output base filename: {filename_base}", logger=logger, echo_console=echo_only_if_no_console_handler(True))

        # 7) Downloads
        successful, failed, skipped = [], [], []

        log_msg("\n" + "-" * 60 + "\n\n\nBeginning download process...", logger, echo_console=echo_only_if_no_console_handler(False))
        orchestrate_cds_downloads(
            session=session,
            filename_base=filename_base,
            successful_downloads=successful,
            failed_downloads=failed,
            skipped_downloads=skipped,
            logger=logger,
            echo_console=echo_only_if_no_console_handler(False),  # internal steps won’t echo in non-verbose
            allow_prompts=(run_mode == "interactive"),
        )

        # Final counts — treat as summary (echo in non-verbose)
        log_msg("-" * 60, logger, echo_console=echo_only_if_no_console_handler(True))
        log_msg("Download process completed.", logger, echo_console=echo_only_if_no_console_handler(True))
        log_msg(f"Successful : {len(successful)}", logger, echo_console=echo_only_if_no_console_handler(True))
        log_msg(f"Skipped    : {len(skipped)}", logger, echo_console=echo_only_if_no_console_handler(True))
        log_msg(f"Failed     : {len(failed)}", logger, echo_console=echo_only_if_no_console_handler(True))
        log_msg("-" * 60, logger, echo_console=echo_only_if_no_console_handler(True))

        if failed:
            log_msg("Some downloads failed. Review logs for details.",
                    logger, level="warning", echo_console=echo_only_if_no_console_handler(True))
            create_final_log_file(session, filename_base, logger, delete_original=True, reattach_to_final=True)
            log_msg(msg="\nProgram ended, goodbye.\n\n", logger=logger, echo_console=echo_only_if_no_console_handler(False))
            return 2
        create_final_log_file(session, filename_base, logger, delete_original=True, reattach_to_final=True)
        log_msg(msg="*"*60 + "\nProgram completed, thank you for using this tool. Goodbye!\n" + "*"*60 + "\n\n", logger=logger, echo_console=echo_only_if_no_console_handler(False))


    except Exception as e:
        # Always echo errors in non-verbose mode
        log_msg(f"Run failed with exception: {e}", logger, level="exception", echo_console=echo_only_if_no_console_handler(True))
        coord_str = format_coordinates_nwse(session.get("region_bounds"))
        hash_str = generate_filename_hash(
            dataset_short_name=session.get("dataset_short_name"),
            variables=session.get("variables"),
            boundaries=session.get("region_bounds"),
        )
        filename_base = f"{session.get('dataset_short_name')}_{coord_str}_{hash_str}"
        create_final_log_file(session, filename_base, logger, delete_original=True, reattach_to_final=True)
        log_msg("\nProgram ended, goodbye.\n\n", logger, echo_console=echo_only_if_no_console_handler(True))
        return 1

run_batch_from_config

run_batch_from_config(cfg_path, logger=None)

Run automatic batch from a config file.

Parameters:

Name Type Description Default
config dict

Configuration dictionary with all required parameters.

required
logger Logger

Pre-configured logger instance, by default None.

None

Returns:

Type Description
int

Exit code: 0=success, 1=fatal error, 2=some downloads failed.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/runner.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def run_batch_from_config(
        cfg_path: str,
        logger=None
        ) -> int:
    """
    Run automatic batch from a config file.

    Parameters
    ----------
    config : dict
        Configuration dictionary with all required parameters.
    logger : logging.Logger, optional
        Pre-configured logger instance, by default None.

    Returns
    -------
    int
        Exit code: 0=success, 1=fatal error, 2=some downloads failed.
    """
    config = load_config(cfg_path)
    return run(config, run_mode="automatic", verbose=False, logger=logger)

weather_data_retrieval.io.cli

parse_args

parse_args()

Parse command-line arguments.

Parameters:

Name Type Description Default
None
required

Returns:

Type Description
Namespace

Parsed arguments.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/cli.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def parse_args() -> argparse.Namespace:
    """
    Parse command-line arguments.

    Parameters
    ----------
    None

    Returns
    -------
    argparse.Namespace
        Parsed arguments.
    """
    p = argparse.ArgumentParser(
        description="ERA5/Open-Meteo downloader — interactive or batch via config file."
    )
    p.add_argument(
        "--config",
        nargs="?",
        default=None,
        help="Path to JSON config file. If provided, runs in non-interactive (automatic) mode."
    )
    p.add_argument(
        "--log-dir",
        default=None,
        help="Directory where logs will be written (file name is auto-generated)."
    )
    p.add_argument(
        "--verbose",
        action="store_true",
        help="Automatic mode only: also echo log messages to console and show a prompt-style transcript (no input)."
    )
    return p.parse_args()

run_prompt_wizard

run_prompt_wizard(session, logger=None)

Drives the interactive prompt flow (no config-source step). Returns True if all fields completed; False if user exits.

Parameters:

Name Type Description Default
session SessionState

The session state to populate.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
bool

True if completed; False if exited early.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/cli.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def run_prompt_wizard(
        session: SessionState,
        logger: logging.Logger = None,
    ) -> bool:
    """
    Drives the interactive prompt flow (no config-source step).
    Returns True if all fields completed; False if user exits.

    Parameters
    ----------
    session : SessionState
        The session state to populate.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    bool
        True if completed; False if exited early.
    """

    log_msg("\n\n" + "=" * 60, logger)
    log_msg("Welcome to the Weather Data Retrieval Prompt Wizard!\n" + "="*60, logger)
    log_msg("\nPlease follow the prompts to configure your data retrieval settings.\n", logger)
    log_msg("At any point, you may type:\n   'back' to return to the previous prompt.", logger)
    log_msg("   'exit' to quit the wizard\n   'Ctrl+C' to stop the program.\n", logger)
    log_msg("=" * 60 + "\n", logger)


    while True:
        key = session.first_unfilled_key()
        if key is None:
            return True

        if key == "data_provider":
            res = prompt_data_provider(session, logger=logger)
            if res == "__EXIT__": return False
            if res == "__BACK__": continue

        elif key == "dataset_short_name":
            provider = session.get("data_provider")
            res = prompt_dataset_short_name(session, provider, logger=logger)
            if res == "__EXIT__": return False
            if res == "__BACK__":
                session.unset("data_provider")
                continue

        # CDS-specific prompts
        elif session.get("data_provider") == "cds":
            if key == "api_url":
                res_url = prompt_cds_url(session, "https://cds.climate.copernicus.eu/api", logger=logger)
                if res_url == "__EXIT__": return False
                if res_url == "__BACK__":
                    session.unset("dataset_short_name")
                    continue

            elif key == "api_key":
                res_key = prompt_cds_api_key(session, logger=logger)
                if res_key == "__EXIT__": return False
                if res_key == "__BACK__":
                    session.unset("api_url")
                    continue

                client = validate_cds_api_key(session.get("api_url"), session.get("api_key"), logger=logger)
                if client is None:
                    log_msg("Authentication failed. Please re-enter your API details.\n", logger)
                    session.unset("api_key")
                    session.unset("api_url")
                    continue
                session.set("session_client", client)

        # Open-Meteo-specific prompts
        elif session.get("data_provider") == "open-meteo":
            raise NotImplementedError("Open-Meteo variable validation not yet implemented.")

        if key == "save_dir":
            raw_save_path = prompt_save_directory(session, default_save_dir, logger=logger)
            if raw_save_path in ("__EXIT__", "__BACK__"):
                if raw_save_path == "__BACK__":
                    session.unset("session_client")
                    session.unset("api_key")
                    continue
                return False

            # Normalize and resolve path safely
            resolved_path = resolve_under(data_dir(create=True), raw_save_path)
            session.set("save_dir", str(resolved_path))
            log_msg(f"Save directory resolved to: {resolved_path}", logger)
            continue


        elif key == "start_date":
            # This prompt sets BOTH start_date and end_date
            s, e = prompt_date_range(session, logger=logger)
            if s == "__EXIT__": return False
            if s == "__BACK__":
                session.unset("save_dir")
                continue

        elif key == "end_date":
            # Will already be filled by prompt_date_range; just skip
            continue

        elif key == "region_bounds":
            bounds = prompt_coordinates(session, logger=logger)
            if bounds == "__EXIT__": return False
            if bounds == "__BACK__":
                session.unset("start_date")
                session.unset("end_date")
                continue

        elif key == "variables":
            if session.get("dataset_short_name") == "era5-world":
                invalid_vars = invalid_era5_world_variables
            elif session.get("dataset_short_name") == "era5-land":
                invalid_vars = invalid_era5_land_variables
            else:
                raise ValueError("Unknown dataset for variable validation.")
            variables = prompt_variables(session, invalid_vars, logger=logger)
            if variables in ("__EXIT__", "__BACK__"):
                if variables == "__BACK__":
                    session.unset("region_bounds")
                    continue
                return False

        elif key == "existing_file_action":
            efa = prompt_skip_overwrite_files(session, logger=logger)
            if efa in ("__EXIT__", "__BACK__"):
                if efa == "__BACK__":
                    session.unset("variables")
                    continue
                return False

        elif key == "parallel_settings":
            ps = prompt_parallelisation_settings(session, logger=logger)
            if ps in ("__EXIT__", "__BACK__"):
                if ps == "__BACK__":
                    session.unset("existing_file_action")
                    continue
                return False

        elif key == "retry_settings":
            rs = prompt_retry_settings(session, logger=logger)
            if rs in ("__EXIT__", "__BACK__"):
                if rs == "__BACK__":
                    session.unset("parallel_settings")
                    continue
                return False

        elif key == "inputs_confirmed":
            log_msg("\n" + "*" * 60 + "\n" + "*" * 60 + "\n", logger)
            log_msg("\nPrompting wizard complete. Please review your selections:\n", logger)
            rs = prompt_continue_confirmation(session=session, logger=logger)
            if rs in ("__EXIT__", "__BACK__"):
                if rs == "__BACK__":
                    session.unset("parallel_settings")
                    continue
                return False
            session.set("inputs_confirmed", True)
            log_msg("\nSelections confirmed.", logger)

weather_data_retrieval.io.config_loader

load_and_validate_config

load_and_validate_config(
    path, *, logger=None, run_mode="automatic"
)

Load JSON config and validate it using the centralized validator. This lets the validator log coercions/warnings (e.g., case_by_case → skip_all).

Parameters:

Name Type Description Default
path str

Path to JSON config file.

required
logger Logger

Logger instance for validation messages, by default None.

None
run_mode str

Run mode, either 'interactive' or 'automatic', by default "automatic".

'automatic'

Returns:

Type Description
dict

Validated configuration dictionary.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/config_loader.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def load_and_validate_config(
        path: str,
        *,
        logger=None,
        run_mode: str = "automatic") -> dict:
    """
    Load JSON config and validate it using the centralized validator.
    This lets the validator log coercions/warnings (e.g., case_by_case → skip_all).

    Parameters
    ----------
    path : str
        Path to JSON config file.
    logger : logging.Logger, optional
        Logger instance for validation messages, by default None.
    run_mode : str, optional
        Run mode, either 'interactive' or 'automatic', by default "automatic".

    Returns
    -------
    dict
        Validated configuration dictionary.
    """
    cfg_path = _resolve_config_path(path)
    with open(cfg_path, "r", encoding="utf-8") as f:
        config = json.load(f)
    # Let validate_config perform normalization/clamping with logging context
    validate_config(config, logger=logger, run_mode=run_mode)
    return config

load_config

load_config(file_path)

Load configuration from a JSON requirements file (without validation).

Parameters:

Name Type Description Default
file_path str

Path to JSON config file.

required

Returns:

Type Description
dict

Configuration dictionary.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/config_loader.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def load_config(file_path: str) -> dict:
    """
    Load configuration from a JSON requirements file (without validation).

    Parameters
    ----------
    file_path : str
        Path to JSON config file.

    Returns
    -------
    dict
        Configuration dictionary.
    """
    # if not os.path.exists(file_path):
    #     raise FileNotFoundError(f"Configuration file not found: {file_path}")
    # with open(file_path, "r", encoding="utf-8") as f:
    #     return json.load(f)
    cfg_path = _resolve_config_path(file_path)
    with open(cfg_path, "r", encoding="utf-8") as f:
        return json.load(f)

weather_data_retrieval.io.prompts

read_input

read_input(prompt, *, logger=None)

Centralized input handler with built-in 'exit' and 'back' controls.

Parameters:

prompt : str The prompt to display to the user. logger : logging.Logger, optional Logger to log the prompt message.

Returns:

str The user input, or special command indicators.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def read_input(
        prompt: str,
        *,
        logger=None,
        ) -> str:
    """
    Centralized input handler with built-in 'exit' and 'back' controls.

    Parameters:
    ----------
    prompt : str
        The prompt to display to the user.
    logger : logging.Logger, optional
        Logger to log the prompt message.

    Returns:
    -------
    str
        The user input, or special command indicators.
    """
    # Log to file only (DEBUG won't be shown by the console handler at INFO)
    if logger:
        # Use level="debug" so it doesn't appear on console
        log_msg(prompt, logger, level="debug", echo_console=False)

    # Show the prompt exactly once on the console
    raw = input(prompt).strip()
    lower = raw.lower()

    if lower in ("exit", "quit"):
        return "__EXIT__"
    if lower == "back":
        return "__BACK__"

    return lower

say

say(text, *, logger=None)

Centralized output handler to log and print messages.

Parameters:

text : str The message to display. logger : logging.Logger, optional Logger to log the message.

Returns:

None

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def say(
        text: str,
        *,
        logger=None
        ) -> None:
    """
    Centralized output handler to log and print messages.

    Parameters:
    ----------
    text : str
        The message to display.
    logger : logging.Logger, optional
        Logger to log the message.

    Returns:
    -------
    None

    """
    # if logger:
    #     logger.info(text)
    log_msg(text, logger)

prompt_data_provider

prompt_data_provider(session, *, logger=None)

Prompt user for which data provider to use (CDS or Open-Meteo).

Parameters:

Name Type Description Default
session SessionState

Current session state to store selected data provider.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
str

Normalized provider name ("cds" or "open-meteo"), or special control token "BACK" or "EXIT".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def prompt_data_provider(
        session: SessionState,
        *,
        logger=None,
        ) -> str:
    """
    Prompt user for which data provider to use (CDS or Open-Meteo).

    Parameters
    ----------
    session : SessionState
        Current session state to store selected data provider.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    str
        Normalized provider name ("cds" or "open-meteo"),
        or special control token "__BACK__" or "__EXIT__".

    """

    say("-"*60 + "\nData Provider Selection:\n" + '-'*60, logger=logger)
    say("Available data providers:\n\t1. Copernicus Climate Data Store (CDS)\n\t2. Open-Meteo", logger=logger)

    while True:
        raw = read_input("\nPlease enter the data provider you would like to use (name or number): ",
                         logger=logger)

        if raw in ("__EXIT__", "__BACK__"):
            return raw

        data_provider = normalize_input(raw, "data_provider")

        if not validate_data_provider(data_provider):
            say("\nERROR: Invalid provider. Please enter '1' for CDS or '2' for Open-Meteo", logger=logger)
            continue
        if data_provider == "open-meteo":
            say("\nERROR: Open-Meteo support is not yet implemented. Please select CDS.", logger=logger)
            continue

        say(f"\nYou selected: [{data_provider.upper()}]\n", logger=logger)

        session.set("data_provider", data_provider)

        return data_provider

prompt_dataset_short_name

prompt_dataset_short_name(
    session, provider, *, logger=None
)

Prompt for dataset choice.

Parameters:

Name Type Description Default
session SessionState

Current session state to store selected dataset.

required
provider str

Data provider name.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
str: Normalized dataset name or 'exit' / 'back'.
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def prompt_dataset_short_name(
        session: SessionState,
        provider: str,
        *,
        logger=None,
        ) -> str:
    """
    Prompt for dataset choice.

    Parameters
    ----------
    session: SessionState
        Current session state to store selected dataset.
    provider : str
        Data provider name.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
        str: Normalized dataset name or 'exit' / 'back'.

    """
    if provider != "cds":
        say("\nCurrently only CDS datasets are supported.", logger=logger)
        return "__BACK__"

    say("-"*60 + "\nDataset Selection:\n" + '-'*60, logger=logger)
    say("Available CDS datasets:\n\t1. ERA5-Land\n\t2. ERA5-World", logger=logger)

    while True:
        raw = read_input("\nPlease enter the dataset you would like to use (name or number): ",
                         logger=logger)
        if raw in ("__EXIT__", "__BACK__"):
            return raw

        dataset_short_name = normalize_input(raw, "era5_dataset_short_name")
        if not validate_dataset_short_name(dataset_short_name, provider):
            say("\nERROR: Invalid or unsupported dataset. Try again.", logger=logger)
            continue

        # if dataset_short_name != "era5-world":
        #     say("\nERROR: Only ERA5-World dataset is implemented in this version. Please select ERA5-World.", logger=logger)
        #     continue

        say(f"\nYou selected: [{dataset_short_name.upper()}]\n", logger=logger)
        session.set("dataset_short_name", dataset_short_name)
        return dataset_short_name

prompt_cds_url

prompt_cds_url(
    session,
    api_url_default="https://cds.climate.copernicus.eu/api",
    *,
    logger=None
)

Prompt for CDS API URL.

Parameters:

Name Type Description Default
session SessionState

Current session state to store API URL.

required
api_url_default str

Default CDS API URL. https://cds.climate.copernicus.eu/api

'https://cds.climate.copernicus.eu/api'

Returns:

Type Description
str: CDS API URL or 'exit' / 'back'.
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def prompt_cds_url(
        session: SessionState,
        api_url_default: str = "https://cds.climate.copernicus.eu/api",
        *,
        logger=None,
        ) -> str:
    """
    Prompt for CDS API URL.

    Parameters
    ----------
    session : SessionState
        Current session state to store API URL.
    api_url_default : str
        Default CDS API URL. https://cds.climate.copernicus.eu/api

    Returns
    -------
        str: CDS API URL or 'exit' / 'back'.

    """
    say("-"*60 + "\nCDS API url:\n" + '-'*60, logger=logger)
    say(f"Default: {api_url_default}", logger=logger)
    while True:
        raw = read_input("\nEnter the CDS API url (or press Enter to keep default): ",
                         logger=logger)
        if raw in ("__EXIT__", "__BACK__"):
            return raw
        url = raw or api_url_default
        session.set("api_url", url)
        say(f"\nYou entered the url : {url}\n", logger=logger)
        return url

prompt_cds_api_key

prompt_cds_api_key(session, *, logger=None)

Prompt only for the CDS API key (hidden input).

Parameters:

Name Type Description Default
session SessionState

Current session state to store API key.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
str

CDS API key or 'exit' / 'back'.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def prompt_cds_api_key(
        session: SessionState,
        *,
        logger=None,
        ) -> str:
    """
    Prompt only for the CDS API key (hidden input).

    Parameters
    ----------
    session : SessionState
        Current session state to store API key.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    str
        CDS API key or 'exit' / 'back'.
    """
    say("-"*60 + "\nCDS API key:\n" + '-'*60, logger=logger)
    while True:
        key = getpass("\nEnter your CDS API key: ")
        # getpass cannot log input value; we just log the action
        low = key.lower()
        if low in ("exit", "quit"):
            return "__EXIT__"
        if low == "back":
            return "__BACK__"
        if not key:
            say("\nERROR: No API key entered. Please try again.", logger=logger)
            continue
        session.set("api_key", key)
        say(f"\nYou entered an API key of length {len(key)} characters.\n", logger=logger)
        return key

prompt_save_directory

prompt_save_directory(session, default_dir, *, logger=None)

Ask for save directory, create if necessary.

Parameters:

Name Type Description Default
session SessionState

Current session state to store save directory.

required
default_dir Path

Default directory to suggest.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
Path | str

Path to save directory, or control token "BACK" / "EXIT".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def prompt_save_directory(
        session: SessionState,
        default_dir: Path,
        *,
        logger=None,
    ) -> Path | str:

    """
    Ask for save directory, create if necessary.

    Parameters
    ----------
    session : SessionState
        Current session state to store save directory.
    default_dir : Path
        Default directory to suggest.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    Path | str
        Path to save directory, or control token "__BACK__" / "__EXIT__".

    """
    say("-"*60 + "\nData Save Directory Selection:\n" + '-'*60, logger=logger)
    say(f"Default: {default_dir}", logger=logger)
    while True:
        raw = read_input("\nEnter a path (or press Enter to use default): ", logger=logger)
        if raw in ("__EXIT__", "__BACK__"):
            return raw
        # resolve under repo_root/data if relative
        resolved_path = resolve_under(data_dir(create=True), raw or default_dir)
        if validate_directory(str(resolved_path)):
            say(f"\nYou set the save directory to: {resolved_path}\n", logger=logger)
            session.set("save_dir", resolved_path)
            return resolved_path
        say(f"ERROR: Directory [{resolved_path}] could not be created or accessed. Try another path.", logger=logger)

prompt_date_range

prompt_date_range(session, *, logger=None)

Ask user for start and end date, with validation. Accepts formats: YYYY-MM-DD or YYYY-MM - Start dates without day default to first day of month (YYYY-MM-01) - End dates without day default to last day of month (YYYY-MM-[last day])

Parameters:

Name Type Description Default
session SessionState

Current session state to store date range.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
tuple[str, str]

(start_date_str, end_date_str) in ISO format (YYYY-MM-DD), or ("EXIT", "EXIT") / ("BACK", "BACK")

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def prompt_date_range(
        session: SessionState,
        *,
        logger=None,
        ) -> tuple[str, str]:
    """
    Ask user for start and end date, with validation.
    Accepts formats: YYYY-MM-DD or YYYY-MM
    - Start dates without day default to first day of month (YYYY-MM-01)
    - End dates without day default to last day of month (YYYY-MM-[last day])

    Parameters
    ----------
    session : SessionState
        Current session state to store date range.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    tuple[str, str]
        (start_date_str, end_date_str) in ISO format (YYYY-MM-DD),
        or ("__EXIT__", "__EXIT__") / ("__BACK__", "__BACK__")
    """
    say("-"*60 + "\nDate Range Selection:\n" + '-'*60, logger=logger)
    say("Enter dates as YYYY-MM-DD or YYYY-MM\n(YYYY-MM will default to first day for start, last day for end)", logger=logger)

    while True:
        start_raw = read_input("\nEnter start date: ", logger=logger)
        if start_raw in ("__EXIT__", "__BACK__"): return start_raw, start_raw
        end_raw = read_input("\nEnter end date: ", logger=logger)
        if end_raw in ("__EXIT__", "__BACK__"): return end_raw, end_raw

        if not validate_date(start_raw, allow_month_only=True):
            say("\nERROR: Invalid start date format. Use YYYY-MM-DD or YYYY-MM.", logger=logger)
            continue
        if not validate_date(end_raw, allow_month_only=True):
            say("\nERROR: Invalid end date format. Use YYYY-MM-DD or YYYY-MM.", logger=logger)
            continue

        try:
            start, start_str = parse_date_with_defaults(start_raw, default_to_month_end=False)
            if len(start_raw) == 7:
                say(f"\n\tStart date set to: {start_str} (first day of month)", logger=logger)
            end, end_str = parse_date_with_defaults(end_raw, default_to_month_end=True)
            if len(end_raw) == 7:
                say(f"\tEnd date set to: {end_str} (last day of month)\n", logger=logger)
        except ValueError as e:
            say(f"\nERROR: Could not parse dates: {e}", logger=logger)
            continue

        if end <= start:
            say("\nERROR: End date must be after start date.", logger=logger)
            continue

        if session.get("data_provider") == "cds":
            end = clamp_era5_available_end_date(end)
            end_str = end.date().isoformat()

        say(f"\nYou selected a date range of start [{start.date().isoformat()}] → end [{end.date().isoformat()}]\n",
            logger=logger)
        session.set("start_date", start.date().isoformat())
        session.set("end_date", end.date().isoformat())
        return start.date().isoformat(), end.date().isoformat()

prompt_coordinates

prompt_coordinates(session, *, logger=None)

Prompt user for geographic boundaries (N, S, W, E) with validation.

Parameters:

Name Type Description Default
session SessionState

Current session state to store geographic boundaries.

required

Returns:

Type Description
list[float]

[north, west, south, east] boundaries or special tokens "EXIT" / "BACK".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def prompt_coordinates(
        session: SessionState,
        *,
        logger=None,
        ) -> list[float]:
    """
    Prompt user for geographic boundaries (N, S, W, E) with validation.

    Parameters
    ----------
    session : SessionState
        Current session state to store geographic boundaries.

    Returns
    -------
    list[float]
        [north, west, south, east] boundaries or special tokens "__EXIT__" / "__BACK__".
    """
    say('-'*60 + "\nGrid Area Selection (EPSG: 4326):\n" + '-'*60, logger=logger)
    while True:
        entries = {}
        for label, key in [("Northern latitude", "north"),
                           ("Southern latitude", "south"),
                           ("Western longitude", "west"),
                           ("Eastern longitude", "east")]:
            value = read_input(f"\nEnter {label} boundary: ", logger=logger)
            if value in ("__BACK__", "__EXIT__"):
                return value
            entries[key] = value

        try:
            n, s, w, e = (
                float(entries["north"]),
                float(entries["south"]),
                float(entries["west"]),
                float(entries["east"]),
            )
        except ValueError:
            say("\nPlease enter numeric values for all coordinates.", logger=logger)
            continue

        if not validate_coordinates(n, w, s, e):
            say("Invalid bounds. Check that -90 ≤ lat ≤ 90, -180 ≤ lon ≤ 180, and North > South.", logger=logger)
            continue

        bounds = [n, w, s, e]
        say(f"You entered boundaries of: [N{n}, W{w}, S{s}, E{e}]\n", logger=logger)
        session.set("region_bounds", bounds)
        return bounds

prompt_variables

prompt_variables(
    session,
    variable_restrictions_list,
    *args,
    restriction_allow=False,
    logger=None
)

Ask for variables to download, validate each against allowed/disallowed list, and only update session if the full set is valid.

Parameters:

Name Type Description Default
session SessionState

Current session state to store selected variables.

required
variable_restrictions_list list[str]

List of variables that are either allowed or disallowed.

required
restriction_allow bool

If True, variable_restrictions_list is an allowlist (i.e. in). If False, it's a denylist (i.e. not in)

False
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
list[str] | str

List of selected variable names, or control token "BACK" / "EXIT".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def prompt_variables(
        session: SessionState,
        variable_restrictions_list: list[str],
        *args,
        restriction_allow: bool = False,
        logger=None,
        ) -> list[str] | str:
    """
    Ask for variables to download, validate each against allowed/disallowed list,
    and only update session if the full set is valid.

    Parameters
    ----------
    session : SessionState
        Current session state to store selected variables.
    variable_restrictions_list : list[str]
        List of variables that are either allowed or disallowed.
    restriction_allow : bool
        If True, variable_restrictions_list is an allowlist (i.e. in).
        If False, it's a denylist (i.e. not in)
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    list[str] | str
        List of selected variable names, or control token "__BACK__" / "__EXIT__".
    """
    say("-" * 60 + f"\nVariable Selection [{session.get('dataset_short_name')}]:\n" + "-" * 60, logger=logger)
    say("(Type 'back' to return to previous step or 'exit' to quit.)", logger=logger)

    while True:
        raw = read_input("\nEnter variable names (comma-separated): ", logger=logger)
        if raw in ("__EXIT__", "__BACK__"):
            return raw

        variable_list = [ v.strip().lower().strip('"').strip("'") for v in raw.split(",") if v.strip() ]
        if not variable_list:
            say("\nERROR: Please enter at least one variable name.", logger=logger)
            continue

        all_valid = validate_variables(variable_list, variable_restrictions_list, restriction_allow)

        if not all_valid:
            if restriction_allow:
                valid_vars = [v for v in variable_list if v in variable_restrictions_list]
                invalid_vars = [v for v in variable_list if v not in variable_restrictions_list]
                say("\nERROR: Some variables are not recognized or not available for this dataset:", logger=logger)
                for iv in invalid_vars:
                    say(f"   - {iv}", logger=logger)
            else:
                valid_vars = [v for v in variable_list if v not in variable_restrictions_list]
                invalid_vars = [v for v in variable_list if v in variable_restrictions_list]
                say("\nERROR: The following variables are known to cause issues or are disallowed for this dataset:", logger=logger)
                for iv in invalid_vars:
                    say(f"   - {iv}", logger=logger)
                say("\nPlease edit the invalid variable list for this dataset if you believe this is an error.", logger=logger)

            if valid_vars:
                proceed = read_input(
                    f"\nWould you like to proceed with only the valid variables ({', '.join(valid_vars)})? (y/n): ",
                    logger=logger)
                if proceed in NORMALIZATION_MAP["confirmation"] and NORMALIZATION_MAP["confirmation"][proceed] == "yes":
                    say("\nProceeding with valid subset.\n", logger=logger)
                    session.set("variables", valid_vars)
                    return valid_vars
                else:
                    say("\nLet's try again.\n", logger=logger)
                    continue
            else:
                say("\nERROR: No valid variables remain. Please try again.\n", logger=logger)
                continue

        say(f"\nYou selected [{len(variable_list)}] valid variables:\n{', '.join(variable_list)}", logger=logger)
        confirm = read_input("\nConfirm selection? (y/n): ", logger=logger)
        if confirm in NORMALIZATION_MAP["confirmation"] and NORMALIZATION_MAP["confirmation"][confirm] == "yes":
            session.set("variables", variable_list)
            return variable_list
        else:
            say("\nLet's try again.\n", logger=logger)

prompt_skip_overwrite_files

prompt_skip_overwrite_files(session, *, logger=None)

Prompt user to choose skip/overwrite/case-by-case for existing files.

Parameters:

Name Type Description Default
session SessionState

Session state to store user choice.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
str

One of "overwrite_all", "skip_all", "case_by_case"

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
def prompt_skip_overwrite_files(
        session: SessionState,
        *,
        logger=None,
        ) -> str:
    """
    Prompt user to choose skip/overwrite/case-by-case for existing files.

    Parameters
    ----------
    session : SessionState
        Session state to store user choice.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    str
        One of "overwrite_all", "skip_all", "case_by_case"
    """
    say("\n" + "-" * 60 + f"\nExisting File Policy:\n" + "-" * 60, logger=logger)
    say("\t1. Overwrite all existing files\n\t2. Skip all existing files\n\t3. Case-by-case confirmation", logger=logger)

    while True:
        choice = read_input("\nEnter choice (1/2/3): ", logger=logger)
        if choice in ("__EXIT__", "__BACK__"):
            return choice
        if choice in ["1", "2", "3"]:
            break
        say("Invalid input. Please enter 1, 2, or 3.", logger=logger)

    if choice == "1":
        session.set("existing_file_action", "overwrite_all")
    elif choice == "2":
        session.set("existing_file_action", "skip_all")
    else:
        session.set("existing_file_action", "case_by_case")

    say(f"You selected option [{choice}] - [{session.get('existing_file_action')}] \n", logger=logger)
    return session.get("existing_file_action")

prompt_parallelisation_settings

prompt_parallelisation_settings(session, *, logger=None)

Ask user about parallel downloads and concurrency cap.

Parameters:

Name Type Description Default
session SessionState

Current session state to store parallelisation settings.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
dict | str

Dictionary with parallelisation settings, or control token "BACK" / "EXIT".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def prompt_parallelisation_settings(
        session: SessionState,
        *,
        logger=None,
        ) -> dict | str:
    """
    Ask user about parallel downloads and concurrency cap.

    Parameters
    ----------
    session : SessionState
        Current session state to store parallelisation settings.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.


    Returns
    -------
    dict | str
        Dictionary with parallelisation settings, or control token "__BACK__" / "__EXIT__".
    """
    say("-"*60 + "\nParallelisation Settings:\n" + '-'*60, logger=logger)
    say("You can enable multiple parallel downloads to speed up retrieval.\nNote: The CDS API may throttle or reject requests if too many are opened concurrently.",
        logger=logger)

    while True:
        raw = read_input("\nEnable parallel downloads? (y/n) [default y]: ", logger=logger)
        if raw in ("__EXIT__", "__BACK__"):
            return raw

        if raw == "":
            user_choice = "yes"
        elif raw in NORMALIZATION_MAP["confirmation"]:
            user_choice = NORMALIZATION_MAP["confirmation"][raw]
        else:
            say("\nInvalid input. Please enter 'y' or 'n'.", logger=logger)
            continue

        if user_choice == "no":
            settings = {"enabled": False, "max_concurrent": 1}
            session.set("parallel_settings", settings)
            say("\nYou have disabled parallel downloads (single-threaded mode).", logger=logger)
            return settings

        while True:
            mc = read_input("\nEnter the maximum number of concurrent downloads you would like to allow (integer ≥ 2) [default 2]: ",
                            logger=logger)
            if mc in ("__EXIT__", "__BACK__"):
                return mc
            try:
                mc = int(mc) if mc else 2
            except ValueError:
                say("\nERROR: Please enter a valid integer.", logger=logger)
                continue
            if mc < 2:
                say("\nERROR: Parallel mode requires at least 2 concurrent downloads.", logger=logger)
                continue
            if mc > 2:
                say("\nWARNING: Using more than 2 parallel CDS downloads may cause throttling or request failures.", logger=logger)
                while True:
                    confirm = read_input("\nDo you still want to continue? (y/n): ", logger=logger)
                    if confirm in ("__EXIT__", "__BACK__"):
                        return confirm
                    if confirm in NORMALIZATION_MAP["confirmation"]:
                        confirm_choice = NORMALIZATION_MAP["confirmation"][confirm]
                        break
                    say("\nInvalid input. Please enter 'y' or 'n'.", logger=logger)
                if confirm_choice == "no":
                    continue

            settings = {"enabled": True, "max_concurrent": mc}
            session.set("parallel_settings", settings)
            say(f"\nYou have enabled parallel downloads with a maximum of [{mc}] concurrent downloads.\n", logger=logger)
            return settings

prompt_retry_settings

prompt_retry_settings(
    session,
    default_retries=6,
    default_delay=15,
    *,
    logger=None
)

Ask user for retry limits.

Parameters:

Name Type Description Default
session SessionState

Current session state to store retry settings.

required
default_retries int

Default number of retry attempts (default = 6).

6
default_delay int

Default delay (in seconds) between retries (default = 15).

15
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
dict | str

Dictionary with 'max_retries' and 'retry_delay_sec', or control token "BACK" / "EXIT".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
def prompt_retry_settings(
        session: SessionState,
        default_retries: int = 6,
        default_delay: int = 15,
        *,
        logger=None,
        ) -> dict | str:
    """
    Ask user for retry limits.

    Parameters
    ----------
    session : SessionState
        Current session state to store retry settings.
    default_retries : int
        Default number of retry attempts (default = 6).
    default_delay : int
        Default delay (in seconds) between retries (default = 15).
    logger : logging.Logger, optional
        Logger for logging messages, by default None.

    Returns
    -------
    dict | str
        Dictionary with 'max_retries' and 'retry_delay_sec', or control token "__BACK__" / "__EXIT__".
    """
    say("-"*60 + "\nRetry Settings:\n" + '-'*60, logger=logger)
    say("These settings control how the program handles failed download attempts.", logger=logger)
    say(f"Default values → Retries: {default_retries}, Delay: {default_delay}s", logger=logger)

    while True:
        user_input = read_input("\nWould you like to use the default retry settings? (y/n) [default y]: ",
                                logger=logger)
        if user_input in ("__EXIT__", "__BACK__"):
            return user_input

        if user_input == "":
            normalized = "yes"
        elif user_input in NORMALIZATION_MAP["confirmation"]:
            normalized = NORMALIZATION_MAP["confirmation"][user_input]
        else:
            say("\nERROR: Invalid input. Please enter 'y' or 'n'.", logger=logger)
            continue

        if normalized == "yes":
            settings = {"max_retries": default_retries, "retry_delay_sec": default_delay}
            session.set("retry_settings", settings)
            say(f"\nUsing default retry settings of max_retries: [{default_retries}], retry_delay_sec: [{default_delay}]", logger=logger)
            return settings

        while True:
            max_retries_raw = read_input("\nEnter maximum number of retries (integer ≥ 0): ", logger=logger)
            if max_retries_raw in ("__EXIT__", "__BACK__"):
                return max_retries_raw
            delay_raw = read_input("\nEnter delay between retries (seconds, integer ≥ 0): ", logger=logger)
            if delay_raw in ("__EXIT__", "__BACK__"):
                return delay_raw

            try:
                max_retries = int(max_retries_raw) if max_retries_raw else default_retries
                retry_delay_sec = int(delay_raw) if delay_raw else default_delay
                if max_retries < 0 or retry_delay_sec < 0:
                    say("\nERROR: Values must be non-negative integers.", logger=logger)
                    continue
                settings = {"max_retries": max_retries, "retry_delay_sec": retry_delay_sec}
                session.set("retry_settings", settings)
                say(f"\nYou set max_retries=[{max_retries}], retry_delay_sec=[{retry_delay_sec}].", logger=logger)
                return settings
            except ValueError:
                say("Please enter valid integer values for retries and delay.", logger=logger)

prompt_continue_confirmation

prompt_continue_confirmation(session, *, logger=None)

Display a formatted download summary and confirm before starting downloads.

Parameters:

Name Type Description Default
session SessionState

session state to summarise.

required
logger Logger

Logger for logging messages, by default None.

None

Returns:

Type Description
bool | str

True if user confirms, False if user declines, or control token "BACK" / "EXIT".

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/io/prompts.py
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
def prompt_continue_confirmation(
        session: SessionState,
        *,
        logger=None,
        ) -> bool | str:
    """
    Display a formatted download summary and confirm before starting downloads.

    Parameters
    ----------
    session : SessionState
        session state to summarise.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.


    Returns
    -------
    bool | str
        True if user confirms, False if user declines,
        or control token "__BACK__" / "__EXIT__".
    """
    text = session.summary()
    say(text, logger=logger)
    while True:
        user_input = read_input("\nProceed with download? (y/n): ", logger=logger)
        if user_input in ("__EXIT__", "__BACK__"):
            return user_input
        if user_input in NORMALIZATION_MAP["confirmation"]:
            choice = NORMALIZATION_MAP["confirmation"][user_input]
            if choice == "yes":
                say("\nProceeding with download...\n", logger=logger)
                return True
            say("\nDownload cancelled by user.", logger=logger)
            return False
        say("\nERROR: Invalid input. Please enter 'y' or 'n'.", logger=logger)

weather_data_retrieval.sources.cds_era5

prepare_cds_download

prepare_cds_download(
    session,
    filename_base,
    year,
    month,
    *,
    logger,
    echo_console,
    allow_prompts,
    dataset_config_mapping=CDS_DATASETS
)

Check if a monthly ERA5 file already exists and decide whether to download.

Parameters:

Name Type Description Default
session SessionState

Session containing user configuration.

required
filename_base str

Base name for the file.

required
year int

Year of the data to download.

required
month int

Month of the data to download.

required
logger Logger

Logger for logging messages.

required
echo_console bool

Whether to echo prompts to console.

required
allow_prompts bool

Whether to allow interactive prompts.

required
dataset_config_mapping dict

Mapping of dataset short names to their configurations.

CDS_DATASETS

Returns:

Name Type Description
tuple (download: bool, save_path: str)

download: Whether to perform the download. save_path: Full path for the target file.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/sources/cds_era5.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def prepare_cds_download(
    session: SessionState,
    filename_base: str,
    year: int,
    month: int,
    *,
    logger,
    echo_console: bool,
    allow_prompts: bool,
    dataset_config_mapping: dict = CDS_DATASETS,
) -> Tuple[bool, str]:
    """
    Check if a monthly ERA5 file already exists and decide whether to download.

    Parameters
    ----------
    session : SessionState
        Session containing user configuration.
    filename_base : str
        Base name for the file.
    year : int
        Year of the data to download.
    month : int
        Month of the data to download.
    logger : logging.Logger, optional
        Logger for logging messages.
    echo_console : bool
        Whether to echo prompts to console.
    allow_prompts : bool
        Whether to allow interactive prompts.
    dataset_config_mapping : dict, optional
        Mapping of dataset short names to their configurations.

    Returns
    -------
    tuple: (download: bool, save_path: str)
        download: Whether to perform the download.
        save_path: Full path for the target file.
    """
    cfg = get_cds_dataset_config(session, dataset_config_mapping)
    data_file_format = cfg.get("data_download_format", "grib")

    # Resolve base save directory
    raw_save_dir = session.get("save_dir") or data_dir(create=True)
    save_dir = resolve_under(data_dir(create=True), raw_save_dir)

    # Construct canonical save path
    save_path = expected_save_path(save_dir, filename_base, year, month, data_file_format)
    policy = session.get("existing_file_action") or "case_by_case"
    download = True

    if save_path.exists():
        if policy == "skip_all":
            log_msg(f"Skipping existing file for {year}-{month:02d}: {save_path}", logger, echo_console=echo_console)
            download = False

        elif policy == "overwrite_all":
            log_msg(f"Overwriting existing file for {year}-{month:02d}: {save_path}", logger, echo_console=echo_console)

        elif policy == "case_by_case":
            if not allow_prompts:
                raise ValueError(
                    "existing_file_action='case_by_case' requires interactive mode. "
                    "Use 'skip_all' or 'overwrite_all' for automatic runs."
                )
            while True:
                user_input = read_input(
                    f"\nFile already exists for {year}-{month:02d}: {save_path}\n"
                    "Do you want to overwrite it? (y/n): ",
                    logger=logger,
                )
                if user_input in NORMALIZATION_MAP["confirmation"]:
                    yn = NORMALIZATION_MAP["confirmation"][user_input]
                    if yn == "yes":
                        log_msg(
                            f"Overwriting existing file for {year}-{month:02d}: {save_path}",
                            logger, echo_console=echo_console
                        )
                        download = True
                    else:
                        log_msg(
                            f"Skipping existing file for {year}-{month:02d}: {save_path}",
                            logger, echo_console=echo_console
                        )
                        download = False
                    break
                log_msg("Invalid input. Please enter 'y' or 'n'.", logger, echo_console=echo_console)
        else:
            log_msg(
                f"Unknown existing_file_action policy '{policy}'; defaulting to 'skip_all'.",
                logger, level="warning", echo_console=echo_console
            )
            session.set("existing_file_action", "skip_all")
            download = False

    return download, save_path

execute_cds_download

execute_cds_download(
    session,
    save_path,
    year,
    month,
    *,
    logger,
    echo_console,
    dataset_config_mapping=CDS_DATASETS
)

Execute a single ERA5 monthly download with retry logic.

Parameters:

Name Type Description Default
session SessionState

Session state containing the authenticated CDS API client.

required
save_path str

Full path to save the downloaded file.

required
year int

Year of the data to download.

required
month int

Month of the data to download.

required
logger Logger

Logger for logging messages.

required
echo_console bool

Whether to echo prompts to console.

required
dataset_config_mapping dict

Mapping of dataset short names to their configurations.

CDS_DATASETS

Returns:

Type Description
(year, month, status): tuple

status = "success" | "failed"

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/sources/cds_era5.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def execute_cds_download(
        session: SessionState,
        save_path: str,
        year: int,
        month: int,
        *,
        logger,
        echo_console: bool,
        dataset_config_mapping: dict = CDS_DATASETS,
        ) -> tuple[int, int, str]:
    """
    Execute a single ERA5 monthly download with retry logic.

    Parameters
    ----------
    session : SessionState
        Session state containing the authenticated CDS API client.
    save_path : str
        Full path to save the downloaded file.
    year : int
        Year of the data to download.
    month : int
        Month of the data to download.
    logger : logging.Logger, optional
        Logger for logging messages.
    echo_console : bool
        Whether to echo prompts to console.
    dataset_config_mapping : dict, optional
        Mapping of dataset short names to their configurations.

    Returns
    -------
    (year, month, status): tuple
        status = "success" | "failed"
    """
    # Pull config fresh from CDS_DATASETS
    cfg = get_cds_dataset_config(session, dataset_config_mapping=dataset_config_mapping)
    dataset_product_name = cfg["dataset_product_name"]
    product_type = cfg["product_type"]
    data_download_format = cfg.get("data_download_format", "grib")
    times = cfg.get("default_times", [f"{h:02d}:00" for h in range(24)])

    variables = session.get("variables")
    grid_area = session.get("region_bounds")
    cds_client_session = session.get("session_client")
    if cds_client_session is None:
        raise ValueError("CDS client not initialized in session")

    retry_conf = session.get("retry_settings") or {"max_retries": 6, "retry_delay_sec": 15}
    max_retries = retry_conf["max_retries"]
    retry_delay_sec = retry_conf["retry_delay_sec"]
    days = month_days(year, month)

    month_start = time.time()
    for attempt in range(1, max_retries + 1):
        try:
            log_msg(f"\tAttempt {attempt} of {max_retries} for {year}-{month:02d}...", logger, echo_console=echo_console)
            cds_client_session.retrieve(
                dataset_product_name,
                {
                    "product_type": [product_type],
                    "variable": variables,
                    "year": str(year),
                    "month": [f"{month:02d}"],
                    "day": days,
                    "time": times,
                    "area": grid_area,
                    "format": data_download_format,
                },
                str(save_path),
            )
            elapsed = time.time() - month_start
            log_msg(f"SUCCESS: {year}-{month:02d} in {format_duration(elapsed)}", logger, echo_console=echo_console)
            return (year, month, "success")

        except Exception as e:
            log_msg(f"WARNING: Attempt {attempt} failed for {year}-{month:02d}: {e}", logger, level="warning", echo_console=echo_console)
            if attempt < max_retries:
                log_msg(f"\tWaiting {retry_delay_sec} seconds before retrying...", logger, echo_console=echo_console)
                time.sleep(retry_delay_sec)
                try:
                    api_url, api_key = session.get("api_url"), session.get("api_key")
                    creds_dict = {"url": api_url, "key": api_key}
                    cds_client_session_new = ensure_cds_connection(cds_client_session, creds_dict)
                    if cds_client_session_new is None:
                        raise RuntimeError("Re-authentication returned None client.")
                    session.set("session_client", cds_client_session_new)
                    cds_client_session = cds_client_session_new
                    log_msg("\tRe-authenticated CDS API client.", logger, echo_console=echo_console)
                except Exception as auth_e:
                    log_msg(f"\tRe-authentication failed: {auth_e}", logger, level="warning", echo_console=echo_console)
            else:
                log_msg(f"FAILURE: all {max_retries} attempts failed for {year}-{month:02d}.", logger, level="error", echo_console=echo_console)
                return (year, month, "failed")

download_cds_month

download_cds_month(
    session,
    filename_base,
    year,
    month,
    *,
    logger,
    echo_console,
    allow_prompts,
    successful_downloads,
    failed_downloads,
    skipped_downloads
)

Orchestrate ERA5 monthly download: handle file checks, then execute download.

Parameters:

Name Type Description Default
Combines
required

Returns:

Type Description
(year, month, status): tuple

status = "success" | "failed" | "skipped"

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/sources/cds_era5.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def download_cds_month(
    session: SessionState,
    filename_base: str,
    year: int,
    month: int,
    *,
    logger,
    echo_console: bool,
    allow_prompts: bool,
    successful_downloads: list,
    failed_downloads: list,
    skipped_downloads: list,
    ) -> Tuple[int, int, str]:
    """
    Orchestrate ERA5 monthly download: handle file checks, then execute download.

    Parameters
    ----------
    Combines parameters from `prepare_download` and `execute_download`.

    Returns
    -------
    (year, month, status): tuple
        status = "success" | "failed" | "skipped"

    """

    proceed, save_path = prepare_cds_download(
        session=session,
        filename_base=filename_base,
        year=year,
        month=month,
        logger=logger,
        echo_console=echo_console,
        allow_prompts=allow_prompts,
        dataset_config_mapping=CDS_DATASETS,
    )

    if not proceed:
        skipped_downloads.append((year, month))
        return (year, month, "skipped")

    y, m, status = execute_cds_download(
        session=session,
        save_path=save_path,
        year=year,
        month=month,
        logger=logger,
        echo_console=echo_console,
        dataset_config_mapping=CDS_DATASETS,
    )

    if status == "success":
        successful_downloads.append((y, m))
    else:
        failed_downloads.append((y, m))
    return (y, m, status)

plan_cds_months

plan_cds_months(
    session,
    filename_base,
    *,
    logger,
    echo_console,
    allow_prompts
)

Build the list of months to download and which are being skipped due to existing files.

Parameters:

Name Type Description Default
session SessionState

Session containing user configuration.

required
filename_base str

Base filename (without date or extension).

required
logger Logger

Logger for logging messages.

required
echo_console bool

Whether to echo prompts to console.

required
allow_prompts bool

Whether to allow interactive prompts.

required

Returns:

Type Description
(months_to_download, months_skipped)
  • months_to_download: list[(year, month)]
  • months_skipped: list[(year, month, path)]
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/sources/cds_era5.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def plan_cds_months(
    session: SessionState,
    filename_base: str,
    *,
    logger,
    echo_console: bool,
    allow_prompts: bool,
) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int, Path]]]:
    """
    Build the list of months to download and which are being skipped due to existing files.

    Parameters
    ----------
    session : SessionState
        Session containing user configuration.
    filename_base : str
        Base filename (without date or extension).
    logger : logging.Logger, optional
        Logger for logging messages.
    echo_console : bool
        Whether to echo prompts to console.
    allow_prompts : bool
        Whether to allow interactive prompts.

    Returns
    -------
    (months_to_download, months_skipped)
      - months_to_download: list[(year, month)]
      - months_skipped: list[(year, month, path)]
    """
    policy = validate_existing_file_action(session, allow_prompts=allow_prompts, logger=logger, echo_console=echo_console)

    start_date = session.get("start_date")
    end_date = session.get("end_date")
    save_dir = Path(session.get("save_dir"))

    s = datetime.strptime(start_date, "%Y-%m-%d")
    e = datetime.strptime(end_date, "%Y-%m-%d")

    # Build month list
    months: List[Tuple[int, int]] = []
    cur = datetime(s.year, s.month, 1)
    while cur <= e:
        months.append((cur.year, cur.month))
        cur = datetime(cur.year + 1, 1, 1) if cur.month == 12 else datetime(cur.year, cur.month + 1, 1)

    months_to_download: List[Tuple[int, int]] = []
    months_skipped: List[Tuple[int, int, Path]] = []

    for (y, m) in months:
        existing = find_existing_month_file(save_dir, filename_base, y, m)
        if existing is None:
            months_to_download.append((y, m))
            continue

        if policy == "skip_all":
            months_skipped.append((y, m, existing))
        elif policy == "overwrite_all":
            months_skipped.append((y, m, existing))
            months_to_download.append((y, m))
        else:
            # interactive 'case_by_case'
            while True:
                ans = read_input(
                    f"\nFile already exists for {y}-{m:02d}: {existing}\n"
                    f"Overwrite this one? (y/n): ",
                    logger=logger,
                )
                if ans in NORMALIZATION_MAP["confirmation"]:
                    yn = NORMALIZATION_MAP["confirmation"][ans]
                    if yn == "yes":
                        months_to_download.append((y, m))
                    else:
                        months_skipped.append((y, m, existing))
                    break
                log_msg("Please enter 'y' or 'n'.", logger, echo_console=echo_console)

    # Report
    log_msg("\n===> Checking for existing files...\n" + "-" * 60, logger, echo_console=echo_console)
    if months_skipped:
        for y, m, p in months_skipped[:5]:
            log_msg(f"  - {y}-{m:02d}: {p}", logger, echo_console=echo_console)
        if len(months_skipped) > 5:
            log_msg(f"  ... and {len(months_skipped)-5} more.", logger, echo_console=echo_console)
    log_msg(f"\tFound [{len(months_skipped)}] existing month(s) out of [{len(months)}] requested month(s)\n", logger, echo_console=echo_console)
    log_msg(f"Policy for existing files: '{policy}'", logger, echo_console=echo_console)

    return months_to_download, months_skipped

orchestrate_cds_downloads

orchestrate_cds_downloads(
    session,
    filename_base,
    successful_downloads,
    failed_downloads,
    skipped_downloads,
    *,
    logger,
    echo_console,
    allow_prompts,
    dataset_config_mapping=CDS_DATASETS
)

Handle and orchestrate ERA5 monthly downloads, supporting parallel or sequential execution.

Parameters:

Name Type Description Default
session SessionState

Session containing user configuration and authenticated client.

required
successful_downloads list

Mutable list to collect (year, month) tuples for successful downloads.

required
failed_downloads list

Mutable list to collect (year, month) tuples for failed downloads.

required
skipped_downloads list

Mutable list to collect (year, month) tuples for skipped downloads.

required
logger Logger

Logger for logging messages.

required
echo_console bool

Whether to echo prompts to console.

required
allow_prompts bool

Whether to allow interactive prompts.

required
dataset_config_mapping dict

Mapping of dataset configurations, by default CDS_DATASETS.

CDS_DATASETS

Returns:

Type Description
None
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/sources/cds_era5.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def orchestrate_cds_downloads(
        session: SessionState,
        filename_base: str,
        successful_downloads: list,
        failed_downloads: list,
        skipped_downloads: list,
        *,
        logger,
        echo_console: bool,
        allow_prompts: bool,
        dataset_config_mapping: dict = CDS_DATASETS,
        ) -> None:
    """
    Handle and orchestrate ERA5 monthly downloads, supporting parallel or sequential execution.

    Parameters
    ----------
    session : SessionState
        Session containing user configuration and authenticated client.
    successful_downloads : list
        Mutable list to collect (year, month) tuples for successful downloads.
    failed_downloads : list
        Mutable list to collect (year, month) tuples for failed downloads.
    skipped_downloads : list
        Mutable list to collect (year, month) tuples for skipped downloads.
    logger : logging.Logger, optional
        Logger for logging messages.
    echo_console : bool
        Whether to echo prompts to console.
    allow_prompts : bool
        Whether to allow interactive prompts.
    dataset_config_mapping : dict, optional
        Mapping of dataset configurations, by default CDS_DATASETS.

    Returns
    -------
    None
    """

    # (Optional) touch cfg to ensure dataset is valid; not stored, just validation side-effect
    _ = get_cds_dataset_config(session, CDS_DATASETS)

    months_to_download, months_skipped = plan_cds_months(
        session=session,
        filename_base=filename_base,
        logger=logger,
        echo_console=echo_console,
        allow_prompts=allow_prompts,
    )

    for (y, m, _) in months_skipped:
        skipped_downloads.append((y, m))

    if not months_to_download:
        log_msg("\nNothing to download (all months skipped).", logger, echo_console=echo_console)
        return

    parallel_conf = session.get("parallel_settings") or {"enabled": False, "max_concurrent": 1}
    t0 = perf_counter()
    if parallel_conf.get("enabled"):
        max_workers = max(2, int(parallel_conf.get("max_concurrent", 2)))
        log_msg(f"\nParallelisation : Enabled -> Beginning download with [{max_workers}] concurrent tasks...\n" + "-" * 60, logger, echo_console=echo_console)
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            tasks = [
                executor.submit(
                    download_cds_month,
                    session=session,
                    filename_base=filename_base,
                    year=y,
                    month=m,
                    logger=logger,
                    echo_console=echo_console,
                    allow_prompts=allow_prompts,
                    successful_downloads=successful_downloads,
                    failed_downloads=failed_downloads,
                    skipped_downloads=skipped_downloads,
                )
                for (y, m) in months_to_download
            ]
            for _ in as_completed(tasks):
                pass
    else:
        log_msg(f"\nParallelisation : Disabled -> Beginning download in sequential download mode ...\n" + "-" * 60, logger, echo_console=echo_console)
        for (y, m) in months_to_download:
            download_cds_month(
                session=session,
                filename_base=filename_base,
                year=y,
                month=m,
                logger=logger,
                echo_console=echo_console,
                allow_prompts=allow_prompts,
                successful_downloads=successful_downloads,
                failed_downloads=failed_downloads,
                skipped_downloads=skipped_downloads,
            )
    elapsed = perf_counter() - t0
    log_msg(f"\nTotal download wall time: {format_duration(elapsed)}",
            logger, echo_console=echo_console)

weather_data_retrieval.sources.open_meteo

weather_data_retrieval.utils.data_validation

normalize_input

normalize_input(value, category)

Normalize user input to canonical internal value as defined in NORMALIZATION_MAP.

Parameters:

Name Type Description Default
value str

The user input value to normalize.

required
category str

The category of normalization (e.g., 'data_provider', 'dataset_short_name')

required

Returns:

Type Description
str

The normalized value.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def normalize_input(
        value: str,
        category: str
    ) -> str:
    """
    Normalize user input to canonical internal value as defined in NORMALIZATION_MAP.

    Parameters
    ----------
    value : str
        The user input value to normalize.
    category : str
        The category of normalization (e.g., 'data_provider', 'dataset_short_name')

    Returns
    -------
    str
        The normalized value.

    """
    if not isinstance(value, str):
        return value
    v = value.strip().lower()
    return NORMALIZATION_MAP.get(category, {}).get(v, value)

format_duration

format_duration(seconds)

Convert seconds to a nice Hh Mm Ss string (with decimal seconds).

Parameters:

Name Type Description Default
seconds float

Duration in seconds.

required

Returns:

Type Description
str: Formatted duration string.
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def format_duration(seconds: float) -> str:
    """
    Convert seconds to a nice Hh Mm Ss string (with decimal seconds).

    Parameters
    ----------
    seconds : float
        Duration in seconds.

    Returns
    -------
        str: Formatted duration string.
    """
    seconds = max(0, seconds)

    hours = int(seconds // 3600)
    days = int(hours // 24)
    minutes = int((seconds % 3600) // 60)
    secs = seconds % 60  # keep remainder as float

    if days > 0:
        hours = hours % 24
        return f"{days}d {hours}h {minutes}m {secs:.2f}s"
    if hours > 0:
        return f"{hours}h {minutes}m {secs:.2f}s"
    elif minutes > 0:
        return f"{minutes}m {secs:.2f}s"
    else:
        return f"{secs:.5f}s"

format_coordinates_nwse

format_coordinates_nwse(boundaries)

Extracts and formats coordinates as integers in N-W-S-E order Used for compact representation in filenames.

Parameters:

Name Type Description Default
boundaries list

List of boundaries in the order [north, west, south, east]

required

Returns:

Type Description
str: Formatted string in the format 'N{north}W{west}S{south}E{east}'
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def format_coordinates_nwse(boundaries: List[float]) -> str:

    """
    Extracts and formats coordinates as integers in N-W-S-E order
    Used for compact representation in filenames.

    Parameters
    ----------
    boundaries : list
        List of boundaries in the order [north, west, south, east]
    Returns
    -------
        str: Formatted string in the format 'N{north}W{west}S{south}E{east}'
    """
    n, w, s, e = boundaries
    # compact string for filenames
    return f"N{int(n)}W{int(w)}S{int(s)}E{int(e)}"

month_days

month_days(year, month)

Get list of days in a month formatted as two-digit strings.

Parameters:

Name Type Description Default
year int

Year of interest.

required
month int

Month of interest.

required

Returns:

Type Description
List[str]

List of days in the month as two-digit strings.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def month_days(
        year: int,
        month: int
        ) -> List[str]:
    """
    Get list of days in a month formatted as two-digit strings.

    Parameters
    ----------
    year : int
        Year of interest.
    month : int
        Month of interest.

    Returns
    -------
    List[str]
        List of days in the month as two-digit strings.
    """
    last = calendar.monthrange(year, month)[1]
    return [f"{d:02d}" for d in range(1, last + 1)]

validate_data_provider

validate_data_provider(provider)

Ensure dataprovider is recognized and implemented.

Parameters:

Name Type Description Default
provider str

Name of the data provider.

required

Returns:

Type Description
bool: True if valid, False otherwise.
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def validate_data_provider(provider: str) -> bool:
    """
    Ensure dataprovider is recognized and implemented.

    Parameters
    ----------
    provider : str
        Name of the data provider.
    Returns
    -------
        bool: True if valid, False otherwise.
    """
    if provider not in ("cds", "open-meteo"):
        return False
    return True

validate_dataset_short_name

validate_dataset_short_name(dataset_short_name, provider)

Check dataset compatibility with provider.

Parameters:

Name Type Description Default
dataset_short_name str

Dataset short name.

required
provider str

Name of the data provider.

required

Returns:

Type Description
bool

True if valid, False otherwise.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def validate_dataset_short_name(
        dataset_short_name: str,
        provider: str
        ) -> bool:
    """
    Check dataset compatibility with provider.

    Parameters
    ----------
    dataset_short_name : str
        Dataset short name.
    provider : str
        Name of the data provider.

    Returns
    -------
    bool
        True if valid, False otherwise.
    """
    if provider == "cds":
        return dataset_short_name in CDS_DATASETS
    if provider == "open-meteo":
        raise NotImplementedError("Open-Meteo dataset validation not yet implemented.")
    log_msg(f"Warning: Unknown provider '{provider}'.")
    return False

validate_cds_api_key

validate_cds_api_key(
    url, key, *, logger=None, echo_console=False
)

Validate CDS API credentials by attempting to initialize a cdsapi.Client.

Parameters:

Name Type Description Default
url str

CDS API URL.

required
key str

CDS API key.

required
logger Logger

Logger for logging messages, by default None.

None
echo_console bool

Whether to echo messages to console, by default False.

False

Returns:

Type Description
Client | None

Authenticated client if successful, otherwise None.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def validate_cds_api_key(
        url: str,
        key: str,
        *,
        logger=None,
        echo_console: bool = False
        ) -> cdsapi.Client | None:
    """
    Validate CDS API credentials by attempting to initialize a cdsapi.Client.

    Parameters
    ----------
    url : str
        CDS API URL.
    key : str
        CDS API key.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.
    echo_console : bool, optional
        Whether to echo messages to console, by default False.

    Returns
    -------
    cdsapi.Client | None
        Authenticated client if successful, otherwise None.
    """

    if logger:
        log_msg("Testing CDS API connection with provided credentials...", logger, level="info")

    # 1) Initialize client
    try:
        client = cdsapi.Client(url=url, key=key, quiet=True, timeout=30)
    except Exception as e:
        if logger:
            log_msg(f"\tFailed to initialize CDS client: {e}", logger, level="warning")
        return None

    # 2) Probe using the predefined payload (normalize keys minimally)
    #    Expecting a dict like: test_payload = {"dataset": "...", "request": {...}}
    try:
        dataset = test_payload["dataset"]
        request = dict(test_payload["request"])  # shallow copy so we can tweak keys safely

        # product_type must be a string
        if isinstance(request.get("product_type"), list) and request["product_type"]:
            request["product_type"] = request["product_type"][0]

        # -------------------------------------------------------------------
        tmp_path = Path(tempfile.gettempdir()) / f"cds_probe_{os.getpid()}_{int(datetime.now().timestamp())}.grib"

        if logger:
            log_msg("\tProbing ERA5 permissions with a minimal retrieve...", logger)
            request_items = ""
            for k, v in request.items():
                request_items += f"\n\t   {k}: {v}"
            log_msg(f"\tRequest details: dataset='{dataset}' {request_items}", logger)

        client.retrieve(dataset, request, target=str(tmp_path))

        if logger:
            log_msg("\tPermission probe succeeded.\n", logger)
        return client

    except Exception as e:
        msg = str(e)
        if logger:
            log_msg(f"\tAuthentication/probe failed: {msg}\n", logger, level="warning")
            if "401" in msg or "Unauthorized" in msg or "operation not allowed" in msg:
                log_msg(
                    f"\tCDS returned 401/Unauthorized. This usually means you haven’t accepted the "
                    f"licence for '{test_payload.get('dataset','(unknown)')}'. Please log into the CDS website and accept it.\n",
                    logger,
                    level="warning",
                )
        return None
    finally:
        # Cleanup
        try:
            if 'tmp_path' in locals() and tmp_path.exists():
                tmp_path.unlink()
        except Exception:
            pass

validate_directory

validate_directory(path)

Check if path exists or can be created.

Parameters:

Name Type Description Default
path str

Directory path to validate.

required

Returns:

Type Description
bool

True if path exists or was created successfully, False otherwise.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def validate_directory(path: str) -> bool:
    """
    Check if path exists or can be created.

    Parameters
    ----------
    path : str
        Directory path to validate.

    Returns
    -------
    bool
        True if path exists or was created successfully, False otherwise.
    """
    p = Path(path)
    if p.exists():
        return True
    try:
        p.mkdir(parents=True, exist_ok=True)
        return True
    except Exception:
        return False

validate_date

validate_date(value, allow_month_only=False)

Validate date format as YYYY-MM-DD or optionally YYYY-MM.

Parameters:

Name Type Description Default
value str

Date string to validate.

required
allow_month_only bool

If True, also accept YYYY-MM format, by default False.

False

Returns:

Type Description
bool

True if valid, False otherwise.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def validate_date(
        value: str,
        allow_month_only: bool = False
        ) -> bool:
    """
    Validate date format as YYYY-MM-DD or optionally YYYY-MM.

    Parameters
    ----------
    value : str
        Date string to validate.
    allow_month_only : bool, optional
        If True, also accept YYYY-MM format, by default False.

    Returns
    -------
    bool
        True if valid, False otherwise.
    """
    try:
        # Try full date format first
        datetime.strptime(value, "%Y-%m-%d")
        return True
    except ValueError:
        if allow_month_only:
            try:
                # Try year-month format
                datetime.strptime(value, "%Y-%m")
                return True
            except ValueError:
                log_msg(f"Invalid date format '{value}'. Expected YYYY-MM-DD or YYYY-MM.\n")
                return False
        return False

parse_date_with_defaults

parse_date_with_defaults(
    date_str, default_to_month_end=False
)

Parse date string and apply defaults for incomplete dates.

Parameters:

Name Type Description Default
date_str str

Date string in format YYYY-MM-DD or YYYY-MM.

required
default_to_month_end bool

If True and date is YYYY-MM format, default to last day of month. If False and date is YYYY-MM format, default to first day of month. By default False.

False

Returns:

Type Description
tuple[datetime, str]

Tuple of (parsed datetime object, ISO format string YYYY-MM-DD)

Raises:

Type Description
ValueError

If date string is invalid.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def parse_date_with_defaults(
        date_str: str,
        default_to_month_end: bool = False
        ) -> tuple[datetime, str]:
    """
    Parse date string and apply defaults for incomplete dates.

    Parameters
    ----------
    date_str : str
        Date string in format YYYY-MM-DD or YYYY-MM.
    default_to_month_end : bool, optional
        If True and date is YYYY-MM format, default to last day of month.
        If False and date is YYYY-MM format, default to first day of month.
        By default False.

    Returns
    -------
    tuple[datetime, str]
        Tuple of (parsed datetime object, ISO format string YYYY-MM-DD)

    Raises
    ------
    ValueError
        If date string is invalid.
    """
    if len(date_str) == 7:  # YYYY-MM format
        year, month = date_str.split('-')
        year, month = int(year), int(month)

        if default_to_month_end:
            # Get last day of month
            last_day = calendar.monthrange(year, month)[1]
            date_str_full = f"{year}-{month:02d}-{last_day:02d}"
        else:
            # Default to first day
            date_str_full = f"{year}-{month:02d}-01"

        dt = datetime.strptime(date_str_full, "%Y-%m-%d")
        return dt, date_str_full

    elif len(date_str) == 10:  # YYYY-MM-DD format
        dt = datetime.strptime(date_str, "%Y-%m-%d")
        return dt, date_str

    else:
        raise ValueError(f"Invalid date format '{date_str}'. Expected YYYY-MM-DD or YYYY-MM.")

clamp_era5_available_end_date

clamp_era5_available_end_date(end)

Clamp end date to ERA5 data availability boundary (8 days ago).

Parameters:

Name Type Description Default
end datetime

Desired end date.

required

Returns:

Name Type Description
datetime: Clamped end date.
NOTES ERA5 data is available up to 8 days prior to the current date.
8-day lag is used to ensure data availability.
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def clamp_era5_available_end_date(end: datetime) -> datetime:
    """
    Clamp end date to ERA5 data availability boundary (8 days ago).

    Parameters
    ----------
    end : datetime
        Desired end date.

    Returns
    -------
        datetime: Clamped end date.

    NOTES: ERA5 data is available up to 8 days prior to the current date.
    8-day lag is used to ensure data availability.

    """
    EIGHT_DAY_LAG = 8
    upper = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=EIGHT_DAY_LAG)
    if end > upper:
        log_msg(f"Adjusting end date from {end.date()} to data availability boundary {upper.date()} (−{EIGHT_DAY_LAG} days).")
        return upper
    return end

validate_coordinates

validate_coordinates(north, west, south, east)

Ensure coordinates are within realistic bounds.

Parameters:

Name Type Description Default
north int | float

Northern latitude boundary.

required
west int | float

Western longitude boundary.

required
south int | float

Southern latitude boundary.

required
east int | float

Eastern longitude boundary.

required

Returns:

Type Description
bool

True if coordinates are valid, False otherwise.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
def validate_coordinates(
        north: int | float,
        west: int | float,
        south: int | float,
        east: int | float,
        ) -> bool:
    """
    Ensure coordinates are within realistic bounds.

    Parameters
    ----------
    north : int | float
        Northern latitude boundary.
    west : int | float
        Western longitude boundary.
    south : int | float
        Southern latitude boundary.
    east : int | float
        Eastern longitude boundary.

    Returns
    -------
    bool
        True if coordinates are valid, False otherwise.
    """
    return all([
        -90 <= south <= 90,
        -90 <= north <= 90,
        -180 <= west <= 180,
        -180 <= east <= 180,
        north > south
    ])

validate_variables

validate_variables(
    variable_list,
    variable_restrictions,
    restriction_allow=False,
)

Ensure user-specified variables are available for this dataset.

Parameters:

Name Type Description Default
variable_list list[str]

List of variable names to validate.

required
variable_restrictions list[str]

List of variables that are either allowed or disallowed.

required
restriction_allow bool

If True, variable_restrictions is an allowlist (i.e. in). If False, it's a denylist (i.e. not in)

False

Returns:

Type Description
bool

True if all variables are valid, False otherwise.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def validate_variables(
        variable_list: list[str],
        variable_restrictions: list[str],
        restriction_allow: bool = False
        ) -> bool:
    """
    Ensure user-specified variables are available for this dataset.

    Parameters
    ----------
    variable_list : list[str]
        List of variable names to validate.
    variable_restrictions : list[str]
        List of variables that are either allowed or disallowed.
    restriction_allow : bool
        If True, variable_restrictions is an allowlist (i.e. in). If False, it's a denylist
        (i.e. not in)

    Returns
    -------
    bool
        True if all variables are valid, False otherwise.
    """
    if not variable_list:
        return False

    # Normalize for case consistency
    variables = [v.lower().strip() for v in variable_list]
    restrictions = [r.lower().strip() for r in variable_restrictions]

    if restriction_allow:
        return all(v in restrictions for v in variables)
    else:
        return all(v not in restrictions for v in variables)

validate_existing_file_action

validate_existing_file_action(
    session, *, allow_prompts, logger, echo_console=False
)

Normalize existing_file_action for the current run-mode. - If 'case_by_case' is set but prompts are not allowed (automatic mode), coerce to 'skip_all' and warn.

Parameters:

Name Type Description Default
session Any

Current session state.

required
allow_prompts bool

Whether prompts are allowed (i.e., interactive mode).

required
logger Logger

Logger for logging messages.

required
echo_console bool

Whether to echo messages to console.

False

Returns:

Type Description
str

Normalized existing_file_action policy.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
def validate_existing_file_action(
        session: Any,
        *,
        allow_prompts: bool,
        logger,
        echo_console: bool = False
        ) -> str:
    """
    Normalize existing_file_action for the current run-mode.
    - If 'case_by_case' is set but prompts are not allowed (automatic mode),
      coerce to 'skip_all' and warn.

    Parameters
    ----------
    session : Any
        Current session state.
    allow_prompts : bool
        Whether prompts are allowed (i.e., interactive mode).
    logger : logging.Logger
        Logger for logging messages.
    echo_console : bool
        Whether to echo messages to console.

    Returns
    -------
    str
        Normalized existing_file_action policy.
    """
    policy = session.get("existing_file_action") or "case_by_case"
    allowed = {"overwrite_all", "skip_all", "case_by_case"}

    if policy not in allowed:
        if allow_prompts:
            # treat as case-by-case if we can prompt
            log_msg(
                f"Unrecognized existing_file_action='{policy}'; treating as 'case_by_case' due to interactive mode.",
                logger, level="warning", echo_console=echo_console
            )
            return "case_by_case"
        else:
            # automatic: force skip_all
            log_msg(
                f"Unrecognized existing_file_action='{policy}'; coercing to 'skip_all' for automatic mode.",
                logger, level="warning", echo_console=echo_console
            )
            session.set("existing_file_action", "skip_all")
            return "skip_all"

    if policy == "case_by_case" and not allow_prompts:
        log_msg(
            "existing_file_action='case_by_case' is not supported without prompts; "
            "coercing to 'skip_all' for automatic mode.",
            logger, level="warning", echo_console=echo_console
        )
        session.set("existing_file_action", "skip_all")
        return "skip_all"

    return policy

validate_config

validate_config(
    config,
    *,
    logger=None,
    run_mode="automatic",
    echo_console=False,
    live_auth_check=False
)

Entry point. Validates common shape then dispatches to provider-specific validator.

Parameters:

Name Type Description Default
config dict

Configuration dictionary.

required
logger Logger

Logger for logging messages, by default None.

None
run_mode str

Run mode, either 'interactive' or 'automatic', by default "automatic".

'automatic'
echo_console bool

Whether to echo messages to console, by default False.

False
live_auth_check bool

Whether to perform live authentication checks (e.g., CDS API), by default False.

False

Returns:

Type Description
None
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/data_validation.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
def validate_config(
        config: dict,
        *,
        logger=None,
        run_mode: str = "automatic",
        echo_console: bool = False,
        live_auth_check: bool = False,
        ) -> None:
    """
    Entry point. Validates common shape then dispatches to provider-specific validator.

    Parameters
    ----------
    config : dict
        Configuration dictionary.
    logger : logging.Logger, optional
        Logger for logging messages, by default None.
    run_mode : str, optional
        Run mode, either 'interactive' or 'automatic', by default "automatic".
    echo_console : bool, optional
        Whether to echo messages to console, by default False.
    live_auth_check : bool, optional
        Whether to perform live authentication checks (e.g., CDS API), by default False.

    Returns
    -------
    None
    """
    _validate_common(config, logger=logger, run_mode=run_mode, echo_console=echo_console)

    provider = config["data_provider"]
    if provider == "cds":
        _validate_cds(config, logger=logger, run_mode=run_mode, echo_console=echo_console, live_auth_check=live_auth_check)
    elif provider == "open-meteo":
        _validate_open_meteo(config, logger=logger, run_mode=run_mode, echo_console=echo_console)
    else:
        raise ValueError(f"Unsupported provider: {provider}")

weather_data_retrieval.utils.file_management

generate_filename_hash

generate_filename_hash(
    dataset_short_name, variables, boundaries
)

Generate a unique hash for the download parameters that will be used to create the filename.

Parameters:

Name Type Description Default
dataset_short_name str

The dataset short name (era5-world etc).

required
variables list[str]

List of variable names.

required
boundaries list[float]

List of boundaries [north, west, south, east].

required

Returns:

Type Description
str: A unique hash string representing the download parameters.
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/file_management.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def generate_filename_hash(
        dataset_short_name: str,
        variables: list[str],
        boundaries: list[float]
        ) -> str:
    """
    Generate a unique hash for the download parameters that will be used to create the filename.

    Parameters
    ----------
    dataset_short_name : str
        The dataset short name (era5-world etc).
    variables : list[str]
        List of variable names.
    boundaries : list[float]
        List of boundaries [north, west, south, east].

    Returns
    -------
        str: A unique hash string representing the download parameters.
    """
    # Create unique string from all parameters
    param_string = f"{dataset_short_name}|{sorted(variables)}|{boundaries}"

    # Generate hash
    hash_object = hashlib.md5(param_string.encode())
    return hash_object.hexdigest()[:12]  # 12 characters

find_existing_month_file

find_existing_month_file(
    save_dir, filename_base, year, month
)

Tolerant matcher that finds an existing file for the given month. Accepts both _YYYY-MM.ext and _YYYY_MM.ext patterns and any extension.

Parameters:

Name Type Description Default
save_dir Path

Directory where files are saved.

required
filename_base str

Base filename (without date or extension).

required
year int

Year of the file.

required
month int

Month of the file.

required

Returns:

Type Description
Optional[Path]

Path to the existing file if found, else None.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/file_management.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def find_existing_month_file(
        save_dir: Path,
        filename_base: str,
        year: int,
        month: int
        ) -> Optional[Path]:
    """
    Tolerant matcher that finds an existing file for the given month.
    Accepts both `_YYYY-MM.ext` and `_YYYY_MM.ext` patterns and any extension.

    Parameters
    ----------
    save_dir : Path
        Directory where files are saved.
    filename_base : str
        Base filename (without date or extension).
    year : int
        Year of the file.
    month : int
        Month of the file.

    Returns
    -------
    Optional[Path]
        Path to the existing file if found, else None.

    """
    save_dir = Path(save_dir)
    if not save_dir.exists():
        return None

    # Accept dash or underscore between year-month; any extension
    pattern = re.compile(
        rf"^{re.escape(filename_base)}_(?:{year:04d}[-_]{month:02d})\.[A-Za-z0-9]+$"
    )
    for p in save_dir.iterdir():
        if p.is_file() and pattern.match(p.name):
            return p
    return None

estimate_era5_monthly_file_size

estimate_era5_monthly_file_size(
    variables,
    area,
    grid_resolution=0.25,
    timestep_hours=1.0,
    bytes_per_value=4.0,
)

Estimate ERA5 GRIB file size (MB) for a monthly dataset.

Parameters:

Name Type Description Default
variables list[str]

Variables requested (e.g. ['2m_temperature', 'total_precipitation']).

required
area list[float]

[north, west, south, east] geographic bounds in degrees.

required
grid_resolution float

Grid spacing in degrees (default 0.25° for ERA5).

0.25
timestep_hours float

Temporal resolution in hours (1 = hourly, 3 = 3-hourly, 6 = 6-hourly, etc.).

1.0
bytes_per_value float

Bytes per gridpoint per variable (float32 = 4 bytes).

4.0

Returns:

Type Description
float

Estimated monthly file size in MB.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/file_management.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def estimate_era5_monthly_file_size(
    variables: list[str],
    area: list[float],
    grid_resolution: float = 0.25,
    timestep_hours: float = 1.0,
    bytes_per_value: float = 4.0,  # typically float32
) -> float:
    """
    Estimate ERA5 GRIB file size (MB) for a monthly dataset.

    Parameters
    ----------
    variables : list[str]
        Variables requested (e.g. ['2m_temperature', 'total_precipitation']).
    area : list[float]
        [north, west, south, east] geographic bounds in degrees.
    grid_resolution : float
        Grid spacing in degrees (default 0.25° for ERA5).
    timestep_hours : float
        Temporal resolution in hours (1 = hourly, 3 = 3-hourly, 6 = 6-hourly, etc.).
    bytes_per_value : float
        Bytes per gridpoint per variable (float32 = 4 bytes).

    Returns
    -------
    float
        Estimated monthly file size in MB.
    """
    if not variables or not area:
        return 0.0

    north, west, south, east = area
    n_vars = len(variables)

    # --- Reference (from empirical files)
    ref_size_MB = 0.509
    ref_vars = 2
    ref_area_deg2 = 3 * 2
    ref_res = 0.25
    ref_timestep_hours = 1.0
    days_per_month = 30

    # --- Compute derived quantities
    # Handle wrap-around longitude
    lon_span = (east - west) if east > west else (360 + east - west)
    lat_span = north - south
    req_area_deg2 = lat_span * lon_span

    # --- Compute scaling factors
    area_factor = req_area_deg2 / ref_area_deg2
    var_factor = n_vars / ref_vars
    res_factor = (ref_res / grid_resolution) ** 2
    timestep_factor = (ref_timestep_hours / timestep_hours)
    time_factor = timestep_factor * (days_per_month / 30)  # normalize to 30 days

    # --- Estimated size (MB)
    estimated_MB = ref_size_MB * var_factor * area_factor * res_factor * time_factor

    # --- Safety floor
    return round(max(estimated_MB, 0.05), 3)  # at least 0.05 MB

estimate_cds_download

estimate_cds_download(
    variables,
    area,
    start_date,
    end_date,
    observed_speed_mbps,
    grid_resolution=0.25,
    timestep_hours=1.0,
    bytes_per_value=4.0,
    overhead_per_request_s=180.0,
    overhead_per_var_s=12.0,
)

Estimate per-file and total download size/time for CDS (ERA5) retrievals, using an empirically grounded file size model.

Parameters:

Name Type Description Default
variables list[str]

Variables selected (e.g. ['2m_temperature', 'total_precipitation']).

required
area list[float]

[north, west, south, east] bounds in degrees.

required
start_date str

Date range (YYYY-MM-DD).

required
end_date str

Date range (YYYY-MM-DD).

required
observed_speed_mbps float

Measured internet speed in megabits per second (Mbps).

required
grid_resolution float

Grid resolution in degrees (default 0.25°).

0.25
timestep_hours float

Temporal resolution in hours (default 1-hourly).

1.0
bytes_per_value float

Bytes per stored value (float32 = 4).

4.0
overhead_per_request_s float

Fixed CDS request overhead time (queue/prep).

180.0
overhead_per_var_s float

Per-variable overhead for CDS throttling/prep.

12.0

Returns:

Type Description
dict

{ "months": int, "file_size_MB": float, "total_size_MB": float, "time_per_file_sec": float, "total_time_sec": float }

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/file_management.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def estimate_cds_download(
        variables: list[str],
        area: list[float],
        start_date: str,
        end_date: str,
        observed_speed_mbps: float,
        grid_resolution: float = 0.25,
        timestep_hours: float = 1.0,
        bytes_per_value: float = 4.0,
        overhead_per_request_s: float = 180.0,      # ignored but kept for signature consistency
        overhead_per_var_s: float = 12.0,       # ignored but kept for signature consistency
        ) -> dict:
    """
    Estimate per-file and total download size/time for CDS (ERA5) retrievals,
    using an empirically grounded file size model.

    Parameters
    ----------
    variables : list[str]
        Variables selected (e.g. ['2m_temperature', 'total_precipitation']).
    area : list[float]
        [north, west, south, east] bounds in degrees.
    start_date, end_date : str
        Date range (YYYY-MM-DD).
    observed_speed_mbps : float
        Measured internet speed in megabits per second (Mbps).
    grid_resolution : float, optional
        Grid resolution in degrees (default 0.25°).
    timestep_hours : float, optional
        Temporal resolution in hours (default 1-hourly).
    bytes_per_value : float, optional
        Bytes per stored value (float32 = 4).
    overhead_per_request_s : float, optional
        Fixed CDS request overhead time (queue/prep).
    overhead_per_var_s : float, optional
        Per-variable overhead for CDS throttling/prep.

    Returns
    -------
    dict
        {
          "months": int,
          "file_size_MB": float,
          "total_size_MB": float,
          "time_per_file_sec": float,
          "total_time_sec": float
        }
    """
    if not variables or not area or not start_date or not end_date:
        return {
            "months": 0,
            "file_size_MB": 0.0,
            "total_size_MB": 0.0,
            "time_per_file_sec": 0.0,
            "total_time_sec": 0.0,
        }

    # ---------- month list (inclusive) ----------
    s = datetime.strptime(start_date, "%Y-%m-%d")
    e = datetime.strptime(end_date, "%Y-%m-%d")
    months = []
    y, m = s.year, s.month
    while (y < e.year) or (y == e.year and m <= e.month):
        # days in this month
        if m == 12:
            next_first = datetime(y + 1, 1, 1)
        else:
            next_first = datetime(y, m + 1, 1)
        this_first = datetime(y, m, 1)
        dim = (next_first - this_first).days
        months.append((y, m, dim))
        if m == 12:
            y, m = y + 1, 1
        else:
            m += 1

    n_files = len(months)
    n_vars = max(1, len(variables))

    # ---------- area → grid cells ----------
    north, west, south, east = map(float, area)
    # handle wrap-around longitude
    lon_span = (east - west) if east > west else (360.0 + east - west)
    lat_span = max(0.0, north - south)
    lat_cells = max(1, int(math.ceil(lat_span / grid_resolution)))
    lon_cells = max(1, int(math.ceil(lon_span / grid_resolution)))
    grid_cells = lat_cells * lon_cells

    # ---------- file sizes (use your existing estimator for MB) ----------
    file_size_MB = estimate_era5_monthly_file_size(
        variables=variables,
        area=area,
        grid_resolution=grid_resolution,
        timestep_hours=timestep_hours,
        bytes_per_value=bytes_per_value,
    )
    total_size_MB = file_size_MB * n_files

    # ---------- timing model constants (tune if you like) ----------
    BASELINE_SEC   = 30.0            # small per-request overhead
    UNITS_PER_SEC  = 8_000_000.0     # processing throughput (units -> seconds); lower = more conservative
    SERVER_CAP_Mbps = 400.0          # upper network cap in megabits/sec (≈ 50 MB/s)
    SAFETY_FACTOR  = 5.0             # inflate to capture throttling, retries, etc.

    # Effective MB/s (convert from Mbps → MB/s)
    line_MBps = max(0.5, float(observed_speed_mbps) / 8.0)
    srv_MBps  = max(0.5, SERVER_CAP_Mbps / 8.0)
    eff_MBps  = min(line_MBps, srv_MBps)

    per_file_secs = []
    for (_, _, days_in_month) in months:
        steps = int((24.0 / max(0.0001, timestep_hours)) * days_in_month)

        # processing “units” = grid_cells × steps × vars
        units = grid_cells * steps * n_vars
        processing_sec = BASELINE_SEC + (units / UNITS_PER_SEC)

        # network time based on file size and effective MB/s
        network_sec = file_size_MB / eff_MBps

        per_file_secs.append((processing_sec + network_sec) * SAFETY_FACTOR)

    time_per_file_sec = max(per_file_secs) if per_file_secs else 0.0
    total_time_sec    = sum(per_file_secs)

    return {
        "months": n_files,
        "file_size_MB": round(file_size_MB, 3),
        "total_size_MB": round(total_size_MB, 3),
        "time_per_file_sec": round(time_per_file_sec, 1),
        "total_time_sec": round(number=total_time_sec, ndigits=1),
    }

expected_save_path

expected_save_path(
    save_dir, filename_base, year, month, data_format="grib"
)

Construct canonical save path for monthly data.

Parameters:

Name Type Description Default
save_dir str | Path | None

Base directory for saving. If None, defaults to osme_common.paths.data_dir().

required
filename_base str

Base name without date or extension.

required
year int

Year and month of the file.

required
month int

Year and month of the file.

required
data_format str

File extension, e.g., 'grib' or 'nc'.

'grib'

Returns:

Type Description
Path

Resolved path under the proper data directory.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/file_management.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def expected_save_path(
        save_dir: str | Path | None,
        filename_base: str,
        year: int,
        month: int,
        data_format: str = "grib"
        ) -> Path:
    """
    Construct canonical save path for monthly data.

    Parameters
    ----------
    save_dir : str | Path | None
        Base directory for saving. If None, defaults to osme_common.paths.data_dir().
    filename_base : str
        Base name without date or extension.
    year, month : int
        Year and month of the file.
    data_format : str
        File extension, e.g., 'grib' or 'nc'.

    Returns
    -------
    Path
        Resolved path under the proper data directory.
    """
    base = resolve_under(data_dir(create=True), save_dir)
    return base / f"{filename_base}_{year:04d}-{month:02d}.{data_format}"

weather_data_retrieval.utils.logging

build_download_summary

build_download_summary(session, estimates, speed_mbps)

Construct a formatted summary string of the current download configuration.

Parameters:

Name Type Description Default
session SessionState

Current session state containing all parameters.

required
estimates dict

Dictionary containing download size and time estimates.

required
speed_mbps float

Measured or estimated internet speed in Mbps.

required

Returns:

Type Description
str

Nicely formatted summary string for display or logging.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/logging.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def build_download_summary(session: Any,
                           estimates: dict,
                           speed_mbps: float) -> str:
    """
    Construct a formatted summary string of the current download configuration.

    Parameters
    ----------
    session : SessionState
        Current session state containing all parameters.
    estimates : dict
        Dictionary containing download size and time estimates.
    speed_mbps : float
        Measured or estimated internet speed in Mbps.

    Returns
    -------
    str
        Nicely formatted summary string for display or logging.
    """
    summary = (
        f"\n" + "="* 60 +
        f"\nDownload Request Summary\n" + "="* 60 + "\n"
        f"Provider: {session.get('data_provider').upper()}\n"
        f"Dataset: {session.get('dataset_short_name')}\n"
        f"Dates: {session.get('start_date')}{session.get('end_date')}\n"
        f"Area: {session.get('region_bounds')}\n"
        f"Variables: {session.get('variables')}\n"
        f"Save Directory: {session.get('save_dir')}\n"
        f"Retries: {session.get('retry_settings')}\n"
        f"Parallelisation: {session.get('parallel_settings')}\n\n"
        f"----------------------------------------\n\n"
        f"Estimated number of monthly files: {estimates['months']}\n"
        f"Estimated size per file: {estimates['file_size_MB']:,.1f} MB\n"
        f"Estimated total size: {estimates['total_size_MB']:,.1f} MB\n\n"
        f"Measured connection speed: {speed_mbps:.4f} Mbps\n\n"
        f"Estimated maximum time per file: {_format_duration(estimates['time_per_file_sec'])}\n"
        f"Estimated maximum total time: {_format_duration(estimates['total_time_sec'])}\n"
    )
    return summary

setup_logger

setup_logger(
    save_dir=None, run_mode="interactive", verbose=False
)

Initialize and return a configured logger.

Logs are written to /logs (or $OSME_LOG_DIR) by default, with optional console output in interactive or verbose modes.

Parameters:

Name Type Description Default
save_dir str or None

Directory to save log files. If None, defaults to osme_common.paths.log_dir().

None
run_mode str

Either 'interactive' or 'automatic', by default 'interactive'.

'interactive'
verbose bool

Whether to echo logs to console in automatic mode, by default False.

False

Returns:

Type Description
Logger

Configured logger instance.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/logging.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def setup_logger(
        save_dir: str | None = None,
        run_mode: str = "interactive",
        verbose: bool = False
        ) -> logging.Logger:
    """
    Initialize and return a configured logger.

    Logs are written to <repo_root>/logs (or $OSME_LOG_DIR) by default,
    with optional console output in interactive or verbose modes.

    Parameters
    ----------
    save_dir : str or None, optional
        Directory to save log files. If None, defaults to osme_common.paths.log_dir().
    run_mode : str, optional
        Either 'interactive' or 'automatic', by default 'interactive'.
    verbose : bool, optional
        Whether to echo logs to console in automatic mode, by default False.

    Returns
    -------
    logging.Logger
        Configured logger instance.
    """
    # --- Resolve log directory ---
    base_dir = Path(save_dir) if save_dir else log_dir(create=True)
    base_dir.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    log_path = base_dir / f"run_{run_mode}_{timestamp}.log"

    logger = logging.getLogger("weather_retrieval")
    # Make sure logger captures everything; handlers will filter
    logger.setLevel(logging.DEBUG)

    # Clear old handlers safely
    if logger.hasHandlers():
        for h in list(logger.handlers):
            logger.removeHandler(h)

    # File handler — DEBUG (captures prompts + everything)
    fh = logging.FileHandler(log_path)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
    logger.addHandler(fh)

    # Console handler:
    # - interactive: always show console at INFO+
    # - automatic: show console only if verbose=True
    add_console = (run_mode == "interactive") or (run_mode == "automatic" and verbose)
    if add_console:
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)   # <- no DEBUG on console, so prompts won't duplicate
        ch.setFormatter(logging.Formatter("%(message)s"))
        logger.addHandler(ch)

    logger.info(f"Logging initialized at {log_path}")
    return logger

log_msg

log_msg(msg, logger, *, level='info', echo_console=False)

Unified logging utility. - Always logs to file. - Echo to console (via tqdm.write) only in interactive mode.

Parameters:

Name Type Description Default
msg str

Message to log.

required
logger Logger

Logger instance.

required
level str

Logging level: 'info', 'warning', 'error', 'exception', by default "info".

'info'
echo_console bool

Whether to also echo to console, by default False.

False

Returns:

Type Description
None
Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/logging.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def log_msg(
        msg: str,
        logger,
        *,
        level: str = "info",
        echo_console: bool = False
        ) -> None:
    """
    Unified logging utility.
    - Always logs to file.
    - Echo to console (via tqdm.write) only in interactive mode.

    Parameters
    ----------
    msg : str
        Message to log.
    logger : logging.Logger
        Logger instance.
    level : str, optional
        Logging level: 'info', 'warning', 'error', 'exception', by default "info".
    echo_console : bool, optional
        Whether to also echo to console, by default False.

    Returns
    -------
    None

    """
    if not logger:
        raise ValueError("Logger instance must be provided to log_msg().")

    log_fn = getattr(logger, level, logger.info)
    log_fn(msg)

    if echo_console:
        tqdm.write(s=msg)

create_final_log_file

create_final_log_file(
    session,
    filename_base,
    original_logger,
    *,
    delete_original=True,
    reattach_to_final=True
)

Create a final log file with the same naming pattern as data files. Copies content from the original log file.

Parameters:

Name Type Description Default
session Any(SessionState)

Current session state.

required
filename_base str

Base filename pattern (same as data files).

required
original_logger Logger

The original logger instance.

required
delete_original bool

Whether to delete the original log file after creating the final one, by default True.

True
reattach_to_final bool

Whether to reattach the logger to the final log file, by default True.

True

Returns:

Type Description
str

Path to the final log file.

Source code in packages/weather_data_retrieval/src/weather_data_retrieval/utils/logging.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def create_final_log_file(
        session,
        filename_base: str,
        original_logger: logging.Logger,
        *,
        delete_original: bool = True,
        reattach_to_final: bool = True,
        ) -> str | None:
    """
    Create a final log file with the same naming pattern as data files.
    Copies content from the original log file.

    Parameters
    ----------
    session : Any (SessionState)
        Current session state.
    filename_base : str
        Base filename pattern (same as data files).
    original_logger : logging.Logger
        The original logger instance.
    delete_original : bool, optional
        Whether to delete the original log file after creating the final one, by default True.
    reattach_to_final : bool, optional
        Whether to reattach the logger to the final log file, by default True.

    Returns
    -------
    str
        Path to the final log file.
    """
    # 1) locate the current FileHandler
    fh = None
    for h in original_logger.handlers:
        if isinstance(h, logging.FileHandler):
            fh = h
            break

    if fh is None or not hasattr(fh, "baseFilename"):
        # No file handler found
        return None

    original_path = Path(fh.baseFilename)

    # 2) build final path
    save_dir_raw = session.get("save_dir")
    save_dir = resolve_under(data_dir(create=True), save_dir_raw) if save_dir_raw else data_dir(create=True)

    start = session.get("start_date")
    end = session.get("end_date")
    retrieved = datetime.datetime.now().strftime("%Y%m%dT%H%M%S")

    final_name = f"{filename_base}_{start}-{end}_retrieved-{retrieved}.log"
    final_path = save_dir / final_name
    final_path.parent.mkdir(parents=True, exist_ok=True)

    # 3) flush & close the original handler, detach from logger
    fh.flush()
    fh.close()
    original_logger.removeHandler(fh)

    # 4) copy to final and optionally delete original
    try:
        # If same filesystem, you could also use os.replace to move instead of copy
        shutil.copyfile(original_path, final_path)
        if delete_original:
            try:
                os.remove(original_path)
            except Exception:
                pass
    except Exception as e:
        # Reattach the original handler if something failed so we don't lose logging
        try:
            fh = logging.FileHandler(original_path, encoding="utf-8")
            fh.setLevel(logging.DEBUG)
            fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
            original_logger.addHandler(fh)
        except Exception:
            pass
        # Log to console if possible
        original_logger.warning(f"Failed to create final log file: {e}")
        return None

    # 5) optionally attach a new FileHandler pointing at the final file
    if reattach_to_final:
        new_fh = logging.FileHandler(final_path, encoding="utf-8")
        new_fh.setLevel(logging.DEBUG)
        new_fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
        original_logger.addHandler(new_fh)

    # 6) log a confirmation (now goes to final file if reattached, and to console in interactive mode)
    try:
        original_logger.info(f"Final log file created: {final_path}")
    except Exception:
        pass

    return str(final_path)

weather_data_retrieval.utils.session_management

SessionState

first_unfilled_key

first_unfilled_key()

Return the first key in the ordered fields that is not filled. This enables a simple wizard-like progression and supports backtracking by clearing fields with unset(key).

summary

summary()

Return a nice printable summary of all fields in a tabular format.

to_dict

to_dict(only_filled=False)

Flatten the session into a plain dict suitable for runner.run(...). If only_filled=True, include only keys that have been filled.

Parameters:

Name Type Description Default
only_filled bool

Whether to include only filled keys, by default False.

False

Returns:

Type Description
dict

Flattened session dictionary.

get_cds_dataset_config

get_cds_dataset_config(session, dataset_config_mapping)

Return dataset configuration dictionary based on session short name.

Parameters:

Name Type Description Default
session SessionState

The current session state containing user selections.

required
dataset_config_mapping dict

The mapping of dataset short names to their configurations.

required

Returns:

Type Description
dict

The configuration dictionary for the selected dataset.

map_config_to_session

map_config_to_session(cfg, session, *, logger=None)

Validate and map a loaded JSON config into SessionState.

Parameters:

Name Type Description Default
cfg dict

Loaded configuration dictionary.

required
session SessionState

The session state to populate.

required
Returns:

tuple : (bool, list[str]) (ok, messages): ok=False if any hard error prevents continuing.

ensure_cds_connection

ensure_cds_connection(
    client,
    creds,
    max_reauth_attempts=6,
    wait_between_attempts=15,
)

Ensure a valid CDS API client. Re-authenticate automatically if the connection drops.

Parameters:

Name Type Description Default
client Client

Current CDS API client.

required
creds dict

{'url': str, 'key': str} stored from initial login.

required
max_reauth_attempts int

Maximum reconnection attempts before aborting.

6
wait_between_attempts int

Wait time (seconds) between re-auth attempts.

15

Returns:

Type Description
Client | None

Valid client or None if re-authentication ultimately fails.

internet_speedtest

internet_speedtest(
    test_urls=None,
    max_seconds=15,
    logger=None,
    echo_console=True,
)

Download ~100MB test file from a fast CDN to estimate speed (MB/s).

Parameters:

Name Type Description Default
test_urls list[str]

List of URLs of the test files.

None
max_seconds int

Maximum time to wait for a response, by default 15 seconds.

15

Returns:

Type Description
float: Estimated download speed in Mbps.