基于距离的离群点检测算法
异常检测简介
异常检测的实质是寻找观测值和参照值之间有意义的偏差。数据库中的数据由于各种原因常常会包含一些异常记录,对这些异常记录的检测和解释有很重要的意义。异常检测目前在入侵检测、金融欺诈、股票分析等领域都有着比较好的实际应用效果。
离群点检测是异常检测中最常用的方法之一。离群点检测的主要目的是为了检测出那些与正常数据行为或特征属性差别较大的异常数据或行为,在一些文献中,这些数据和行为又被叫做孤立点、噪音、异常点或离群点,这些叫法中离群点的叫法较为普遍。
离群点检测算法分类
目前主要的离群点检测技术包括:基于统计的离群检测方法、基于聚类的离群检测方法、基于分类的离群检测方法、基于距离的离群检测方法、基于密度的离群检测方法和基于信息熵的离群检测方法。
centos 7x
CentOS(Community Enterprise Operating System)是[Linux]发行版之一,它是来自于[Red Hat Enterprise Linux]依照[开放源代码]规定发布的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定性的服务器以CentOS替代商业版的[Red Hat Enterprise Linux]使用。两者的不同,在于CentOS并不包含封闭源代码软件。CentOS 对上游代码的主要修改是为了移除不能自由使用的商标。2014年,CentOS宣布与Red Hat合作,但CentOS将会在新的委员会下继续运作,并不受RHEL的影响。
CentOS和[RHEL]一样,都可以使用[Fedora EPEL]来补足软件。
python 3.6
Python(英国发音:[/ˈpaɪθən/] 美国发音:[/ˈpaɪθɑːn/],是一种广泛使用的[解释型],[高级编程],[通用型编程语言],由[吉多·范罗苏姆]创造,第一版发布于1991年。可以视之为一种改良(加入一些其他编程语言的优点,如面向对象)的[LISP]。Python的设计哲学强调代码的[可读性]和简洁的语法(尤其是使用[空格缩进]划分代码块,而非使用大括号或者关键词)。相比于C++或Java,Python让开发者能够用更少的代码表达想法。不管是小型还是大型程序,该语言都试图让程序的结构清晰明暸。
与[Scheme]、[Ruby]、[Perl]、[Tcl]等动态类型编程语言一样,Python拥有[动态类型系统]和[垃圾回收]功能,能够自动管理内存使用,并且支持多种编程范式,包括面向对象、命令式、函数式和过程式编程。其本身拥有一个巨大而广泛的标准库。
Python 本身几乎可以在所有的[操作系统]中运行。Python的其中一个[CPython]是用[C语言]编写的、是一个由社群驱动的自由[软件],当前由[Python软件基金会]管理
步骤1:环境准备
本项目所需要的依赖包(已安装)
matplotlib
在有网的情况下可以使用以下命令安装上述依包。
pip3 install matplotlib
LOF算法的python实现
lof.py:
# -*- coding: utf8 -*-from __future__ import divisiondef distance_euclidean(instance1, instance2):
"""
Computes the distance between two instances. Instances should be tuples of equal length.
Returns: Euclidean distance
Signature: ((attr_1_1, attr_1_2, ...), (attr_2_1, attr_2_2, ...)) -> float
"""
def detect_value_type(attribute):
"""
Detects the value type (number or non-number).
Returns: (value type, value casted as detected type)
Signature: value -> (str or float type, str or float value)
"""
from numbers import Number
attribute_type = None
if isinstance(attribute, Number):
attribute_type = float
attribute = float(attribute) else:
attribute_type = str
attribute = str(attribute) return attribute_type, attribute # check if instances are of same length
if len(instance1) != len(instance2): raise AttributeError("Instances have different number of arguments.") # init differences vector
differences = [0] * len(instance1) # compute difference for each attribute and store it to differences vector
for i, (attr1, attr2) in enumerate(zip(instance1, instance2)):
type1, attr1 = detect_value_type(attr1)
type2, attr2 = detect_value_type(attr2) # raise error is attributes are not of same data type.
if type1 != type2: raise AttributeError("Instances have different data types.") if type1 is float: # compute difference for float
differences = attr1 - attr2 else: # compute difference for string
if attr1 == attr2:
differences = 0
else:
differences = 1
# compute RMSE (root mean squared error)
rmse = (sum(map(lambda x: x ** 2, differences)) / len(differences)) ** 0.5
return rmseclass LOF:
"""
Helper class for performing LOF computations and instances normalization.
"""
def __init__(self, instances, normalize=True, distance_function=distance_euclidean):
self.instances = instances
self.normalize = normalize
self.distance_function = distance_function if normalize:
self.normalize_instances() def compute_instance_attribute_bounds(self):
min_values = [float("inf")] * len(self.instances[0]) # n.ones(len(self.instances[0])) * n.inf
max_values = [float("-inf")] * len(self.instances[0]) # n.ones(len(self.instances[0])) * -1 * n.inf
for instance in self.instances:
min_values = tuple(map(lambda x, y: min(x, y), min_values, instance)) # n.minimum(min_values, instance)
max_values = tuple(map(lambda x, y: max(x, y), max_values, instance)) # n.maximum(max_values, instance)
self.max_attribute_values = max_values
self.min_attribute_values = min_values def normalize_instances(self):
"""
Normalizes the instances and stores the infromation for rescaling new instances.
"""
if not hasattr(self, "max_attribute_values"):
self.compute_instance_attribute_bounds()
new_instances = [] for instance in self.instances:
new_instances.append(
self.normalize_instance(instance)) # (instance - min_values) / (max_values - min_values)
self.instances = new_instances def normalize_instance(self, instance):
return tuple(map(lambda value, max, min: (value - min) / (max - min) if max - min > 0 else 0,
instance, self.max_attribute_values, self.min_attribute_values)) def local_outlier_factor(self, min_pts, instance):
"""
The (local) outlier factor of instance captures the degree to which we call instance an outlier.
min_pts is a parameter that is specifying a minimum number of instances to consider for computing LOF value.
Returns: local outlier factor
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> float
"""
if self.normalize:
instance = self.normalize_instance(instance) return local_outlier_factor(min_pts, instance, self.instances, distance_function=self.distance_function)def k_distance(k, instance, instances, distance_function=distance_euclidean):
# TODO: implement caching
"""
Computes the k-distance of instance as defined in paper. It also gatheres the set of k-distance neighbours.
Returns: (k-distance, k-distance neighbours)
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> (float, ((attr_j_1, ...),(attr_k_1, ...), ...))
"""
distances = {} for instance2 in instances:
distance_value = distance_function(instance, instance2) if distance_value in distances:
distances[distance_value].append(instance2) else:
distances[distance_value] = [instance2]
distances = sorted(distances.items())
neighbours = []
k_sero = 0
k_dist = None
for dist in distances:
k_sero += len(dist[1])
neighbours.extend(dist[1])
k_dist = dist[0] if k_sero >= k: break
return k_dist, neighboursdef reachability_distance(k, instance1, instance2, instances, distance_function=distance_euclidean):
"""
The reachability distance of instance1 with respect to instance2.
Returns: reachability distance
Signature: (int, (attr_1_1, ...),(attr_2_1, ...)) -> float
"""
(k_distance_value, neighbours) = k_distance(k, instance2, instances, distance_function=distance_function) return max([k_distance_value, distance_function(instance1, instance2)])def local_reachability_density(min_pts, instance, instances, **kwargs):
"""
Local reachability density of instance is the inverse of the average reachability
distance based on the min_pts-nearest neighbors of instance.
Returns: local reachability density
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> float
"""
(k_distance_value, neighbours) = k_distance(min_pts, instance, instances, **kwargs)
reachability_distances_array = [0] * len(neighbours) # n.zeros(len(neighbours))
for i, neighbour in enumerate(neighbours):
reachability_distances_array = reachability_distance(min_pts, instance, neighbour, instances, **kwargs)
sum_reach_dist = sum(reachability_distances_array) if sum_reach_dist == 0: return float('inf') return len(neighbours) / sum_reach_distdef local_outlier_factor(min_pts, instance, instances, **kwargs):
"""
The (local) outlier factor of instance captures the degree to which we call instance an outlier.
min_pts is a parameter that is specifying a minimum number of instances to consider for computing LOF value.
Returns: local outlier factor
Signature: (int, (attr1, attr2, ...), ((attr_1_1, ...),(attr_2_1, ...), ...)) -> float
"""
(k_distance_value, neighbours) = k_distance(min_pts, instance, instances, **kwargs)
instance_lrd = local_reachability_density(min_pts, instance, instances, **kwargs)
lrd_ratios_array = [0] * len(neighbours) for i, neighbour in enumerate(neighbours):
instances_without_instance = set(instances)
instances_without_instance.discard(neighbour)
neighbour_lrd = local_reachability_density(min_pts, neighbour, instances_without_instance, **kwargs)
lrd_ratios_array = neighbour_lrd / instance_lrd return sum(lrd_ratios_array) / len(neighbours)def outliers(k, instances, **kwargs):
"""
Simple procedure to identify outliers in the dataset.
"""
instances_value_backup = instances
outliers = [] for i, instance in enumerate(instances_value_backup):
instances = list(instances_value_backup)
instances.remove(instance)
l = LOF(instances, **kwargs)
value = l.local_outlier_factor(k, instance) if value > 1:
outliers.append({"lof": value, "instance": instance, "index": i})
outliers.sort(key=lambda o: o["lof"], reverse=True) return outliers
测试代码
测试上述代码,测试脚本看附件!
