Skip to content

Unexpected Local Data Transfer During xarray Preprocessing on Dask Cluster #1890

@lbesnard

Description

@lbesnard

TL;DR

When running a script on a remote Dask cluster (e.g., via Coiled), memory on the local machine (which starts the cluster) is unexpectedly used. This happens I think after opening NetCDF files from S3 via s3fs with xarray.open_mfdataset and applying a preprocess function — even though all operations are expected to run remotely on the cluster.
This is part of a script which afterwards writes the data to Zarr from the cluster (not shown here). But the local machine can download/upload Gb of data as well as using tens of Gb of memory.

Background

I initially got some help from @phofl who raised this fsspec #1747

I also tried to get some insights here:

Apparently the trick was to use the following options:

s3_fs = s3fs.S3FileSystem(
    anon=True, default_fill_cache=False, default_cache_type="readahead"
)

However, this never fixed it for me. My local machine always ends up being used. I've kind of given up, and used smaller batches of files to process at once, but it defeats the purpose of using a cluster.

@martindurant asked me in this comment for a MVCE, but there were far too many annoying bugs on xarray which seemed to have been fixes now.

The MVCE example below can run on any machine since the files on S3 are on an anonymous bucket. I'm using coiled in it, but a Fargate cluster would show the same behaviour of memory and bandwith consumption. This example use a bit more than 6 Gb of local memory I think. If the preprocess function is disabled, memory is still being used.

Maybe it's a dask/xarray problem, and not an s3fs one, but I'm not sure. The only thing I know is that I can replicate this all the time, with any dataset I'm playing with. Depending on the dataset, my local memory/bandwith might explode. I'm very surprised I'm the only one experiencing it

MVCE

fsspec                        2025.5.1
s3fs                          2025.5.1
xarray                    2025.7.1
dask                      2025.5.1
dask-cloudprovider        2024.9.1
dask-expr                 2.0.0
#!/usr/bin/env python3
from functools import partial

import s3fs
import xarray as xr
from coiled import Cluster
from dask.distributed import Client
import numpy as np
import numcodecs
import os


def create_fileset(s3_paths, s3_fs=None):
    """
    Create a fileset from S3 objects specified by a list of full S3 paths.

    Args:
        s3_paths (str or list[str]): Either a single full S3 path (e.g., 's3://bucket_name/object_key')
                                     or a list of full S3 paths.

    Returns:
        list[file-like object]: List of file-like objects representing each object in the fileset.
    """
    if s3_fs is None:
        s3_fs = s3fs.S3FileSystem(
            anon=True, default_fill_cache=False, default_cache_type="readahead"
        )

    if isinstance(s3_paths, str):
        s3_paths = [s3_paths]

    if not isinstance(s3_paths, list):
        raise ValueError("Invalid input format. Expecting either str or list[str].")

    # Create a fileset by opening each file
    fileset = [s3_fs.open(file) for file in s3_paths]

    return fileset


def preprocess_xarray(ds, dataset_config=None):
    append_dim = "TIME"
    dest_name = "filename"

    filename_placeholder = "UNKNOWN_FILENAME.nc"
    filename = filename_placeholder

    try:
        var = next(var for var in ds)  # get the first variable
        if hasattr(ds[var], "encoding") and "source" in ds[var].encoding:
            filename = os.path.basename(ds[var].encoding["source"])
        elif hasattr(ds, "encoding") and "source" in ds.encoding:
            filename = os.path.basename(ds.encoding["source"])

        print(f"Filename found in ds.encoding: {filename}")
    except Exception as e:
        print(
            f"Original filename not available in the xarray dataset: {e} - will default to use {filename_placeholder}"
        )
        filename = filename_placeholder

    length = ds.sizes[append_dim]
    string_array = np.full(
        length, filename, dtype=object
    )  # having da.full could break things when appending to existing zarr. need to investigate why. default to using numpy array instead

    ds[dest_name] = (append_dim, string_array)
    ds[dest_name].encoding = {
        "_FillValue": "",
        "dtype": object,
        "compressor": None,
        "filters": None,
        "object_codec": numcodecs.VLenUTF8(),  # crucial!
    }

    return ds


