Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions categorical_robustness/detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from typing import Optional, List


class CategoricalRobustnessDetector:
def __init__(
self,
model,
task: str = "classification",
regression_threshold: float = 0.1,
prob_threshold: Optional[float] = None,
n_jobs: int = -1,
):
self.model = model
self.task = task
self.regression_threshold = regression_threshold
self.prob_threshold = prob_threshold
self.n_jobs = n_jobs
self.categorical_columns = []
self.issues = []

self.has_predict_prob = hasattr(model, "predict_prob")

def fit(self, X: pd.DataFrame, categorical_columns: Optional[List[str]] = None):
if categorical_columns is not None:
self.categorical_columns = categorical_columns
else:
self.categorical_columns = X.select_dtypes(include=["object", "category"]).columns.tolist()

def scan(self, X: pd.DataFrame) -> List[dict]:
def process_row(idx, row):
row_issues = []
original_input = row.to_frame().T

try:
if self.task == "classification":
original_pred = self.model.predict(original_input)[0]
if self.has_predict_prob:
original_prob = self.model.predict_prob(original_input)[0]
original_conf = np.max(original_prob)
else:
original_conf = None
else:
original_pred = self.model.predict(original_input)[0]
except Exception as e:
return [{"row": idx, "error": str(e), "change_type": "prediction_failed"}]

for col in self.categorical_columns:
original_value = row[col]
unique_values = X[col].dropna().unique()

for val in unique_values:
if val == original_value:
continue

perturbed_input = original_input.copy()
perturbed_input[col] = val

try:
if self.task == "classification":
new_pred = self.model.predict(perturbed_input)[0]
if self.has_predict_prob:
new_prob = self.model.predict_prob(perturbed_input)[0]
new_conf = np.max(new_prob)
else:
new_conf = None

if new_pred != original_pred:
row_issues.append({
"row": idx,
"feature": col,
"from": original_value,
"to": val,
"original_pred": original_pred,
"new_pred": new_pred,
"original_conf": original_conf,
"new_conf": new_conf,
"confidence_drop": (original_conf - new_conf) if (original_conf is not None and new_conf is not None) else None,
"change_type": "label_changed"
})

elif self.prob_threshold is not None and original_conf is not None and new_conf is not None and abs(original_conf - new_conf) >= self.prob_threshold:
row_issues.append({
"row": idx,
"feature": col,
"from": original_value,
"to": val,
"original_pred": original_pred,
"new_pred": new_pred,
"original_conf": original_conf,
"new_conf": new_conf,
"confidence_drop": original_conf - new_conf,
"change_type": "confidence_shift"
})

else: # regression
new_pred = self.model.predict(perturbed_input)[0]
delta = abs(new_pred - original_pred)
if delta >= self.regression_threshold:
row_issues.append({
"row": idx,
"feature": col,
"from": original_value,
"to": val,
"original_pred": original_pred,
"new_pred": new_pred,
"delta": delta,
"change_type": "prediction_shift"
})
except Exception as e:
row_issues.append({
"row": idx,
"feature": col,
"from": original_value,
"to": val,
"error": str(e),
"change_type": "perturbation_failed"
})

return row_issues

results = Parallel(n_jobs=self.n_jobs)(
delayed(process_row)(idx, row) for idx, row in X.iterrows()
)

self.issues = [item for sublist in results for item in sublist]
return self.issues

def report(self) -> pd.DataFrame:
return pd.DataFrame(self.issues)

def summary(self) -> pd.DataFrame:
if not self.issues:
return pd.DataFrame()
df = pd.DataFrame(self.issues)
return df.groupby(["feature", "change_type"]).size().reset_index(name="count")
Empty file added list_extraction_qa.py
Empty file.
157 changes: 157 additions & 0 deletions numerical_robustness_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@


from giskard import Dataset, Model, Issue, IssueSeverity, IssueType
from typing import Optional, List, Tuple, Any
import numpy as np
import pandas as pd


class NumericalRobustnessScan:
"""
Giskard Scan that detects minimal numerical perturbations capable of
changing a model’s prediction or output significantly.
"""

