Skip to content

Commit

Permalink
Add scripts for Faiss to Parquet Conversion (#2631)
Browse files Browse the repository at this point in the history
  • Loading branch information
valamuri2020 authored Nov 22, 2024
1 parent 06eb93f commit 8bd8ca8
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 1 deletion.
122 changes: 122 additions & 0 deletions src/main/python/parquet/faiss_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
import argparse
import logging
import shutil
import faiss
import pandas as pd


def setup_logging():
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)

def read_docid_file(docid_path):
"""
Reads the docid file and returns a list of document IDs.
"""
try:
with open(docid_path, 'r') as f:
docids = [line.strip() for line in f]
logging.info(f"Read {len(docids)} docids from {docid_path}")
return docids
except Exception as e:
logging.error(f"Failed to read docid file {docid_path}: {e}")
raise RuntimeError(f"Failed to read docid file {docid_path}: {e}")


def read_faiss_index(index_path):
"""
Reads a FAISS index file and returns a numpy array of vectors.
"""
try:
index = faiss.read_index(index_path)
vectors = index.reconstruct_n(0, index.ntotal)
logging.info(f"Read {vectors.shape[0]} vectors from {index_path}")
return vectors
except Exception as e:
logging.error(f"Failed to read FAISS index file {index_path}: {e}")
raise RuntimeError(f"Failed to read FAISS index file {index_path}: {e}")


def write_to_parquet_in_chunks(df, output_dir, rows_per_chunk=10**6):
"""
Writes the DataFrame to Parquet files in chunks of specified size.
"""
# Write DataFrame to Parquet in chunks
for i in range(0, len(df), rows_per_chunk):
chunk = df.iloc[i:i + rows_per_chunk]
chunk_file = os.path.join(output_dir, f'chunk_{i//rows_per_chunk}.parquet')
try:
chunk.to_parquet(chunk_file, index=False)
logging.info(f"Successfully wrote chunk to {chunk_file}")
except Exception as e:
logging.error(f"Failed to write chunk to Parquet file {chunk_file}: {e}")
raise RuntimeError(f"Failed to write chunk to Parquet file {chunk_file}: {e}")


def convert_faiss_to_parquet(input_dir, output_dir, overwrite):
"""
Converts FAISS index files in the input directory to Parquet files in the output directory.
"""
# Ensure the input directory contains the necessary files
docid_path = os.path.join(input_dir, 'docid')
index_path = os.path.join(input_dir, 'index')

if not os.path.isfile(docid_path) or not os.path.isfile(index_path):
raise FileNotFoundError("Both 'docid' and 'index' files must be present in the input directory.")

# Set up the output directory
if os.path.exists(output_dir):
if overwrite:
shutil.rmtree(output_dir)
os.makedirs(output_dir)
else:
raise FileExistsError(f"Output directory '{output_dir}' already exists. Use --overwrite to replace it.")
else:
os.makedirs(output_dir)

# Read docids and vectors
docids = read_docid_file(docid_path)
vectors = read_faiss_index(index_path)

# Check if the number of docids matches the number of vectors
if len(docids) != vectors.shape[0]:
error_message = "The number of docids does not match the number of vectors."
logging.error(error_message)
raise ValueError(error_message)

df = pd.DataFrame({
'docid': docids,
'vector': vectors.tolist() # Convert vectors to a list format
})

# Write DataFrame to Parquet in chunks
write_to_parquet_in_chunks(df, output_dir)

if __name__ == "__main__":
setup_logging()

parser = argparse.ArgumentParser(
description="Convert FAISS index files to Parquet format in chunks."
)
parser.add_argument(
"--input", required=True, help="Input directory containing 'docid' and 'index' files."
)
parser.add_argument(
"--output", required=True, help="Output directory where the Parquet files will be saved."
)
parser.add_argument(
"--overwrite",
action="store_true",
default=False,
help="Overwrite the output directory if it already exists.",
)
args = parser.parse_args()

try:
# Convert FAISS index data to Parquet in chunks
convert_faiss_to_parquet(args.input, args.output, args.overwrite)
except Exception as e:
logging.error(f"Script failed: {e}")

57 changes: 57 additions & 0 deletions src/main/python/parquet/get_faiss_indexes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

# Create the collections folder if it doesn't exist
mkdir -p collections

# Array of URLs to download
urls=(
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-arguana.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-bioasq.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-climate-fever.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-android.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-english.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-gaming.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-gis.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-mathematica.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-physics.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-programmers.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-stats.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-tex.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-unix.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-webmasters.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-cqadupstack-wordpress.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-dbpedia-entity.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-fever.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-fiqa.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-hotpotqa.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-nfcorpus.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-nq.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-quora.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-robust04.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-scifact.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-scidocs.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-signal1m.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-trec-covid.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-trec-news.bge-base-en-v1.5.20240107.tar.gz"
"https://rgw.cs.uwaterloo.ca/pyserini/indexes/faiss/faiss-flat.beir-v1.0.0-webis-touche2020.bge-base-en-v1.5.20240107.tar.gz"
)

