Skip to content
169 changes: 107 additions & 62 deletions giskard_vision/core/detectors/metadata_scan_detector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Callable, List, Sequence
from copy import deepcopy
from typing import Any, Callable, Dict, List, Sequence

import numpy as np
import pandas as pd
Expand All @@ -17,17 +18,25 @@ class MetaDataScanDetector(DetectorVisionBase):
Detector based on Giskard scan that looks for issues based on metadata

Args:
surrogate_function: function
type_task: str
Type of the task for the scan, ["regression", "classification"]
surrogate_functions: Dict[str, Callable]
Function to transform the output of the model and the ground truth into one value
that will be used by the scan
metric: function
metric: MetricBase
Metric to evaluate the prediction with respect to the ground truth
type_task: str
Type of the task for the scan, ["regression", "classification"]
metric_type: str
"relative": relative difference will be computed to detect issues
"absolute": absolute difference will be computed to detect issues
metric_direction: str
"better_higher": higer metric means better result
"better_lower": lower metric means better result
issue_group: IssueGroup
Default issue group
"""

type_task: str = "classification"
surrogate_function: Callable = None
surrogate_functions: Dict[str, Callable] = {"no_surrogate": None}
metric: MetricBase = None
metric_type: str = None
metric_direction: str = "better_lower"
Expand Down Expand Up @@ -60,71 +69,101 @@ def get_results(self, model: Any, dataset: Any) -> List[ScanResult]:
# Get dataframe from metadata
df_for_scan = self.get_df_for_scan(model, dataset, list_metadata)

if self.type_task == "regression":

def prediction_function(df: pd.DataFrame) -> np.ndarray:
return pd.merge(df, df_for_scan, on="index", how="inner")["prediction"].values
list_scan_results = []
current_slices = []
for surrogate_name in self.surrogate_functions:

elif self.type_task == "classification":
class_to_index = {label: index for index, label in enumerate(model.classification_labels)}
n_classes = len(model.classification_labels)
if self.type_task == "regression":

def prediction_function(df: pd.DataFrame) -> np.ndarray:
array = pd.merge(df, df_for_scan, on="index", how="inner")["prediction"].values
one_hot_encoded = np.zeros((len(array), n_classes), dtype=float)
def prediction_function(df: pd.DataFrame) -> np.ndarray:
return pd.merge(df, df_for_scan, on="index", how="inner")[f"prediction_{surrogate_name}"].values

for i, label in enumerate(array):
class_index = class_to_index[label]
one_hot_encoded[i, class_index] = 1
elif self.type_task == "classification":

return one_hot_encoded
class_to_index = {label: index for index, label in enumerate(model.classification_labels)}
n_classes = len(model.classification_labels)

# Create Giskard dataset and model
giskard_dataset = Dataset(df=df_for_scan, target="target", cat_columns=list_categories + ["index"])
def prediction_function(df: pd.DataFrame) -> np.ndarray:
array = pd.merge(df, df_for_scan, on="index", how="inner")[f"prediction_{surrogate_name}"].values
one_hot_encoded = np.zeros((len(array), n_classes), dtype=float)

giskard_model = Model(
model=prediction_function,
model_type=self.type_task,
feature_names=list_metadata + ["index"],
classification_labels=model.classification_labels if self.type_task == "classification" else None,
)
for i, label in enumerate(array):
class_index = class_to_index[label]
one_hot_encoded[i, class_index] = 1

# Get scan results
results = scan(giskard_model, giskard_dataset, max_issues_per_detector=None, verbose=False)
return one_hot_encoded

list_scan_results = []

# For each slice found, get appropriate scna results with the metric
for issue in results.issues:
current_data_slice = giskard_dataset.slice(issue.slicing_fn)
indices = list(current_data_slice.df.sort_values(by="metric", ascending=False)["index"].values)
filenames = (
[dataset.get_image_path(int(idx)) for idx in indices[: self.num_images]]
if hasattr(dataset, "get_image_path")
else []
# Create Giskard dataset and model
giskard_dataset = Dataset(
df=df_for_scan, target=f"target_{surrogate_name}", cat_columns=list_categories + ["index"]
)
list_scan_results.append(
self.get_scan_result(
metric_value=current_data_slice.df["metric"].mean(),
metric_reference_value=giskard_dataset.df["metric"].mean(),
metric_name=self.metric.name,
filename_examples=filenames,
name=issue.slicing_fn.meta.display_name,
size_data=len(current_data_slice.df),
issue_group=meta.issue_group(issue.features[0]),
)

giskard_model = Model(
model=prediction_function,
model_type=self.type_task,
feature_names=list_metadata + ["index"],
classification_labels=model.classification_labels if self.type_task == "classification" else None,
)

# Get scan results
results = scan(giskard_model, giskard_dataset, max_issues_per_detector=None, verbose=False)

# For each slice found, get appropriate scna results with the metric
for issue in results.issues:
current_data_slice = giskard_dataset.slice(issue.slicing_fn)
indices = list(current_data_slice.df.sort_values(by="metric", ascending=False)["index"].values)
if not self.check_slice_already_selected(indices, current_slices):
current_slices.append(deepcopy(indices))
filenames = (
[dataset.get_image_path(int(idx)) for idx in indices[: self.num_images]]
if hasattr(dataset, "get_image_path")
else []
)
list_scan_results.append(
self.get_scan_result(
metric_value=current_data_slice.df["metric"].mean(),
metric_reference_value=giskard_dataset.df["metric"].mean(),
metric_name=self.metric.name,
filename_examples=filenames,
name=issue.slicing_fn.meta.display_name,
size_data=len(current_data_slice.df),
issue_group=meta.issue_group(issue.features[0]),
)
)

return list_scan_results

def check_slice_already_selected(self, slice, list_slices):
"""
Check whether the slice is already present in list_slices (list of sorted slices)

