实时异常检测(xStream, CMU, KDD'18)

xStream 代码

主要分析xStream 的python实现

数据集分布

数据共有6个类: * 类别0:范围0000-1000,稀疏正常类 * 类别1:范围1000-3000,密集正常类 * 类别2:范围3000-3050,聚集性的异常 * 类别3:范围3050-3075,稀疏的异常 * 类别4:范围3076-3082,局部异常类 * 类别5:范围3075-3076,单点异常

class StreamhashProjection

import math
import numpy as np
import random
import mmh3


class StreamhashProjection:
def __init__(self, n_components, density=1 / 3.0, random_state=None):
"""数据流特征的哈希投影

Args:
n_components (int): 特征的个数
density (float, optional): 密度的阈值. Defaults to 1/3.0.
random_state (int, optional): 随机数的种子. Defaults to None.
"""
# 生成与特征数量相等数量的key作为哈希函数族的index,这个index也对应mmh3哈希族函数中的seed
self.keys = np.arange(0, n_components, 1)
self.constant = np.sqrt(1.0 / density) / np.sqrt(
n_components
) # 这个对应公式里面的 \sqrt{3/K}
self.density = density
self.n_components = n_components
random.seed(random_state)

def fit_transform(self, X, feature_names=None):
nsamples = X.shape[0]
ndim = X.shape[1]
if feature_names is None:
feature_names = [str(i) for i in range(ndim)]

# 对哈希族中的每一个哈希函数都进行全特征的一个哈希处理,结果为一个k*f的矩阵,对应r[i]的分布概率,实现降维
R = np.array(
[[self._hash_string(k, f) for f in feature_names] for k in self.keys]
)
# check if density matches
# print "R", np.sum(R.ravel() == 0)/float(len(R.ravel()))

Y = np.dot(X, R.T) # Y是那些被选中特征下的数据
return Y

def transform(self, X, feature_names=None):
return self.fit_transform(X, feature_names)

def _hash_string(self, k, s):

hash_value = int(mmh3.hash(s, signed=False, seed=k)) / (2.0 ** 32 - 1)
s = self.density
# 根据hash结果,只保留3/1的特征,对应论文streamhash部分中的分段函数公式
if hash_value <= s / 2.0:
return -1 * self.constant
elif hash_value <= s:
return self.constant
else:
return 0

class Chains

chians:所有链的初始化


#!/usr/bin/env python

import itertools
from StreamhashProjection import StreamhashProjection
import numpy as np
import tqdm

tqdm.tqdm.monitor_interval = 0


class Chain:
def __init__(self, deltamax, depth=25):
"""单条链的初始化

Args:
deltamax (int): 特征范围
depth (int, optional): 层级. Defaults to 25.
"""
k = len(deltamax)
self.deltamax = deltamax # feature ranges
self.depth = depth
self.fs = [
np.random.randint(0, k) for d in range(depth)
] # 随机选择一个特征,应该是feature select的缩写
self.cmsketches = [None] * depth # 这个是 count-min-sketch算法的变量
self.shift = np.random.rand(k) * deltamax # 随机偏移量

def fit(self, X, verbose=False, update=False):
prebins = np.zeros(X.shape, dtype=np.float)
depthcount = np.zeros(len(self.deltamax), dtype=np.int)
for depth in range(self.depth):
f = self.fs[depth] # 选择要划分的特征
depthcount[f] += 1 # 统计该特征是第几次被选择

if depthcount[f] == 1: # 第一次被选择的处理
prebins[:, f] = (X[:, f] + self.shift[f]) / self.deltamax[f]
else:
prebins[:, f] = (
2.0 * prebins[:, f] - self.shift[f] / self.deltamax[f]
) # 这个看着和原始公式表示方法不太一样,但实际是拆分后的一种迭代更新方式

if update:
cmsketch = self.cmsketches[depth]
else:
cmsketch = {}
for prebin in prebins: # 遍历每个组的特征,将其统计值通过count-min-sketch保存起来
l = tuple(np.floor(prebin).astype(np.int))
if not l in cmsketch:
cmsketch[l] = 0
cmsketch[l] += 1
self.cmsketches[depth] = cmsketch
return self

def bincount(self, X):
"""计算每个层级下,对应特征(保存至组内)的统计次数

Args:
X ([type]): 数据集

Returns:
[type]: 每个被选择的特征的次数统计
"""
scores = np.zeros((X.shape[0], self.depth))
prebins = np.zeros(X.shape, dtype=np.float)
depthcount = np.zeros(len(self.deltamax), dtype=np.int)
for depth in range(self.depth):
f = self.fs[depth]
depthcount[f] += 1

if depthcount[f] == 1:
prebins[:, f] = (X[:, f] + self.shift[f]) / self.deltamax[f]
else:
prebins[:, f] = 2.0 * prebins[:, f] - self.shift[f] / self.deltamax[f]

cmsketch = self.cmsketches[depth]
for i, prebin in enumerate(prebins):
l = tuple(np.floor(prebin).astype(np.int))
if not l in cmsketch:
scores[i, depth] = 0.0
else:
scores[i, depth] = cmsketch[l]

return scores

def score(self, X, adjusted=False):
# scale score logarithmically to avoid overflow:
# score = min_d [ log2(bincount x 2^d) = log2(bincount) + d ]
scores = self.bincount(X)
depths = np.array([d for d in range(1, self.depth + 1)])
scores = np.log2(1.0 + scores) + depths # add 1 to avoid log(0)
return np.min(scores, axis=1)


class Chains:
def __init__(self, k=50, nchains=100, depth=25, seed=42):
"""所有链的初始化

Args:
k (int, optional): 最大特征个数. Defaults to 50.
nchains (int, optional): 链的个数. Defaults to 100.
depth (int, optional): 深度,对应论文中的level. Defaults to 25.
seed (int, optional): 随机数种子. Defaults to 42.
"""
self.nchains = nchains
self.depth = depth
self.chains = []
self.projector = StreamhashProjection(
n_components=k, density=1 / 3.0, random_state=seed
)

def fit(self, X):
"""fit函数

Args:
X ([type]): 数据集
"""
projected_X = self.projector.fit_transform(X)
deltamax = np.ptp(projected_X, axis=0) / 2.0 # 组(bin)的宽度,也是取了一半作为初始化的值
deltamax[deltamax == 0] = 1.0 # 为了保证除法分母不为0
for i in tqdm.tqdm(range(self.nchains), desc="Fitting..."):
c = Chain(deltamax, depth=self.depth)
c.fit(projected_X)
self.chains.append(c)

def score(self, X, adjusted=False):
"""score函数

Args:
X ([type]): 数据集
adjusted (bool, optional): 这部分代码没有用到这个变量. Defaults to False.

Returns:
[type]: 打分结果
"""
projected_X = self.projector.transform(X)
scores = np.zeros(X.shape[0])
for i in tqdm.tqdm(range(self.nchains), desc="Scoring..."):
chain = self.chains[i]
scores += chain.score(projected_X, adjusted)
scores /= float(self.nchains) # 最后的输出结果是所有链的评分的均值
return scores