#----------------------------------------------------------------------------- # Name: plotter.py # Purpose: # # Author: Ray Schumacher # # Created: 2015/04/22 # RCS-ID: $Id: plotter.py $ # Copyright: (c) 2015 # Licence: #----------------------------------------------------------------------------- """ """ import numpy import time #------------ stand-alone test -------------------------------------------------- def intf(a, fs, f_lo=0.0, f_hi=1.0e12, times=1, winlen=1, unwin=False): """ Numerically integrate a time series in the frequency domain. This function integrates a time series in the frequency domain using 'Omega Arithmetic', over a defined frequency band. Parameters ---------- a : array_like Inumpyut time series. fs : int Sampling rate (Hz) of the inumpyut time series. f_lo : float, optional Lower frequency bound over which integration takes place. Defaults to 0 Hz. f_hi : float, optional Upper frequency bound over which integration takes place. Defaults to the Nyquist frequency ( = fs / 2). times : int, optional Number of times to integrate inumpyut time series a. Can be either 0, 1 or 2. If 0 is used, function effectively applies a 'brick wall' frequency domain filter to a. Defaults to 1. winlen : int, optional Number of seconds at the beginning and end of a file to apply half a Hanning window to. Limited to half the record length. Defaults to 1 second. unwin : Boolean, optional Whether or not to remove the window applied to the inumpyut time series from the output time series. Returns ------- out : complex ndarray The zero-, single- or double-integrated acceleration time series. Versions ---------- 1.1 First development version. Uses rfft to avoid complex return values. Checks for even length time series; if not, end-pad with single zero. 1.2 Zero-means time series to avoid spurious errors when applying Hanning window. """ t0 = time.clock() EPS = numpy.finfo(float).eps *2**8 a = a - a.mean() # Convert time series to zero-mean if numpy.mod(a.size,2) != 0: # Check for even length time series odd = True a = numpy.append(a, 0) # If not, append zero to array else: odd = False f_hi = min(fs/2, f_hi) # Upper frequency limited to Nyquist winlen = min(a.size/2, winlen) # Limit window to half record length ni = a.size # No. of points in data (int) nf = float(ni) # No. of points in data (float) fs = float(fs) # Sampling rate (Hz) df = fs/nf # Frequency increment in FFT stf_i = int(f_lo/df) # Index of lower frequency bound enf_i = int(f_hi/df) # Index of upper frequency bound window = numpy.ones(ni) # Create window function es = int(winlen*fs) # No. of samples to window from ends edge_win = numpy.hanning(es) # Hanning window edge window[:es/2] = edge_win[:es/2] window[-es/2:] = edge_win[-es/2:] a_w = a*window FFTspec_a = numpy.fft.rfft(a_w) # Calculate complex FFT of inumpyut FFTfreq = numpy.fft.fftfreq(ni, d=1/fs)[:ni/2+1] w = (2*numpy.pi*FFTfreq) # Omega iw = (0+1j)*w # i*Omega mask = numpy.zeros(ni/2+1) # Half-length mask for +ve freqs mask[stf_i:enf_i] = 1.0 # Mask = 1 for desired +ve freqs if times == 2: # Double integration FFTspec = -FFTspec_a*w / (w+EPS)**3 elif times == 1: # Single integration FFTspec = FFTspec_a*iw / (iw+EPS)**2 elif times == 0: # No integration FFTspec = FFTspec_a else: print 'Error' FFTspec *= mask # Select frequencies to use out_w = numpy.fft.irfft(FFTspec) # Return to time domain if unwin == True: out = out_w*window/(window+EPS)**2 # Remove window from time series else: out = out_w print 'elapsed', time.clock()-t0 if odd == True: # Check for even length time series return out[:-1] # If not, remove last entry else: return out def direct_diff(x,k=1,period=None): fx = numpy.fft.fft(x) n = len (fx) if period is None: period = 2*numpy.pi w = numpy.fft.fftfreq(n)*2j*numpy.pi/period*n if k<0: with numpy.errstate(divide="ignore", invalid="ignore"): w = 1 / w**k w[0] = 0.0 else: w = w**k if n>2000: w[250:n-250] = 0.0 return numpy.fft.ifft(w*fx).real def dc_int(sine, sample_rate): # fourier transform ft = numpy.fft.rfft(sine) N = len(sine) # bin frequencies bin_width = float(sample_rate) / N bin_freqs = numpy.arange(N//2 + 1) * bin_width # include extra frequency for Nyquist bin # OK, don't divide by zero, and the Nyquist bin actually has a negative frequency bin_freqs[0] = bin_freqs[1] bin_freqs[-1] *= -1 # integrate! integ_ft = ft / (2j*numpy.pi*bin_freqs) integ_ft[0] = 0 # zero out the DC bin # inverse rfft return numpy.fft.irfft(integ_ft) def int2(ys): N = ys.shape[0] w = (numpy.arange(N) - N /2.) / float(N) # integration Fys = numpy.fft.fft(ys) with numpy.errstate(divide="ignore", invalid="ignore"): modFys = numpy.fft.ifftshift(1./ (2 * numpy.pi * 1j * w) * numpy.fft.fftshift(Fys)) # modFys[0] will hold the result of dividing the DC component of y by 0, so it # will be nan or inf. Setting modFys[0] to 0 amounts to choosing a specific # constant of integration. modFys[0] = 0 return numpy.fft.ifft(modFys).real / float(N) def test(pth=r'C:\Users\Ray\Dropbox (Jan Medical)\SD Datasets\Normals\Other Subject Data\Data from JH Unit\001_2008Sep12_142457.csv'): """ run on a sample CSV file b: blue g: green r: red c: cyan m: magenta y: yellow k: black w: white """ import os.path import matplotlib.pyplot as plt from scipy.fftpack import diff from mpl_toolkits.axes_grid1 import host_subplot import mpl_toolkits.axisartist as AA length = 2.**14 fh=open(pth,'r') ## for this sample data sampleRate=1024 x = numpy.arange(length)*2*numpy.pi/length f = numpy.sin(x)*numpy.cos(4*x) plt.plot(f, label="data") intg_data = diff(f,-1) plt.plot(intg_data-.01, label="scipy diff - .01") intg_data1 = direct_diff(f, -1, period=-1) adj0 = intg_data.max()/intg_data1.max() plt.plot(intg_data1*adj0, label="direct_diff, adj:"+str(adj0)) # diverges wildly... intg_data3 = dc_int(f, sampleRate) adjdc = intg_data.max()/intg_data3.max() intg_data3 *= adjdc plt.plot(intg_data3+.01, label="int dc +.01, adj:"+str(adjdc)) int2_res = int2(f) adj = intg_data.max()/int2_res.max() int2_res *= adj plt.plot(int2_res, label="int2 adj:"+str(adj)) plt.legend() plt.draw() plt.show() intg_data = diff(f,-2) plt.plot(intg_data-.01, label="scipy diff - .01") intg_data1 = direct_diff(f, -2, period=1) adj0 = intg_data.max()/intg_data1.max() plt.plot(intg_data1*adj0, label="direct_diff, adj:"+str(adj0)) # diverges wildly... intg_data3 = dc_int(dc_int(f, sampleRate), sampleRate) adjdc = intg_data.max()/intg_data3.max() intg_data3 *= adjdc plt.plot(intg_data3+.01, label="int dc +.01, adj:"+str(adjdc)) int2_res = int2(int2(f)) adj = intg_data.max()/int2_res.max() int2_res *= adj plt.plot(int2_res, label="int2 adj:"+str(adj)) plt.legend() plt.draw() plt.show() if __name__ == '__main__': import sys if len(sys.argv)>1: test(sys.argv[1]) else: test()