Source code for local2global_embedding.outliers

#  Copyright (c) 2021. Lucas G. S. Jeub
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#  SOFTWARE.


import numpy as np
from collections.abc import Iterable

from tqdm.auto import tqdm
from sklearn.neighbors import LocalOutlierFactor
from sklearn.ensemble import IsolationForest

from local2global import AlignmentProblem
from local2global.utils import local_error
from local2global.patch import MeanAggregatorPatch


[docs] def all_errors(prob: AlignmentProblem): """ Compute un-normalised alignment errors for all patches Args: prob: Alignment problem Returns: errors """ errors = np.full((prob.n_nodes, prob.n_patches), float('nan')) reference = prob.mean_embedding() for i, p in enumerate(prob.patches): errors[p.nodes, i] = local_error(p, reference) return errors
[docs] def sliding_window_errors(prob: AlignmentProblem, window=7): errors = np.full((prob.n_nodes, prob.n_patches-window+1), float('nan')) for i in range(prob.n_patches-window+1): p = prob.patches[i+window-1] reference = np.full((prob.n_nodes, prob.dim), float('nan')) agg = MeanAggregatorPatch(prob.patches[i:i+window]) reference[agg.nodes] = agg.coordinates errors[p.nodes, i] = local_error(p, reference) return errors
[docs] def nan_z_score(errors, axis=1): m_errors = np.nanmean(errors, axis=axis, keepdims=True) std_errors = np.nanstd(errors, axis=axis, keepdims=True) return (errors-m_errors) / (std_errors + 1e-16)
[docs] def bootstrap_nan_z_score(errors, repeats=100, random_state=None): rg = np.random.default_rng(random_state) samples = rg.integers(errors.shape[1], size=(errors.shape[1], repeats)) b_errors = errors[:, samples] m_errors = np.nanmedian(np.nanmean(b_errors, axis=1, keepdims=True), axis=2) std_errors = np.nanmedian(np.nanstd(b_errors, axis=1, keepdims=True), axis=2) return (errors - m_errors) / (std_errors + 1e-16)
[docs] def leave_out_nan_z_score(errors): z_errors = np.full(errors.shape, float('nan')) for i in range(errors.shape[1]): mask = np.ones(errors.shape[1], dtype=bool) mask[i] = False m_e = np.nanmean(errors[:, mask], axis=1) s_e = np.nanstd(errors[:, mask], axis=1) z_errors[:, i] = (errors[:, i] - m_e) / (s_e + 1e-16) return z_errors
[docs] def leave_out_nan_rms_score(errors): z_errors = np.full(errors.shape, float('nan')) for i in range(errors.shape[1]): mask = np.ones(errors.shape[1], dtype=bool) mask[i] = False m_e = np.sqrt(np.nanmean((errors[:, mask]**2), axis=1)) z_errors[:, i] = (errors[:, i]) / (m_e + 1e-16) return z_errors
[docs] def z_score_errors(prob: AlignmentProblem): """ Compute z-score alignment errors using centroid over all patches as reference Args: prob: AlignmentProblem Returns: array of z-score errors Notes This function does not align the patches before computing the reference. """ errors = all_errors(prob) return nan_z_score(errors, axis=1)
[docs] def leave_out_z_score_errors(prob: AlignmentProblem): errors = all_errors(prob) return leave_out_nan_z_score(errors)
[docs] def z_score_boost_errors(prob: AlignmentProblem, threshold=4, positive_only=True): """ Boosted z-score errors Args: prob: Alignment problem threshold: boosting threshold Returns: boosted z-score errors This function first computes normal z-scores. It then uses `threshold` to identify outliers. If `positive_only=True` (the default), only positive z-scores are considered as outliers. The identified outliers are removed when computing the means and standard deviations and the z-scores are recomputed based on the outlier-corrected mean and standard deviation. """ errors = all_errors(prob) z_errors = nan_z_score(errors, axis=1) if positive_only: outliers = z_errors > threshold else: outliers = np.abs(z_errors) > threshold in_errors = errors.copy() in_errors[outliers] = float('nan') m_errors = np.nanmean(in_errors, axis=1, keepdims=True) std_errors = np.nanstd(in_errors, axis=1, keepdims=True) return (errors-m_errors) / (std_errors + 1e-16)
[docs] def quantile_errors(errors, quantile=0.75): """ errors scaled by provided quantile of the distribution """ if isinstance(quantile, Iterable): q = np.nanquantile(errors, max(quantile), axis=1, keepdims=True) - np.nanquantile(errors, min(quantile), axis=1, keepdims=True) else: q = np.nanquantile(errors, quantile, axis=1, keepdims=True) m = np.nanmedian(errors, axis=1, keepdims=True) return (errors-m) / q
[docs] def LOF_error(prob, min_points=21, use_min=False): if isinstance(min_points, Iterable): lof = [LocalOutlierFactor(n_neighbors=n) for n in min_points] min_points = max(min_points) else: lof = [LocalOutlierFactor(n_neighbors=min_points)] out = np.full((prob.n_nodes, prob.n_patches), np.nan) for i, pids in tqdm(enumerate(prob.patch_index), total=prob.n_nodes): if len(pids) > min_points: points = np.array([prob.patches[pid].get_coordinate(i) for pid in pids]) if use_min: out[i, pids] = -1 - np.max([l.fit(points).negative_outlier_factor_ for l in lof], axis=0) else: out[i, pids] = -1 - np.min([l.fit(points).negative_outlier_factor_ for l in lof], axis=0) return out
[docs] def iForest_error(prob): out = np.full((prob.n_nodes, prob.n_patches), np.nan) iforest = IsolationForest() for i, pids in tqdm(enumerate(prob.patch_index), total=prob.n_nodes): if pids: points = np.array([prob.patches[pid].get_coordinate(i) for pid in pids]) iforest.fit(points) out[i, pids] = -iforest.score_samples(points) return out
[docs] def plt_score(score, active, score_range=None): import matplotlib.pyplot as plt if score_range is not None: if score_range.shape[0] == 4: err = score_range[1:3] plt.plot(score_range[0], 'k^', markersize=1) plt.plot(score_range[-1], 'kv', markersize=1) else: err = score_range err[0] = score - err[0] err[1] -= score plt.errorbar(np.arange(len(score)), score, fmt='none', yerr=err, linewidth=0.1, capsize=2, capthick=0.1, ecolor='k') plt.plot(score, '.-k', linewidth=0.1) plt.plot(active, score[active], '.r', label='red-team active') plt.legend()