Source code for calpy.rqa.rqa

import numpy, math
#from .. import utilities

[docs]class phase_space(object): """Phase space class. """ def __init__(self, xs, tau=1, m=2, eps=.001): self.tau, self.m, self.eps = tau, m, eps N = int(len(xs)-m*tau+tau) self.matrix = numpy.empty([N,m],dtype=float) for i in range(N): self.matrix[i,:] = xs[i:i+1+int(m*tau-tau):tau] self.recurrence_matrix = None return None def __repr__(self): return "phase_space()" def __str__(self): return "{} with shape {} and (tau, m, eps) = ({}, {}, {})".format(type(self.matrix), self.matrix.shape, self.tau, self.m, self.eps) def __hash__(self): return id(self) def __eq__(self, other): return id(self) == id(other)
def _Theta(x, y, eps): """Theta tmp Args: x: y: eps: Returns: int: 0 or 1. """ sm = 0 for k in range(len(x)): sm += (x[k]-y[k])**2 if sm > eps: return 0 return 1 _recurrence_matrix_cache = dict()
[docs]def recurrence_matrix(xps, yps=None, joint=False): """Computes cross-reccurence matrix when two inputs are given and self-reccurence otherwise. Args: xps (numpy.array): Phase_space object(s). yps (numpy.array, optional): Phase_space object for cross reccurence. Defaults to none. joint (bool, optional): Should joint reccurence be calculated? Defaults to False. Returns: numpy.array : A 2D numpy matrix. """ if not yps: yps, cross = xps, False else: cross = True if (xps,yps,joint) in _recurrence_matrix_cache: return _recurrence_matrix_cache[xps, yps, joint] if (xps.matrix.shape, xps.tau, xps.m, xps.eps) != (yps.matrix.shape, yps.tau, yps.m, yps.eps): print("Error: Input phase spaces have different parameters.") return if joint: return numpy.multiply( recurrence_matrix(xps), recurrence_matrix(yps) ) BB, AA, tau, m, eps = yps.matrix, xps.matrix, xps.tau, xps.m, xps.eps N = AA.shape[0] ans = numpy.full([N, N],0) for i in range(N): for j in range(N if cross else i+1): #ans[i][j] = _Theta( AA[i], BB[j], eps) ans[i][j] = numpy.linalg.norm(AA[i]-BB[j]) _recurrence_matrix_cache[xps,yps,joint] = ans return _recurrence_matrix_cache[xps, yps, joint]
[docs]def cross_recurrence_matrix( xps, yps ): """Cross reccurence matrix. Args: xps (numpy.array): yps (numpy.array): Returns: numpy.array : A 2D numpy array. """ return recurrence_matrix( xps, yps )
[docs]def joint_recurrence_matrix( xps, yps ): """Joint reccurence matrix. Args: xps (numpy.array): yps (numpy.array): Returns: numpy.array : A 2D numpy array. """ return recurrence_matrix( xps, yps, joint=True )
[docs]def recurrence_rate( AA ): """Computes reccurence-rate from reccurence matrix. Args: AA (numpy.array): A reccurence matrix. Returns: numpy.array : A numpy array. """ isLower = utilities.is_lower_triangular(AA) N = AA.shape[0] ans = numpy.zeros( N, dtype=float ) for k in range(1,N): tmp = numpy.sum(AA[:k,:k]) ans[k] += tmp for i in range(1, N-k): if isLower: tmp += numpy.sum(AA[i+k-1,i:i+k]) - numpy.sum(AA[i-1:i-1+k,i-1]) else: tmp += numpy.sum( AA[i+k-1, i:i+k] ) \ + numpy.sum( AA[i:i+k-1, i+k-1] ) \ - numpy.sum( AA[i-1:i-1+k, i-1] ) \ - numpy.sum( AA[i-1, i:i-1+k] ) ans[k] += tmp ans[k] /= 0.5*(N-k)*k**2 if isLower else (N-k)*k**2 return ans
_measures_cache = dict()
[docs]def determinism( AA ): """Calculates percentage of recurrence points which form diagonal lines. Args: AA (numpy.array): A reccurence matrix. Returns: float: The determinism. """ if (id(AA),"determinism") in _measures_cache: return _measures_cache[id(AA),"determinism"] isLower = utilities.is_lower_triangular(AA) N = AA.shape[0] H = dict() for key in range(N): H[key] = 0 def lower_DET(x): for i in range(1, N): isPrev = False count = 0 for j in range(i, N): #search for consective lines in AA[idx1,idx1-idx] if x[j, j-i]: if isPrev: count += 1 else: count = 1 isPrev = True elif isPrev: isPrev = False H[count] += 1 if count > 1 else 0 count = 0 H[count] += 1 if count>1 else 0 return lower_DET(AA) if not isLower: lower_DET(numpy.transpose(AA)) num, avg, max_L = 0, 0, 0 for key, val in H.items(): max_L = key if val else max_L num += key*val avg += val dem = numpy.sum(AA) ENTR = 0 if avg: for key, val in H.items(): p = val/avg ENTR -= p*math.log(p) if p else 0 PRED = num/avg else: ENTR = None PRED = 0 DIV = 1/max_L if max_L else float('inf') _measures_cache[id(AA),"determinism"] = num/dem _measures_cache[id(AA),"pred"] = PRED _measures_cache[id(AA),"divergence"] = DIV _measures_cache[id(AA),"entropy"] = ENTR return _measures_cache[id(AA),"determinism"]
[docs]def divergence( AA ): """Divergence Args: AA (numpy.array): A numpy array. Returns: numpy.array: The answer. """ if (id(AA),"divergence") not in _measures_cache: determinism(AA) return _measures_cache[id(AA),"divergence"]
[docs]def entropy( AA ): """Entropy Args: AA (numpy.array): A numpy array. Returns: numpy.array: The answer. """ if (id(AA),"entropy") not in _measures_cache: determinism(AA) return _measures_cache[id(AA),"entropy"]
[docs]def pred( AA ): """Pred Args: AA (numpy.array): A numpy array. Returns: numpy.array: The answer. """ if (id(AA),"pred") not in _measures_cache: determinism(AA) return _measures_cache[id(AA),"pred"]
[docs]def trend( AA, longterm=False ): """Calculate the TREND of a give 1d numpy array R. Args: AA (numpy.array(float)): A 2D matrix. longterm (bool, optional): Should long-term trend be calculate? Defaults to False. Returns: float: The medium and long range trends a float tuple (Med, Long) """ N = AA.shape[0] R_med = R[:N//2] - np.mean(R[:N//2]) R_long = R[:-1] - np.mean(R[:-1]) coef = np.array([i - N//4 +1 for i in range(N//2)]) Med = np.dot(coef, R_med)/np.dot(coef, coef) coef = np.array([i - N//2 +1 for i in range(N-1)]) Long = np.dot(coef, R_long)/np.dot(coef, coef) return Long if longterm else Med
[docs]def laminarity( AA ): #+ Trapping """Laminarity. Calculates percentage of recurrence points which form verticle lines. This function calculates Trapping as a side effect. Args: AA (numpy.array(float)): A 2D matrix. Returns: float: The laminarity """ N = AA.shape[0] H = dict() for key in range(N): H[key] = 0 #Lower Lam for j in range(N): isPrev, count = False, 0 for i in range(j+1, N): #search for consecutive lines in M[i, j] if AA[i, j]: if isPrev: count += 1 else: isPrev, count = True, 1 elif isPrev: H[count] += 1 if count > 1 else 0 isPrev, count = False, 0 H[count] += 1 if count > 1 else 0 #Upper Lam if not utilities.is_lower_triangular(AA): for j in range(N): isPrev, count = False, 0 for i in range(j): #search for consecutive lines in M[idx1, idx] if AA[i,j]: if isPrev: count += 1 else: isPrev, count = True, 1 elif isPrev: H[count] += 1 if count > 1 else 0 isPrev, count = False, 0 H[count] += 1 if count > 1 else 0 num, avg= 0, 0 for key, val in H.items(): avg += val num += key*val dem = num + numpy.sum(AA) LAMI = num/dem TRAP = num/avg if avg else 0 _measures_cache[id(AA),"laminarity"] = LAMI _measures_cache[id(AA),"trapping"] = TRAP return _measures_cache[id(AA),"laminarity"]
[docs]def trapping( AA ): """Trapping. Calculates ... This function calculates Laminiarity as a side effect. Args: AA (numpy.array(float)): A 2D matrix. Returns: float: The trapping """ if (id(AA),"trapping") not in _measures_cache: return laminarity(AA) return _measures_cache[id(AA),"trapping"]