# Change to the collections directory
cd collections

# Download each file using wget in parallel
for url in "${urls[@]}"; do
wget "$url" &
done

# Wait for all downloads to complete
wait

# Extract each tar.gz file in parallel
for file in *.tar.gz; do
tar -xvzf "$file" &
done

# Wait for all extractions to complete
wait

2 changes: 1 addition & 1 deletion src/main/python/parquet/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ python-dateutil==2.9.0.post0
pytz==2024.1
six==1.16.0
tqdm==4.66.5
tzdata==2024.1
tzdata==2024.1
91 changes: 91 additions & 0 deletions src/main/python/parquet/run_conversions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/bin/bash

# Default base directory
DEFAULT_BASE_DIR="collections/faiss/"
BASE_DIR=${1:-$DEFAULT_BASE_DIR}

# Shift the arguments if a base directory is passed
if [ "$BASE_DIR" != "$DEFAULT_BASE_DIR" ]; then
shift
fi

# Check if the --all flag is passed
if [ "$1" == "--all" ]; then
# Get all subdirectories in the base directory
SUBDIRS=("$BASE_DIR"/*/)
else
# If no --all flag, use the provided arguments as subdirectory names
if [ $# -eq 0 ]; then
echo "No subdirectories specified. Exiting."
exit 1
fi

# Convert the passed arguments to subdirectory paths
SUBDIRS=()
for SUBDIR_NAME in "$@"; do
SUBDIR_PATH="$BASE_DIR/$SUBDIR_NAME"
if [ -d "$SUBDIR_PATH" ]; then
SUBDIRS+=("$SUBDIR_PATH/")
else
echo "Subdirectory $SUBDIR_PATH does not exist. Skipping."
fi
done

# If no valid subdirectories were provided, exit
if [ ${#SUBDIRS[@]} -eq 0 ]; then
echo "No valid subdirectories provided. Exiting."
exit 1
fi
fi

# Loop through each specified subdirectory (or all subdirectories if --all was passed)
for SUBDIR in "${SUBDIRS[@]}"; do
if [ -d "$SUBDIR" ]; then
(
echo "Processing $SUBDIR"

PARQUET_DIR="${SUBDIR%/}.faiss-parquet"
SUBDIR_NAME=$(basename "$SUBDIR")
INDEX_NAME="indexes/faiss-parquet/$SUBDIR_NAME"
RUNS_FILE="runs/${SUBDIR_NAME}_faiss_parquet.txt"
EVAL_FILE="runs/${SUBDIR_NAME}_faiss_parquet_evals.txt"

# Convert to Parquet
python src/main/python/parquet/faiss_to_parquet.py --input "$SUBDIR" --output "$PARQUET_DIR" --overwrite

# Index Parquet data
bin/run.sh io.anserini.index.IndexFlatDenseVectors \
-threads 16 \
-collection ParquetDenseVectorCollection \
-input "$PARQUET_DIR" \
-generator ParquetDenseVectorDocumentGenerator \
-index "$INDEX_NAME" \
>&"logs/debug-log.beir-v1.0.0-${SUBDIR_NAME}.bge-base-en-v1.5"

# Search on the indexed data
bin/run.sh io.anserini.search.SearchFlatDenseVectors \
-index "$INDEX_NAME" \
-topics "tools/topics-and-qrels/topics.beir-v1.0.0-${SUBDIR_NAME}.test.bge-base-en-v1.5.jsonl.gz" \
-topicReader JsonStringVector \
-output "$RUNS_FILE" \
-hits 1000 -removeQuery -threads 16
echo "Running evaluations for $SUBDIR_NAME"
{
bin/trec_eval -c -m ndcg_cut.10 "tools/topics-and-qrels/qrels.beir-v1.0.0-${SUBDIR_NAME}.test.txt" "$RUNS_FILE"
bin/trec_eval -c -m recall.100 "tools/topics-and-qrels/qrels.beir-v1.0.0-${SUBDIR_NAME}.test.txt" "$RUNS_FILE"
bin/trec_eval -c -m recall.1000 "tools/topics-and-qrels/qrels.beir-v1.0.0-${SUBDIR_NAME}.test.txt" "$RUNS_FILE"
} >"$EVAL_FILE"

# Check if the commands were successful
if [ $? -eq 0 ]; then
echo "Successfully processed $SUBDIR"
else
echo "Failed to process $SUBDIR"
fi
) &
fi
done

wait

echo "All specified subdirectories processed."

0 comments on commit 8bd8ca8

Please sign in to comment.