diff --git a/install/requirements_py3.11.txt b/install/requirements_py3.11.txt index 18138382f..2c5108133 100644 --- a/install/requirements_py3.11.txt +++ b/install/requirements_py3.11.txt @@ -22,3 +22,5 @@ environs pydanticDeep1B 1B vectors, 96 dimensions), at varying parallel levels. Results will show index building time, + recall, and maximum QPS.""" + load_timeout: float | int = config.LOAD_TIMEOUT_96D_1B + optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_96D_1B + + def metric_type_map(s: str) -> MetricType: if s.lower() == "cosine": return MetricType.COSINE @@ -366,5 +381,6 @@ def __init__( CaseType.Performance1536D500K99P: Performance1536D500K99P, CaseType.Performance1536D5M99P: Performance1536D5M99P, CaseType.Performance1536D50K: Performance1536D50K, + CaseType.Performance96D1B: Performance96D1B, CaseType.PerformanceCustomDataset: PerformanceCustomDataset, } diff --git a/vectordb_bench/backend/clients/pgvector/cli.py b/vectordb_bench/backend/clients/pgvector/cli.py index fba329d2d..400cceeba 100644 --- a/vectordb_bench/backend/clients/pgvector/cli.py +++ b/vectordb_bench/backend/clients/pgvector/cli.py @@ -6,6 +6,7 @@ from vectordb_bench.backend.clients import DB from vectordb_bench.backend.clients.api import MetricType +from vectordb_bench import config from ....cli.cli import ( CommonTypedDict, diff --git a/vectordb_bench/backend/clients/pgvector/config.py b/vectordb_bench/backend/clients/pgvector/config.py index ea7263e8d..616f7169f 100644 --- a/vectordb_bench/backend/clients/pgvector/config.py +++ b/vectordb_bench/backend/clients/pgvector/config.py @@ -59,6 +59,7 @@ class PgVectorIndexConfig(BaseModel, DBCaseConfig): metric_type: MetricType | None = None create_index_before_load: bool = False create_index_after_load: bool = True + deep1b_dataset_percentage: float | None = None def parse_metric(self) -> str: d = { @@ -173,6 +174,7 @@ class PgVectorIVFFlatConfig(PgVectorIndexConfig): reranking_metric: str | None = None create_index_before_load: bool | None = True create_index_after_load: bool | None = False + deep1b_dataset_percentage: float | None = None def index_param(self) -> PgVectorIndexParam: index_parameters = {"lists": self.lists} @@ -219,6 +221,7 @@ class PgVectorHNSWConfig(PgVectorIndexConfig): reranking_metric: str | None = None create_index_before_load: bool | None = True create_index_after_load: bool | None = False + deep1b_dataset_percentage: float | None = None def index_param(self) -> PgVectorIndexParam: index_parameters = {"m": self.m, "ef_construction": self.ef_construction} diff --git a/vectordb_bench/backend/data_source.py b/vectordb_bench/backend/data_source.py index 139d2e308..21dc8ce80 100644 --- a/vectordb_bench/backend/data_source.py +++ b/vectordb_bench/backend/data_source.py @@ -7,6 +7,9 @@ from tqdm import tqdm from vectordb_bench import config +import h5py +import polars as pl +from vectordb_bench.backend.utils import download_file logging.getLogger("s3fs").setLevel(logging.CRITICAL) @@ -18,6 +21,7 @@ class DatasetSource(Enum): S3 = "S3" AliyunOSS = "AliyunOSS" + Deep1BLocal = "Deep1BLocal" def reader(self) -> DatasetReader: if self == DatasetSource.S3: @@ -26,6 +30,9 @@ def reader(self) -> DatasetReader: if self == DatasetSource.AliyunOSS: return AliyunOSSReader() + if self == DatasetSource.Deep1BLocal: + return Deep1BReader() + return None @@ -34,13 +41,14 @@ class DatasetReader(ABC): remote_root: str @abstractmethod - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): """read dataset files from remote_root to local_ds_root, Args: dataset(str): for instance "sift_small_500k" files(list[str]): all filenames of the dataset local_ds_root(pathlib.Path): whether to write the remote data. + deep1b_dataset_percentage(float | None): percentage of Deep1B dataset to use (only for Deep1B) """ @abstractmethod @@ -68,7 +76,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: return True - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): downloads = [] if not local_ds_root.exists(): log.info(f"local dataset root path not exist, creating it: {local_ds_root}") @@ -118,7 +126,7 @@ def ls_all(self, dataset: str): log.info(n) return names - def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path): + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): downloads = [] if not local_ds_root.exists(): log.info(f"local dataset root path not exist, creating it: {local_ds_root}") @@ -155,3 +163,74 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: return False return True + + +DEEP1B_URL = "http://ann-benchmarks.com/deep-image-96-angular.hdf5" +DEEP1B_HDF5_FILENAME = "deep-image-96-angular.hdf5" + +class Deep1BReader(DatasetReader): + source: DatasetSource = None # Not a remote source + remote_root: str = DEEP1B_URL + + def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool: + return local.exists() and local.stat().st_size > 0 + + def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None): + # Download the HDF5 file if not present + hdf5_path = local_ds_root.parent.joinpath(DEEP1B_HDF5_FILENAME) + if not hdf5_path.exists(): + local_ds_root.parent.mkdir(parents=True, exist_ok=True) + download_file(DEEP1B_URL, str(hdf5_path)) + # Extract and convert to Parquet if not already done + if not local_ds_root.exists(): + local_ds_root.mkdir(parents=True) + + # Get the percentage configuration - use task config if provided, otherwise use global config + percentage = deep1b_dataset_percentage if deep1b_dataset_percentage is not None else config.DEEP1B_DATASET_PERCENTAGE + log.info(f"DEBUG: Dataset preparation reading DEEP1B_DATASET_PERCENTAGE = {percentage} (task_config={deep1b_dataset_percentage}, global_config={config.DEEP1B_DATASET_PERCENTAGE})") + if percentage <= 0.0 or percentage > 1.0: + raise ValueError(f"DEEP1B_DATASET_PERCENTAGE must be between 0.0 and 1.0, got {percentage}") + + # Create percentage-specific filenames + train_parquet = local_ds_root.joinpath(f"train_{int(percentage * 100)}p.parquet") + test_parquet = local_ds_root.joinpath(f"test_{int(percentage * 100)}p.parquet") + + if not train_parquet.exists() or not test_parquet.exists(): + log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format with {percentage*100}% of data to base path: {local_ds_root}") + with h5py.File(hdf5_path, "r") as f: + # Extract train vectors with percentage sampling + train_vectors = f["train"][:] + total_train = train_vectors.shape[0] + sample_size = int(total_train * percentage) + + # Use first N% of the data for consistency + train_vectors = train_vectors[:sample_size] + train_ids = list(range(sample_size)) + train_df = pl.DataFrame({ + "id": train_ids, + "emb": [v.astype("float32") for v in train_vectors], + }) + log.info(f"Writing train_{int(percentage * 100)}p.parquet with {sample_size:,} vectors") + train_df.write_parquet(str(train_parquet)) + + # Extract test vectors with percentage sampling + test_vectors = f["test"][:] + total_test = test_vectors.shape[0] + test_sample_size = int(total_test * percentage) + + # Use first N% of the test data for consistency + test_vectors = test_vectors[:test_sample_size] + test_ids = list(range(test_sample_size)) + test_df = pl.DataFrame({ + "id": test_ids, + "emb": [v.astype("float32") for v in test_vectors], + }) + log.info(f"Writing test_{int(percentage * 100)}p.parquet with {test_sample_size:,} vectors") + test_df.write_parquet(str(test_parquet)) + else: + log.info(f"Using existing Deep1B dataset files with {percentage*100}% of data") + + log.info(f"Deep1B dataset preparation completed with {percentage*100}% of data. Note: No ground truth file is provided.") + +def deep1b_reader(): + return Deep1BReader() diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index 62700b0fa..78662e2d0 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -37,19 +37,21 @@ class BaseDataset(BaseModel): metric_type: MetricType use_shuffled: bool with_gt: bool = False - _size_label: dict[int, SizeLabel] = PrivateAttr() is_custom: bool = False @validator("size") def verify_size(cls, v: int): - if v not in cls._size_label: - msg = f"Size {v} not supported for the dataset, expected: {cls._size_label.keys()}" + if not hasattr(cls, '_size_label') or v not in cls._size_label: + msg = f"Size {v} not supported for the dataset, expected: {getattr(cls, '_size_label', {}).keys()}" raise ValueError(msg) return v @property def label(self) -> str: - return self._size_label.get(self.size).label + if not hasattr(type(self), '_size_label'): + return "" + size_label = type(self)._size_label.get(self.size) + return size_label.label if size_label else "" @property def dir_name(self) -> str: @@ -57,7 +59,10 @@ def dir_name(self) -> str: @property def file_count(self) -> int: - return self._size_label.get(self.size).file_count + if not hasattr(type(self), '_size_label'): + return 0 + size_label = type(self)._size_label.get(self.size) + return size_label.file_count if size_label else 0 class CustomDataset(BaseDataset): @@ -109,7 +114,7 @@ class Cohere(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA - with_gt: bool = (True,) + with_gt: bool = True _size_label: dict = { 100_000: SizeLabel(100_000, "SMALL", 1), 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), @@ -146,7 +151,7 @@ class OpenAI(BaseDataset): dim: int = 1536 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA - with_gt: bool = (True,) + with_gt: bool = True _size_label: dict = { 50_000: SizeLabel(50_000, "SMALL", 1), 500_000: SizeLabel(500_000, "MEDIUM", 1), @@ -154,6 +159,17 @@ class OpenAI(BaseDataset): } +class Deep1B(BaseDataset): + name: str = "Deep1B" + dim: int = 96 + metric_type: MetricType = MetricType.L2 + use_shuffled: bool = False + with_gt: bool = False # Deep1B doesn't have ground truth by default + _size_label: dict = { + 1_000_000_000: SizeLabel(1_000_000_000, "LARGE", 100), + } + + class DatasetManager(BaseModel): """Download dataset if not in the local directory. Provide data for cases. @@ -171,7 +187,7 @@ class DatasetManager(BaseModel): train_files: list[str] = [] reader: DatasetReader | None = None - def __eq__(self, obj: any): + def __eq__(self, obj: object) -> bool: if isinstance(obj, DatasetManager): return self.data.name == obj.data.name and self.data.label == obj.data.label return False @@ -202,6 +218,7 @@ def prepare( self, source: DatasetSource = DatasetSource.S3, filters: float | str | None = None, + deep1b_dataset_percentage: float | None = None, ) -> bool: """Download the dataset from DatasetSource url = f"{source}/{self.data.dir_name}" @@ -225,7 +242,15 @@ def prepare( gt_file, test_file = utils.compose_gt_file(filters), "test.parquet" all_files.extend([gt_file, test_file]) - if not self.data.is_custom: + # Use Deep1BReader for Deep1B dataset + if self.data.name == "Deep1B": + DatasetSource.Deep1BLocal.reader().read( + dataset=self.data.dir_name.lower(), + files=all_files, + local_ds_root=self.data_dir, + deep1b_dataset_percentage=deep1b_dataset_percentage, + ) + elif not self.data.is_custom: source.reader().read( dataset=self.data.dir_name.lower(), files=all_files, @@ -236,8 +261,22 @@ def prepare( self.test_data = self._read_file(test_file) self.gt_data = self._read_file(gt_file) - prefix = "shuffle_train" if use_shuffled else "train" - self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")]) + # Handle Deep1B percentage-specific filenames + if self.data.name == "Deep1B": + # Use the passed parameter if available, otherwise use config + percentage = deep1b_dataset_percentage if deep1b_dataset_percentage is not None else config.DEEP1B_DATASET_PERCENTAGE + train_filename = f"train_{int(percentage * 100)}p.parquet" + test_filename = f"test_{int(percentage * 100)}p.parquet" + + # Update test_data for Deep1B with percentage-specific file + if self.data_dir.joinpath(test_filename).exists(): + self.test_data = self._read_file(test_filename) + + # Set train files for Deep1B + self.train_files = [train_filename] if self.data_dir.joinpath(train_filename).exists() else [] + else: + prefix = "shuffle_train" if use_shuffled else "train" + self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")]) log.debug(f"{self.data.name}: available train files {self.train_files}") return True @@ -250,7 +289,7 @@ def _read_file(self, file_name: str) -> pd.DataFrame: log.warning(f"No such file: {p}") return pd.DataFrame() - return pl.read_parquet(p) + return pl.read_parquet(p).to_pandas() class DataSetIterator: @@ -263,6 +302,13 @@ def __init__(self, dataset: DatasetManager): def __iter__(self): return self + def _get_batch_size(self) -> int: + """Get batch size for the current dataset""" + # Deep1B dataset uses larger batch size for better performance + if self._ds.data.name == "Deep1B": + return 1_000_000 + return config.NUM_PER_BATCH + def _get_iter(self, file_name: str): p = pathlib.Path(self._ds.data_dir, file_name) log.info(f"Get iterator for {p.name}") @@ -270,7 +316,9 @@ def _get_iter(self, file_name: str): msg = f"No such file: {p}" log.warning(msg) raise IndexError(msg) - return ParquetFile(p, memory_map=True, pre_buffer=True).iter_batches(config.NUM_PER_BATCH) + batch_size = self._get_batch_size() + log.info(f"Using batch size {batch_size} for dataset {self._ds.data.name}") + return ParquetFile(p, memory_map=True, pre_buffer=True).iter_batches(batch_size) def __next__(self) -> pd.DataFrame: """return the data in the next file of the training list""" @@ -307,6 +355,7 @@ class Dataset(Enum): GLOVE = Glove SIFT = SIFT OPENAI = OpenAI + DEEP1B = Deep1B def get(self, size: int) -> BaseDataset: return self.value(size=size) diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 2a583b4f5..bf7faf4c2 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -96,7 +96,11 @@ def init_db(self, drop_old: bool = True) -> None: def _pre_run(self, drop_old: bool = True): try: self.init_db(drop_old) - self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate) + self.ca.dataset.prepare( + self.dataset_source, + filters=self.ca.filter_rate, + deep1b_dataset_percentage=self.config.deep1b_dataset_percentage, + ) except ModuleNotFoundError as e: log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}") raise e from None @@ -149,13 +153,14 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: m = Metric() if drop_old: if TaskStage.LOAD in self.config.stages: - _, load_dur = self._load_train_data() + count, load_dur = self._load_train_data() build_dur = self._optimize() m.load_duration = round(load_dur + build_dur, 4) + m.max_load_count = count log.info( f"Finish loading the entire dataset into VectorDB," f" insert_duration={load_dur}, optimize_duration={build_dur}" - f" load_duration(insert + optimize) = {m.load_duration}" + f" load_duration(insert + optimize) = {m.load_duration}, max_load_count={count}" ) else: log.info("Data loading skipped") @@ -196,7 +201,8 @@ def _load_train_data(self): self.normalize, self.ca.load_timeout, ) - runner.run() + count = runner.run() + return count except Exception as e: raise e from None finally: diff --git a/vectordb_bench/backend/utils.py b/vectordb_bench/backend/utils.py index 86c4faf5e..9ebc50798 100644 --- a/vectordb_bench/backend/utils.py +++ b/vectordb_bench/backend/utils.py @@ -1,5 +1,8 @@ import time from functools import wraps +import requests +from tqdm import tqdm +import os def numerize(n: int) -> str: @@ -80,3 +83,24 @@ def compose_gt_file(filters: float | str | None = None) -> str: msg = f"Filters not supported: {filters}" raise ValueError(msg) + + +def download_file(url: str, dest_path: str, chunk_size: int = 8192, show_progress: bool = True) -> None: + """Download a file from a URL to a local path, with progress bar. Skips if file exists.""" + if os.path.exists(dest_path): + return + response = requests.get(url, stream=True) + response.raise_for_status() + total = int(response.headers.get('content-length', 0)) + with open(dest_path, 'wb') as file, tqdm( + desc=f"Downloading {os.path.basename(dest_path)}", + total=total, + unit='B', + unit_scale=True, + unit_divisor=1024, + disable=not show_progress + ) as bar: + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + file.write(chunk) + bar.update(len(chunk)) diff --git a/vectordb_bench/cli/cli.py b/vectordb_bench/cli/cli.py index 3bb7763d8..1279340a3 100644 --- a/vectordb_bench/cli/cli.py +++ b/vectordb_bench/cli/cli.py @@ -401,6 +401,15 @@ class CommonTypedDict(TypedDict): show_default=True, ), ] + deep1b_dataset_percentage: Annotated[ + float, + click.option( + "--deep1b-dataset-percentage", + help="Percentage of Deep1B dataset to use (0.0 to 1.0, default: 1.0 = 100%)", + default=config.DEEP1B_DATASET_PERCENTAGE, + show_default=True, + ), + ] class HNSWBaseTypedDict(TypedDict): @@ -466,14 +475,32 @@ def run( db_case_config: DBCaseConfig, **parameters: Unpack[CommonTypedDict], ): - """Builds a single VectorDBBench Task and runs it, awaiting the task until finished. - - Args: - db (DB) - db_config (DBConfig) - db_case_config (DBCaseConfig) - **parameters: expects keys from CommonTypedDict - """ + # DEBUG: Print parameters to see what is being passed + log.info("=" * 80) + log.info(f"DEBUG: CLI parameters: {parameters}") + log.info(f"DEBUG: All parameter keys: {list(parameters.keys())}") + log.info("=" * 80) + + # Accept both underscore and dash versions + for key in ["deep1b_dataset_percentage", "deep1b-dataset-percentage"]: + if key in parameters and parameters[key] is not None: + try: + config.DEEP1B_DATASET_PERCENTAGE = float(parameters[key]) + log.info(f"DEBUG: Set config.DEEP1B_DATASET_PERCENTAGE = {config.DEEP1B_DATASET_PERCENTAGE}") + except Exception as e: + log.info(f"DEBUG: Could not set DEEP1B_DATASET_PERCENTAGE from {key}: {e}") + else: + log.info(f"DEBUG: Key '{key}' not found or is None in parameters") + + # Get deep1b_dataset_percentage from parameters + deep1b_dataset_percentage = None + for key in ["deep1b_dataset_percentage", "deep1b-dataset-percentage"]: + if key in parameters and parameters[key] is not None: + try: + deep1b_dataset_percentage = float(parameters[key]) + break + except Exception as e: + log.info(f"DEBUG: Could not convert DEEP1B_DATASET_PERCENTAGE from {key}: {e}") task = TaskConfig( db=db, @@ -494,6 +521,7 @@ def run( parameters["search_serial"], parameters["search_concurrent"], ), + deep1b_dataset_percentage=deep1b_dataset_percentage, ) log.info(f"Task:\n{pformat(task)}\n") diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index bf71ebb89..31bb2aeb7 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -153,6 +153,7 @@ class TaskConfig(BaseModel): db_case_config: DBCaseConfig case_config: CaseConfig stages: list[TaskStage] = ALL_TASK_STAGES + deep1b_dataset_percentage: float | None = None @property def db_name(self):