-
Notifications
You must be signed in to change notification settings - Fork 5
/
SPLL.py
108 lines (88 loc) · 3.52 KB
/
SPLL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
##############################################################################
# Python/NumPy/Sklearn implementation of SPLL Change Detection Algorithm (Kuncheva, 2013)
# Based on Matlab code from https://github.com/LucyKuncheva/Change-detection
# This code was translated from Matlab to Python with minimal changes. Code is distributed under
# original Matlab code license (GPL-3.0)
# Author: Haidar Khan
# Date: 10/23/18
# Description: Implements the change point detection algorithm proposed by (Kuncheva, 2013) using a
# window based approach.
# Usage: SPLL(W1,W2,K) tests if a change occurs between data windows W1 and W2 with K-Means clustering
# parameter K
import numpy as np
from sklearn.cluster import KMeans
from scipy.stats import chi2
EPS = np.finfo(float).eps
def SPLL(W1, W2, K=3):
n1 = W1.shape[1]
n2 = W2.shape[1]
assert n1==n2, "The number of features must be the same for W1 and W2"
n = n1
s1 = log_LL(W1,W2,K)
s2 = log_LL(W2,W1,K)
st = max(s1,s2)
pst = min(chi2.cdf(st,n), 1-chi2.cdf(st,n))
Change = float(pst < 0.05)
return Change, pst, st
def log_LL(W1,W2,K):
M1, n = W1.shape
M2, _ = W2.shape
kmeans = KMeans(n_clusters=K).fit(W1)
labels = kmeans.predict(W1)
means = kmeans.cluster_centers_
SC = np.zeros((n*n, K))
ClassPriors = np.zeros((K,1))
for k in range(K):
ClassIndex = np.nonzero(labels==k)[0]
ClassPriors[k] = len(ClassIndex)
if len(ClassIndex) <= 1:
sc = np.zeros((n,n))
else:
sc = np.cov(W1[labels==k,:].T)
SC[:,k] = sc.flatten()
ClassCount = ClassPriors
ClassPriors = ClassPriors/M1
scov = np.sum(SC*np.tile(ClassPriors, (1,n*n)).T,1)
# Assuming the same full covariance
# Pooled (weighted by the priors)
scov = np.reshape(scov, (n,n))
z = np.diag(scov)
indexvarzero = z < EPS # define eps #identify features with very low variability
if sum(indexvarzero) == 0: #if no such features
invscov = np.linalg.inv(scov) # invert the covariance matrix
else: # otherwise
z[indexvarzero] = np.min(z[not indexvarzero]) # set their variance to the minimum variance of other features
invscov = np.diag(1/z) # invert the diagonal matrix only
# Calculate the MATCH from Window 2
LogLikelihoodTerm = np.zeros((M2))
for j in range(M2):
xx = W2[j,:]
DistanceToMeans = np.zeros((K))
for ii in range(K):
if ClassCount[ii] > 0:
DistanceToMeans[ii] = np.dot(np.dot(means[ii,:] - xx , invscov), means[ii,:]-xx)
else:
DistanceToMeans[ii] = np.inf
LogLikelihoodTerm[j] = np.min(DistanceToMeans)
# scale
st = np.mean(LogLikelihoodTerm)
return st
def test_SPLL():
window_size = 100
num_features = 10
change1 = np.zeros((10))
pvalue1 = np.zeros((10))
spllstat1 = np.zeros((10))
change2 = np.zeros((10))
pvalue2 = np.zeros((10))
spllstat2 = np.zeros((10))
for i in range(10):
W1 = np.random.randn(window_size, num_features)
W2 = np.random.randn(window_size, num_features)
W3 = np.random.randn(window_size, num_features)+2
change1[i], pvalue1[i], spllstat1[i] = SPLL(W1, W2)
change2[i], pvalue2[i], spllstat2[i] = SPLL(W1, W3)
print "No change test: change: %d pval: %f stat: %f" %(change1[i], pvalue1[i], spllstat1[i])
print "Change test: change: %d pval: %f stat: %f" %(change2[i], pvalue2[i], spllstat2[i])
if __name__ == '__main__':
test_SPLL()