class GenericHandler:
    def _open_mfds(
        self, partial_preprocess, drop_vars_list, batch_files, engine="scipy"
    ):
        open_mfdataset_params = {
            "engine": engine,
            "parallel": True,
            "preprocess": partial_preprocess,
            "data_vars": "all",
            "concat_characters": True,
            "mask_and_scale": True,
            "decode_cf": True,
            "decode_times": True,
            "use_cftime": True,
            "decode_coords": True,
            "compat": "override",
            "coords": "minimal",
            "drop_variables": drop_vars_list,
        }

        ds = xr.open_mfdataset(batch_files, **open_mfdataset_params)

        # ds = ds.unify_chunks()
        # ds = ds.sortby("TIME")

        return ds

    def publish_cloud_optimised_fileset_batch(self, s3_file_uri_list):
        self.dataset_config = None
        drop_vars_list = ["wind_speed"]

        partial_preprocess = partial(
            preprocess_xarray, dataset_config=self.dataset_config
        )

        ds = self._open_mfds(
            partial_preprocess,
            drop_vars_list,
            s3_file_uri_list,
        )

       # next part of the code should do a ds.to_zarr() 

prefix = "s3://imos-data/IMOS/SRS/OC/radiometer/VLHJ_Southern-Surveyor/2013/"
s3_file_uri_list += [
    prefix + "IMOS_SRS-OC_F_20130214T000001Z_VLHJ_FV01_DALEC_END-20130214T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130215T000000Z_VLHJ_FV01_DALEC_END-20130215T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130216T000000Z_VLHJ_FV01_DALEC_END-20130216T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130217T000002Z_VLHJ_FV01_DALEC_END-20130217T054432Z.nc",
    prefix + "IMOS_SRS-OC_F_20130218T213111Z_VLHJ_FV01_DALEC_END-20130218T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130219T000001Z_VLHJ_FV01_DALEC_END-20130219T054325Z.nc",
    prefix + "IMOS_SRS-OC_F_20130221T213337Z_VLHJ_FV01_DALEC_END-20130221T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130222T000004Z_VLHJ_FV01_DALEC_END-20130222T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130223T000002Z_VLHJ_FV01_DALEC_END-20130223T235951Z.nc",
    prefix + "IMOS_SRS-OC_F_20130224T000003Z_VLHJ_FV01_DALEC_END-20130224T230710Z.nc",
    prefix + "IMOS_SRS-OC_F_20130226T220957Z_VLHJ_FV01_DALEC_END-20130226T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130227T000001Z_VLHJ_FV01_DALEC_END-20130227T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20130228T000000Z_VLHJ_FV01_DALEC_END-20130228T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130301T000002Z_VLHJ_FV01_DALEC_END-20130301T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130302T000001Z_VLHJ_FV01_DALEC_END-20130302T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130303T000000Z_VLHJ_FV01_DALEC_END-20130303T001757Z.nc",
    prefix + "IMOS_SRS-OC_F_20130410T001856Z_VLHJ_FV01_DALEC_END-20130410T061407Z.nc",
    prefix + "IMOS_SRS-OC_F_20130411T002135Z_VLHJ_FV01_DALEC_END-20130411T061533Z.nc",
    prefix + "IMOS_SRS-OC_F_20130412T002318Z_VLHJ_FV01_DALEC_END-20130412T061322Z.nc",
    prefix + "IMOS_SRS-OC_F_20130413T002039Z_VLHJ_FV01_DALEC_END-20130413T061708Z.nc",
    prefix + "IMOS_SRS-OC_F_20130414T001848Z_VLHJ_FV01_DALEC_END-20130414T061153Z.nc",
    prefix + "IMOS_SRS-OC_F_20130415T002210Z_VLHJ_FV01_DALEC_END-20130415T060559Z.nc",
    prefix + "IMOS_SRS-OC_F_20130416T001959Z_VLHJ_FV01_DALEC_END-20130416T055628Z.nc",
    prefix + "IMOS_SRS-OC_F_20130417T001719Z_VLHJ_FV01_DALEC_END-20130417T054750Z.nc",
    prefix + "IMOS_SRS-OC_F_20130418T001858Z_VLHJ_FV01_DALEC_END-20130419T040841Z.nc",
    prefix + "IMOS_SRS-OC_F_20130419T041404Z_VLHJ_FV01_DALEC_END-20130419T051434Z.nc",
    prefix + "IMOS_SRS-OC_F_20130420T002254Z_VLHJ_FV01_DALEC_END-20130420T044248Z.nc",
    prefix + "IMOS_SRS-OC_F_20130421T003058Z_VLHJ_FV01_DALEC_END-20130421T020501Z.nc",
    prefix + "IMOS_SRS-OC_F_20130429T005759Z_VLHJ_FV01_DALEC_END-20130429T031412Z.nc",
    prefix + "IMOS_SRS-OC_F_20130621T022830Z_VLHJ_FV01_DALEC_END-20130621T034358Z.nc",
    prefix + "IMOS_SRS-OC_F_20130622T015425Z_VLHJ_FV01_DALEC_END-20130622T040101Z.nc",
    prefix + "IMOS_SRS-OC_F_20130623T022616Z_VLHJ_FV01_DALEC_END-20130623T035247Z.nc",
    prefix + "IMOS_SRS-OC_F_20130624T023403Z_VLHJ_FV01_DALEC_END-20130624T042341Z.nc",
    prefix + "IMOS_SRS-OC_F_20130625T024217Z_VLHJ_FV01_DALEC_END-20130625T045224Z.nc",
    prefix + "IMOS_SRS-OC_F_20130626T025409Z_VLHJ_FV01_DALEC_END-20130626T050829Z.nc",
    prefix + "IMOS_SRS-OC_F_20130627T032159Z_VLHJ_FV01_DALEC_END-20130627T051355Z.nc",
    prefix + "IMOS_SRS-OC_F_20130728T003922Z_VLHJ_FV01_DALEC_END-20130728T071533Z.nc",
    prefix + "IMOS_SRS-OC_F_20130729T002107Z_VLHJ_FV01_DALEC_END-20130729T070924Z.nc",
    prefix + "IMOS_SRS-OC_F_20130730T000226Z_VLHJ_FV01_DALEC_END-20130730T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130731T000002Z_VLHJ_FV01_DALEC_END-20130731T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20130801T000001Z_VLHJ_FV01_DALEC_END-20130801T235929Z.nc",
    prefix + "IMOS_SRS-OC_F_20130805T212533Z_VLHJ_FV01_DALEC_END-20130805T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130806T000002Z_VLHJ_FV01_DALEC_END-20130806T005357Z.nc",
    prefix + "IMOS_SRS-OC_F_20130807T042802Z_VLHJ_FV01_DALEC_END-20130807T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20130808T000000Z_VLHJ_FV01_DALEC_END-20130808T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20130809T000000Z_VLHJ_FV01_DALEC_END-20130809T063416Z.nc",
    prefix + "IMOS_SRS-OC_F_20130820T040551Z_VLHJ_FV01_DALEC_END-20130820T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130821T000002Z_VLHJ_FV01_DALEC_END-20130821T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20130822T000001Z_VLHJ_FV01_DALEC_END-20130822T235955Z.nc",
    prefix + "IMOS_SRS-OC_F_20130823T000000Z_VLHJ_FV01_DALEC_END-20130823T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130824T000001Z_VLHJ_FV01_DALEC_END-20130824T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130825T000002Z_VLHJ_FV01_DALEC_END-20130825T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20130826T000004Z_VLHJ_FV01_DALEC_END-20130826T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130827T000003Z_VLHJ_FV01_DALEC_END-20130827T211558Z.nc",
    prefix + "IMOS_SRS-OC_F_20130828T204304Z_VLHJ_FV01_DALEC_END-20130828T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130829T000000Z_VLHJ_FV01_DALEC_END-20130829T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20130830T000003Z_VLHJ_FV01_DALEC_END-20130830T235956Z.nc",
    prefix + "IMOS_SRS-OC_F_20130831T000001Z_VLHJ_FV01_DALEC_END-20130831T235959Z.nc",
    prefix + "IMOS_SRS-OC_F_20130901T000000Z_VLHJ_FV01_DALEC_END-20130901T235954Z.nc",
    prefix + "IMOS_SRS-OC_F_20130902T000000Z_VLHJ_FV01_DALEC_END-20130902T034340Z.nc",
    prefix + "IMOS_SRS-OC_F_20130927T201704Z_VLHJ_FV01_DALEC_END-20130927T235956Z.nc",
    prefix + "IMOS_SRS-OC_F_20130928T000001Z_VLHJ_FV01_DALEC_END-20130928T235955Z.nc",
    prefix + "IMOS_SRS-OC_F_20130930T203535Z_VLHJ_FV01_DALEC_END-20130930T235958Z.nc",
    prefix + "IMOS_SRS-OC_F_20131001T000000Z_VLHJ_FV01_DALEC_END-20131001T235957Z.nc",
    prefix + "IMOS_SRS-OC_F_20131002T000003Z_VLHJ_FV01_DALEC_END-20131002T034135Z.nc",
]


cluster_options = {
    "n_workers": [1, 120],
    "scheduler_vm_types": "t3.2xlarge",
    "worker_vm_types": "t3.2xlarge",
    "allow_ingress_from": "me",
    "compute_purchase_option": "spot_with_fallback",
    "worker_options": {"nthreads": 8, "memory_limit": "32GB"},
    "name": "thisIsATest",
}

cluster = Cluster(**cluster_options)

client = Client(cluster)

s3_file_handle_list = create_fileset(s3_file_uri_list)
GenericHandler().publish_cloud_optimised_fileset_batch(s3_file_handle_list)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions