mColBERT / colbert /indexing /faiss_index_gpu.py
vjeronymo2's picture
Adding model and checkpoint
828992f
raw
history blame
4.25 kB
"""
Heavily based on: https://github.com/facebookresearch/faiss/blob/master/benchs/bench_gpu_1bn.py
"""
import sys
import time
import math
import faiss
import torch
import numpy as np
from colbert.utils.utils import print_message
class FaissIndexGPU():
def __init__(self):
self.ngpu = faiss.get_num_gpus()
if self.ngpu == 0:
return
self.tempmem = 1 << 33
self.max_add_per_gpu = 1 << 25
self.max_add = self.max_add_per_gpu * self.ngpu
self.add_batch_size = 65536
self.gpu_resources = self._prepare_gpu_resources()
def _prepare_gpu_resources(self):
print_message(f"Preparing resources for {self.ngpu} GPUs.")
gpu_resources = []
for _ in range(self.ngpu):
res = faiss.StandardGpuResources()
if self.tempmem >= 0:
res.setTempMemory(self.tempmem)
gpu_resources.append(res)
return gpu_resources
def _make_vres_vdev(self):
"""
return vectors of device ids and resources useful for gpu_multiple
"""
assert self.ngpu > 0
vres = faiss.GpuResourcesVector()
vdev = faiss.IntVector()
for i in range(self.ngpu):
vdev.push_back(i)
vres.push_back(self.gpu_resources[i])
return vres, vdev
def training_initialize(self, index, quantizer):
"""
The index and quantizer should be owned by caller.
"""
assert self.ngpu > 0
s = time.time()
self.index_ivf = faiss.extract_index_ivf(index)
self.clustering_index = faiss.index_cpu_to_all_gpus(quantizer)
self.index_ivf.clustering_index = self.clustering_index
print(time.time() - s)
def training_finalize(self):
assert self.ngpu > 0
s = time.time()
self.index_ivf.clustering_index = faiss.index_gpu_to_cpu(self.index_ivf.clustering_index)
print(time.time() - s)
def adding_initialize(self, index):
"""
The index should be owned by caller.
"""
assert self.ngpu > 0
self.co = faiss.GpuMultipleClonerOptions()
self.co.useFloat16 = True
self.co.useFloat16CoarseQuantizer = False
self.co.usePrecomputed = False
self.co.indicesOptions = faiss.INDICES_CPU
self.co.verbose = True
self.co.reserveVecs = self.max_add
self.co.shard = True
assert self.co.shard_type in (0, 1, 2)
self.vres, self.vdev = self._make_vres_vdev()
self.gpu_index = faiss.index_cpu_to_gpu_multiple(self.vres, self.vdev, index, self.co)
def add(self, index, data, offset):
assert self.ngpu > 0
t0 = time.time()
nb = data.shape[0]
for i0 in range(0, nb, self.add_batch_size):
i1 = min(i0 + self.add_batch_size, nb)
xs = data[i0:i1]
self.gpu_index.add_with_ids(xs, np.arange(offset+i0, offset+i1))
if self.max_add > 0 and self.gpu_index.ntotal > self.max_add:
self._flush_to_cpu(index, nb, offset)
print('\r%d/%d (%.3f s) ' % (i0, nb, time.time() - t0), end=' ')
sys.stdout.flush()
if self.gpu_index.ntotal > 0:
self._flush_to_cpu(index, nb, offset)
assert index.ntotal == offset+nb, (index.ntotal, offset+nb, offset, nb)
print(f"add(.) time: %.3f s \t\t--\t\t index.ntotal = {index.ntotal}" % (time.time() - t0))
def _flush_to_cpu(self, index, nb, offset):
print("Flush indexes to CPU")
for i in range(self.ngpu):
index_src_gpu = faiss.downcast_index(self.gpu_index if self.ngpu == 1 else self.gpu_index.at(i))
index_src = faiss.index_gpu_to_cpu(index_src_gpu)
index_src.copy_subset_to(index, 0, offset, offset+nb)
index_src_gpu.reset()
index_src_gpu.reserveMemory(self.max_add)
if self.ngpu > 1:
try:
self.gpu_index.sync_with_shard_indexes()
except:
self.gpu_index.syncWithSubIndexes()