-
Notifications
You must be signed in to change notification settings - Fork 401
Description
TL;DR
When running a script on a remote Dask cluster (e.g., via Coiled), memory on the local machine (which starts the cluster) is unexpectedly used. This happens I think after opening NetCDF files from S3 via s3fs
with xarray.open_mfdataset
and applying a preprocess function — even though all operations are expected to run remotely on the cluster.
This is part of a script which afterwards writes the data to Zarr from the cluster (not shown here). But the local machine can download/upload Gb of data as well as using tens of Gb of memory.
Background
I initially got some help from @phofl who raised this fsspec #1747
I also tried to get some insights here:
- https://discourse.pangeo.io/t/xarray-operations-e-g-preprocess-running-locally-post-open-mfdataset-instead-of-on-dask-distributed-cluster/4637
- Xarray operations (e.g., preprocess) running locally (post open_mfdataset) instead of on Dask distributed cluster dask/distributed#8913 (comment)
Apparently the trick was to use the following options:
s3_fs = s3fs.S3FileSystem(
anon=True, default_fill_cache=False, default_cache_type="readahead"
)
However, this never fixed it for me. My local machine always ends up being used. I've kind of given up, and used smaller batches of files to process at once, but it defeats the purpose of using a cluster.
@martindurant asked me in this comment for a MVCE, but there were far too many annoying bugs on xarray which seemed to have been fixes now.
The MVCE example below can run on any machine since the files on S3 are on an anonymous bucket. I'm using coiled in it, but a Fargate cluster would show the same behaviour of memory and bandwith consumption. This example use a bit more than 6 Gb of local memory I think. If the preprocess function is disabled, memory is still being used.
Maybe it's a dask/xarray problem, and not an s3fs one, but I'm not sure. The only thing I know is that I can replicate this all the time, with any dataset I'm playing with. Depending on the dataset, my local memory/bandwith might explode. I'm very surprised I'm the only one experiencing it
MVCE
fsspec 2025.5.1
s3fs 2025.5.1
xarray 2025.7.1
dask 2025.5.1
dask-cloudprovider 2024.9.1
dask-expr 2.0.0
#!/usr/bin/env python3
from functools import partial
import s3fs
import xarray as xr
from coiled import Cluster
from dask.distributed import Client
import numpy as np
import numcodecs
import os
def create_fileset(s3_paths, s3_fs=None):
"""
Create a fileset from S3 objects specified by a list of full S3 paths.
Args:
s3_paths (str or list[str]): Either a single full S3 path (e.g., 's3://bucket_name/object_key')
or a list of full S3 paths.
Returns:
list[file-like object]: List of file-like objects representing each object in the fileset.
"""
if s3_fs is None:
s3_fs = s3fs.S3FileSystem(
anon=True, default_fill_cache=False, default_cache_type="readahead"
)
if isinstance(s3_paths, str):
s3_paths = [s3_paths]
if not isinstance(s3_paths, list):
raise ValueError("Invalid input format. Expecting either str or list[str].")
# Create a fileset by opening each file
fileset = [s3_fs.open(file) for file in s3_paths]
return fileset
def preprocess_xarray(ds, dataset_config=None):
append_dim = "TIME"
dest_name = "filename"
filename_placeholder = "UNKNOWN_FILENAME.nc"
filename = filename_placeholder
try:
var = next(var for var in ds) # get the first variable
if hasattr(ds[var], "encoding") and "source" in ds[var].encoding:
filename = os.path.basename(ds[var].encoding["source"])
elif hasattr(ds, "encoding") and "source" in ds.encoding:
filename = os.path.basename(ds.encoding["source"])
print(f"Filename found in ds.encoding: {filename}")
except Exception as e:
print(
f"Original filename not available in the xarray dataset: {e} - will default to use {filename_placeholder}"
)
filename = filename_placeholder
length = ds.sizes[append_dim]
string_array = np.full(
length, filename, dtype=object
) # having da.full could break things when appending to existing zarr. need to investigate why. default to using numpy array instead
ds[dest_name] = (append_dim, string_array)
ds[dest_name].encoding = {
"_FillValue": "",
"dtype": object,
"compressor": None,
"filters": None,
"object_codec": numcodecs.VLenUTF8(), # crucial!
}
return ds
class GenericHandler:
def _open_mfds(
self, partial_preprocess, drop_vars_list, batch_files, engine="scipy"
):
open_mfdataset_params = {
"engine": engine,
"parallel": True,
"preprocess": partial_preprocess,
"data_vars": "all",
"concat_characters": True,
"mask_and_scale": True,
"decode_cf": True,
"decode_times": True,
"use_cftime": True,
"decode_coords": True,
"compat": "override",
"coords": "minimal",
"drop_variables": drop_vars_list,
}
ds = xr.open_mfdataset(batch_files, **open_mfdataset_params)
# ds = ds.unify_chunks()
# ds = ds.sortby("TIME")
return ds
def publish_cloud_optimised_fileset_batch(self, s3_file_uri_list):
self.dataset_config = None
drop_vars_list = ["wind_speed"]
partial_preprocess = partial(
preprocess_xarray, dataset_config=self.dataset_config
)
ds = self._open_mfds(
partial_preprocess,
drop_vars_list,
s3_file_uri_list,
)
# next part of the code should do a ds.to_zarr()
prefix = "s3://imos-data/IMOS/SRS/OC/radiometer/VLHJ_Southern-Surveyor/2013/"
s3_file_uri_list += [
prefix + "IMOS_SRS-OC_F_20130214T000001Z_VLHJ_FV01_DALEC_END-20130214T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130215T000000Z_VLHJ_FV01_DALEC_END-20130215T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130216T000000Z_VLHJ_FV01_DALEC_END-20130216T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130217T000002Z_VLHJ_FV01_DALEC_END-20130217T054432Z.nc",
prefix + "IMOS_SRS-OC_F_20130218T213111Z_VLHJ_FV01_DALEC_END-20130218T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130219T000001Z_VLHJ_FV01_DALEC_END-20130219T054325Z.nc",
prefix + "IMOS_SRS-OC_F_20130221T213337Z_VLHJ_FV01_DALEC_END-20130221T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130222T000004Z_VLHJ_FV01_DALEC_END-20130222T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130223T000002Z_VLHJ_FV01_DALEC_END-20130223T235951Z.nc",
prefix + "IMOS_SRS-OC_F_20130224T000003Z_VLHJ_FV01_DALEC_END-20130224T230710Z.nc",
prefix + "IMOS_SRS-OC_F_20130226T220957Z_VLHJ_FV01_DALEC_END-20130226T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130227T000001Z_VLHJ_FV01_DALEC_END-20130227T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20130228T000000Z_VLHJ_FV01_DALEC_END-20130228T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130301T000002Z_VLHJ_FV01_DALEC_END-20130301T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130302T000001Z_VLHJ_FV01_DALEC_END-20130302T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130303T000000Z_VLHJ_FV01_DALEC_END-20130303T001757Z.nc",
prefix + "IMOS_SRS-OC_F_20130410T001856Z_VLHJ_FV01_DALEC_END-20130410T061407Z.nc",
prefix + "IMOS_SRS-OC_F_20130411T002135Z_VLHJ_FV01_DALEC_END-20130411T061533Z.nc",
prefix + "IMOS_SRS-OC_F_20130412T002318Z_VLHJ_FV01_DALEC_END-20130412T061322Z.nc",
prefix + "IMOS_SRS-OC_F_20130413T002039Z_VLHJ_FV01_DALEC_END-20130413T061708Z.nc",
prefix + "IMOS_SRS-OC_F_20130414T001848Z_VLHJ_FV01_DALEC_END-20130414T061153Z.nc",
prefix + "IMOS_SRS-OC_F_20130415T002210Z_VLHJ_FV01_DALEC_END-20130415T060559Z.nc",
prefix + "IMOS_SRS-OC_F_20130416T001959Z_VLHJ_FV01_DALEC_END-20130416T055628Z.nc",
prefix + "IMOS_SRS-OC_F_20130417T001719Z_VLHJ_FV01_DALEC_END-20130417T054750Z.nc",
prefix + "IMOS_SRS-OC_F_20130418T001858Z_VLHJ_FV01_DALEC_END-20130419T040841Z.nc",
prefix + "IMOS_SRS-OC_F_20130419T041404Z_VLHJ_FV01_DALEC_END-20130419T051434Z.nc",
prefix + "IMOS_SRS-OC_F_20130420T002254Z_VLHJ_FV01_DALEC_END-20130420T044248Z.nc",
prefix + "IMOS_SRS-OC_F_20130421T003058Z_VLHJ_FV01_DALEC_END-20130421T020501Z.nc",
prefix + "IMOS_SRS-OC_F_20130429T005759Z_VLHJ_FV01_DALEC_END-20130429T031412Z.nc",
prefix + "IMOS_SRS-OC_F_20130621T022830Z_VLHJ_FV01_DALEC_END-20130621T034358Z.nc",
prefix + "IMOS_SRS-OC_F_20130622T015425Z_VLHJ_FV01_DALEC_END-20130622T040101Z.nc",
prefix + "IMOS_SRS-OC_F_20130623T022616Z_VLHJ_FV01_DALEC_END-20130623T035247Z.nc",
prefix + "IMOS_SRS-OC_F_20130624T023403Z_VLHJ_FV01_DALEC_END-20130624T042341Z.nc",
prefix + "IMOS_SRS-OC_F_20130625T024217Z_VLHJ_FV01_DALEC_END-20130625T045224Z.nc",
prefix + "IMOS_SRS-OC_F_20130626T025409Z_VLHJ_FV01_DALEC_END-20130626T050829Z.nc",
prefix + "IMOS_SRS-OC_F_20130627T032159Z_VLHJ_FV01_DALEC_END-20130627T051355Z.nc",
prefix + "IMOS_SRS-OC_F_20130728T003922Z_VLHJ_FV01_DALEC_END-20130728T071533Z.nc",
prefix + "IMOS_SRS-OC_F_20130729T002107Z_VLHJ_FV01_DALEC_END-20130729T070924Z.nc",
prefix + "IMOS_SRS-OC_F_20130730T000226Z_VLHJ_FV01_DALEC_END-20130730T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130731T000002Z_VLHJ_FV01_DALEC_END-20130731T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20130801T000001Z_VLHJ_FV01_DALEC_END-20130801T235929Z.nc",
prefix + "IMOS_SRS-OC_F_20130805T212533Z_VLHJ_FV01_DALEC_END-20130805T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130806T000002Z_VLHJ_FV01_DALEC_END-20130806T005357Z.nc",
prefix + "IMOS_SRS-OC_F_20130807T042802Z_VLHJ_FV01_DALEC_END-20130807T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20130808T000000Z_VLHJ_FV01_DALEC_END-20130808T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20130809T000000Z_VLHJ_FV01_DALEC_END-20130809T063416Z.nc",
prefix + "IMOS_SRS-OC_F_20130820T040551Z_VLHJ_FV01_DALEC_END-20130820T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130821T000002Z_VLHJ_FV01_DALEC_END-20130821T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20130822T000001Z_VLHJ_FV01_DALEC_END-20130822T235955Z.nc",
prefix + "IMOS_SRS-OC_F_20130823T000000Z_VLHJ_FV01_DALEC_END-20130823T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130824T000001Z_VLHJ_FV01_DALEC_END-20130824T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130825T000002Z_VLHJ_FV01_DALEC_END-20130825T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20130826T000004Z_VLHJ_FV01_DALEC_END-20130826T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130827T000003Z_VLHJ_FV01_DALEC_END-20130827T211558Z.nc",
prefix + "IMOS_SRS-OC_F_20130828T204304Z_VLHJ_FV01_DALEC_END-20130828T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130829T000000Z_VLHJ_FV01_DALEC_END-20130829T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20130830T000003Z_VLHJ_FV01_DALEC_END-20130830T235956Z.nc",
prefix + "IMOS_SRS-OC_F_20130831T000001Z_VLHJ_FV01_DALEC_END-20130831T235959Z.nc",
prefix + "IMOS_SRS-OC_F_20130901T000000Z_VLHJ_FV01_DALEC_END-20130901T235954Z.nc",
prefix + "IMOS_SRS-OC_F_20130902T000000Z_VLHJ_FV01_DALEC_END-20130902T034340Z.nc",
prefix + "IMOS_SRS-OC_F_20130927T201704Z_VLHJ_FV01_DALEC_END-20130927T235956Z.nc",
prefix + "IMOS_SRS-OC_F_20130928T000001Z_VLHJ_FV01_DALEC_END-20130928T235955Z.nc",
prefix + "IMOS_SRS-OC_F_20130930T203535Z_VLHJ_FV01_DALEC_END-20130930T235958Z.nc",
prefix + "IMOS_SRS-OC_F_20131001T000000Z_VLHJ_FV01_DALEC_END-20131001T235957Z.nc",
prefix + "IMOS_SRS-OC_F_20131002T000003Z_VLHJ_FV01_DALEC_END-20131002T034135Z.nc",
]
cluster_options = {
"n_workers": [1, 120],
"scheduler_vm_types": "t3.2xlarge",
"worker_vm_types": "t3.2xlarge",
"allow_ingress_from": "me",
"compute_purchase_option": "spot_with_fallback",
"worker_options": {"nthreads": 8, "memory_limit": "32GB"},
"name": "thisIsATest",
}
cluster = Cluster(**cluster_options)
client = Client(cluster)
s3_file_handle_list = create_fileset(s3_file_uri_list)
GenericHandler().publish_cloud_optimised_fileset_batch(s3_file_handle_list)