def __init__(
self,
model: Model,
dataset: Dataset,
threshold: float = 0.1,
max_steps: int = 100,
verbose: bool = False
):
"""
Initialize the scan.

Args:
model (Model): Giskard Model object.
dataset (Dataset): Giskard Dataset object.
threshold (float): Threshold change for regression predictions.
max_steps (int): Steps between min/max to test perturbations.
verbose (bool): If True, prints progress during scan.
"""
self.model = model
self.dataset = dataset
self.threshold = threshold
self.max_steps = max_steps
self.verbose = verbose

self.is_classification = self.model.is_classifier
self.X = self.dataset.get_features_dataframe()
self.feature_names = self.dataset.feature_names
self.X_np = self.X.to_numpy()
self.feature_bounds = self._get_feature_bounds()

def _get_feature_bounds(self) -> List[Tuple[float, float]]:
"""Extract min/max bounds for each numerical feature."""
bounds = []
for feature in self.dataset.features:
if feature.feature_type == "numerical":
min_val = feature.min if feature.min is not None else self.X[feature.name].min()
max_val = feature.max if feature.max is not None else self.X[feature.name].max()
bounds.append((min_val, max_val))
else:
bounds.append((np.nan, np.nan))
return bounds

def _predict(self, sample: np.array) -> Any:
"""Run prediction for a single sample."""
return self.model.predict(sample.reshape(1, -1))[0]

def _build_issue(
self,
feature_index: int,
perturb_value: float,
original_pred: Any,
new_pred: Any,
sample_idx: int
) -> Issue:
"""Create a Giskard Issue object."""
feature_name = self.feature_names[feature_index]
description = (
f"Perturbing '{feature_name}' by {abs(perturb_value):.4f} in sample {sample_idx} "
f"changed prediction from {original_pred} to {new_pred}."
)
return Issue(
type=IssueType.ROBUSTNESS,
severity=IssueSeverity.MEDIUM,
description=description,
feature=feature_name,
sample_index=sample_idx,
)

def _scan_feature(self, feature_index: int) -> Optional[Issue]:
"""Scan a single feature for robustness issues."""
min_val, max_val = self.feature_bounds[feature_index]
if np.isnan(min_val) or np.isnan(max_val):
return None

step_size = (max_val - min_val) / self.max_steps

for sample_idx in range(len(self.X_np)):
original_sample = self.X_np[sample_idx].copy()
original_pred = self._predict(original_sample)

for step in range(1, self.max_steps + 1):
for direction in [+1, -1]:
perturb = direction * step * step_size
new_val = original_sample[feature_index] + perturb

if not (min_val <= new_val <= max_val):
continue

perturbed_sample = original_sample.copy()
perturbed_sample[feature_index] = new_val
new_pred = self._predict(perturbed_sample)

if self.is_classification and new_pred != original_pred:
return self._build_issue(feature_index, perturb, original_pred, new_pred, sample_idx)
elif not self.is_classification and abs(new_pred - original_pred) > self.threshold:
return self._build_issue(feature_index, perturb, original_pred, new_pred, sample_idx)

return None

def run_scan(self) -> List[Issue]:
"""Run the full robustness scan across all numerical features."""
issues = []
for feature_index, feature_name in enumerate(self.feature_names):
if self.verbose:
print(f"Scanning feature: {feature_name} ({feature_index})")
issue = self._scan_feature(feature_index)
if issue:
issues.append(issue)
if self.verbose:
print(f"✔ Issue found on '{feature_name}'")
elif self.verbose:
print(f"✘ No issue on '{feature_name}'")
return issues


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description="Run Numerical Robustness Scan.")
parser.add_argument("--model_path", required=True, help="Path to Giskard model file (YAML)")
parser.add_argument("--dataset_path", required=True, help="Path to Giskard dataset file (YAML)")
parser.add_argument("--threshold", type=float, default=0.1, help="Threshold for regression change")
parser.add_argument("--max_steps", type=int, default=100, help="Steps to scan perturbations")
parser.add_argument("--verbose", action="store_true", help="Enable verbose output")

args = parser.parse_args()

model = Model.load(args.model_path)
dataset = Dataset.load(args.dataset_path)

scan = NumericalRobustnessScan(
model=model,
dataset=dataset,
threshold=args.threshold,
max_steps=args.max_steps,
verbose=args.verbose
)

issues = scan.run_scan()

print(f"\nScan complete. Found {len(issues)} issue(s).")
for issue in issues:
print(f"- {issue.description}")