Skip to content
Open
1 change: 1 addition & 0 deletions changes/3987.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Two new fields on `ArrayConfig` control how the sharding codec coalesces partial-shard reads: `sharding_coalesce_max_gap_bytes` (default 1 MiB) and `sharding_coalesce_max_bytes` (default 16 MiB). When reading multiple chunks from the same shard, nearby byte ranges are merged into a single request to the store if separated by no more than `sharding_coalesce_max_gap_bytes` and the merged read stays within `sharding_coalesce_max_bytes`. Defaults are seeded from the matching `array.sharding_coalesce_max_gap_bytes` / `array.sharding_coalesce_max_bytes` keys in [`zarr.config`][] at array-creation time, and can be overridden per array by passing `config={...}` to [`zarr.create_array`][].
1 change: 1 addition & 0 deletions docs/user-guide/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Configuration options include the following:
- Async and threading options, e.g. `async.concurrency` and `threading.max_workers`
- Selections of implementations of codecs, codec pipelines and buffers
- Enabling GPU support with `zarr.config.enable_gpu()`. See GPU support for more.
- Control request merging when reading multiple chunks from the same shard with `array.sharding_coalesce_max_gap_bytes` and `array.sharding_coalesce_max_bytes`. Reads of nearby chunks are coalesced into a single request to the store when separated by at most `sharding_coalesce_max_gap_bytes` and the resulting merged read is no larger than `sharding_coalesce_max_bytes`.

For selecting custom implementations of codecs, pipelines, buffers and ndbuffers,
first register the implementations in the registry and then select them in the config.
Expand Down
14 changes: 13 additions & 1 deletion src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ async def _decode_partial_single(
chunk_spec.prototype,
chunks_per_shard,
all_chunk_coords,
max_gap_bytes=shard_spec.config.sharding_coalesce_max_gap_bytes,
max_coalesced_bytes=shard_spec.config.sharding_coalesce_max_bytes,
)

if shard_dict_maybe is None:
Expand Down Expand Up @@ -814,10 +816,16 @@ async def _load_partial_shard_maybe(
prototype: BufferPrototype,
chunks_per_shard: tuple[int, ...],
all_chunk_coords: set[tuple[int, ...]],
max_gap_bytes: int,
max_coalesced_bytes: int,
) -> ShardMapping | None:
"""
Read chunks from `byte_getter` for the case where the read is less than a full shard.
Returns a mapping of chunk coordinates to bytes or None.

`max_gap_bytes` and `max_coalesced_bytes` are forwarded to
`Store.get_ranges` to control byte-range coalescing across the requested
chunks.
"""
shard_index = await self._load_shard_index_maybe(byte_getter, chunks_per_shard)
if shard_index is None:
Expand All @@ -841,7 +849,11 @@ async def _load_partial_shard_maybe(
byte_ranges = [byte_range for _, byte_range in chunk_coord_byte_ranges]
try:
async for group in byte_getter.store.get_ranges(
byte_getter.path, byte_ranges, prototype=prototype
byte_getter.path,
byte_ranges,
prototype=prototype,
max_gap_bytes=max_gap_bytes,
max_coalesced_bytes=max_coalesced_bytes,
):
for idx, buf in group:
if buf is not None:
Expand Down
30 changes: 28 additions & 2 deletions src/zarr/core/array_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
MemoryOrder,
parse_bool,
parse_fill_value,
parse_int,
parse_order,
parse_shapelike,
)
Expand All @@ -29,6 +30,8 @@ class ArrayConfigParams(TypedDict):
order: NotRequired[MemoryOrder]
write_empty_chunks: NotRequired[bool]
read_missing_chunks: NotRequired[bool]
sharding_coalesce_max_gap_bytes: NotRequired[int]
sharding_coalesce_max_bytes: NotRequired[int]


