Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions install/requirements_py3.11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ environs
pydantic<v2
scikit-learn
pymilvus
h5py
requests
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dependencies = [
"pydantic<v2",
"scikit-learn",
"pymilvus", # with pandas, numpy, ujson
"h5py",
"requests",
]
dynamic = ["version"]

Expand Down
118 changes: 118 additions & 0 deletions test_deep1b_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Test script to verify Deep1B dataset integration
"""

import sys
import pathlib

# Add the project root to the path
sys.path.insert(0, str(pathlib.Path(__file__).parent))

from vectordb_bench.backend.dataset import Dataset
from vectordb_bench.backend.cases import CaseType, type2case


def test_deep1b_dataset():
"""Test that Deep1B dataset can be created and accessed"""
print("Testing Deep1B dataset integration...")

# Test dataset creation
try:
deep1b_dataset = Dataset.DEEP1B.get(1_000_000_000)
print(f"✓ Deep1B dataset created successfully")
print(f" - Name: {deep1b_dataset.name}")
print(f" - Size: {deep1b_dataset.size}")
print(f" - Dimensions: {deep1b_dataset.dim}")
print(f" - Metric Type: {deep1b_dataset.metric_type}")
print(f" - Label: {deep1b_dataset.label}")
print(f" - Directory Name: {deep1b_dataset.dir_name}")
print(f" - File Count: {deep1b_dataset.file_count}")
except Exception as e:
print(f"✗ Failed to create Deep1B dataset: {e}")
return False

# Test dataset manager
try:
deep1b_manager = Dataset.DEEP1B.manager(1_000_000_000)
print(f"✓ Deep1B dataset manager created successfully")
print(f" - Data directory: {deep1b_manager.data_dir}")
except Exception as e:
print(f"✗ Failed to create Deep1B dataset manager: {e}")
return False

# Test case creation
try:
case = CaseType.Performance96D1B.case_cls()
print(f"✓ Deep1B case created successfully")
print(f" - Case ID: {case.case_id}")
print(f" - Name: {case.name}")
print(f" - Description: {case.description[:100]}...")
print(f" - Load Timeout: {case.load_timeout}")
print(f" - Optimize Timeout: {case.optimize_timeout}")
except Exception as e:
print(f"✗ Failed to create Deep1B case: {e}")
return False

print("✓ All Deep1B integration tests passed!")
return True


def test_dataset_enum():
"""Test that Deep1B is properly added to the Dataset enum"""
print("\nTesting Dataset enum...")

# Check if DEEP1B is in the enum
if hasattr(Dataset, 'DEEP1B'):
print(f"✓ DEEP1B found in Dataset enum")
else:
print(f"✗ DEEP1B not found in Dataset enum")
return False

# Check if it's properly mapped
try:
deep1b_class = Dataset.DEEP1B.value
print(f"✓ DEEP1B class: {deep1b_class}")
except Exception as e:
print(f"✗ Failed to access DEEP1B class: {e}")
return False

return True


def test_case_enum():
"""Test that Performance96D1B is properly added to the CaseType enum"""
print("\nTesting CaseType enum...")

# Check if Performance96D1B is in the enum
if hasattr(CaseType, 'Performance96D1B'):
print(f"✓ Performance96D1B found in CaseType enum")
else:
print(f"✗ Performance96D1B not found in CaseType enum")
return False

# Check if it's properly mapped in type2case
if CaseType.Performance96D1B in type2case:
print(f"✓ Performance96D1B found in type2case mapping")
else:
print(f"✗ Performance96D1B not found in type2case mapping")
return False

return True


if __name__ == "__main__":
print("Deep1B Dataset Integration Test")
print("=" * 40)

success = True
success &= test_deep1b_dataset()
success &= test_dataset_enum()
success &= test_case_enum()

if success:
print("\n🎉 All tests passed! Deep1B integration is working correctly.")
sys.exit(0)
else:
print("\n❌ Some tests failed. Please check the integration.")
sys.exit(1)
6 changes: 6 additions & 0 deletions vectordb_bench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ class config:
LOAD_TIMEOUT_1536D_500K = 24 * 3600 # 24h
LOAD_TIMEOUT_1536D_5M = 240 * 3600 # 10d

# Deep1B dataset timeouts (96 dimensions, 1B vectors)
LOAD_TIMEOUT_96D_1B = 7200 * 3600 # 300d - very large dataset
OPTIMIZE_TIMEOUT_96D_1B = 7200 * 3600 # 300d - very large dataset
# Deep1B dataset percentage for testing (0.0 to 1.0, default 1.0 = 100%)
DEEP1B_DATASET_PERCENTAGE = env.float("DEEP1B_DATASET_PERCENTAGE", 1.0)

OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d
Expand Down
16 changes: 16 additions & 0 deletions vectordb_bench/backend/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class CaseType(Enum):

Performance1536D50K = 50

# Deep1B dataset cases
Performance96D1B = 60

Custom = 100
PerformanceCustomDataset = 101

Expand Down Expand Up @@ -302,6 +305,18 @@ class Performance1536D50K(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_DEFAULT


class Performance96D1B(PerformanceCase):
case_id: CaseType = CaseType.Performance96D1B
filter_rate: float | int | None = None
dataset: DatasetManager = Dataset.DEEP1B.manager(1_000_000_000)
name: str = "Search Performance Test (1B Dataset, 96 Dim)"
description: str = """This case tests the search performance of a vector database with a very large 1B dataset
(<b>Deep1B 1B vectors</b>, 96 dimensions), at varying parallel levels. Results will show index building time,
recall, and maximum QPS."""
load_timeout: float | int = config.LOAD_TIMEOUT_96D_1B
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_96D_1B


def metric_type_map(s: str) -> MetricType:
if s.lower() == "cosine":
return MetricType.COSINE
Expand Down Expand Up @@ -366,5 +381,6 @@ def __init__(
CaseType.Performance1536D500K99P: Performance1536D500K99P,
CaseType.Performance1536D5M99P: Performance1536D5M99P,
CaseType.Performance1536D50K: Performance1536D50K,
CaseType.Performance96D1B: Performance96D1B,
CaseType.PerformanceCustomDataset: PerformanceCustomDataset,
}
1 change: 1 addition & 0 deletions vectordb_bench/backend/clients/pgvector/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from vectordb_bench.backend.clients import DB
from vectordb_bench.backend.clients.api import MetricType
from vectordb_bench import config

from ....cli.cli import (
CommonTypedDict,
Expand Down
3 changes: 3 additions & 0 deletions vectordb_bench/backend/clients/pgvector/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PgVectorIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType | None = None
create_index_before_load: bool = False
create_index_after_load: bool = True
deep1b_dataset_percentage: float | None = None

def parse_metric(self) -> str:
d = {
Expand Down Expand Up @@ -173,6 +174,7 @@ class PgVectorIVFFlatConfig(PgVectorIndexConfig):
reranking_metric: str | None = None
create_index_before_load: bool | None = True
create_index_after_load: bool | None = False
deep1b_dataset_percentage: float | None = None

def index_param(self) -> PgVectorIndexParam:
index_parameters = {"lists": self.lists}
Expand Down Expand Up @@ -219,6 +221,7 @@ class PgVectorHNSWConfig(PgVectorIndexConfig):
reranking_metric: str | None = None
create_index_before_load: bool | None = True
create_index_after_load: bool | None = False
deep1b_dataset_percentage: float | None = None

def index_param(self) -> PgVectorIndexParam:
index_parameters = {"m": self.m, "ef_construction": self.ef_construction}
Expand Down
85 changes: 82 additions & 3 deletions vectordb_bench/backend/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from tqdm import tqdm

from vectordb_bench import config
import h5py
import polars as pl
from vectordb_bench.backend.utils import download_file

logging.getLogger("s3fs").setLevel(logging.CRITICAL)

Expand All @@ -18,6 +21,7 @@
class DatasetSource(Enum):
S3 = "S3"
AliyunOSS = "AliyunOSS"
Deep1BLocal = "Deep1BLocal"

def reader(self) -> DatasetReader:
if self == DatasetSource.S3:
Expand All @@ -26,6 +30,9 @@ def reader(self) -> DatasetReader:
if self == DatasetSource.AliyunOSS:
return AliyunOSSReader()

if self == DatasetSource.Deep1BLocal:
return Deep1BReader()

return None


Expand All @@ -34,13 +41,14 @@ class DatasetReader(ABC):
remote_root: str

@abstractmethod
def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None):
"""read dataset files from remote_root to local_ds_root,

