Skip to content

Do not allow exceptions in ManifestStore #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def zarr_store_scalar(tmpdir):
def _generate_chunk_entries(
shape: tuple[int, ...],
chunks: tuple[int, ...],
entry_generator: Callable[[tuple[int, ...]], dict[str, Any]],
itemsize: int,
entry_generator: Callable[[tuple[int, ...], tuple[int, ...], int], dict[str, Any]],
) -> dict[str, dict[str, Any]]:
"""
Generate chunk entries for a manifest based on shape and chunks.
Expand All @@ -112,30 +113,29 @@ def _generate_chunk_entries(
)

if chunk_grid_shape == ():
return {"0": entry_generator((0,))}
return {"0": entry_generator((0,), (0,), itemsize)}

all_possible_combos = itertools.product(
*[range(length) for length in chunk_grid_shape]
)
return {join(ind): entry_generator(ind) for ind in all_possible_combos}


def _offset_from_chunk_key(ind: tuple[int, ...]) -> int:
"""Generate an offset value from chunk indices."""
return sum(ind) * 10
return {
join(ind): entry_generator(ind, chunks, itemsize) for ind in all_possible_combos
}


def _length_from_chunk_key(ind: tuple[int, ...]) -> int:
def _length_from_chunk_key(chunks: tuple[int, ...], itemsize: int) -> int:
"""Generate a length value from chunk indices."""
return sum(ind) + 5
return int(np.prod(chunks) * itemsize)


def _entry_from_chunk_key(ind: tuple[int, ...]) -> dict[str, str | int]:
def _entry_from_chunk_key(
ind: tuple[int, ...], chunks: tuple[int, ...], itemsize: int
) -> dict[str, str | int]:
"""Generate a (somewhat) unique manifest entry from a given chunk key."""
entry = {
"path": f"/foo.{str(join(ind))}.nc",
"offset": _offset_from_chunk_key(ind),
"length": _length_from_chunk_key(ind),
"offset": 0,
"length": _length_from_chunk_key(chunks, itemsize),
}
return entry # type: ignore[return-value]

Expand All @@ -150,7 +150,9 @@ def _generate_chunk_manifest(
"""Generate a chunk manifest with sequential offsets for each chunk."""
current_offset = [offset] # Use list to allow mutation in closure

def sequential_entry_generator(ind: tuple[int, ...]) -> dict[str, Any]:
def sequential_entry_generator(
ind: tuple[int, ...], chunks: tuple[int, ...], itemsize: int
) -> dict[str, Any]:
entry = {
"path": netcdf4_file,
"offset": current_offset[0],
Expand All @@ -159,7 +161,7 @@ def sequential_entry_generator(ind: tuple[int, ...]) -> dict[str, Any]:
current_offset[0] += length
return entry

entries = _generate_chunk_entries(shape, chunks, sequential_entry_generator)
entries = _generate_chunk_entries(shape, chunks, 32, sequential_entry_generator)
return ChunkManifest(entries)


Expand Down Expand Up @@ -369,7 +371,9 @@ def _manifest_array(
codecs=codecs,
dimension_names=dimension_names,
)
entries = _generate_chunk_entries(shape, chunks, _entry_from_chunk_key)
entries = _generate_chunk_entries(
shape, chunks, data_type.itemsize, _entry_from_chunk_key
)
chunkmanifest = ChunkManifest(entries=entries)
return ManifestArray(chunkmanifest=chunkmanifest, metadata=metadata)

Expand Down
33 changes: 12 additions & 21 deletions virtualizarr/manifests/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@
__all__ = ["ManifestStore"]


_ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = (
FileNotFoundError,
IsADirectoryError,
NotADirectoryError,
)


@dataclass
class StoreRequest:
"""Dataclass for matching a key to the store instance"""
Expand Down Expand Up @@ -259,28 +252,26 @@ async def get(
f"Could not find a store to use for {path} in the store registry"
)

# Truncate path to match Obstore expectations
key = urlparse(path).path
path_in_store = urlparse(path).path
if hasattr(store, "prefix") and store.prefix:
# strip the prefix from key
key = key.removeprefix(str(store.prefix))

prefix = str(store.prefix).lstrip("/")
path_in_store = path_in_store.lstrip("/").removeprefix(prefix).lstrip("/")
elif hasattr(store, "url"):
prefix = urlparse(store.url).path.lstrip("/")
path_in_store = path_in_store.lstrip("/").removeprefix(prefix).lstrip("/")
# Transform the input byte range to account for the chunk location in the file
chunk_end_exclusive = offset + length
byte_range = _transform_byte_range(
byte_range, chunk_start=offset, chunk_end_exclusive=chunk_end_exclusive
)

# Actually get the bytes
try:
bytes = await store.get_range_async(
key,
start=byte_range.start,
end=byte_range.end,
)
return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type]
except _ALLOWED_EXCEPTIONS:
return None
bytes = await store.get_range_async(
path_in_store,
start=byte_range.start,
end=byte_range.end,
)
return prototype.buffer.from_bytes(bytes) # type: ignore[arg-type]

async def get_partial_values(
self,
Expand Down
16 changes: 8 additions & 8 deletions virtualizarr/tests/test_manifests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ def test_broadcast_existing_axis(self, manifest_array):
assert expanded.shape == (3, 2)
assert expanded.chunks == (1, 2)
assert expanded.manifest.dict() == {
"0.0": {"path": "file:///foo.0.0.nc", "offset": 0, "length": 5},
"1.0": {"path": "file:///foo.0.0.nc", "offset": 0, "length": 5},
"2.0": {"path": "file:///foo.0.0.nc", "offset": 0, "length": 5},
"0.0": {"path": "file:///foo.0.0.nc", "offset": 0, "length": 8},
"1.0": {"path": "file:///foo.0.0.nc", "offset": 0, "length": 8},
"2.0": {"path": "file:///foo.0.0.nc", "offset": 0, "length": 8},
Comment on lines +141 to +143
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the length of these need to change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the chunks are emulating two 4 byte numbers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it doesn't actually matter for these tests right? You're just making it neater.

}

def test_broadcast_new_axis(self, manifest_array):
Expand All @@ -149,9 +149,9 @@ def test_broadcast_new_axis(self, manifest_array):
assert expanded.shape == (1, 3)
assert expanded.chunks == (1, 1)
assert expanded.manifest.dict() == {
"0.0": {"path": "file:///foo.0.nc", "offset": 0, "length": 5},
"0.1": {"path": "file:///foo.1.nc", "offset": 10, "length": 6},
"0.2": {"path": "file:///foo.2.nc", "offset": 20, "length": 7},
"0.0": {"path": "file:///foo.0.nc", "offset": 0, "length": 4},
"0.1": {"path": "file:///foo.1.nc", "offset": 0, "length": 4},
"0.2": {"path": "file:///foo.2.nc", "offset": 0, "length": 4},
}

def test_broadcast_scalar(self, manifest_array):
Expand All @@ -160,14 +160,14 @@ def test_broadcast_scalar(self, manifest_array):
assert marr.shape == ()
assert marr.chunks == ()
assert marr.manifest.dict() == {
"0": {"path": "file:///foo.0.nc", "offset": 0, "length": 5},
"0": {"path": "file:///foo.0.nc", "offset": 0, "length": 0},
}

expanded = np.broadcast_to(marr, shape=(1,))
assert expanded.shape == (1,)
assert expanded.chunks == (1,)
assert expanded.manifest.dict() == {
"0": {"path": "file:///foo.0.nc", "offset": 0, "length": 5},
"0": {"path": "file:///foo.0.nc", "offset": 0, "length": 0},
}

@pytest.mark.parametrize(
Expand Down
35 changes: 22 additions & 13 deletions virtualizarr/tests/test_manifests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import TYPE_CHECKING

import numpy as np
import obstore as obs
import pytest
from obstore.store import MemoryStore
from zarr.abc.store import (
Expand Down Expand Up @@ -341,20 +342,28 @@ def test_single_group_to_dataset(
expected_loadable_variables,
):
marr1 = manifest_array(
shape=(3, 2, 5), chunks=(1, 2, 1), dimension_names=["x", "y", "t"]
shape=(3, 2, 5),
chunks=(1, 2, 1),
dimension_names=["x", "y", "t"],
codecs=None,
)
marr2 = manifest_array(shape=(3, 2), chunks=(1, 2), dimension_names=["x", "y"])
marr3 = manifest_array(shape=(5,), chunks=(5,), dimension_names=["t"])

paths1 = list({v["path"] for v in marr1.manifest.values()})
paths2 = list({v["path"] for v in marr2.manifest.values()})
paths3 = list({v["path"] for v in marr2.manifest.values()})
unique_paths = list(set(paths1 + paths2 + paths3))
stores = {}
for path in unique_paths:
store = MemoryStore()
stores[get_store_prefix(path)] = store
store_registry = ObjectStoreRegistry(stores=stores)
marr2 = manifest_array(
shape=(3, 2), chunks=(1, 2), dimension_names=["x", "y"], codecs=None
)
marr3 = manifest_array(
shape=(5,), chunks=(5,), dimension_names=["t"], codecs=None
)

store = MemoryStore()
for marr in [marr1, marr2, marr3]:
unique_paths = list({v["path"] for v in marr.manifest.values()})
for path in unique_paths:
obs.put(
store,
path.split("/")[-1],
np.ones(marr.chunks, dtype=marr.dtype).tobytes(),
)
store_registry = ObjectStoreRegistry({"file://": store})

manifest_group = ManifestGroup(
arrays={
Expand Down
Loading