Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import SimpleITK as sitk
from prefect import flow, task
from pytools.HedwigZarrImages import HedwigZarrImage, HedwigZarrImages
from pytools import HedwigZarrImage, HedwigZarrImages

from em_workflows.file_path import FilePath
from em_workflows.utils import utils
Expand Down Expand Up @@ -78,8 +78,11 @@ def copy_zarr_to_assets_dir(file_path: FilePath) -> None:


@task
def generate_imageset(file_path: FilePath) -> List[Dict]:
def generate_imageset(file_path: FilePath,
use_default_dask=False) -> List[Dict]:
"""
:param: use_default_dask: If True, reuses the Prefect Dask Scheduler for the ZARR and Dask array operations.

| ImageSet consists of all the assets for a particular zarr sub-image and label images
| Macro image is ignored
| Label image is added as an thumbnail asset
Expand All @@ -91,16 +94,17 @@ def generate_imageset(file_path: FilePath) -> List[Dict]:
"""
zarr_fp = f"{file_path.assets_dir}/{file_path.base}.zarr"
image_set = list()
zarr_images = HedwigZarrImages(Path(zarr_fp))

if use_default_dask:
compute_args = {}
else:
# This task is used in a sub-flow where it's the only task running.
# use all the cores in a thread pool.
compute_args = {"scheduler": "threads"}
zarr_images = HedwigZarrImages(Path(zarr_fp), compute_args=compute_args)
# for image_name, image in zarr_images.series():
for k_idx, image_name in enumerate(zarr_images.get_series_keys()):
# The relative path of the zarr group from the root zarr
# this assumes a valid zarr group with OME directory inside
ome_index_to_zarr_group = zarr_images.zarr_root["OME"].attrs["series"]
zarr_idx = ome_index_to_zarr_group[k_idx]
image = HedwigZarrImage(
zarr_images.zarr_root[zarr_idx], zarr_images.ome_info, k_idx
)
image = zarr_images[k_idx]
# single image element
image_elt = dict()
image_elt["imageMetadata"] = None
Expand All @@ -119,7 +123,7 @@ def generate_imageset(file_path: FilePath) -> List[Dict]:

else:
ng_asset = file_path.gen_asset(
asset_type="neuroglancerZarr", asset_fp=Path(zarr_fp) / zarr_idx
asset_type="neuroglancerZarr", asset_fp=image.path
)
# note - dims should be image.dims, but GUI does not want XYC
# hardcoding in XY for now.
Expand Down Expand Up @@ -156,7 +160,9 @@ async def generate_czi_imageset(file_path: FilePath) -> List[Dict]:
copy_to_assets = copy_zarr_to_assets_dir.submit(
file_path, wait_for=[rechunk_result]
)
return generate_imageset.submit(file_path, wait_for=[copy_to_assets])
return generate_imageset.submit(file_path,
use_default_dask=True,
wait_for=[copy_to_assets])


@task
Expand Down