Args:
dataset(str): for instance "sift_small_500k"
files(list[str]): all filenames of the dataset
local_ds_root(pathlib.Path): whether to write the remote data.
deep1b_dataset_percentage(float | None): percentage of Deep1B dataset to use (only for Deep1B)
"""

@abstractmethod
Expand Down Expand Up @@ -68,7 +76,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:

return True

def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None):
downloads = []
if not local_ds_root.exists():
log.info(f"local dataset root path not exist, creating it: {local_ds_root}")
Expand Down Expand Up @@ -118,7 +126,7 @@ def ls_all(self, dataset: str):
log.info(n)
return names

def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None):
downloads = []
if not local_ds_root.exists():
log.info(f"local dataset root path not exist, creating it: {local_ds_root}")
Expand Down Expand Up @@ -155,3 +163,74 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:
return False

return True


DEEP1B_URL = "http://ann-benchmarks.com/deep-image-96-angular.hdf5"
DEEP1B_HDF5_FILENAME = "deep-image-96-angular.hdf5"

class Deep1BReader(DatasetReader):
source: DatasetSource = None # Not a remote source
remote_root: str = DEEP1B_URL

def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:
return local.exists() and local.stat().st_size > 0

def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path, deep1b_dataset_percentage: float | None = None):
# Download the HDF5 file if not present
hdf5_path = local_ds_root.parent.joinpath(DEEP1B_HDF5_FILENAME)
if not hdf5_path.exists():
local_ds_root.parent.mkdir(parents=True, exist_ok=True)
download_file(DEEP1B_URL, str(hdf5_path))
# Extract and convert to Parquet if not already done
if not local_ds_root.exists():
local_ds_root.mkdir(parents=True)

# Get the percentage configuration - use task config if provided, otherwise use global config
percentage = deep1b_dataset_percentage if deep1b_dataset_percentage is not None else config.DEEP1B_DATASET_PERCENTAGE
log.info(f"DEBUG: Dataset preparation reading DEEP1B_DATASET_PERCENTAGE = {percentage} (task_config={deep1b_dataset_percentage}, global_config={config.DEEP1B_DATASET_PERCENTAGE})")
if percentage <= 0.0 or percentage > 1.0:
raise ValueError(f"DEEP1B_DATASET_PERCENTAGE must be between 0.0 and 1.0, got {percentage}")

# Create percentage-specific filenames
train_parquet = local_ds_root.joinpath(f"train_{int(percentage * 100)}p.parquet")
test_parquet = local_ds_root.joinpath(f"test_{int(percentage * 100)}p.parquet")

if not train_parquet.exists() or not test_parquet.exists():
log.info(f"Extracting and converting Deep1B HDF5 file to Parquet format with {percentage*100}% of data to base path: {local_ds_root}")
with h5py.File(hdf5_path, "r") as f:
# Extract train vectors with percentage sampling
train_vectors = f["train"][:]
total_train = train_vectors.shape[0]
sample_size = int(total_train * percentage)

# Use first N% of the data for consistency
train_vectors = train_vectors[:sample_size]
train_ids = list(range(sample_size))
train_df = pl.DataFrame({
"id": train_ids,
"emb": [v.astype("float32") for v in train_vectors],
})
log.info(f"Writing train_{int(percentage * 100)}p.parquet with {sample_size:,} vectors")
train_df.write_parquet(str(train_parquet))

# Extract test vectors with percentage sampling
test_vectors = f["test"][:]
total_test = test_vectors.shape[0]
test_sample_size = int(total_test * percentage)

# Use first N% of the test data for consistency
test_vectors = test_vectors[:test_sample_size]
test_ids = list(range(test_sample_size))
test_df = pl.DataFrame({
"id": test_ids,
"emb": [v.astype("float32") for v in test_vectors],
})
log.info(f"Writing test_{int(percentage * 100)}p.parquet with {test_sample_size:,} vectors")
test_df.write_parquet(str(test_parquet))
else:
log.info(f"Using existing Deep1B dataset files with {percentage*100}% of data")

log.info(f"Deep1B dataset preparation completed with {percentage*100}% of data. Note: No ground truth file is provided.")

def deep1b_reader():
return Deep1BReader()
Loading