@dataclass(frozen=True)
Expand All @@ -45,22 +48,42 @@ class ArrayConfig:
read_missing_chunks : bool
If True, missing chunks will be filled with the array's fill value on read.
If False, reading missing chunks will raise a ``ChunkNotFoundError``.
sharding_coalesce_max_gap_bytes : int
When reading multiple chunks from the same shard, nearby byte ranges
separated by no more than this many bytes are coalesced into a single
request to the store.
sharding_coalesce_max_bytes : int
Requests will not be coalesced if doing so would exceed this byte size.
"""

order: MemoryOrder
write_empty_chunks: bool
read_missing_chunks: bool
sharding_coalesce_max_gap_bytes: int
sharding_coalesce_max_bytes: int

def __init__(
self, order: MemoryOrder, write_empty_chunks: bool, *, read_missing_chunks: bool = True
self,
order: MemoryOrder,
write_empty_chunks: bool,
*,
read_missing_chunks: bool = True,
sharding_coalesce_max_gap_bytes: int = 1 << 20, # 1 MiB
sharding_coalesce_max_bytes: int = 16 << 20, # 16 MiB
) -> None:
order_parsed = parse_order(order)
write_empty_chunks_parsed = parse_bool(write_empty_chunks)
read_missing_chunks_parsed = parse_bool(read_missing_chunks)
sharding_coalesce_max_gap_bytes_parsed = parse_int(sharding_coalesce_max_gap_bytes)
sharding_coalesce_max_bytes_parsed = parse_int(sharding_coalesce_max_bytes)

object.__setattr__(self, "order", order_parsed)
object.__setattr__(self, "write_empty_chunks", write_empty_chunks_parsed)
object.__setattr__(self, "read_missing_chunks", read_missing_chunks_parsed)
object.__setattr__(
self, "sharding_coalesce_max_gap_bytes", sharding_coalesce_max_gap_bytes_parsed
)
object.__setattr__(self, "sharding_coalesce_max_bytes", sharding_coalesce_max_bytes_parsed)

@classmethod
def from_dict(cls, data: ArrayConfigParams) -> Self:
Expand All @@ -72,7 +95,8 @@ def from_dict(cls, data: ArrayConfigParams) -> Self:
kwargs_out: ArrayConfigParams = {}
for f in fields(ArrayConfig):
field_name = cast(
"Literal['order', 'write_empty_chunks', 'read_missing_chunks']", f.name
"Literal['order', 'write_empty_chunks', 'read_missing_chunks', 'sharding_coalesce_max_gap_bytes', 'sharding_coalesce_max_bytes']",
f.name,
)
if field_name not in data:
kwargs_out[field_name] = zarr_config.get(f"array.{field_name}")
Expand All @@ -88,6 +112,8 @@ def to_dict(self) -> ArrayConfigParams:
"order": self.order,
"write_empty_chunks": self.write_empty_chunks,
"read_missing_chunks": self.read_missing_chunks,
"sharding_coalesce_max_gap_bytes": self.sharding_coalesce_max_gap_bytes,
"sharding_coalesce_max_bytes": self.sharding_coalesce_max_bytes,
}


Expand Down
6 changes: 6 additions & 0 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ def parse_bool(data: Any) -> bool:
raise ValueError(f"Expected bool, got {data} instead.")


def parse_int(data: Any) -> int:
if isinstance(data, int) and not isinstance(data, bool):
return data
raise ValueError(f"Expected int, got {data} instead.")


def _warn_write_empty_chunks_kwarg() -> None:
# TODO: link to docs page on array configuration in this message
msg = (
Expand Down
2 changes: 2 additions & 0 deletions src/zarr/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def enable_gpu(self) -> ConfigSet:
"read_missing_chunks": True,
"target_shard_size_bytes": None,
"rectilinear_chunks": False,
"sharding_coalesce_max_gap_bytes": 1 << 20, # 1 MiB
"sharding_coalesce_max_bytes": 16 << 20, # 16 MiB
},
"async": {"concurrency": 10, "timeout": None},
"threading": {"max_workers": None},
Expand Down
164 changes: 164 additions & 0 deletions tests/test_codecs/test_sharding_unit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from __future__ import annotations

from typing import TYPE_CHECKING, cast
from unittest.mock import AsyncMock

import numpy as np
import pytest

Expand All @@ -10,9 +15,14 @@
)
from zarr.core.buffer import default_buffer_prototype
from zarr.core.buffer.cpu import Buffer
from zarr.core.config import config
from zarr.storage._common import StorePath
from zarr.storage._memory import MemoryStore

if TYPE_CHECKING:
from zarr.core.array import ShardsConfigParam
from zarr.core.array_spec import ArrayConfigParams

# ============================================================================
# _ShardIndex tests
# ============================================================================
Expand Down Expand Up @@ -155,6 +165,8 @@ async def test_load_partial_shard_maybe_index_load_fails() -> None:
prototype=default_buffer_prototype(),
chunks_per_shard=(2,),
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is None
Expand Down Expand Up @@ -187,6 +199,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,), (2,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is not None
Expand Down Expand Up @@ -220,6 +234,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,), (2,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result == {}
Expand Down Expand Up @@ -251,6 +267,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is not None
Expand Down Expand Up @@ -292,6 +310,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is None
Expand Down Expand Up @@ -336,6 +356,8 @@ async def boom(*args: object, **kwargs: object) -> Buffer | None:
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)


Expand Down Expand Up @@ -368,6 +390,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is not None
Expand Down Expand Up @@ -405,6 +429,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result == {}
Expand Down Expand Up @@ -486,3 +512,141 @@ def test_is_total_shard_1d() -> None:
# Partial
partial_coords: set[tuple[int, ...]] = {(0,), (2,)}
assert codec._is_total_shard(partial_coords, chunks_per_shard) is False


# ============================================================================
# Coalescing config option tests
#
# Assert that the `array.sharding_coalesce_max_gap_bytes` and
# `array.sharding_coalesce_max_bytes` global config keys flow through
# `ArrayConfig` to `Store.get_ranges` as `max_gap_bytes` /
# `max_coalesced_bytes` kwargs, and that per-array `config={...}` overrides
# the global default.
# ============================================================================


def _trigger_partial_shard_read(array_config: ArrayConfigParams | None = None) -> AsyncMock:
"""Build a sharded array on a mocked `MemoryStore`, trigger a partial-shard
read via the public read path, and return the `get_ranges` mock.
"""
import zarr
from zarr.codecs.sharding import ShardingCodecIndexLocation

chunk_shape = (2,)
shard_shape = (8,)
data = np.arange(8, dtype="int32")

store = MemoryStore()
store_mock = AsyncMock(wraps=store, spec=store.__class__)

shards: ShardsConfigParam = {
"shape": shard_shape,
"index_location": ShardingCodecIndexLocation.end,
}
a = zarr.create_array(
StorePath(store_mock),
shape=(8,),
chunks=chunk_shape,
shards=shards,
dtype=data.dtype,
fill_value=-1,
config=array_config,
)
a[:] = data

store_mock.reset_mock()

# Read a strict subset of chunks to take the partial-shard read path.
_ = a[0:4]

return cast(AsyncMock, store_mock.get_ranges)


def test_load_partial_shard_forwards_global_config_to_get_ranges() -> None:
"""Global `array.sharding_coalesce_*` values flow into ArrayConfig at
array-creation time and are forwarded to `Store.get_ranges`."""
with config.set(
{
"array.sharding_coalesce_max_gap_bytes": 4242,
"array.sharding_coalesce_max_bytes": 424242,
}
):
get_ranges_mock = _trigger_partial_shard_read()

assert get_ranges_mock.call_count >= 1
for call in get_ranges_mock.call_args_list:
kwargs = call.kwargs
assert kwargs["max_gap_bytes"] == 4242
assert kwargs["max_coalesced_bytes"] == 424242


def test_load_partial_shard_per_array_config_overrides_global() -> None:
"""Per-array `config={...}` passed to `create_array` takes precedence over
the global config and is forwarded to `Store.get_ranges`."""
with config.set(
{
"array.sharding_coalesce_max_gap_bytes": 4242,
"array.sharding_coalesce_max_bytes": 424242,
}
):
get_ranges_mock = _trigger_partial_shard_read(
array_config={
"sharding_coalesce_max_gap_bytes": 99,
"sharding_coalesce_max_bytes": 9999,
},
)

assert get_ranges_mock.call_count >= 1
for call in get_ranges_mock.call_args_list:
kwargs = call.kwargs
assert kwargs["max_gap_bytes"] == 99
assert kwargs["max_coalesced_bytes"] == 9999


def test_load_partial_shard_uses_config_defaults() -> None:
"""Without explicit config, defaults from `zarr.config` are forwarded."""
get_ranges_mock = _trigger_partial_shard_read()

assert get_ranges_mock.call_count >= 1
for call in get_ranges_mock.call_args_list:
kwargs = call.kwargs
assert kwargs["max_gap_bytes"] == config.get("array.sharding_coalesce_max_gap_bytes")
assert kwargs["max_coalesced_bytes"] == config.get("array.sharding_coalesce_max_bytes")


async def test_load_partial_shard_explicit_kwargs_passthrough(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""`_load_partial_shard_maybe` forwards its explicit kwargs to `get_ranges`."""
codec = ShardingCodec(chunk_shape=(2,))
chunks_per_shard = (4,)

index = _ShardIndex.create_empty(chunks_per_shard)
index.set_chunk_slice((0,), slice(0, 100))
index.set_chunk_slice((2,), slice(200, 300))

store = MemoryStore()
await store.set("shard", Buffer.from_bytes(b"x" * 300))
store_mock = AsyncMock(wraps=store, spec=store.__class__)
byte_getter = StorePath(store_mock, "shard")

async def mock_load_index(
self: ShardingCodec, byte_getter: StorePath, cps: tuple[int, ...]
) -> _ShardIndex:
return index

monkeypatch.setattr(ShardingCodec, "_load_shard_index_maybe", mock_load_index)

await codec._load_partial_shard_maybe(
byte_getter=byte_getter,
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (2,)},
max_gap_bytes=12345,
max_coalesced_bytes=67890,
)

store_mock.get_ranges.assert_called_once()
kwargs = store_mock.get_ranges.call_args.kwargs
assert kwargs["max_gap_bytes"] == 12345
assert kwargs["max_coalesced_bytes"] == 67890
Loading
Loading