From f64d3da3583fe83cd382954c0053fafcc8e582bd Mon Sep 17 00:00:00 2001 From: shaharuk-yb Date: Mon, 14 Jul 2025 14:23:18 +0530 Subject: [PATCH 1/7] deep1b integration --- test_deep1b_integration.py | 118 ++++++++++++++++++++++++++ vectordb_bench/__init__.py | 4 + vectordb_bench/backend/cases.py | 16 ++++ vectordb_bench/backend/data_source.py | 53 ++++++++++++ vectordb_bench/backend/dataset.py | 44 +++++++--- vectordb_bench/backend/utils.py | 24 ++++++ 6 files changed, 249 insertions(+), 10 deletions(-) create mode 100644 test_deep1b_integration.py diff --git a/test_deep1b_integration.py b/test_deep1b_integration.py new file mode 100644 index 000000000..27ac5786d --- /dev/null +++ b/test_deep1b_integration.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +Test script to verify Deep1B dataset integration +""" + +import sys +import pathlib + +# Add the project root to the path +sys.path.insert(0, str(pathlib.Path(__file__).parent)) + +from vectordb_bench.backend.dataset import Dataset +from vectordb_bench.backend.cases import CaseType, type2case + + +def test_deep1b_dataset(): + """Test that Deep1B dataset can be created and accessed""" + print("Testing Deep1B dataset integration...") + + # Test dataset creation + try: + deep1b_dataset = Dataset.DEEP1B.get(1_000_000_000) + print(f"✓ Deep1B dataset created successfully") + print(f" - Name: {deep1b_dataset.name}") + print(f" - Size: {deep1b_dataset.size}") + print(f" - Dimensions: {deep1b_dataset.dim}") + print(f" - Metric Type: {deep1b_dataset.metric_type}") + print(f" - Label: {deep1b_dataset.label}") + print(f" - Directory Name: {deep1b_dataset.dir_name}") + print(f" - File Count: {deep1b_dataset.file_count}") + except Exception as e: + print(f"✗ Failed to create Deep1B dataset: {e}") + return False + + # Test dataset manager + try: + deep1b_manager = Dataset.DEEP1B.manager(1_000_000_000) + print(f"✓ Deep1B dataset manager created successfully") + print(f" - Data directory: {deep1b_manager.data_dir}") + except Exception as e: + print(f"✗ Failed to create Deep1B dataset manager: {e}") + return False + + # Test case creation + try: + case = CaseType.Performance96D1B.case_cls() + print(f"✓ Deep1B case created successfully") + print(f" - Case ID: {case.case_id}") + print(f" - Name: {case.name}") + print(f" - Description: {case.description[:100]}...") + print(f" - Load Timeout: {case.load_timeout}") + print(f" - Optimize Timeout: {case.optimize_timeout}") + except Exception as e: + print(f"✗ Failed to create Deep1B case: {e}") + return False + + print("✓ All Deep1B integration tests passed!") + return True + + +def test_dataset_enum(): + """Test that Deep1B is properly added to the Dataset enum""" + print("\nTesting Dataset enum...") + + # Check if DEEP1B is in the enum + if hasattr(Dataset, 'DEEP1B'): + print(f"✓ DEEP1B found in Dataset enum") + else: + print(f"✗ DEEP1B not found in Dataset enum") + return False + + # Check if it's properly mapped + try: + deep1b_class = Dataset.DEEP1B.value + print(f"✓ DEEP1B class: {deep1b_class}") + except Exception as e: + print(f"✗ Failed to access DEEP1B class: {e}") + return False + + return True + + +def test_case_enum(): + """Test that Performance96D1B is properly added to the CaseType enum""" + print("\nTesting CaseType enum...") + + # Check if Performance96D1B is in the enum + if hasattr(CaseType, 'Performance96D1B'): + print(f"✓ Performance96D1B found in CaseType enum") + else: + print(f"✗ Performance96D1B not found in CaseType enum") + return False + + # Check if it's properly mapped in type2case + if CaseType.Performance96D1B in type2case: + print(f"✓ Performance96D1B found in type2case mapping") + else: + print(f"✗ Performance96D1B not found in type2case mapping") + return False + + return True + + +if __name__ == "__main__": + print("Deep1B Dataset Integration Test") + print("=" * 40) + + success = True + success &= test_deep1b_dataset() + success &= test_dataset_enum() + success &= test_case_enum() + + if success: + print("\n🎉 All tests passed! Deep1B integration is working correctly.") + sys.exit(0) + else: + print("\n❌ Some tests failed. Please check the integration.") + sys.exit(1) \ No newline at end of file diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index c07fc855d..a40f1ef7c 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -73,6 +73,10 @@ class config: LOAD_TIMEOUT_1536D_500K = 24 * 3600 # 24h LOAD_TIMEOUT_1536D_5M = 240 * 3600 # 10d + # Deep1B dataset timeouts (96 dimensions, 1B vectors) + LOAD_TIMEOUT_96D_1B = 7200 * 3600 # 300d - very large dataset + OPTIMIZE_TIMEOUT_96D_1B = 7200 * 3600 # 300d - very large dataset + OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py index 15fc069cc..c3375b26c 100644 --- a/vectordb_bench/backend/cases.py +++ b/vectordb_bench/backend/cases.py @@ -44,6 +44,9 @@ class CaseType(Enum): Performance1536D50K = 50 + # Deep1B dataset cases + Performance96D1B = 60 + Custom = 100 PerformanceCustomDataset = 101 @@ -302,6 +305,18 @@ class Performance1536D50K(PerformanceCase): optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_DEFAULT +class Performance96D1B(PerformanceCase): + case_id: CaseType = CaseType.Performance96D1B + filter_rate: float | int | None = None + dataset: DatasetManager = Dataset.DEEP1B.manager(1_000_000_000) + name: str = "Search Performance Test (1B Dataset, 96 Dim)" + description: str = """This case tests the search performance of a vector database with a very large 1B dataset + (Deep1B 1B vectors, 96 dimensions), at varying parallel levels. Results will show index building time, + recall, and maximum QPS.""" + load_timeout: float | int = config.LOAD_TIMEOUT_96D_1B + optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_96D_1B + + def metric_type_map(s: str) -> MetricType: if s.lower() == "cosine": return MetricType.COSINE @@ -366,5 +381,6 @@ def __init__( CaseType.Performance1536D500K99P: Performance1536D500K99P, CaseType.Performance1536D5M99P: Performance1536D5M99P, CaseType.Performance1536D50K: Performance1536D50K, + CaseType.Performance96D1B: Performance96D1B, CaseType.PerformanceCustomDataset: PerformanceCustomDataset, } diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index 139d2e308..3251ba51f 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -7,6 +7,9 @@ from tqdm import tqdm from vectordb_bench import config +import h5py +import polars as pl +from vectordb_bench.backend.utils import download_file logging.getLogger("s3fs").setLevel(logging.CRITICAL) @@ -18,6 +21,7 @@ class DatasetSource(Enum): S3 = "S3" AliyunOSS = "AliyunOSS" + Deep1BLocal = "Deep1BLocal" def reader(self) -> DatasetReader: if self == DatasetSource.S3: @@ -26,6 +30,9 @@ def reader(self) -> DatasetReader: if self == DatasetSource.AliyunOSS: return AliyunOSSReader() + if self == DatasetSource.Deep1BLocal: + return Deep1BReader() + return None @@ -155,3 +162,49 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: return False return True + + +DEEP1B_URL = "http://ann-benchmarks.com/deep-image-96-angular.hdf5" +DEEP1B_HDF5_FILENAME = "deep-image-96-angular.hdf5" + +class Deep1BReader(DatasetReader): + source: DatasetSource = None # Not a remote source + remote_root: str = DEEP1B_URL + + def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: + return local.exists() and local.stat().st_size > 0 + + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + # Download the HDF5 file if not present + hdf5_path = local_ds_root.parent.joinpath(DEEP1B_HDF5_FILENAME) + if not hdf5_path.exists(): + local_ds_root.parent.mkdir(parents=True, exist_ok=True) + download_file(DEEP1B_URL, str(hdf5_path)) + # Extract and convert to Parquet if not already done + if not local_ds_root.exists(): + local_ds_root.mkdir(parents=True) + # Only create train.parquet and test.parquet if not present + train_parquet = local_ds_root.joinpath("train.parquet") + test_parquet = local_ds_root.joinpath("test.parquet") + if not train_parquet.exists() or not test_parquet.exists(): + with h5py.File(hdf5_path, "r") as f: + # Extract train vectors + train_vectors = f["train"][:] + train_ids = list(range(train_vectors.shape[0])) + train_df = pl.DataFrame({ + "id": train_ids, + "emb": [v.astype("float32") for v in train_vectors], + }) + train_df.write_parquet(str(train_parquet)) + # Extract test vectors + test_vectors = f["test"][:] + test_ids = list(range(test_vectors.shape[0])) + test_df = pl.DataFrame({ + "id": test_ids, + "emb": [v.astype("float32") for v in test_vectors], + }) + test_df.write_parquet(str(test_parquet)) + # No ground truth for now (could be added if needed) + +def deep1b_reader(): + return Deep1BReader() diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index 62700b0fa..4cdfd9b5d 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -37,19 +37,21 @@ class BaseDataset(BaseModel): metric_type: MetricType use_shuffled: bool with_gt: bool = False - _size_label: dict[int, SizeLabel] = PrivateAttr() is_custom: bool = False @validator("size") def verify_size(cls, v: int): - if v not in cls._size_label: - msg = f"Size {v} not supported for the dataset, expected: {cls._size_label.keys()}" + if not hasattr(cls, '_size_label') or v not in cls._size_label: + msg = f"Size {v} not supported for the dataset, expected: {getattr(cls, '_size_label', {}).keys()}" raise ValueError(msg) return v @property def label(self) -> str: - return self._size_label.get(self.size).label + if not hasattr(type(self), '_size_label'): + return "" + size_label = type(self)._size_label.get(self.size) + return size_label.label if size_label else "" @property def dir_name(self) -> str: @@ -57,7 +59,10 @@ def dir_name(self) -> str: @property def file_count(self) -> int: - return self._size_label.get(self.size).file_count + if not hasattr(type(self), '_size_label'): + return 0 + size_label = type(self)._size_label.get(self.size) + return size_label.file_count if size_label else 0 class CustomDataset(BaseDataset): @@ -109,7 +114,7 @@ class Cohere(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA - with_gt: bool = (True,) + with_gt: bool = True _size_label: dict = { 100_000: SizeLabel(100_000, "SMALL", 1), 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), @@ -146,7 +151,7 @@ class OpenAI(BaseDataset): dim: int = 1536 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA - with_gt: bool = (True,) + with_gt: bool = True _size_label: dict = { 50_000: SizeLabel(50_000, "SMALL", 1), 500_000: SizeLabel(500_000, "MEDIUM", 1), @@ -154,6 +159,17 @@ class OpenAI(BaseDataset): } +class Deep1B(BaseDataset): + name: str = "Deep1B" + dim: int = 96 + metric_type: MetricType = MetricType.L2 + use_shuffled: bool = False + with_gt: bool = True + _size_label: dict = { + 1_000_000_000: SizeLabel(1_000_000_000, "LARGE", 100), + } + + class DatasetManager(BaseModel): """Download dataset if not in the local directory. Provide data for cases. @@ -171,7 +187,7 @@ class DatasetManager(BaseModel): train_files: list[str] = [] reader: DatasetReader | None = None - def __eq__(self, obj: any): + def __eq__(self, obj: object) -> bool: if isinstance(obj, DatasetManager): return self.data.name == obj.data.name and self.data.label == obj.data.label return False @@ -225,7 +241,14 @@ def prepare( gt_file, test_file = utils.compose_gt_file(filters), "test.parquet" all_files.extend([gt_file, test_file]) - if not self.data.is_custom: + # Use Deep1BReader for Deep1B dataset + if self.data.name == "Deep1B": + DatasetSource.Deep1BLocal.reader().read( + dataset=self.data.dir_name.lower(), + files=all_files, + local_ds_root=self.data_dir, + ) + elif not self.data.is_custom: source.reader().read( dataset=self.data.dir_name.lower(), files=all_files, @@ -250,7 +273,7 @@ def _read_file(self, file_name: str) -> pd.DataFrame: log.warning(f"No such file: {p}") return pd.DataFrame() - return pl.read_parquet(p) + return pl.read_parquet(p).to_pandas() class DataSetIterator: @@ -307,6 +330,7 @@ class Dataset(Enum): GLOVE = Glove SIFT = SIFT OPENAI = OpenAI + DEEP1B = Deep1B def get(self, size: int) -> BaseDataset: return self.value(size=size) diff --git a/vectordb_bench/backend/utils.py b/vectordb_bench/backend/utils.py index 86c4faf5e..9ebc50798 100644 --- a/vectordb_bench/backend/utils.py +++ b/vectordb_bench/backend/utils.py @@ -1,5 +1,8 @@ import time from functools import wraps +import requests +from tqdm import tqdm +import os def numerize(n: int) -> str: @@ -80,3 +83,24 @@ def compose_gt_file(filters: float | str | None = None) -> str: msg = f"Filters not supported: {filters}" raise ValueError(msg) + + +def download_file(url: str, dest_path: str, chunk_size: int = 8192, show_progress: bool = True) -> None: + """Download a file from a URL to a local path, with progress bar. Skips if file exists.""" + if os.path.exists(dest_path): + return + response = requests.get(url, stream=True) + response.raise_for_status() + total = int(response.headers.get('content-length', 0)) + with open(dest_path, 'wb') as file, tqdm( + desc=f"Downloading {os.path.basename(dest_path)}", + total=total, + unit='B', + unit_scale=True, + unit_divisor=1024, + disable=not show_progress + ) as bar: + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + file.write(chunk) + bar.update(len(chunk)) From 13cfaf70e3a2b4e1640275c738612b2ae324f199 Mon Sep 17 00:00:00 2001 From: shaharuk-yb Date: Mon, 14 Jul 2025 14:49:35 +0530 Subject: [PATCH 2/7] dependencies added --- install/requirements_py3.11.txt | 2 ++ pyproject.toml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/install/requirements_py3.11.txt b/install/requirements_py3.11.txt index 18138382f..2c5108133 100644 --- a/install/requirements_py3.11.txt +++ b/install/requirements_py3.11.txt @@ -22,3 +22,5 @@ environs pydantic Date: Mon, 14 Jul 2025 15:08:33 +0530 Subject: [PATCH 3/7] added log statements --- vectordb_bench/backend/data_source.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index 3251ba51f..19539a3ec 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -187,6 +187,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): train_parquet = local_ds_root.joinpath("train.parquet") test_parquet = local_ds_root.joinpath("test.parquet") if not train_parquet.exists() or not test_parquet.exists(): + log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format") with h5py.File(hdf5_path, "r") as f: # Extract train vectors train_vectors = f["train"][:] @@ -195,6 +196,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): "id": train_ids, "emb": [v.astype("float32") for v in train_vectors], }) + log.info(f"Writing train.parquet") train_df.write_parquet(str(train_parquet)) # Extract test vectors test_vectors = f["test"][:] @@ -203,6 +205,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): "id": test_ids, "emb": [v.astype("float32") for v in test_vectors], }) + log.info(f"Writing test.parquet") test_df.write_parquet(str(test_parquet)) # No ground truth for now (could be added if needed) From 780db01dd3ea6534eefaee5f80f412b1d362be24 Mon Sep 17 00:00:00 2001 From: shaharuk-yb Date: Mon, 14 Jul 2025 15:24:29 +0530 Subject: [PATCH 4/7] added log statements --- vectordb_bench/backend/data_source.py | 5 +++-- vectordb_bench/backend/dataset.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index 19539a3ec..159f0c1c0 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -187,7 +187,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): train_parquet = local_ds_root.joinpath("train.parquet") test_parquet = local_ds_root.joinpath("test.parquet") if not train_parquet.exists() or not test_parquet.exists(): - log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format") + log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format to base path: {local_ds_root}") with h5py.File(hdf5_path, "r") as f: # Extract train vectors train_vectors = f["train"][:] @@ -207,7 +207,8 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): }) log.info(f"Writing test.parquet") test_df.write_parquet(str(test_parquet)) - # No ground truth for now (could be added if needed) + + log.info("Deep1B dataset preparation completed. Note: No ground truth file is provided.") def deep1b_reader(): return Deep1BReader() diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index 4cdfd9b5d..54585e669 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -164,7 +164,7 @@ class Deep1B(BaseDataset): dim: int = 96 metric_type: MetricType = MetricType.L2 use_shuffled: bool = False - with_gt: bool = True + with_gt: bool = False # Deep1B doesn't have ground truth by default _size_label: dict = { 1_000_000_000: SizeLabel(1_000_000_000, "LARGE", 100), } From 16ddb7b79c8d40bc829afc8b7fde54ad81bff61e Mon Sep 17 00:00:00 2001 From: shaharuk-yb Date: Mon, 14 Jul 2025 15:35:10 +0530 Subject: [PATCH 5/7] increased batch size for DEEP1B dataset --- vectordb_bench/backend/dataset.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index 54585e669..bf31bad04 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -286,6 +286,13 @@ def __init__(self, dataset: DatasetManager): def __iter__(self): return self + def _get_batch_size(self) -> int: + """Get batch size for the current dataset""" + # Deep1B dataset uses larger batch size for better performance + if self._ds.data.name == "Deep1B": + return 1_000_000 + return config.NUM_PER_BATCH + def _get_iter(self, file_name: str): p = pathlib.Path(self._ds.data_dir, file_name) log.info(f"Get iterator for {p.name}") @@ -293,7 +300,9 @@ def _get_iter(self, file_name: str): msg = f"No such file: {p}" log.warning(msg) raise IndexError(msg) - return ParquetFile(p, memory_map=True, pre_buffer=True).iter_batches(config.NUM_PER_BATCH) + batch_size = self._get_batch_size() + log.info(f"Using batch size {batch_size} for dataset {self._ds.data.name}") + return ParquetFile(p, memory_map=True, pre_buffer=True).iter_batches(batch_size) def __next__(self) -> pd.DataFrame: """return the data in the next file of the training list""" From 8e25def72c1f506190feab2c48d82b3d8005ccc5 Mon Sep 17 00:00:00 2001 From: shaharuk-yb Date: Mon, 14 Jul 2025 15:35:10 +0530 Subject: [PATCH 6/7] increased batch size for DEEP1B dataset --- vectordb_bench/__init__.py | 4 +- .../backend/clients/pgvector/cli.py | 14 +++++++ .../backend/clients/pgvector/config.py | 3 ++ vectordb_bench/backend/data_source.py | 42 ++++++++++++++----- vectordb_bench/backend/dataset.py | 17 +++++++- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index a40f1ef7c..9b774c581 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -76,7 +76,9 @@ class config: # Deep1B dataset timeouts (96 dimensions, 1B vectors) LOAD_TIMEOUT_96D_1B = 7200 * 3600 # 300d - very large dataset OPTIMIZE_TIMEOUT_96D_1B = 7200 * 3600 # 300d - very large dataset - + # Deep1B dataset percentage for testing (0.0 to 1.0, default 1.0 = 100%) + DEEP1B_DATASET_PERCENTAGE = env.float("DEEP1B_DATASET_PERCENTAGE", 1.0) + OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d diff --git a/vectordb_bench/backend/clients/pgvector/cli.py b/vectordb_bench/backend/clients/pgvector/cli.py index fba329d2d..aecb4dfb2 100644 --- a/vectordb_bench/backend/clients/pgvector/cli.py +++ b/vectordb_bench/backend/clients/pgvector/cli.py @@ -6,6 +6,7 @@ from vectordb_bench.backend.clients import DB from vectordb_bench.backend.clients.api import MetricType +from vectordb_bench import config from ....cli.cli import ( CommonTypedDict, @@ -140,6 +141,17 @@ class PgVectorTypedDict(CommonTypedDict): show_default=True, ), ] + deep1b_dataset_percentage: Annotated[ + float | None, + click.option( + "--deep1b_dataset_percentage", + type=float, + help="Percentage of Deep1B dataset to use (0.0 to 1.0, default: 1.0 = 100%)", + default=config.DEEP1B_DATASET_PERCENTAGE, + required=False, + show_default=False, + ), + ] class PgVectorIVFFlatTypedDict(PgVectorTypedDict, IVFFlatTypedDict): ... @@ -173,6 +185,7 @@ def PgVectorIVFFlat( quantized_fetch_limit=parameters["quantized_fetch_limit"], create_index_before_load=parameters["create_index_before_load"], create_index_after_load=parameters["create_index_after_load"], + deep1b_dataset_percentage=parameters["deep1b_dataset_percentage"], ), **parameters, ) @@ -211,6 +224,7 @@ def PgVectorHNSW( quantized_fetch_limit=parameters["quantized_fetch_limit"], create_index_before_load=parameters["create_index_before_load"], create_index_after_load=parameters["create_index_after_load"], + deep1b_dataset_percentage=parameters["deep1b_dataset_percentage"], ), **parameters, ) diff --git a/vectordb_bench/backend/clients/pgvector/config.py b/vectordb_bench/backend/clients/pgvector/config.py index ea7263e8d..616f7169f 100644 --- a/vectordb_bench/backend/clients/pgvector/config.py +++ b/vectordb_bench/backend/clients/pgvector/config.py @@ -59,6 +59,7 @@ class PgVectorIndexConfig(BaseModel, DBCaseConfig): metric_type: MetricType | None = None create_index_before_load: bool = False create_index_after_load: bool = True + deep1b_dataset_percentage: float | None = None def parse_metric(self) -> str: d = { @@ -173,6 +174,7 @@ class PgVectorIVFFlatConfig(PgVectorIndexConfig): reranking_metric: str | None = None create_index_before_load: bool | None = True create_index_after_load: bool | None = False + deep1b_dataset_percentage: float | None = None def index_param(self) -> PgVectorIndexParam: index_parameters = {"lists": self.lists} @@ -219,6 +221,7 @@ class PgVectorHNSWConfig(PgVectorIndexConfig): reranking_metric: str | None = None create_index_before_load: bool | None = True create_index_after_load: bool | None = False + deep1b_dataset_percentage: float | None = None def index_param(self) -> PgVectorIndexParam: index_parameters = {"m": self.m, "ef_construction": self.ef_construction} diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index 159f0c1c0..1ce9a96aa 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -183,32 +183,52 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): # Extract and convert to Parquet if not already done if not local_ds_root.exists(): local_ds_root.mkdir(parents=True) - # Only create train.parquet and test.parquet if not present - train_parquet = local_ds_root.joinpath("train.parquet") - test_parquet = local_ds_root.joinpath("test.parquet") + + # Get the percentage configuration + percentage = config.DEEP1B_DATASET_PERCENTAGE + if percentage <= 0.0 or percentage > 1.0: + raise ValueError(f"DEEP1B_DATASET_PERCENTAGE must be between 0.0 and 1.0, got {percentage}") + + # Create percentage-specific filenames + train_parquet = local_ds_root.joinpath(f"train_{int(percentage * 100)}p.parquet") + test_parquet = local_ds_root.joinpath(f"test_{int(percentage * 100)}p.parquet") + if not train_parquet.exists() or not test_parquet.exists(): - log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format to base path: {local_ds_root}") + log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format with {percentage*100}% of data to base path: {local_ds_root}") with h5py.File(hdf5_path, "r") as f: - # Extract train vectors + # Extract train vectors with percentage sampling train_vectors = f["train"][:] - train_ids = list(range(train_vectors.shape[0])) + total_train = train_vectors.shape[0] + sample_size = int(total_train * percentage) + + # Use first N% of the data for consistency + train_vectors = train_vectors[:sample_size] + train_ids = list(range(sample_size)) train_df = pl.DataFrame({ "id": train_ids, "emb": [v.astype("float32") for v in train_vectors], }) - log.info(f"Writing train.parquet") + log.info(f"Writing train_{int(percentage * 100)}p.parquet with {sample_size:,} vectors") train_df.write_parquet(str(train_parquet)) - # Extract test vectors + + # Extract test vectors with percentage sampling test_vectors = f["test"][:] - test_ids = list(range(test_vectors.shape[0])) + total_test = test_vectors.shape[0] + test_sample_size = int(total_test * percentage) + + # Use first N% of the test data for consistency + test_vectors = test_vectors[:test_sample_size] + test_ids = list(range(test_sample_size)) test_df = pl.DataFrame({ "id": test_ids, "emb": [v.astype("float32") for v in test_vectors], }) - log.info(f"Writing test.parquet") + log.info(f"Writing test_{int(percentage * 100)}p.parquet with {test_sample_size:,} vectors") test_df.write_parquet(str(test_parquet)) + else: + log.info(f"Using existing Deep1B dataset files with {percentage*100}% of data") - log.info("Deep1B dataset preparation completed. Note: No ground truth file is provided.") + log.info(f"Deep1B dataset preparation completed with {percentage*100}% of data. Note: No ground truth file is provided.") def deep1b_reader(): return Deep1BReader() diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index bf31bad04..b8733c37b 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -259,8 +259,21 @@ def prepare( self.test_data = self._read_file(test_file) self.gt_data = self._read_file(gt_file) - prefix = "shuffle_train" if use_shuffled else "train" - self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")]) + # Handle Deep1B percentage-specific filenames + if self.data.name == "Deep1B": + percentage = config.DEEP1B_DATASET_PERCENTAGE + train_filename = f"train_{int(percentage * 100)}p.parquet" + test_filename = f"test_{int(percentage * 100)}p.parquet" + + # Update test_data for Deep1B with percentage-specific file + if self.data_dir.joinpath(test_filename).exists(): + self.test_data = self._read_file(test_filename) + + # Set train files for Deep1B + self.train_files = [train_filename] if self.data_dir.joinpath(train_filename).exists() else [] + else: + prefix = "shuffle_train" if use_shuffled else "train" + self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")]) log.debug(f"{self.data.name}: available train files {self.train_files}") return True From 95bbb2aca6b4d998dac5619a6ac0b67898319f55 Mon Sep 17 00:00:00 2001 From: shaharuk-yb Date: Mon, 14 Jul 2025 22:33:36 +0530 Subject: [PATCH 7/7] add support to load subset of the DEEP1B dataset --- .../backend/clients/pgvector/cli.py | 13 ------ vectordb_bench/backend/data_source.py | 14 +++--- vectordb_bench/backend/dataset.py | 11 +++-- vectordb_bench/backend/task_runner.py | 14 ++++-- vectordb_bench/cli/cli.py | 44 +++++++++++++++---- vectordb_bench/models.py | 1 + 6 files changed, 62 insertions(+), 35 deletions(-) diff --git a/vectordb_bench/backend/clients/pgvector/cli.py b/vectordb_bench/backend/clients/pgvector/cli.py index aecb4dfb2..400cceeba 100644 --- a/vectordb_bench/backend/clients/pgvector/cli.py +++ b/vectordb_bench/backend/clients/pgvector/cli.py @@ -141,17 +141,6 @@ class PgVectorTypedDict(CommonTypedDict): show_default=True, ), ] - deep1b_dataset_percentage: Annotated[ - float | None, - click.option( - "--deep1b_dataset_percentage", - type=float, - help="Percentage of Deep1B dataset to use (0.0 to 1.0, default: 1.0 = 100%)", - default=config.DEEP1B_DATASET_PERCENTAGE, - required=False, - show_default=False, - ), - ] class PgVectorIVFFlatTypedDict(PgVectorTypedDict, IVFFlatTypedDict): ... @@ -185,7 +174,6 @@ def PgVectorIVFFlat( quantized_fetch_limit=parameters["quantized_fetch_limit"], create_index_before_load=parameters["create_index_before_load"], create_index_after_load=parameters["create_index_after_load"], - deep1b_dataset_percentage=parameters["deep1b_dataset_percentage"], ), **parameters, ) @@ -224,7 +212,6 @@ def PgVectorHNSW( quantized_fetch_limit=parameters["quantized_fetch_limit"], create_index_before_load=parameters["create_index_before_load"], create_index_after_load=parameters["create_index_after_load"], - deep1b_dataset_percentage=parameters["deep1b_dataset_percentage"], ), **parameters, ) diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index 1ce9a96aa..21dc8ce80 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -41,13 +41,14 @@ class DatasetReader(ABC): remote_root: str @abstractmethod - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): """read dataset files from remote_root to local_ds_root, Args: dataset(str): for instance "sift_small_500k" files(list[str]): all filenames of the dataset local_ds_root(pathlib.Path): whether to write the remote data. + deep1b_dataset_percentage(float | None): percentage of Deep1B dataset to use (only for Deep1B) """ @abstractmethod @@ -75,7 +76,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: return True - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): downloads = [] if not local_ds_root.exists(): log.info(f"local dataset root path not exist, creating it: {local_ds_root}") @@ -125,7 +126,7 @@ def ls_all(self, dataset: str): log.info(n) return names - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): downloads = [] if not local_ds_root.exists(): log.info(f"local dataset root path not exist, creating it: {local_ds_root}") @@ -174,7 +175,7 @@ class Deep1BReader(DatasetReader): def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: return local.exists() and local.stat().st_size > 0 - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): # Download the HDF5 file if not present hdf5_path = local_ds_root.parent.joinpath(DEEP1B_HDF5_FILENAME) if not hdf5_path.exists(): @@ -184,8 +185,9 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): if not local_ds_root.exists(): local_ds_root.mkdir(parents=True) - # Get the percentage configuration - percentage = config.DEEP1B_DATASET_PERCENTAGE + # Get the percentage configuration - use task config if provided, otherwise use global config + percentage = deep1b_dataset_percentage if deep1b_dataset_percentage is not None else config.DEEP1B_DATASET_PERCENTAGE + log.info(f"DEBUG: Dataset preparation reading DEEP1B_DATASET_PERCENTAGE = {percentage} (task_config={deep1b_dataset_percentage}, global_config={config.DEEP1B_DATASET_PERCENTAGE})") if percentage <= 0.0 or percentage > 1.0: raise ValueError(f"DEEP1B_DATASET_PERCENTAGE must be between 0.0 and 1.0, got {percentage}") diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index b8733c37b..78662e2d0 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -218,6 +218,7 @@ def prepare( self, source: DatasetSource = DatasetSource.S3, filters: float | str | None = None, + deep1b_dataset_percentage: float | None = None, ) -> bool: """Download the dataset from DatasetSource url = f"{source}/{self.data.dir_name}" @@ -247,6 +248,7 @@ def prepare( dataset=self.data.dir_name.lower(), files=all_files, local_ds_root=self.data_dir, + deep1b_dataset_percentage=deep1b_dataset_percentage, ) elif not self.data.is_custom: source.reader().read( @@ -261,14 +263,15 @@ def prepare( # Handle Deep1B percentage-specific filenames if self.data.name == "Deep1B": - percentage = config.DEEP1B_DATASET_PERCENTAGE + # Use the passed parameter if available, otherwise use config + percentage = deep1b_dataset_percentage if deep1b_dataset_percentage is not None else config.DEEP1B_DATASET_PERCENTAGE train_filename = f"train_{int(percentage * 100)}p.parquet" test_filename = f"test_{int(percentage * 100)}p.parquet" - - # Update test_data for Deep1B with percentage-specific file + + # Update test_data for Deep1B with percentage-specific file if self.data_dir.joinpath(test_filename).exists(): self.test_data = self._read_file(test_filename) - + # Set train files for Deep1B self.train_files = [train_filename] if self.data_dir.joinpath(train_filename).exists() else [] else: diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 2a583b4f5..bf7faf4c2 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -96,7 +96,11 @@ def init_db(self, drop_old: bool = True) -> None: def _pre_run(self, drop_old: bool = True): try: self.init_db(drop_old) - self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate) + self.ca.dataset.prepare( + self.dataset_source, + filters=self.ca.filter_rate, + deep1b_dataset_percentage=self.config.deep1b_dataset_percentage, + ) except ModuleNotFoundError as e: log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}") raise e from None @@ -149,13 +153,14 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: m = Metric() if drop_old: if TaskStage.LOAD in self.config.stages: - _, load_dur = self._load_train_data() + count, load_dur = self._load_train_data() build_dur = self._optimize() m.load_duration = round(load_dur + build_dur, 4) + m.max_load_count = count log.info( f"Finish loading the entire dataset into VectorDB," f" insert_duration={load_dur}, optimize_duration={build_dur}" - f" load_duration(insert + optimize) = {m.load_duration}" + f" load_duration(insert + optimize) = {m.load_duration}, max_load_count={count}" ) else: log.info("Data loading skipped") @@ -196,7 +201,8 @@ def _load_train_data(self): self.normalize, self.ca.load_timeout, ) - runner.run() + count = runner.run() + return count except Exception as e: raise e from None finally: diff --git a/vectordb_bench/cli/cli.py b/vectordb_bench/cli/cli.py index 3bb7763d8..1279340a3 100644 --- a/vectordb_bench/cli/cli.py +++ b/vectordb_bench/cli/cli.py @@ -401,6 +401,15 @@ class CommonTypedDict(TypedDict): show_default=True, ), ] + deep1b_dataset_percentage: Annotated[ + float, + click.option( + "--deep1b-dataset-percentage", + help="Percentage of Deep1B dataset to use (0.0 to 1.0, default: 1.0 = 100%)", + default=config.DEEP1B_DATASET_PERCENTAGE, + show_default=True, + ), + ] class HNSWBaseTypedDict(TypedDict): @@ -466,14 +475,32 @@ def run( db_case_config: DBCaseConfig, **parameters: Unpack[CommonTypedDict], ): - """Builds a single VectorDBBench Task and runs it, awaiting the task until finished. - - Args: - db (DB) - db_config (DBConfig) - db_case_config (DBCaseConfig) - **parameters: expects keys from CommonTypedDict - """ + # DEBUG: Print parameters to see what is being passed + log.info("=" * 80) + log.info(f"DEBUG: CLI parameters: {parameters}") + log.info(f"DEBUG: All parameter keys: {list(parameters.keys())}") + log.info("=" * 80) + + # Accept both underscore and dash versions + for key in ["deep1b_dataset_percentage", "deep1b-dataset-percentage"]: + if key in parameters and parameters[key] is not None: + try: + config.DEEP1B_DATASET_PERCENTAGE = float(parameters[key]) + log.info(f"DEBUG: Set config.DEEP1B_DATASET_PERCENTAGE = {config.DEEP1B_DATASET_PERCENTAGE}") + except Exception as e: + log.info(f"DEBUG: Could not set DEEP1B_DATASET_PERCENTAGE from {key}: {e}") + else: + log.info(f"DEBUG: Key '{key}' not found or is None in parameters") + + # Get deep1b_dataset_percentage from parameters + deep1b_dataset_percentage = None + for key in ["deep1b_dataset_percentage", "deep1b-dataset-percentage"]: + if key in parameters and parameters[key] is not None: + try: + deep1b_dataset_percentage = float(parameters[key]) + break + except Exception as e: + log.info(f"DEBUG: Could not convert DEEP1B_DATASET_PERCENTAGE from {key}: {e}") task = TaskConfig( db=db, @@ -494,6 +521,7 @@ def run( parameters["search_serial"], parameters["search_concurrent"], ), + deep1b_dataset_percentage=deep1b_dataset_percentage, ) log.info(f"Task:\n{pformat(task)}\n") diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index bf71ebb89..31bb2aeb7 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -153,6 +153,7 @@ class TaskConfig(BaseModel): db_case_config: DBCaseConfig case_config: CaseConfig stages: list[TaskStage] = ALL_TASK_STAGES + deep1b_dataset_percentage: float | None = None @property def db_name(self):