Args:
slice (list): Current slice (list of indices)
list_slices (list[list]): List of slices

Return:
bool
"""
if not list_slices:
return False
len_slice = len(slice)
for saved_slice in list_slices:
if len(saved_slice) == len_slice:
for i in range(len_slice):
if slice[i] != saved_slice[i]:
return False
return True
return False

def get_df_for_scan(self, model: Any, dataset: Any, list_metadata: Sequence[str]) -> pd.DataFrame:
# Create a dataframe containing each metadata and metric, surrogate target, surrogate prediction
# image path for display in html, and index
df = {name_metadata: [] for name_metadata in list_metadata}
df["metric"] = []
df["target"] = []
df["prediction"] = []
df["index"] = []
for surrogate_name in self.surrogate_functions:
df[f"target_{surrogate_name}"] = []
df[f"prediction_{surrogate_name}"] = []

# For now the DataFrame is built without a batch strategy because
# we need the metadata, labels and image path on an individual basis,
Expand All @@ -138,24 +177,30 @@ def get_df_for_scan(self, model: Any, dataset: Any, list_metadata: Sequence[str]
ground_truth = np.array([dataset.get_labels(i)]) # batch of 1 ground truth
metadata = dataset.get_meta(i)
metric_value = self.metric.get(model.prediction_result_cls(prediction), ground_truth) # expect batches
prediction_surrogate = (
self.surrogate_function(prediction, image) if self.surrogate_function is not None else prediction[0]
)
truth_surrogate = (
self.surrogate_function(ground_truth, image)
if self.surrogate_function is not None
else ground_truth[0]
)

for name_metadata in list_metadata:
try:
df[name_metadata].append(metadata.get(name_metadata))
except KeyError:
df[name_metadata].append(None)

for surrogate_name in self.surrogate_functions:

prediction_surrogate = (
self.surrogate_functions[surrogate_name](prediction, image)
if self.surrogate_functions[surrogate_name] is not None
else prediction[0]
)
truth_surrogate = (
self.surrogate_functions[surrogate_name](ground_truth, image)
if self.surrogate_functions[surrogate_name] is not None
else ground_truth[0]
)

df[f"target_{surrogate_name}"].append(truth_surrogate)
df[f"prediction_{surrogate_name}"].append(prediction_surrogate)

df["metric"].append(metric_value)
df["target"].append(truth_surrogate)
df["prediction"].append(prediction_surrogate)
df["index"].append(i)
except (KeyboardInterrupt, SystemExit):
raise
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from giskard_vision.core.detectors.metadata_scan_detector import MetaDataScanDetector
from giskard_vision.landmark_detection.detectors.surrogate_functions import (
nme_0,
relative_volume_convex_hull,
)
from giskard_vision.landmark_detection.tests.performance import NMEMean
Expand All @@ -9,7 +10,7 @@

@maybe_detector("metadata_landmark", tags=["vision", "face", "landmark", "metadata"])
class MetaDataScanDetectorLandmark(MetaDataScanDetector):
surrogate_function = relative_volume_convex_hull
surrogate_functions = {"relative_volume_convex_hull": relative_volume_convex_hull, "nme_0": nme_0}
metric = NMEMean
type_task = "regression"
metric_type = "relative"