diff --git a/changes/3987.feature.md b/changes/3987.feature.md new file mode 100644 index 0000000000..2492b4d7dd --- /dev/null +++ b/changes/3987.feature.md @@ -0,0 +1 @@ +Two new fields on `ArrayConfig` control how the sharding codec coalesces partial-shard reads: `sharding_coalesce_max_gap_bytes` (default 1 MiB) and `sharding_coalesce_max_bytes` (default 16 MiB). When reading multiple chunks from the same shard, nearby byte ranges are merged into a single request to the store if separated by no more than `sharding_coalesce_max_gap_bytes` and the merged read stays within `sharding_coalesce_max_bytes`. Defaults are seeded from the matching `array.sharding_coalesce_max_gap_bytes` / `array.sharding_coalesce_max_bytes` keys in [`zarr.config`][] at array-creation time, and can be overridden per array by passing `config={...}` to [`zarr.create_array`][]. diff --git a/docs/user-guide/config.md b/docs/user-guide/config.md index 8a8fa94c3d..71c021b070 100644 --- a/docs/user-guide/config.md +++ b/docs/user-guide/config.md @@ -35,6 +35,7 @@ Configuration options include the following: - Async and threading options, e.g. `async.concurrency` and `threading.max_workers` - Selections of implementations of codecs, codec pipelines and buffers - Enabling GPU support with `zarr.config.enable_gpu()`. See GPU support for more. +- Control request merging when reading multiple chunks from the same shard with `array.sharding_coalesce_max_gap_bytes` and `array.sharding_coalesce_max_bytes`. Reads of nearby chunks are coalesced into a single request to the store when separated by at most `sharding_coalesce_max_gap_bytes` and the resulting merged read is no larger than `sharding_coalesce_max_bytes`. For selecting custom implementations of codecs, pipelines, buffers and ndbuffers, first register the implementations in the registry and then select them in the config. diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 33c8602ecb..bed11013e5 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -500,6 +500,8 @@ async def _decode_partial_single( chunk_spec.prototype, chunks_per_shard, all_chunk_coords, + max_gap_bytes=shard_spec.config.sharding_coalesce_max_gap_bytes, + max_coalesced_bytes=shard_spec.config.sharding_coalesce_max_bytes, ) if shard_dict_maybe is None: @@ -814,10 +816,16 @@ async def _load_partial_shard_maybe( prototype: BufferPrototype, chunks_per_shard: tuple[int, ...], all_chunk_coords: set[tuple[int, ...]], + max_gap_bytes: int, + max_coalesced_bytes: int, ) -> ShardMapping | None: """ Read chunks from `byte_getter` for the case where the read is less than a full shard. Returns a mapping of chunk coordinates to bytes or None. + + `max_gap_bytes` and `max_coalesced_bytes` are forwarded to + `Store.get_ranges` to control byte-range coalescing across the requested + chunks. """ shard_index = await self._load_shard_index_maybe(byte_getter, chunks_per_shard) if shard_index is None: @@ -841,7 +849,11 @@ async def _load_partial_shard_maybe( byte_ranges = [byte_range for _, byte_range in chunk_coord_byte_ranges] try: async for group in byte_getter.store.get_ranges( - byte_getter.path, byte_ranges, prototype=prototype + byte_getter.path, + byte_ranges, + prototype=prototype, + max_gap_bytes=max_gap_bytes, + max_coalesced_bytes=max_coalesced_bytes, ): for idx, buf in group: if buf is not None: diff --git a/src/zarr/core/array_spec.py b/src/zarr/core/array_spec.py index 2b5eb0191c..89163f7d83 100644 --- a/src/zarr/core/array_spec.py +++ b/src/zarr/core/array_spec.py @@ -7,6 +7,7 @@ MemoryOrder, parse_bool, parse_fill_value, + parse_int, parse_order, parse_shapelike, ) @@ -29,6 +30,8 @@ class ArrayConfigParams(TypedDict): order: NotRequired[MemoryOrder] write_empty_chunks: NotRequired[bool] read_missing_chunks: NotRequired[bool] + sharding_coalesce_max_gap_bytes: NotRequired[int] + sharding_coalesce_max_bytes: NotRequired[int] @dataclass(frozen=True) @@ -45,22 +48,42 @@ class ArrayConfig: read_missing_chunks : bool If True, missing chunks will be filled with the array's fill value on read. If False, reading missing chunks will raise a ``ChunkNotFoundError``. + sharding_coalesce_max_gap_bytes : int + When reading multiple chunks from the same shard, nearby byte ranges + separated by no more than this many bytes are coalesced into a single + request to the store. + sharding_coalesce_max_bytes : int + Requests will not be coalesced if doing so would exceed this byte size. """ order: MemoryOrder write_empty_chunks: bool read_missing_chunks: bool + sharding_coalesce_max_gap_bytes: int + sharding_coalesce_max_bytes: int def __init__( - self, order: MemoryOrder, write_empty_chunks: bool, *, read_missing_chunks: bool = True + self, + order: MemoryOrder, + write_empty_chunks: bool, + *, + read_missing_chunks: bool = True, + sharding_coalesce_max_gap_bytes: int = 1 << 20, # 1 MiB + sharding_coalesce_max_bytes: int = 16 << 20, # 16 MiB ) -> None: order_parsed = parse_order(order) write_empty_chunks_parsed = parse_bool(write_empty_chunks) read_missing_chunks_parsed = parse_bool(read_missing_chunks) + sharding_coalesce_max_gap_bytes_parsed = parse_int(sharding_coalesce_max_gap_bytes) + sharding_coalesce_max_bytes_parsed = parse_int(sharding_coalesce_max_bytes) object.__setattr__(self, "order", order_parsed) object.__setattr__(self, "write_empty_chunks", write_empty_chunks_parsed) object.__setattr__(self, "read_missing_chunks", read_missing_chunks_parsed) + object.__setattr__( + self, "sharding_coalesce_max_gap_bytes", sharding_coalesce_max_gap_bytes_parsed + ) + object.__setattr__(self, "sharding_coalesce_max_bytes", sharding_coalesce_max_bytes_parsed) @classmethod def from_dict(cls, data: ArrayConfigParams) -> Self: @@ -72,7 +95,8 @@ def from_dict(cls, data: ArrayConfigParams) -> Self: kwargs_out: ArrayConfigParams = {} for f in fields(ArrayConfig): field_name = cast( - "Literal['order', 'write_empty_chunks', 'read_missing_chunks']", f.name + "Literal['order', 'write_empty_chunks', 'read_missing_chunks', 'sharding_coalesce_max_gap_bytes', 'sharding_coalesce_max_bytes']", + f.name, ) if field_name not in data: kwargs_out[field_name] = zarr_config.get(f"array.{field_name}") @@ -88,6 +112,8 @@ def to_dict(self) -> ArrayConfigParams: "order": self.order, "write_empty_chunks": self.write_empty_chunks, "read_missing_chunks": self.read_missing_chunks, + "sharding_coalesce_max_gap_bytes": self.sharding_coalesce_max_gap_bytes, + "sharding_coalesce_max_bytes": self.sharding_coalesce_max_bytes, } diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index eafffa1818..570e3dfdc3 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -217,6 +217,12 @@ def parse_bool(data: Any) -> bool: raise ValueError(f"Expected bool, got {data} instead.") +def parse_int(data: Any) -> int: + if isinstance(data, int) and not isinstance(data, bool): + return data + raise ValueError(f"Expected int, got {data} instead.") + + def _warn_write_empty_chunks_kwarg() -> None: # TODO: link to docs page on array configuration in this message msg = ( diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index 7dcbc78e31..08d2a50ace 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -99,6 +99,8 @@ def enable_gpu(self) -> ConfigSet: "read_missing_chunks": True, "target_shard_size_bytes": None, "rectilinear_chunks": False, + "sharding_coalesce_max_gap_bytes": 1 << 20, # 1 MiB + "sharding_coalesce_max_bytes": 16 << 20, # 16 MiB }, "async": {"concurrency": 10, "timeout": None}, "threading": {"max_workers": None}, diff --git a/tests/test_codecs/test_sharding_unit.py b/tests/test_codecs/test_sharding_unit.py index 6e022ed9fa..829e0ce79d 100644 --- a/tests/test_codecs/test_sharding_unit.py +++ b/tests/test_codecs/test_sharding_unit.py @@ -1,3 +1,8 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, cast +from unittest.mock import AsyncMock + import numpy as np import pytest @@ -10,9 +15,14 @@ ) from zarr.core.buffer import default_buffer_prototype from zarr.core.buffer.cpu import Buffer +from zarr.core.config import config from zarr.storage._common import StorePath from zarr.storage._memory import MemoryStore +if TYPE_CHECKING: + from zarr.core.array import ShardsConfigParam + from zarr.core.array_spec import ArrayConfigParams + # ============================================================================ # _ShardIndex tests # ============================================================================ @@ -155,6 +165,8 @@ async def test_load_partial_shard_maybe_index_load_fails() -> None: prototype=default_buffer_prototype(), chunks_per_shard=(2,), all_chunk_coords={(0,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result is None @@ -187,6 +199,8 @@ async def mock_load_index( prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,), (1,), (2,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result is not None @@ -220,6 +234,8 @@ async def mock_load_index( prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,), (1,), (2,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result == {} @@ -251,6 +267,8 @@ async def mock_load_index( prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,), (1,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result is not None @@ -292,6 +310,8 @@ async def mock_load_index( prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result is None @@ -336,6 +356,8 @@ async def boom(*args: object, **kwargs: object) -> Buffer | None: prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) @@ -368,6 +390,8 @@ async def mock_load_index( prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,), (1,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result is not None @@ -405,6 +429,8 @@ async def mock_load_index( prototype=default_buffer_prototype(), chunks_per_shard=chunks_per_shard, all_chunk_coords={(0,)}, + max_gap_bytes=1 << 20, + max_coalesced_bytes=16 << 20, ) assert result == {} @@ -486,3 +512,141 @@ def test_is_total_shard_1d() -> None: # Partial partial_coords: set[tuple[int, ...]] = {(0,), (2,)} assert codec._is_total_shard(partial_coords, chunks_per_shard) is False + + +# ============================================================================ +# Coalescing config option tests +# +# Assert that the `array.sharding_coalesce_max_gap_bytes` and +# `array.sharding_coalesce_max_bytes` global config keys flow through +# `ArrayConfig` to `Store.get_ranges` as `max_gap_bytes` / +# `max_coalesced_bytes` kwargs, and that per-array `config={...}` overrides +# the global default. +# ============================================================================ + + +def _trigger_partial_shard_read(array_config: ArrayConfigParams | None = None) -> AsyncMock: + """Build a sharded array on a mocked `MemoryStore`, trigger a partial-shard + read via the public read path, and return the `get_ranges` mock. + """ + import zarr + from zarr.codecs.sharding import ShardingCodecIndexLocation + + chunk_shape = (2,) + shard_shape = (8,) + data = np.arange(8, dtype="int32") + + store = MemoryStore() + store_mock = AsyncMock(wraps=store, spec=store.__class__) + + shards: ShardsConfigParam = { + "shape": shard_shape, + "index_location": ShardingCodecIndexLocation.end, + } + a = zarr.create_array( + StorePath(store_mock), + shape=(8,), + chunks=chunk_shape, + shards=shards, + dtype=data.dtype, + fill_value=-1, + config=array_config, + ) + a[:] = data + + store_mock.reset_mock() + + # Read a strict subset of chunks to take the partial-shard read path. + _ = a[0:4] + + return cast(AsyncMock, store_mock.get_ranges) + + +def test_load_partial_shard_forwards_global_config_to_get_ranges() -> None: + """Global `array.sharding_coalesce_*` values flow into ArrayConfig at + array-creation time and are forwarded to `Store.get_ranges`.""" + with config.set( + { + "array.sharding_coalesce_max_gap_bytes": 4242, + "array.sharding_coalesce_max_bytes": 424242, + } + ): + get_ranges_mock = _trigger_partial_shard_read() + + assert get_ranges_mock.call_count >= 1 + for call in get_ranges_mock.call_args_list: + kwargs = call.kwargs + assert kwargs["max_gap_bytes"] == 4242 + assert kwargs["max_coalesced_bytes"] == 424242 + + +def test_load_partial_shard_per_array_config_overrides_global() -> None: + """Per-array `config={...}` passed to `create_array` takes precedence over + the global config and is forwarded to `Store.get_ranges`.""" + with config.set( + { + "array.sharding_coalesce_max_gap_bytes": 4242, + "array.sharding_coalesce_max_bytes": 424242, + } + ): + get_ranges_mock = _trigger_partial_shard_read( + array_config={ + "sharding_coalesce_max_gap_bytes": 99, + "sharding_coalesce_max_bytes": 9999, + }, + ) + + assert get_ranges_mock.call_count >= 1 + for call in get_ranges_mock.call_args_list: + kwargs = call.kwargs + assert kwargs["max_gap_bytes"] == 99 + assert kwargs["max_coalesced_bytes"] == 9999 + + +def test_load_partial_shard_uses_config_defaults() -> None: + """Without explicit config, defaults from `zarr.config` are forwarded.""" + get_ranges_mock = _trigger_partial_shard_read() + + assert get_ranges_mock.call_count >= 1 + for call in get_ranges_mock.call_args_list: + kwargs = call.kwargs + assert kwargs["max_gap_bytes"] == config.get("array.sharding_coalesce_max_gap_bytes") + assert kwargs["max_coalesced_bytes"] == config.get("array.sharding_coalesce_max_bytes") + + +async def test_load_partial_shard_explicit_kwargs_passthrough( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """`_load_partial_shard_maybe` forwards its explicit kwargs to `get_ranges`.""" + codec = ShardingCodec(chunk_shape=(2,)) + chunks_per_shard = (4,) + + index = _ShardIndex.create_empty(chunks_per_shard) + index.set_chunk_slice((0,), slice(0, 100)) + index.set_chunk_slice((2,), slice(200, 300)) + + store = MemoryStore() + await store.set("shard", Buffer.from_bytes(b"x" * 300)) + store_mock = AsyncMock(wraps=store, spec=store.__class__) + byte_getter = StorePath(store_mock, "shard") + + async def mock_load_index( + self: ShardingCodec, byte_getter: StorePath, cps: tuple[int, ...] + ) -> _ShardIndex: + return index + + monkeypatch.setattr(ShardingCodec, "_load_shard_index_maybe", mock_load_index) + + await codec._load_partial_shard_maybe( + byte_getter=byte_getter, + prototype=default_buffer_prototype(), + chunks_per_shard=chunks_per_shard, + all_chunk_coords={(0,), (2,)}, + max_gap_bytes=12345, + max_coalesced_bytes=67890, + ) + + store_mock.get_ranges.assert_called_once() + kwargs = store_mock.get_ranges.call_args.kwargs + assert kwargs["max_gap_bytes"] == 12345 + assert kwargs["max_coalesced_bytes"] == 67890 diff --git a/tests/test_common.py b/tests/test_common.py index 0dedde1d6b..2fe0743e14 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -9,6 +9,7 @@ from zarr.core.common import ( ANY_ACCESS_MODE, AccessModeLiteral, + parse_int, parse_name, parse_shapelike, product, @@ -72,6 +73,18 @@ def test_parse_indexing_order_invalid(data: Any) -> None: parse_indexing_order(data) +@pytest.mark.parametrize("data", ["1", 1.0, True, False, None, [1], (1,)]) +def test_parse_int_invalid(data: Any) -> None: + """Non-int values (including bools, which are int subclasses) are rejected.""" + with pytest.raises(ValueError, match="Expected int"): + parse_int(data) + + +@pytest.mark.parametrize("data", [0, 1, -1, 2**63]) +def test_parse_int_valid(data: int) -> None: + assert parse_int(data) == data + + @pytest.mark.parametrize("data", ["C", "F"]) def parse_indexing_order_valid(data: Literal["C", "F"]) -> None: assert parse_indexing_order(data) == data diff --git a/tests/test_config.py b/tests/test_config.py index 4e293e968f..a758378dc7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,3 +1,4 @@ +import inspect import os from collections.abc import Iterable from typing import Any @@ -17,7 +18,7 @@ Crc32cCodec, ShardingCodec, ) -from zarr.core.array_spec import ArraySpec +from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import NDBuffer from zarr.core.buffer.core import Buffer from zarr.core.codec_pipeline import BatchedCodecPipeline @@ -56,6 +57,8 @@ def test_config_defaults_set() -> None: "read_missing_chunks": True, "target_shard_size_bytes": None, "rectilinear_chunks": False, + "sharding_coalesce_max_gap_bytes": 1 << 20, + "sharding_coalesce_max_bytes": 16 << 20, }, "async": {"concurrency": 10, "timeout": None}, "threading": {"max_workers": None}, @@ -109,6 +112,25 @@ def test_config_defaults_set() -> None: assert config.get("json_indent") == 2 +def test_array_config_init_defaults_match_global_config() -> None: + """Each `ArrayConfig.__init__` parameter that has a default must match the + value of `array.` in the global config. Catches drift between + the two sources of truth.""" + params = inspect.signature(ArrayConfig.__init__).parameters + has_defaults = { + name: p.default + for name, p in params.items() + if name != "self" and p.default is not inspect.Parameter.empty + } + assert has_defaults, "expected at least one default to check" + for name, default in has_defaults.items(): + assert default == config.get(f"array.{name}"), ( + f"ArrayConfig.__init__ default for {name!r} ({default!r}) does not " + f"match global config value for 'array.{name}' " + f"({config.get(f'array.{name}')!r})" + ) + + @pytest.mark.parametrize( ("key", "old_val", "new_val"), [("array.order", "C", "F"), ("async.concurrency", 10, 128), ("json_indent", 2, 0)],