From c82c8878fce0248b158f992f0700481f865c47b7 Mon Sep 17 00:00:00 2001 From: Roque Lopez Date: Fri, 26 Jul 2024 12:58:49 -0400 Subject: [PATCH] feat: Restructure packages to streamline the addition of new algorithms --- CONTRIBUTING.md | 26 ++ bdikit/api.py | 142 +------- .../column_mapping/algorithms.py | 319 ------------------ .../value_mapping/__init__.py | 0 .../value_mapping/algorithms.py | 260 -------------- .../value_mappers.py => mapping_functions.py} | 0 .../__init__.py | 0 bdikit/schema_matching/best/__init__.py | 5 + .../best/algorithms/__init__.py | 1 + .../best/algorithms/contrastivelearning.py | 21 ++ bdikit/schema_matching/best/algorithms/gpt.py | 52 +++ .../best/algorithms/maxvalsim.py | 84 +++++ .../best/algorithms/twophase.py | 52 +++ .../best/algorithms/valentine.py | 106 ++++++ bdikit/schema_matching/best/base.py | 15 + .../schema_matching/best/matcher_factory.py | 45 +++ bdikit/schema_matching/topk/__init__.py | 1 + .../topk/algorithms/contrastivelearning.py} | 30 +- bdikit/schema_matching/topk/base.py | 21 ++ .../schema_matching/topk/matcher_factory.py | 26 ++ bdikit/value_matching/__init__.py | 2 + .../algorithms}/__init__.py | 0 bdikit/value_matching/algorithms/gpt.py | 54 +++ bdikit/value_matching/algorithms/polyfuzz.py | 141 ++++++++ bdikit/value_matching/base.py | 37 ++ bdikit/value_matching/matcher_factory.py | 36 ++ docs/source/examples.rst | 3 +- tests/test_api.py | 2 +- ...e_mapping.py => test_mapping_functions.py} | 2 +- tests/test_schema_matching.py | 4 +- ...g_algorithms.py => test_value_matching.py} | 2 +- 31 files changed, 753 insertions(+), 736 deletions(-) create mode 100644 CONTRIBUTING.md delete mode 100644 bdikit/mapping_algorithms/column_mapping/algorithms.py delete mode 100644 bdikit/mapping_algorithms/value_mapping/__init__.py delete mode 100644 bdikit/mapping_algorithms/value_mapping/algorithms.py rename bdikit/{mapping_algorithms/value_mapping/value_mappers.py => mapping_functions.py} (100%) rename bdikit/{mapping_algorithms => schema_matching}/__init__.py (100%) create mode 100644 bdikit/schema_matching/best/__init__.py create mode 100644 bdikit/schema_matching/best/algorithms/__init__.py create mode 100644 bdikit/schema_matching/best/algorithms/contrastivelearning.py create mode 100644 bdikit/schema_matching/best/algorithms/gpt.py create mode 100644 bdikit/schema_matching/best/algorithms/maxvalsim.py create mode 100644 bdikit/schema_matching/best/algorithms/twophase.py create mode 100644 bdikit/schema_matching/best/algorithms/valentine.py create mode 100644 bdikit/schema_matching/best/base.py create mode 100644 bdikit/schema_matching/best/matcher_factory.py create mode 100644 bdikit/schema_matching/topk/__init__.py rename bdikit/{mapping_algorithms/column_mapping/topk_matchers.py => schema_matching/topk/algorithms/contrastivelearning.py} (77%) create mode 100644 bdikit/schema_matching/topk/base.py create mode 100644 bdikit/schema_matching/topk/matcher_factory.py create mode 100644 bdikit/value_matching/__init__.py rename bdikit/{mapping_algorithms/column_mapping => value_matching/algorithms}/__init__.py (100%) create mode 100644 bdikit/value_matching/algorithms/gpt.py create mode 100644 bdikit/value_matching/algorithms/polyfuzz.py create mode 100644 bdikit/value_matching/base.py create mode 100644 bdikit/value_matching/matcher_factory.py rename tests/{test_value_mapping.py => test_mapping_functions.py} (96%) rename tests/{test_value_matching_algorithms.py => test_value_matching.py} (96%) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..be504245 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,26 @@ +Contributing to bdi-kit +======================= + +There are many ways to contribute to bdi-kit, such as improving the codebase, reporting +issues or bugs, enhancing the documentation, reviewing pull requests from other developers, +adding new matching methods, or expanding support for additional standards. +See the instructions below to get started! + + +Adding New Matching Methods +--------------------------- + +Contributors can add new methods for schema and value matching by following these steps: + +1. Create a Python module inside the `algorithms` folder (e.g., `bdikit/value_matching/algorithms`). + +2. Define a class in the module that implements either `BaseValueMatcher` (for value matching) or `BaseSchemaMatcher` (for schema matching). + +3. Instantiate an object of your class in `matcher_factory.py` (e.g., `bdikit/value_matching/matcher_factory.py`). Ensure your module is properly imported in the `__init__.py` file (e.g.,` bdikit/value_matching/__init__.py`). + + +Code of Conduct +--------------- + +We abide by the principles of openness, respect, and consideration of others +of the Python Software Foundation: https://www.python.org/psf/codeofconduct/. diff --git a/bdikit/api.py b/bdikit/api.py index 73548dd2..d3172d10 100644 --- a/bdikit/api.py +++ b/bdikit/api.py @@ -1,19 +1,16 @@ from __future__ import annotations import logging -from enum import Enum + from collections import defaultdict from os.path import join, dirname from typing import ( Union, - Type, List, Dict, TypedDict, - Set, Optional, Tuple, Callable, - Mapping, Any, ) import itertools @@ -22,37 +19,15 @@ import panel as pn from IPython.display import display, Markdown from bdikit.utils import get_gdc_data, get_gdc_metadata -from bdikit.mapping_algorithms.column_mapping.algorithms import ( - BaseSchemaMatcher, - SimFloodSchemaMatcher, - ComaSchemaMatcher, - CupidSchemaMatcher, - DistributionBasedSchemaMatcher, - JaccardSchemaMatcher, - GPTSchemaMatcher, - ContrastiveLearningSchemaMatcher, - TwoPhaseSchemaMatcher, - MaxValSimSchemaMatcher, -) -from bdikit.mapping_algorithms.value_mapping.value_mappers import ValueMapper -from bdikit.models.contrastive_learning.cl_api import ( - DEFAULT_CL_MODEL, -) -from bdikit.mapping_algorithms.column_mapping.topk_matchers import ( - TopkColumnMatcher, - CLTopkColumnMatcher, -) -from bdikit.mapping_algorithms.value_mapping.algorithms import ( - ValueMatch, - BaseValueMatcher, - TFIDFValueMatcher, - GPTValueMatcher, - EditDistanceValueMatcher, - EmbeddingValueMatcher, - AutoFuzzyJoinValueMatcher, - FastTextValueMatcher, -) -from bdikit.mapping_algorithms.value_mapping.value_mappers import ( + +from bdikit.schema_matching.best.base import BaseSchemaMatcher +from bdikit.schema_matching.best.matcher_factory import SchemaMatchers +from bdikit.schema_matching.topk.base import BaseTopkSchemaMatcher +from bdikit.schema_matching.topk.matcher_factory import TopkMatchers +from bdikit.value_matching.base import BaseValueMatcher, ValueMatch, ValueMatchingResult +from bdikit.value_matching.matcher_factory import ValueMatchers + +from bdikit.mapping_functions import ( ValueMapper, FunctionValueMapper, DictionaryMapper, @@ -67,37 +42,6 @@ logger = logging.getLogger(__name__) -class SchemaMatchers(Enum): - SIMFLOOD = ("similarity_flooding", SimFloodSchemaMatcher) - COMA = ("coma", ComaSchemaMatcher) - CUPID = ("cupid", CupidSchemaMatcher) - DISTRIBUTION_BASED = ("distribution_based", DistributionBasedSchemaMatcher) - JACCARD_DISTANCE = ("jaccard_distance", JaccardSchemaMatcher) - GPT = ("gpt", GPTSchemaMatcher) - CT_LEARGNING = ("ct_learning", ContrastiveLearningSchemaMatcher) - TWO_PHASE = ("two_phase", TwoPhaseSchemaMatcher) - MAX_VAL_SIM = ("max_val_sim", MaxValSimSchemaMatcher) - - def __init__(self, method_name: str, method_class: Type[BaseSchemaMatcher]): - self.method_name = method_name - self.method_class = method_class - - @staticmethod - def get_instance( - method_name: str, **method_kwargs: Mapping[str, Any] - ) -> BaseSchemaMatcher: - methods = {method.method_name: method.method_class for method in SchemaMatchers} - - try: - return methods[method_name](**method_kwargs) - except KeyError: - names = ", ".join(list(methods.keys())) - raise ValueError( - f"The {method_name} algorithm is not supported. " - f"Supported algorithms are: {names}" - ) - - def match_schema( source: pd.DataFrame, target: Union[str, pd.DataFrame] = "gdc", @@ -154,34 +98,12 @@ def _load_table_for_standard(name: str) -> pd.DataFrame: raise ValueError(f"The {name} standard is not supported") -class TopkMatchers(Enum): - CT_LEARNING = ("ct_learning", CLTopkColumnMatcher) - - def __init__(self, method_name: str, method_class: Type[TopkColumnMatcher]): - self.method_name = method_name - self.method_class = method_class - - @staticmethod - def get_instance( - method_name: str, **method_kwargs: Mapping[str, Any] - ) -> TopkColumnMatcher: - methods = {method.method_name: method.method_class for method in TopkMatchers} - try: - return methods[method_name](**method_kwargs) - except KeyError: - names = ", ".join(list(methods.keys())) - raise ValueError( - f"The {method_name} algorithm is not supported. " - f"Supported algorithms are: {names}" - ) - - def top_matches( source: pd.DataFrame, columns: Optional[List[str]] = None, target: Union[str, pd.DataFrame] = "gdc", top_k: int = 10, - method: Union[str, TopkColumnMatcher] = "ct_learning", + method: Union[str, BaseTopkSchemaMatcher] = "ct_learning", method_args: Optional[Dict[str, Any]] = None, ) -> pd.DataFrame: """ @@ -211,11 +133,11 @@ def top_matches( if method_args is None: method_args = {} topk_matcher = TopkMatchers.get_instance(method, **method_args) - elif isinstance(method, TopkColumnMatcher): + elif isinstance(method, BaseTopkSchemaMatcher): topk_matcher = method else: raise ValueError( - "The method must be a string or an instance of TopkColumnMatcher" + "The method must be a string or an instance of BaseTopkColumnMatcher" ) top_k_matches = topk_matcher.get_recommendations( @@ -232,47 +154,11 @@ def top_matches( return pd.concat(dfs, ignore_index=True) -class ValueMatchers(Enum): - TFIDF = ("tfidf", TFIDFValueMatcher) - EDIT = ("edit_distance", EditDistanceValueMatcher) - EMBEDDINGS = ("embedding", EmbeddingValueMatcher) - AUTOFJ = ("auto_fuzzy_join", AutoFuzzyJoinValueMatcher) - FASTTEXT = ("fasttext", FastTextValueMatcher) - GPT = ("gpt", GPTValueMatcher) - - def __init__(self, method_name: str, method_class: Type[BaseValueMatcher]): - self.method_name = method_name - self.method_class = method_class - - @staticmethod - def get_instance( - method_name: str, **method_kwargs: Mapping[str, Any] - ) -> BaseValueMatcher: - methods = {method.method_name: method.method_class for method in ValueMatchers} - try: - return methods[method_name](**method_kwargs) - except KeyError: - names = ", ".join(list(methods.keys())) - raise ValueError( - f"The {method_name} algorithm is not supported. " - f"Supported algorithms are: {names}" - ) - - -class ValueMatchingResult(TypedDict): - source: str - target: str - matches: List[ValueMatch] - coverage: float - unique_values: Set[str] - unmatch_values: Set[str] - - def match_values( source: pd.DataFrame, target: Union[str, pd.DataFrame], column_mapping: Union[Tuple[str, str], pd.DataFrame], - method: str = DEFAULT_VALUE_MATCHING_METHOD, + method: Union[str, BaseValueMatcher] = DEFAULT_VALUE_MATCHING_METHOD, method_args: Optional[Dict[str, Any]] = None, ) -> Union[pd.DataFrame, List[pd.DataFrame]]: """ diff --git a/bdikit/mapping_algorithms/column_mapping/algorithms.py b/bdikit/mapping_algorithms/column_mapping/algorithms.py deleted file mode 100644 index d96b0bb8..00000000 --- a/bdikit/mapping_algorithms/column_mapping/algorithms.py +++ /dev/null @@ -1,319 +0,0 @@ -import pandas as pd -from typing import Dict, Optional, Callable -from valentine import valentine_match -from valentine.algorithms import ( - SimilarityFlooding, - Coma, - Cupid, - DistributionBased, - JaccardDistanceMatcher, - BaseMatcher, -) -from valentine.algorithms.matcher_results import MatcherResults -from valentine.algorithms.jaccard_distance import StringDistanceFunction -from openai import OpenAI -from bdikit.models.contrastive_learning.cl_api import ( - DEFAULT_CL_MODEL, -) -from bdikit.mapping_algorithms.column_mapping.topk_matchers import ( - TopkColumnMatcher, - CLTopkColumnMatcher, -) -from bdikit.mapping_algorithms.value_mapping.algorithms import ( - BaseValueMatcher, - TFIDFValueMatcher, -) - - -class BaseSchemaMatcher: - def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame) -> Dict[str, str]: - raise NotImplementedError("Subclasses must implement this method") - - def _fill_missing_matches( - self, dataset: pd.DataFrame, matches: Dict[str, str] - ) -> Dict[str, str]: - for column in dataset.columns: - if column not in matches: - matches[column] = "" - return matches - - -class ValentineSchemaMatcher(BaseSchemaMatcher): - def __init__(self, matcher: BaseMatcher): - self.matcher = matcher - - def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame) -> Dict[str, str]: - matches: MatcherResults = valentine_match(dataset, global_table, self.matcher) - mappings = {} - for match in matches.one_to_one(): - dataset_candidate = match[0][1] - global_table_candidate = match[1][1] - mappings[dataset_candidate] = global_table_candidate - return self._fill_missing_matches(dataset, mappings) - - -class SimFloodSchemaMatcher(ValentineSchemaMatcher): - def __init__( - self, coeff_policy: str = "inverse_average", formula: str = "formula_c" - ): - super().__init__(SimilarityFlooding(coeff_policy=coeff_policy, formula=formula)) - - -class ComaSchemaMatcher(ValentineSchemaMatcher): - def __init__( - self, max_n: int = 0, use_instances: bool = False, java_xmx: str = "1024m" - ): - super().__init__( - Coma(max_n=max_n, use_instances=use_instances, java_xmx=java_xmx) - ) - - -class CupidSchemaMatcher(ValentineSchemaMatcher): - def __init__( - self, - leaf_w_struct: float = 0.2, - w_struct: float = 0.2, - th_accept: float = 0.7, - th_high: float = 0.6, - th_low: float = 0.35, - c_inc: float = 1.2, - c_dec: float = 0.9, - th_ns: float = 0.7, - parallelism: int = 1, - ): - super().__init__( - Cupid( - leaf_w_struct=leaf_w_struct, - w_struct=w_struct, - th_accept=th_accept, - th_high=th_high, - th_low=th_low, - c_inc=c_inc, - c_dec=c_dec, - th_ns=th_ns, - parallelism=parallelism, - ) - ) - - -class DistributionBasedSchemaMatcher(ValentineSchemaMatcher): - def __init__( - self, - threshold1: float = 0.15, - threshold2: float = 0.15, - quantiles: int = 256, - process_num: int = 1, - ): - super().__init__( - DistributionBased( - threshold1=threshold1, - threshold2=threshold2, - quantiles=quantiles, - process_num=process_num, - ) - ) - - -class JaccardSchemaMatcher(ValentineSchemaMatcher): - def __init__( - self, - threshold_dist: float = 0.8, - distance_fun: Callable[[str, str], float] = StringDistanceFunction.Levenshtein, - process_num: int = 1, - ): - super().__init__( - JaccardDistanceMatcher( - threshold_dist=threshold_dist, - distance_fun=distance_fun, - process_num=process_num, - ) - ) - - -class MaxValSimSchemaMatcher(BaseSchemaMatcher): - def __init__( - self, - top_k: int = 20, - top_k_matcher: Optional[TopkColumnMatcher] = None, - value_matcher: Optional[BaseValueMatcher] = None, - ): - if top_k_matcher is None: - self.api = CLTopkColumnMatcher(DEFAULT_CL_MODEL) - elif isinstance(top_k_matcher, TopkColumnMatcher): - self.api = top_k_matcher - else: - raise ValueError( - f"Invalid top_k_matcher type: {type(top_k_matcher)}. " - "Must be a subclass of {TopkColumnMatcher.__name__}" - ) - - if value_matcher is None: - self.value_matcher = TFIDFValueMatcher() - elif isinstance(value_matcher, BaseValueMatcher): - self.value_matcher = value_matcher - else: - raise ValueError( - f"Invalid value_matcher type: {type(value_matcher)}. " - "Must be a subclass of {BaseValueMatcher.__name__}" - ) - - self.top_k = top_k - - def unique_string_values(self, column: pd.Series) -> pd.Series: - column = column.dropna() - if pd.api.types.is_string_dtype(column): - return pd.Series(column.unique(), name=column.name) - else: - return pd.Series(column.unique().astype(str), name=column.name) - - def map( - self, - dataset: pd.DataFrame, - global_table: pd.DataFrame, - ): - topk_column_matches = self.api.get_recommendations( - dataset, global_table, self.top_k - ) - - matches = {} - for source_column_name, scope in zip(dataset.columns, topk_column_matches): - - source_column_name = scope["source_column"] - top_k_columns = scope["top_k_columns"] - - source_column = dataset[source_column_name] - - if not pd.api.types.is_string_dtype(source_column): - matches[source_column_name] = top_k_columns[0].column_name - continue - - source_values = self.unique_string_values(source_column).to_list() - - scores = [] - for top_column in top_k_columns: - target_column_name = top_column.column_name - target_column = global_table[target_column_name] - target_values = self.unique_string_values(target_column).to_list() - value_matches = self.value_matcher.match(source_values, target_values) - score = sum([m.similarity for m in value_matches]) / len(target_values) - score = (top_column.score + score) / 2.0 - scores.append((source_column_name, target_column_name, score)) - - sorted_columns = sorted(scores, key=lambda it: it[2], reverse=True) - - matches[source_column_name] = sorted_columns[0][1] - - return self._fill_missing_matches(dataset, matches) - - -class GPTSchemaMatcher(BaseSchemaMatcher): - def __init__(self): - self.client = OpenAI() - - def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame): - global_columns = global_table.columns - labels = ", ".join(global_columns) - candidate_columns = dataset.columns - mappings = {} - for column in candidate_columns: - col = dataset[column] - values = col.drop_duplicates().dropna() - if len(values) > 15: - rows = values.sample(15).tolist() - else: - rows = values.tolist() - serialized_input = f"{column}: {', '.join([str(row) for row in rows])}" - context = serialized_input.lower() - column_types = self.get_column_type(context, labels) - for column_type in column_types: - if column_type in global_columns: - mappings[column] = column_type - break - return self._fill_missing_matches(dataset, mappings) - - def get_column_type( - self, context: str, labels: str, m: int = 10, model: str = "gpt-4-turbo-preview" - ): - messages = [ - {"role": "system", "content": "You are an assistant for column matching."}, - { - "role": "user", - "content": """ Please select the top """ - + str(m) - + """ class from """ - + labels - + """ which best describes the context. The context is defined by the column name followed by its respective values. Please respond only with the name of the classes separated by semicolon. - \n CONTEXT: """ - + context - + """ \n RESPONSE: \n""", - }, - ] - col_type = self.client.chat.completions.create( - model=model, messages=messages, temperature=0.3 - ) - col_type_content = col_type.choices[0].message.content - return col_type_content.split(";") - - -class ContrastiveLearningSchemaMatcher(BaseSchemaMatcher): - def __init__(self, model_name: str = DEFAULT_CL_MODEL): - self.topk_matcher = CLTopkColumnMatcher(model_name=model_name) - - def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame): - topk_matches = self.topk_matcher.get_recommendations( - dataset, global_table, top_k=1 - ) - matches = {} - for column, top_k_match in zip(dataset.columns, topk_matches): - candidate = top_k_match["top_k_columns"][0][0] - if candidate in global_table.columns: - matches[column] = candidate - return self._fill_missing_matches(dataset, matches) - - -class TwoPhaseSchemaMatcher(BaseSchemaMatcher): - def __init__( - self, - top_k: int = 20, - top_k_matcher: Optional[TopkColumnMatcher] = None, - schema_matcher: BaseSchemaMatcher = SimFloodSchemaMatcher(), - ): - if top_k_matcher is None: - self.api = CLTopkColumnMatcher(DEFAULT_CL_MODEL) - elif isinstance(top_k_matcher, TopkColumnMatcher): - self.api = top_k_matcher - else: - raise ValueError( - f"Invalid top_k_matcher type: {type(top_k_matcher)}. " - "Must be a subclass of {TopkColumnMatcher.__name__}" - ) - - self.schema_matcher = schema_matcher - self.top_k = top_k - - def map( - self, - dataset: pd.DataFrame, - global_table: pd.DataFrame, - ): - topk_column_matches = self.api.get_recommendations( - dataset, global_table, self.top_k - ) - - matches = {} - for column, scope in zip(dataset.columns, topk_column_matches): - candidates = [ - cand[0] - for cand in scope["top_k_columns"] - if cand[0] in global_table.columns - ] - reduced_dataset = dataset[[column]] - reduced_global_table = global_table[candidates] - partial_matches = self.schema_matcher.map( - reduced_dataset, reduced_global_table - ) - - if column in partial_matches: - matches[column] = partial_matches[column] - - return self._fill_missing_matches(dataset, matches) diff --git a/bdikit/mapping_algorithms/value_mapping/__init__.py b/bdikit/mapping_algorithms/value_mapping/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bdikit/mapping_algorithms/value_mapping/algorithms.py b/bdikit/mapping_algorithms/value_mapping/algorithms.py deleted file mode 100644 index f288840f..00000000 --- a/bdikit/mapping_algorithms/value_mapping/algorithms.py +++ /dev/null @@ -1,260 +0,0 @@ -from typing import List, NamedTuple, Callable, Tuple, Union -import ast -from openai import OpenAI -from polyfuzz import PolyFuzz -from polyfuzz.models import EditDistance, TFIDF, Embeddings -from flair.embeddings import TransformerWordEmbeddings, WordEmbeddings -from rapidfuzz import fuzz -from autofj import AutoFJ -from Levenshtein import ratio -import pandas as pd -import flair -import torch -from bdikit.config import get_device, VALUE_MATCHING_THRESHOLD - -flair.device = torch.device(get_device()) - - -class ValueMatch(NamedTuple): - """ - Represents a match between a source value and a target value with a - similarity score. - """ - - source_value: str - target_value: str - similarity: float - - -class BaseValueMatcher: - """ - Base class for value matching algorithms, i.e., algorithms that match - values from a source domain to values from a target domain. - """ - - def match( - self, source_values: List[str], target_values: List[str] - ) -> List[ValueMatch]: - raise NotImplementedError("Subclasses must implement this method") - - -class PolyFuzzValueMatcher(BaseValueMatcher): - """ - Base class for value matching algorithms based on the PolyFuzz library. - """ - - def __init__(self, polyfuzz_model: PolyFuzz, threshold: float): - self.model = polyfuzz_model - self.threshold = threshold - - def match( - self, - source_values: List[str], - target_values: List[str], - ) -> List[ValueMatch]: - - self.model.match(source_values, target_values) - match_results = self.model.get_matches() - match_results.sort_values(by="Similarity", ascending=False, inplace=True) - - matches = [] - for _, row in match_results.iterrows(): - source = row[0] - top_matches = row[1:] - indexes = range(0, len(top_matches) - 1, 2) - - for index in indexes: - target = top_matches[index] - similarity = top_matches[index + 1] - if similarity >= self.threshold: - matches.append((source, target, similarity)) - - return matches - - -class TFIDFValueMatcher(PolyFuzzValueMatcher): - """ - Value matching algorithm based on the TF-IDF similarity between values. - """ - - def __init__( - self, - n_gram_range: Tuple[int, int] = (1, 3), - clean_string: bool = True, - threshold: float = VALUE_MATCHING_THRESHOLD, - top_n: int = 1, - cosine_method: str = "sparse", - ): - - super().__init__( - PolyFuzz( - method=TFIDF( - n_gram_range=n_gram_range, - clean_string=clean_string, - min_similarity=threshold, - top_n=top_n, - cosine_method=cosine_method, - ) - ), - threshold, - ) - - -class EditDistanceValueMatcher(PolyFuzzValueMatcher): - """ - Value matching algorithm based on the edit distance between values. - """ - - def __init__( - self, - scorer: Callable[[str, str], float] = fuzz.ratio, - n_jobs: int = -1, - threshold: float = VALUE_MATCHING_THRESHOLD, - ): - # Return scores between 0 and 1 - normalized_scorer = lambda str1, str2: scorer(str1, str2) / 100.0 - super().__init__( - PolyFuzz( - method=EditDistance( - n_jobs=n_jobs, scorer=normalized_scorer, normalize=False - ) - ), - threshold, - ) - - -class EmbeddingValueMatcher(PolyFuzzValueMatcher): - """ - Value matching algorithm based on the cosine similarity of value embeddings. - """ - - def __init__( - self, - model_name: str = "bert-base-multilingual-cased", - threshold: float = VALUE_MATCHING_THRESHOLD, - top_n: int = 1, - cosine_method: str = "sparse", - ): - embeddings = TransformerWordEmbeddings(model_name) - method = Embeddings( - embeddings, - min_similarity=threshold, - top_n=top_n, - cosine_method=cosine_method, - ) - super().__init__(PolyFuzz(method), threshold) - - -class FastTextValueMatcher(PolyFuzzValueMatcher): - """ - Value matching algorithm based on the cosine similarity of FastText embeddings. - """ - - def __init__( - self, - model_name: str = "en-crawl", - threshold: float = VALUE_MATCHING_THRESHOLD, - top_n: int = 1, - cosine_method: str = "sparse", - ): - embeddings = WordEmbeddings(model_name) - method = Embeddings( - embeddings, - min_similarity=threshold, - top_n=top_n, - cosine_method=cosine_method, - ) - super().__init__(PolyFuzz(method), threshold) - - -class GPTValueMatcher(BaseValueMatcher): - def __init__( - self, - threshold: float = VALUE_MATCHING_THRESHOLD, - ): - self.client = OpenAI() - self.threshold = threshold - - def match( - self, - source_values: List[str], - target_values: List[str], - ) -> List[ValueMatch]: - target_values_set = set(target_values) - matches = [] - - for source_value in source_values: - completion = self.client.chat.completions.create( - model="gpt-4-turbo-preview", - messages=[ - { - "role": "system", - "content": "You are an intelligent system that given a term, you have to choose a value from a list that best matches the term. " - "These terms belong to the medical domain, and the list contains terms in the Genomics Data Commons (GDC) format.", - }, - { - "role": "user", - "content": f'For the term: "{source_value}", choose a value from this list {target_values}. ' - "Return the value from the list with a similarity score, between 0 and 1, with 1 indicating the highest similarity. " - "DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. " - 'Only provide a Python dictionary. For example {"term": "term from the list", "score": 0.8}.', - }, - ], - ) - - response_message = completion.choices[0].message.content - try: - response_dict = ast.literal_eval(response_message) - target_value = response_dict["term"] - score = float(response_dict["score"]) - if target_value in target_values_set and score >= self.threshold: - matches.append(ValueMatch(source_value, target_value, score)) - except: - print( - f'Errors parsing response for "{source_value}": {response_message}' - ) - - return matches - - -class AutoFuzzyJoinValueMatcher(BaseValueMatcher): - def __init__( - self, - threshold: float = VALUE_MATCHING_THRESHOLD, - ): - self.threshold = threshold - - def match( - self, - source_values: List[str], - target_values: List[str], - ) -> List[ValueMatch]: - - source_values = sorted(list(set(source_values))) - target_values = sorted(list(set(target_values))) - - df_source_values = pd.DataFrame( - {"id": range(1, len(source_values) + 1), "title": source_values} - ) - df_target_values = pd.DataFrame( - {"id": range(1, len(target_values) + 1), "title": target_values} - ) - - matches = [] - try: - autofj = AutoFJ( - precision_target=self.threshold, - join_function_space="autofj_md", - verbose=True, - ) - LR_joins = autofj.join(df_source_values, df_target_values, id_column="id") - if len(LR_joins) > 0: - for _, row in LR_joins.iterrows(): - title_l = row["title_l"] - title_r = row["title_r"] - similarity = ratio(title_l, title_r) - if similarity >= self.threshold: - matches.append(ValueMatch(title_l, title_r, similarity)) - except Exception as e: - return matches - return matches diff --git a/bdikit/mapping_algorithms/value_mapping/value_mappers.py b/bdikit/mapping_functions.py similarity index 100% rename from bdikit/mapping_algorithms/value_mapping/value_mappers.py rename to bdikit/mapping_functions.py diff --git a/bdikit/mapping_algorithms/__init__.py b/bdikit/schema_matching/__init__.py similarity index 100% rename from bdikit/mapping_algorithms/__init__.py rename to bdikit/schema_matching/__init__.py diff --git a/bdikit/schema_matching/best/__init__.py b/bdikit/schema_matching/best/__init__.py new file mode 100644 index 00000000..6aa90108 --- /dev/null +++ b/bdikit/schema_matching/best/__init__.py @@ -0,0 +1,5 @@ +from bdikit.schema_matching.best.algorithms.valentine import * +from bdikit.schema_matching.best.algorithms.gpt import * +from bdikit.schema_matching.best.algorithms.contrastivelearning import * +from bdikit.schema_matching.best.algorithms.twophase import * +from bdikit.schema_matching.best.algorithms.maxvalsim import * diff --git a/bdikit/schema_matching/best/algorithms/__init__.py b/bdikit/schema_matching/best/algorithms/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/bdikit/schema_matching/best/algorithms/__init__.py @@ -0,0 +1 @@ + diff --git a/bdikit/schema_matching/best/algorithms/contrastivelearning.py b/bdikit/schema_matching/best/algorithms/contrastivelearning.py new file mode 100644 index 00000000..8b8a5ca3 --- /dev/null +++ b/bdikit/schema_matching/best/algorithms/contrastivelearning.py @@ -0,0 +1,21 @@ +import pandas as pd + +from bdikit.schema_matching.best.base import BaseSchemaMatcher +from bdikit.models.contrastive_learning.cl_api import DEFAULT_CL_MODEL +from bdikit.schema_matching.topk.algorithms.contrastivelearning import ( + CLTopkSchemaMatcher, +) + + +class ContrastiveLearningSchemaMatcher(BaseSchemaMatcher): + def __init__(self, model_name: str = DEFAULT_CL_MODEL): + self.topk_matcher = CLTopkSchemaMatcher(model_name=model_name) + + def map(self, source: pd.DataFrame, target: pd.DataFrame): + topk_matches = self.topk_matcher.get_recommendations(source, target, top_k=1) + matches = {} + for column, top_k_match in zip(source.columns, topk_matches): + candidate = top_k_match["top_k_columns"][0][0] + if candidate in target.columns: + matches[column] = candidate + return self._fill_missing_matches(source, matches) diff --git a/bdikit/schema_matching/best/algorithms/gpt.py b/bdikit/schema_matching/best/algorithms/gpt.py new file mode 100644 index 00000000..68d27803 --- /dev/null +++ b/bdikit/schema_matching/best/algorithms/gpt.py @@ -0,0 +1,52 @@ +import pandas as pd +from openai import OpenAI +from bdikit.schema_matching.best.base import BaseSchemaMatcher + + +class GPTSchemaMatcher(BaseSchemaMatcher): + def __init__(self): + self.client = OpenAI() + + def map(self, source: pd.DataFrame, target: pd.DataFrame): + target_columns = target.columns + labels = ", ".join(target_columns) + candidate_columns = source.columns + mappings = {} + for column in candidate_columns: + col = source[column] + values = col.drop_duplicates().dropna() + if len(values) > 15: + rows = values.sample(15).tolist() + else: + rows = values.tolist() + serialized_input = f"{column}: {', '.join([str(row) for row in rows])}" + context = serialized_input.lower() + column_types = self.get_column_type(context, labels) + for column_type in column_types: + if column_type in target_columns: + mappings[column] = column_type + break + return self._fill_missing_matches(source, mappings) + + def get_column_type( + self, context: str, labels: str, m: int = 10, model: str = "gpt-4-turbo-preview" + ): + messages = [ + {"role": "system", "content": "You are an assistant for column matching."}, + { + "role": "user", + "content": """ Please select the top """ + + str(m) + + """ class from """ + + labels + + """ which best describes the context. The context is defined by the column name followed by its respective values. Please respond only with the name of the classes separated by semicolon. + \n CONTEXT: """ + + context + + """ \n RESPONSE: \n""", + }, + ] + col_type = self.client.chat.completions.create( + model=model, messages=messages, temperature=0.3 + ) + col_type_content = col_type.choices[0].message.content + return col_type_content.split(";") diff --git a/bdikit/schema_matching/best/algorithms/maxvalsim.py b/bdikit/schema_matching/best/algorithms/maxvalsim.py new file mode 100644 index 00000000..c8993912 --- /dev/null +++ b/bdikit/schema_matching/best/algorithms/maxvalsim.py @@ -0,0 +1,84 @@ +import pandas as pd +from typing import Optional +from bdikit.schema_matching.best.base import BaseSchemaMatcher +from bdikit.models.contrastive_learning.cl_api import DEFAULT_CL_MODEL +from bdikit.schema_matching.topk.base import BaseTopkSchemaMatcher +from bdikit.schema_matching.topk.algorithms.contrastivelearning import ( + CLTopkSchemaMatcher, +) +from bdikit.value_matching.algorithms.polyfuzz import TFIDFValueMatcher +from bdikit.value_matching.base import BaseValueMatcher + + +class MaxValSimSchemaMatcher(BaseSchemaMatcher): + def __init__( + self, + top_k: int = 20, + top_k_matcher: Optional[BaseTopkSchemaMatcher] = None, + value_matcher: Optional[BaseValueMatcher] = None, + ): + if top_k_matcher is None: + self.api = CLTopkSchemaMatcher(DEFAULT_CL_MODEL) + elif isinstance(top_k_matcher, BaseTopkSchemaMatcher): + self.api = top_k_matcher + else: + raise ValueError( + f"Invalid top_k_matcher type: {type(top_k_matcher)}. " + "Must be a subclass of {BaseTopkColumnMatcher.__name__}" + ) + + if value_matcher is None: + self.value_matcher = TFIDFValueMatcher() + elif isinstance(value_matcher, BaseValueMatcher): + self.value_matcher = value_matcher + else: + raise ValueError( + f"Invalid value_matcher type: {type(value_matcher)}. " + "Must be a subclass of {BaseValueMatcher.__name__}" + ) + + self.top_k = top_k + + def unique_string_values(self, column: pd.Series) -> pd.Series: + column = column.dropna() + if pd.api.types.is_string_dtype(column): + return pd.Series(column.unique(), name=column.name) + else: + return pd.Series(column.unique().astype(str), name=column.name) + + def map( + self, + source: pd.DataFrame, + target: pd.DataFrame, + ): + topk_column_matches = self.api.get_recommendations(source, target, self.top_k) + + matches = {} + for source_column_name, scope in zip(source.columns, topk_column_matches): + + source_column_name = scope["source_column"] + top_k_columns = scope["top_k_columns"] + + source_column = source[source_column_name] + + if not pd.api.types.is_string_dtype(source_column): + matches[source_column_name] = top_k_columns[0].column_name + continue + + source_values = self.unique_string_values(source_column).to_list() + + scores = [] + for top_column in top_k_columns: + target_column_name = top_column.column_name + target_column = target[target_column_name] + target_values = self.unique_string_values(target_column).to_list() + value_matches = self.value_matcher.match(source_values, target_values) + score = sum([m.similarity for m in value_matches]) / len(target_values) + score = (top_column.score + score) / 2.0 + scores.append((source_column_name, target_column_name, score)) + + sorted_columns = sorted(scores, key=lambda it: it[2], reverse=True) + + matches[source_column_name] = sorted_columns[0][1] + + return self._fill_missing_matches(source, matches) diff --git a/bdikit/schema_matching/best/algorithms/twophase.py b/bdikit/schema_matching/best/algorithms/twophase.py new file mode 100644 index 00000000..45aa1a7b --- /dev/null +++ b/bdikit/schema_matching/best/algorithms/twophase.py @@ -0,0 +1,52 @@ +import pandas as pd +from typing import Optional + +from bdikit.schema_matching.best.base import BaseSchemaMatcher +from bdikit.schema_matching.best.algorithms.valentine import SimFloodSchemaMatcher +from bdikit.models.contrastive_learning.cl_api import DEFAULT_CL_MODEL +from bdikit.schema_matching.topk.base import BaseTopkSchemaMatcher +from bdikit.schema_matching.topk.algorithms.contrastivelearning import ( + CLTopkSchemaMatcher, +) + + +class TwoPhaseSchemaMatcher(BaseSchemaMatcher): + def __init__( + self, + top_k: int = 20, + top_k_matcher: Optional[BaseTopkSchemaMatcher] = None, + schema_matcher: BaseSchemaMatcher = SimFloodSchemaMatcher(), + ): + if top_k_matcher is None: + self.api = CLTopkSchemaMatcher(DEFAULT_CL_MODEL) + elif isinstance(top_k_matcher, BaseTopkSchemaMatcher): + self.api = top_k_matcher + else: + raise ValueError( + f"Invalid top_k_matcher type: {type(top_k_matcher)}. " + "Must be a subclass of {BaseTopkColumnMatcher.__name__}" + ) + + self.schema_matcher = schema_matcher + self.top_k = top_k + + def map( + self, + source: pd.DataFrame, + target: pd.DataFrame, + ): + topk_column_matches = self.api.get_recommendations(source, target, self.top_k) + + matches = {} + for column, scope in zip(source.columns, topk_column_matches): + candidates = [ + cand[0] for cand in scope["top_k_columns"] if cand[0] in target.columns + ] + reduced_source = source[[column]] + reduced_target = target[candidates] + partial_matches = self.schema_matcher.map(reduced_source, reduced_target) + + if column in partial_matches: + matches[column] = partial_matches[column] + + return self._fill_missing_matches(source, matches) diff --git a/bdikit/schema_matching/best/algorithms/valentine.py b/bdikit/schema_matching/best/algorithms/valentine.py new file mode 100644 index 00000000..67f8fc25 --- /dev/null +++ b/bdikit/schema_matching/best/algorithms/valentine.py @@ -0,0 +1,106 @@ +import pandas as pd +from typing import Dict, Callable +from bdikit.schema_matching.best.base import BaseSchemaMatcher +from valentine import valentine_match +from valentine.algorithms.matcher_results import MatcherResults +from valentine.algorithms.jaccard_distance import StringDistanceFunction +from valentine.algorithms import ( + SimilarityFlooding, + Coma, + Cupid, + DistributionBased, + JaccardDistanceMatcher, + BaseMatcher, +) + + +class ValentineSchemaMatcher(BaseSchemaMatcher): + def __init__(self, matcher: BaseMatcher): + self.matcher = matcher + + def map(self, source: pd.DataFrame, target: pd.DataFrame) -> Dict[str, str]: + matches: MatcherResults = valentine_match(source, target, self.matcher) + mappings = {} + for match in matches.one_to_one(): + source_candidate = match[0][1] + target_candidate = match[1][1] + mappings[source_candidate] = target_candidate + return self._fill_missing_matches(source, mappings) + + +class SimFloodSchemaMatcher(ValentineSchemaMatcher): + def __init__( + self, coeff_policy: str = "inverse_average", formula: str = "formula_c" + ): + super().__init__(SimilarityFlooding(coeff_policy=coeff_policy, formula=formula)) + + +class ComaSchemaMatcher(ValentineSchemaMatcher): + def __init__( + self, max_n: int = 0, use_instances: bool = False, java_xmx: str = "1024m" + ): + super().__init__( + Coma(max_n=max_n, use_instances=use_instances, java_xmx=java_xmx) + ) + + +class CupidSchemaMatcher(ValentineSchemaMatcher): + def __init__( + self, + leaf_w_struct: float = 0.2, + w_struct: float = 0.2, + th_accept: float = 0.7, + th_high: float = 0.6, + th_low: float = 0.35, + c_inc: float = 1.2, + c_dec: float = 0.9, + th_ns: float = 0.7, + parallelism: int = 1, + ): + super().__init__( + Cupid( + leaf_w_struct=leaf_w_struct, + w_struct=w_struct, + th_accept=th_accept, + th_high=th_high, + th_low=th_low, + c_inc=c_inc, + c_dec=c_dec, + th_ns=th_ns, + parallelism=parallelism, + ) + ) + + +class DistributionBasedSchemaMatcher(ValentineSchemaMatcher): + def __init__( + self, + threshold1: float = 0.15, + threshold2: float = 0.15, + quantiles: int = 256, + process_num: int = 1, + ): + super().__init__( + DistributionBased( + threshold1=threshold1, + threshold2=threshold2, + quantiles=quantiles, + process_num=process_num, + ) + ) + + +class JaccardSchemaMatcher(ValentineSchemaMatcher): + def __init__( + self, + threshold_dist: float = 0.8, + distance_fun: Callable[[str, str], float] = StringDistanceFunction.Levenshtein, + process_num: int = 1, + ): + super().__init__( + JaccardDistanceMatcher( + threshold_dist=threshold_dist, + distance_fun=distance_fun, + process_num=process_num, + ) + ) diff --git a/bdikit/schema_matching/best/base.py b/bdikit/schema_matching/best/base.py new file mode 100644 index 00000000..4be62cdd --- /dev/null +++ b/bdikit/schema_matching/best/base.py @@ -0,0 +1,15 @@ +import pandas as pd +from typing import Dict + + +class BaseSchemaMatcher: + def map(self, source: pd.DataFrame, target: pd.DataFrame) -> Dict[str, str]: + raise NotImplementedError("Subclasses must implement this method") + + def _fill_missing_matches( + self, dataset: pd.DataFrame, matches: Dict[str, str] + ) -> Dict[str, str]: + for column in dataset.columns: + if column not in matches: + matches[column] = "" + return matches diff --git a/bdikit/schema_matching/best/matcher_factory.py b/bdikit/schema_matching/best/matcher_factory.py new file mode 100644 index 00000000..4da08017 --- /dev/null +++ b/bdikit/schema_matching/best/matcher_factory.py @@ -0,0 +1,45 @@ +from enum import Enum +from typing import Mapping, Any, Type +from bdikit.schema_matching.best.base import BaseSchemaMatcher +from bdikit.schema_matching.best import ( + SimFloodSchemaMatcher, + ComaSchemaMatcher, + CupidSchemaMatcher, + DistributionBasedSchemaMatcher, + JaccardSchemaMatcher, + GPTSchemaMatcher, + ContrastiveLearningSchemaMatcher, + TwoPhaseSchemaMatcher, + MaxValSimSchemaMatcher, +) + + +class SchemaMatchers(Enum): + SIMFLOOD = ("similarity_flooding", SimFloodSchemaMatcher) + COMA = ("coma", ComaSchemaMatcher) + CUPID = ("cupid", CupidSchemaMatcher) + DISTRIBUTION_BASED = ("distribution_based", DistributionBasedSchemaMatcher) + JACCARD_DISTANCE = ("jaccard_distance", JaccardSchemaMatcher) + GPT = ("gpt", GPTSchemaMatcher) + CT_LEARNING = ("ct_learning", ContrastiveLearningSchemaMatcher) + TWO_PHASE = ("two_phase", TwoPhaseSchemaMatcher) + MAX_VAL_SIM = ("max_val_sim", MaxValSimSchemaMatcher) + + def __init__(self, method_name: str, method_class: Type[BaseSchemaMatcher]): + self.method_name = method_name + self.method_class = method_class + + @staticmethod + def get_instance( + method_name: str, **method_kwargs: Mapping[str, Any] + ) -> BaseSchemaMatcher: + methods = {method.method_name: method.method_class for method in SchemaMatchers} + + try: + return methods[method_name](**method_kwargs) + except KeyError: + names = ", ".join(list(methods.keys())) + raise ValueError( + f"The {method_name} algorithm is not supported. " + f"Supported algorithms are: {names}" + ) diff --git a/bdikit/schema_matching/topk/__init__.py b/bdikit/schema_matching/topk/__init__.py new file mode 100644 index 00000000..b068beda --- /dev/null +++ b/bdikit/schema_matching/topk/__init__.py @@ -0,0 +1 @@ +from bdikit.schema_matching.topk.algorithms.contrastivelearning import * diff --git a/bdikit/mapping_algorithms/column_mapping/topk_matchers.py b/bdikit/schema_matching/topk/algorithms/contrastivelearning.py similarity index 77% rename from bdikit/mapping_algorithms/column_mapping/topk_matchers.py rename to bdikit/schema_matching/topk/algorithms/contrastivelearning.py index 8b670650..a014bb49 100644 --- a/bdikit/mapping_algorithms/column_mapping/topk_matchers.py +++ b/bdikit/schema_matching/topk/algorithms/contrastivelearning.py @@ -1,7 +1,11 @@ -from abc import ABCMeta, abstractmethod -from typing import List, NamedTuple, TypedDict import pandas as pd import numpy as np +from typing import List +from bdikit.schema_matching.topk.base import ( + ColumnScore, + TopkMatching, + BaseTopkSchemaMatcher, +) from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances from bdikit.models.contrastive_learning.cl_api import ( ContrastiveLearningAPI, @@ -10,25 +14,7 @@ from bdikit.models import ColumnEmbedder -class ColumnScore(NamedTuple): - column_name: str - score: float - - -class TopkMatching(TypedDict): - source_column: str - top_k_columns: List[ColumnScore] - - -class TopkColumnMatcher(metaclass=ABCMeta): - @abstractmethod - def get_recommendations( - self, source: pd.DataFrame, target: pd.DataFrame, top_k: int - ) -> List[TopkMatching]: - pass - - -class EmbeddingSimilarityTopkColumnMatcher(TopkColumnMatcher): +class EmbeddingSimilarityTopkSchemaMatcher(BaseTopkSchemaMatcher): def __init__(self, column_embedder: ColumnEmbedder, metric: str = "cosine"): self.api = column_embedder self.metric = metric @@ -68,7 +54,7 @@ def get_recommendations( return top_k_results -class CLTopkColumnMatcher(EmbeddingSimilarityTopkColumnMatcher): +class CLTopkSchemaMatcher(EmbeddingSimilarityTopkSchemaMatcher): def __init__(self, model_name: str = DEFAULT_CL_MODEL, metric: str = "cosine"): super().__init__( column_embedder=ContrastiveLearningAPI(model_name=model_name), metric=metric diff --git a/bdikit/schema_matching/topk/base.py b/bdikit/schema_matching/topk/base.py new file mode 100644 index 00000000..87725f08 --- /dev/null +++ b/bdikit/schema_matching/topk/base.py @@ -0,0 +1,21 @@ +from abc import ABCMeta, abstractmethod +from typing import List, NamedTuple, TypedDict +import pandas as pd + + +class ColumnScore(NamedTuple): + column_name: str + score: float + + +class TopkMatching(TypedDict): + source_column: str + top_k_columns: List[ColumnScore] + + +class BaseTopkSchemaMatcher(metaclass=ABCMeta): + @abstractmethod + def get_recommendations( + self, source: pd.DataFrame, target: pd.DataFrame, top_k: int + ) -> List[TopkMatching]: + pass diff --git a/bdikit/schema_matching/topk/matcher_factory.py b/bdikit/schema_matching/topk/matcher_factory.py new file mode 100644 index 00000000..05be5bb3 --- /dev/null +++ b/bdikit/schema_matching/topk/matcher_factory.py @@ -0,0 +1,26 @@ +from enum import Enum +from typing import Mapping, Any, Type +from bdikit.schema_matching.topk.base import BaseTopkSchemaMatcher +from bdikit.schema_matching.topk import CLTopkSchemaMatcher + + +class TopkMatchers(Enum): + CT_LEARNING = ("ct_learning", CLTopkSchemaMatcher) + + def __init__(self, method_name: str, method_class: Type[BaseTopkSchemaMatcher]): + self.method_name = method_name + self.method_class = method_class + + @staticmethod + def get_instance( + method_name: str, **method_kwargs: Mapping[str, Any] + ) -> BaseTopkSchemaMatcher: + methods = {method.method_name: method.method_class for method in TopkMatchers} + try: + return methods[method_name](**method_kwargs) + except KeyError: + names = ", ".join(list(methods.keys())) + raise ValueError( + f"The {method_name} algorithm is not supported. " + f"Supported algorithms are: {names}" + ) diff --git a/bdikit/value_matching/__init__.py b/bdikit/value_matching/__init__.py new file mode 100644 index 00000000..e9409e5f --- /dev/null +++ b/bdikit/value_matching/__init__.py @@ -0,0 +1,2 @@ +from bdikit.value_matching.algorithms.polyfuzz import * +from bdikit.value_matching.algorithms.gpt import * diff --git a/bdikit/mapping_algorithms/column_mapping/__init__.py b/bdikit/value_matching/algorithms/__init__.py similarity index 100% rename from bdikit/mapping_algorithms/column_mapping/__init__.py rename to bdikit/value_matching/algorithms/__init__.py diff --git a/bdikit/value_matching/algorithms/gpt.py b/bdikit/value_matching/algorithms/gpt.py new file mode 100644 index 00000000..7fcde3e6 --- /dev/null +++ b/bdikit/value_matching/algorithms/gpt.py @@ -0,0 +1,54 @@ +from typing import List +from openai import OpenAI +from bdikit.value_matching.base import BaseValueMatcher, ValueMatch +from bdikit.config import VALUE_MATCHING_THRESHOLD + + +class GPTValueMatcher(BaseValueMatcher): + def __init__( + self, + threshold: float = VALUE_MATCHING_THRESHOLD, + ): + self.client = OpenAI() + self.threshold = threshold + + def match( + self, + source_values: List[str], + target_values: List[str], + ) -> List[ValueMatch]: + target_values_set = set(target_values) + matches = [] + + for source_value in source_values: + completion = self.client.chat.completions.create( + model="gpt-4-turbo-preview", + messages=[ + { + "role": "system", + "content": "You are an intelligent system that given a term, you have to choose a value from a list that best matches the term. " + "These terms belong to the medical domain, and the list contains terms in the Genomics Data Commons (GDC) format.", + }, + { + "role": "user", + "content": f'For the term: "{source_value}", choose a value from this list {target_values}. ' + "Return the value from the list with a similarity score, between 0 and 1, with 1 indicating the highest similarity. " + "DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. " + 'Only provide a Python dictionary. For example {"term": "term from the list", "score": 0.8}.', + }, + ], + ) + + response_message = completion.choices[0].message.content + try: + response_dict = ast.literal_eval(response_message) + target_value = response_dict["term"] + score = float(response_dict["score"]) + if target_value in target_values_set and score >= self.threshold: + matches.append(ValueMatch(source_value, target_value, score)) + except: + print( + f'Errors parsing response for "{source_value}": {response_message}' + ) + + return matches diff --git a/bdikit/value_matching/algorithms/polyfuzz.py b/bdikit/value_matching/algorithms/polyfuzz.py new file mode 100644 index 00000000..4bee843b --- /dev/null +++ b/bdikit/value_matching/algorithms/polyfuzz.py @@ -0,0 +1,141 @@ +import flair +import torch +from rapidfuzz import fuzz +from polyfuzz import PolyFuzz +from typing import List, Callable, Tuple +from bdikit.value_matching.base import BaseValueMatcher, ValueMatch +from polyfuzz.models import EditDistance, TFIDF, Embeddings +from flair.embeddings import TransformerWordEmbeddings, WordEmbeddings +from bdikit.config import get_device, VALUE_MATCHING_THRESHOLD + + +flair.device = torch.device(get_device()) + + +class PolyFuzzValueMatcher(BaseValueMatcher): + """ + Base class for value matching algorithms based on the PolyFuzz library. + """ + + def __init__(self, polyfuzz_model: PolyFuzz, threshold: float): + self.model = polyfuzz_model + self.threshold = threshold + + def match( + self, + source_values: List[str], + target_values: List[str], + ) -> List[ValueMatch]: + + self.model.match(source_values, target_values) + match_results = self.model.get_matches() + match_results.sort_values(by="Similarity", ascending=False, inplace=True) + + matches = [] + for _, row in match_results.iterrows(): + source = row[0] + top_matches = row[1:] + indexes = range(0, len(top_matches) - 1, 2) + + for index in indexes: + target = top_matches[index] + similarity = top_matches[index + 1] + if similarity >= self.threshold: + matches.append((source, target, similarity)) + + return matches + + +class TFIDFValueMatcher(PolyFuzzValueMatcher): + """ + Value matching algorithm based on the TF-IDF similarity between values. + """ + + def __init__( + self, + n_gram_range: Tuple[int, int] = (1, 3), + clean_string: bool = True, + threshold: float = VALUE_MATCHING_THRESHOLD, + top_n: int = 1, + cosine_method: str = "sparse", + ): + + super().__init__( + PolyFuzz( + method=TFIDF( + n_gram_range=n_gram_range, + clean_string=clean_string, + min_similarity=threshold, + top_n=top_n, + cosine_method=cosine_method, + ) + ), + threshold, + ) + + +class EditDistanceValueMatcher(PolyFuzzValueMatcher): + """ + Value matching algorithm based on the edit distance between values. + """ + + def __init__( + self, + scorer: Callable[[str, str], float] = fuzz.ratio, + n_jobs: int = -1, + threshold: float = VALUE_MATCHING_THRESHOLD, + ): + # Return scores between 0 and 1 + normalized_scorer = lambda str1, str2: scorer(str1, str2) / 100.0 + super().__init__( + PolyFuzz( + method=EditDistance( + n_jobs=n_jobs, scorer=normalized_scorer, normalize=False + ) + ), + threshold, + ) + + +class EmbeddingValueMatcher(PolyFuzzValueMatcher): + """ + Value matching algorithm based on the cosine similarity of value embeddings. + """ + + def __init__( + self, + model_name: str = "bert-base-multilingual-cased", + threshold: float = VALUE_MATCHING_THRESHOLD, + top_n: int = 1, + cosine_method: str = "sparse", + ): + embeddings = TransformerWordEmbeddings(model_name) + method = Embeddings( + embeddings, + min_similarity=threshold, + top_n=top_n, + cosine_method=cosine_method, + ) + super().__init__(PolyFuzz(method), threshold) + + +class FastTextValueMatcher(PolyFuzzValueMatcher): + """ + Value matching algorithm based on the cosine similarity of FastText embeddings. + """ + + def __init__( + self, + model_name: str = "en-crawl", + threshold: float = VALUE_MATCHING_THRESHOLD, + top_n: int = 1, + cosine_method: str = "sparse", + ): + embeddings = WordEmbeddings(model_name) + method = Embeddings( + embeddings, + min_similarity=threshold, + top_n=top_n, + cosine_method=cosine_method, + ) + super().__init__(PolyFuzz(method), threshold) diff --git a/bdikit/value_matching/base.py b/bdikit/value_matching/base.py new file mode 100644 index 00000000..f8124bf8 --- /dev/null +++ b/bdikit/value_matching/base.py @@ -0,0 +1,37 @@ +from typing import List, NamedTuple, TypedDict, Set + + +class ValueMatch(NamedTuple): + """ + Represents a match between a source value and a target value with a + similarity score. + """ + + source_value: str + target_value: str + similarity: float + + +class ValueMatchingResult(TypedDict): + """ + Represents the result of a value matching operation. + """ + + source: str + target: str + matches: List[ValueMatch] + coverage: float + unique_values: Set[str] + unmatch_values: Set[str] + + +class BaseValueMatcher: + """ + Base class for value matching algorithms, i.e., algorithms that match + values from a source domain to values from a target domain. + """ + + def match( + self, source_values: List[str], target_values: List[str] + ) -> List[ValueMatch]: + raise NotImplementedError("Subclasses must implement this method") diff --git a/bdikit/value_matching/matcher_factory.py b/bdikit/value_matching/matcher_factory.py new file mode 100644 index 00000000..a73c2c6c --- /dev/null +++ b/bdikit/value_matching/matcher_factory.py @@ -0,0 +1,36 @@ +from enum import Enum +from typing import Mapping, Any, Type +from bdikit.value_matching.base import BaseValueMatcher +from bdikit.value_matching import ( + GPTValueMatcher, + TFIDFValueMatcher, + EditDistanceValueMatcher, + EmbeddingValueMatcher, + FastTextValueMatcher, +) + + +class ValueMatchers(Enum): + TFIDF = ("tfidf", TFIDFValueMatcher) + EDIT = ("edit_distance", EditDistanceValueMatcher) + EMBEDDINGS = ("embedding", EmbeddingValueMatcher) + FASTTEXT = ("fasttext", FastTextValueMatcher) + GPT = ("gpt", GPTValueMatcher) + + def __init__(self, method_name: str, method_class: Type[BaseValueMatcher]): + self.method_name = method_name + self.method_class = method_class + + @staticmethod + def get_instance( + method_name: str, **method_kwargs: Mapping[str, Any] + ) -> BaseValueMatcher: + methods = {method.method_name: method.method_class for method in ValueMatchers} + try: + return methods[method_name](**method_kwargs) + except KeyError: + names = ", ".join(list(methods.keys())) + raise ValueError( + f"The {method_name} algorithm is not supported. " + f"Supported algorithms are: {names}" + ) diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 590082d5..67a8c81f 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -5,5 +5,4 @@ Here can find different Jupyter notebook examples about how to use `bdi-kit`: - `Changing the parameters of the matching methods `__ - `Getting the top-k value matches `__ -- `Analyzing one attribute/column at a time `__ - +- `Analyzing one attribute/column at a time `__ \ No newline at end of file diff --git a/tests/test_api.py b/tests/test_api.py index 952de096..5b05fcdc 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,7 +2,7 @@ import numpy as np import pandas as pd import numpy as np -from bdikit.mapping_algorithms.value_mapping.value_mappers import ( +from bdikit.mapping_functions import ( FunctionValueMapper, IdentityValueMapper, ) diff --git a/tests/test_value_mapping.py b/tests/test_mapping_functions.py similarity index 96% rename from tests/test_value_mapping.py rename to tests/test_mapping_functions.py index 5a20f1fb..652456d3 100644 --- a/tests/test_value_mapping.py +++ b/tests/test_mapping_functions.py @@ -1,6 +1,6 @@ import pandas as pd import numpy as np -from bdikit.mapping_algorithms.value_mapping.value_mappers import ( +from bdikit.mapping_functions import ( FunctionValueMapper, DictionaryMapper, IdentityValueMapper, diff --git a/tests/test_schema_matching.py b/tests/test_schema_matching.py index 6b637727..03e31706 100644 --- a/tests/test_schema_matching.py +++ b/tests/test_schema_matching.py @@ -1,5 +1,5 @@ import pandas as pd -from bdikit.mapping_algorithms.column_mapping.algorithms import ( +from bdikit.schema_matching.best import ( SimFloodSchemaMatcher, JaccardSchemaMatcher, DistributionBasedSchemaMatcher, @@ -33,7 +33,7 @@ def test_basic_column_mapping_algorithms(): ) # when - mapping = column_matcher.map(dataset=table1, global_table=table2) + mapping = column_matcher.map(source=table1, target=table2) # then assert { diff --git a/tests/test_value_matching_algorithms.py b/tests/test_value_matching.py similarity index 96% rename from tests/test_value_matching_algorithms.py rename to tests/test_value_matching.py index c43a739b..9b180ff8 100644 --- a/tests/test_value_matching_algorithms.py +++ b/tests/test_value_matching.py @@ -1,6 +1,6 @@ import unittest import pandas as pd -from bdikit.mapping_algorithms.value_mapping.algorithms import ( +from bdikit.value_matching import ( TFIDFValueMatcher, EditDistanceValueMatcher, )