# Copyright 2026 IPSL / CNRS / Sorbonne University
# Authors: Kishanthan Kingston
#
# This work is licensed under the Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc-sa/4.0/
import argparse
import gc
import shutil
from pathlib import Path
from AID_BC.dataset import ClimateDataset
from AID_BC.logger import Logger
[docs]
def parse_args():
"""
Parse command-line arguments.
Returns
-------
argparse.Namespace
Parsed arguments.
"""
parser = argparse.ArgumentParser(
description="Preprocess CMIP6 data onto ERA5 grid and save as Zarr"
)
parser.add_argument("--start_year", type=int, required=True, help="Start year")
parser.add_argument("--end_year", type=int, required=True, help="End year")
parser.add_argument("--variable", type=str, default="VAR_2T", help="Variable name")
parser.add_argument(
"--era5_root", type=str, required=True, help="ERA5 root directory"
)
parser.add_argument(
"--cmip6_root", type=str, required=True, help="CMIP6 root directory"
)
parser.add_argument(
"--output_zarr", type=str, required=True, help="Output Zarr path"
)
# Define the Zarr chunk sizes used for storage and later access
parser.add_argument("--time_chunk", type=int, default=1460, help="Time chunk size")
parser.add_argument(
"--lat_chunk", type=int, default=144, help="Latitude chunk size"
)
parser.add_argument(
"--lon_chunk", type=int, default=360, help="Longitude chunk size"
)
# Allow the output Zarr store to be replaced if it already exists
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite output Zarr if it already exists",
)
return parser.parse_args()
[docs]
def build_paths(year, era5_root, cmip6_root):
"""
Build ERA5 and CMIP6 file paths.
Parameters
----------
year : int
Year to process.
era5_root : str
ERA5 root directory.
cmip6_root : str
CMIP6 root directory.
Returns
-------
tuple[str, str]
ERA5 and CMIP6 file paths.
"""
era5_path = Path(era5_root) / f"samples_{year}.nc"
cmip6_path = Path(cmip6_root) / f"samples_{year}.nc"
return str(era5_path), str(cmip6_path)
[docs]
def preprocess_year(year, era5_root, cmip6_root, variable_name, logger):
"""
Preprocess one CMIP6 year onto the ERA5 grid.
Parameters
----------
year : int
Year to process.
era5_root : str
ERA5 root directory.
cmip6_root : str
CMIP6 root directory.
variable_name : str
Variable name.
logger : Logger
Logger instance.
Returns
-------
da : xarray.DataArray
CMIP6 data interpolated onto ERA5 grid.
"""
# Build the yearly input paths for ERA5 and CMIP6
era5_path, cmip6_path = build_paths(
year=year, era5_root=era5_root, cmip6_root=cmip6_root
)
# ClimateDataset handles loading, checking, and preparing ERA5/CMIP6 data
ds = ClimateDataset(
era5_path=era5_path,
cmip6_path=cmip6_path,
variable_name=variable_name,
logger=logger,
)
# Prepare the data, including interpolation of CMIP6 onto the ERA5 grid
ds.prepare()
# Keep only the processed CMIP6 field and reduce precision to save memory
da = ds.cmip6_data.astype("float32")
da.name = variable_name
# Close source NetCDF files after building the interpolated DataArray
if ds.era5 is not None:
ds.era5.close()
if ds.cmip6 is not None:
ds.cmip6.close()
return da
[docs]
def main():
"""
Main preprocessing workflow.
"""
args = parse_args()
logger = Logger()
output_zarr = Path(args.output_zarr)
# Remove the existing Zarr store only if overwrite mode is enabled
if output_zarr.exists():
if args.overwrite:
logger.info(f"Removing existing Zarr:\n{output_zarr}")
shutil.rmtree(output_zarr)
else:
raise FileExistsError(
f"Output Zarr already exists: {output_zarr}. "
f"Use --overwrite to replace it."
)
output_zarr.parent.mkdir(parents=True, exist_ok=True)
first_year = True
# Process each year independently to avoid loading the full period at once
for year in range(args.start_year, args.end_year + 1):
logger.info(f"Preprocessing CMIP6 year {year}")
da = preprocess_year(
year=year,
era5_root=args.era5_root,
cmip6_root=args.cmip6_root,
variable_name=args.variable,
logger=logger,
)
# Rechunk the data before writing to Zarr for storage and later processing
da = da.chunk(
{
"time": args.time_chunk,
"latitude": args.lat_chunk,
"longitude": args.lon_chunk,
}
)
# Convert the DataArray to a Dataset before saving it as Zarr
ds_out = da.to_dataset(name=args.variable)
logger.info(f"Writing year {year} to Zarr:\n{output_zarr}")
if first_year:
# Create the Zarr store for the first processed year
ds_out.to_zarr(output_zarr, mode="w")
first_year = False
else:
# Append following years along the time dimension
ds_out.to_zarr(output_zarr, mode="a", append_dim="time")
# Explicitly free memory after each year
del da
del ds_out
# Force garbage collection
gc.collect()
logger.success("CMIP6 Zarr preprocessing completed")
if __name__ == "__main__":
main()