-
Notifications
You must be signed in to change notification settings - Fork 45
Description
Several parsers use obstore.ReadableFile because the underlying libraries (e.g., h5py) expect a file-like object. The performance for this approach is pretty terrible for any files that have many small variables. This was particularly apparent when trying to run @jacobbieker's GOES MVCE from #728 (comment) over bad internet. I think we should add documentation for how to fetch the entire file in a single get request and cache it in memory or on disk so that the performance isn't ruined by the latency of many small sequential requests.
The following script requires 9.39s user 7.89s system 27% cpu 1:03.22 total
.
import dask
import xarray as xr
from obstore.store import MemoryStore, from_url
from virtualizarr import open_virtual_dataset
from virtualizarr.parsers import HDFParser
from virtualizarr.registry import ObjectStoreRegistry
bucket = "s3://noaa-goes19/"
path = "ABI-L2-MCMIPF/2025/002/01/"
url = f"{bucket}/{path}"
store = from_url(bucket, skip_signature=True)
registry = ObjectStoreRegistry({bucket: store})
stream = store.list("/ABI-L2-MCMIPF/2025/002/01/", chunk_size=1)
files = [f"{bucket}{f[0]['path']}" for f in stream]
# TODO: Can we make it simpler to load all coordinate variables?
coords = [
'y',
'x',
't',
'y_image',
'x_image',
'band_wavelength_C01',
'band_wavelength_C02',
'band_wavelength_C03',
'band_wavelength_C04',
'band_wavelength_C05',
'band_wavelength_C06',
'band_wavelength_C07',
'band_wavelength_C08',
'band_wavelength_C09',
'band_wavelength_C10',
'band_wavelength_C11',
'band_wavelength_C12',
'band_wavelength_C13',
'band_wavelength_C14',
'band_wavelength_C15',
'band_wavelength_C16',
'band_id_C01',
'band_id_C02',
'band_id_C03',
'band_id_C04',
'band_id_C05',
'band_id_C06',
'band_id_C07',
'band_id_C08',
'band_id_C09',
'band_id_C10',
'band_id_C11',
'band_id_C12',
'band_id_C13',
'band_id_C14',
'band_id_C15',
'band_id_C16'
]
parser = HDFParser()
# TODO: Is vds[coords].load() the best approach here? Could we expose `chunks=None` in `open_virtual_dataset` to return NumPy arrays rater than Dask arrays?
def cache_and_open_virtual_dataset(url):
store, path_in_store = registry.resolve(url)
memory_store = MemoryStore()
buffer = store.get(path_in_store).bytes()
memory_store.put(path, buffer)
cached_reg = ObjectStoreRegistry({bucket: memory_store})
vds = open_virtual_dataset(
url=f"{bucket}/{path}",
parser=parser,
registry=cached_reg,
loadable_variables=coords,
decode_times=True,
)
vds[coords].load() # Load coordinate data before file is cleared from the memory store
memory_store.delete(path)
return vds
tasks = [dask.delayed(cache_and_open_virtual_dataset)(url) for url in files]
virtual_datasets = dask.compute(tasks)[0]
vds_combined = xr.concat(
virtual_datasets,
dim="t",
data_vars="minimal",
coords="minimal",
compat="override",
)
print(vds_combined)
In contrast, this code that a user would presumable arrive at from our docs has a performance of 21.16s user 18.32s system 6% cpu 9:32.27 total
:
import dask
import xarray as xr
from obstore.store import from_url
from virtualizarr import open_virtual_dataset
from virtualizarr.parsers import HDFParser
from virtualizarr.registry import ObjectStoreRegistry
bucket = "s3://noaa-goes19/"
path = "ABI-L2-MCMIPF/2025/002/01/"
url = f"{bucket}/{path}"
store = from_url(bucket, skip_signature=True)
registry = ObjectStoreRegistry({bucket: store})
stream = store.list("/ABI-L2-MCMIPF/2025/002/01/", chunk_size=1)
files = [f"{bucket}{f[0]['path']}" for f in stream]
coords = [
'y',
'x',
't',
'y_image',
'x_image',
'band_wavelength_C01',
'band_wavelength_C02',
'band_wavelength_C03',
'band_wavelength_C04',
'band_wavelength_C05',
'band_wavelength_C06',
'band_wavelength_C07',
'band_wavelength_C08',
'band_wavelength_C09',
'band_wavelength_C10',
'band_wavelength_C11',
'band_wavelength_C12',
'band_wavelength_C13',
'band_wavelength_C14',
'band_wavelength_C15',
'band_wavelength_C16',
'band_id_C01',
'band_id_C02',
'band_id_C03',
'band_id_C04',
'band_id_C05',
'band_id_C06',
'band_id_C07',
'band_id_C08',
'band_id_C09',
'band_id_C10',
'band_id_C11',
'band_id_C12',
'band_id_C13',
'band_id_C14',
'band_id_C15',
'band_id_C16'
]
parser = HDFParser()
tasks = [dask.delayed(open_virtual_dataset)(url, registry=registry ,parser=parser, loadable_variables=coords, decode_times=True) for url in files]
virtual_datasets = dask.compute(tasks)[0]
vds_combined = xr.concat(
virtual_datasets,
dim="t",
data_vars="minimal",
coords="minimal",
compat="override",
)
print(vds_combined)
This method may be useful separately from virtualizarr to make it simpler to use xarray with obstore. It is similar to fsspec's simplecache.