[Numpy-discussion] scipy.fftpack.diff operation question
R Schumacher
rays at blue-cove.com
Tue Apr 28 16:38:58 EDT 2015
We are looking to plot to time series accelerometer data as velocity
and displacement.
To this end we tried scipy.fftpack.diff, but in looking at the test
code direct_diff() we get odd results, and, why is the doc using
"sqrt(-1)*j" in its explanation?
So, I tried a few different integration methods.
We were first looking at the doc at
http://docs.scipy.org/doc/scipy-0.15.1/reference/generated/scipy.fftpack.diff.html
and code at
https://github.com/scipy/scipy/blob/v0.15.1/scipy/fftpack/pseudo_diffs.py#L26
as well as the tests at
https://github.com/scipy/scipy/blob/master/benchmarks/benchmarks/fftpack_pseudo_diffs.py
The direct_diff() test function in bench_pseudo_diffs.py seems odd,
since I have to pass period=-1 to get a matching sign plot for the
first integration.
And why the scaling differences required, even for diff() and direct_diff()?
Is my understanding fundamentally flawed?
Emacs!
Ray
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/numpy-discussion/attachments/20150428/821d6519/attachment.html>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: 2a6b1958.jpg
Type: image/jpeg
Size: 362369 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/numpy-discussion/attachments/20150428/821d6519/attachment.jpg>
-------------- next part --------------
#-----------------------------------------------------------------------------
# Name: plotter.py
# Purpose:
#
# Author: Ray Schumacher
#
# Created: 2015/04/22
# RCS-ID: $Id: plotter.py $
# Copyright: (c) 2015
# Licence:
#-----------------------------------------------------------------------------
"""
"""
import numpy
import time
#------------ stand-alone test --------------------------------------------------
def intf(a, fs, f_lo=0.0, f_hi=1.0e12, times=1, winlen=1, unwin=False):
"""
Numerically integrate a time series in the frequency domain.
This function integrates a time series in the frequency domain using
'Omega Arithmetic', over a defined frequency band.
Parameters
----------
a : array_like
Inumpyut time series.
fs : int
Sampling rate (Hz) of the inumpyut time series.
f_lo : float, optional
Lower frequency bound over which integration takes place.
Defaults to 0 Hz.
f_hi : float, optional
Upper frequency bound over which integration takes place.
Defaults to the Nyquist frequency ( = fs / 2).
times : int, optional
Number of times to integrate inumpyut time series a. Can be either
0, 1 or 2. If 0 is used, function effectively applies a 'brick wall'
frequency domain filter to a.
Defaults to 1.
winlen : int, optional
Number of seconds at the beginning and end of a file to apply half a
Hanning window to. Limited to half the record length.
Defaults to 1 second.
unwin : Boolean, optional
Whether or not to remove the window applied to the inumpyut time series
from the output time series.
Returns
-------
out : complex ndarray
The zero-, single- or double-integrated acceleration time series.
Versions
----------
1.1 First development version.
Uses rfft to avoid complex return values.
Checks for even length time series; if not, end-pad with single zero.
1.2 Zero-means time series to avoid spurious errors when applying Hanning
window.
"""
t0 = time.clock()
EPS = numpy.finfo(float).eps *2**8
a = a - a.mean() # Convert time series to zero-mean
if numpy.mod(a.size,2) != 0: # Check for even length time series
odd = True
a = numpy.append(a, 0) # If not, append zero to array
else:
odd = False
f_hi = min(fs/2, f_hi) # Upper frequency limited to Nyquist
winlen = min(a.size/2, winlen) # Limit window to half record length
ni = a.size # No. of points in data (int)
nf = float(ni) # No. of points in data (float)
fs = float(fs) # Sampling rate (Hz)
df = fs/nf # Frequency increment in FFT
stf_i = int(f_lo/df) # Index of lower frequency bound
enf_i = int(f_hi/df) # Index of upper frequency bound
window = numpy.ones(ni) # Create window function
es = int(winlen*fs) # No. of samples to window from ends
edge_win = numpy.hanning(es) # Hanning window edge
window[:es/2] = edge_win[:es/2]
window[-es/2:] = edge_win[-es/2:]
a_w = a*window
FFTspec_a = numpy.fft.rfft(a_w) # Calculate complex FFT of inumpyut
FFTfreq = numpy.fft.fftfreq(ni, d=1/fs)[:ni/2+1]
w = (2*numpy.pi*FFTfreq) # Omega
iw = (0+1j)*w # i*Omega
mask = numpy.zeros(ni/2+1) # Half-length mask for +ve freqs
mask[stf_i:enf_i] = 1.0 # Mask = 1 for desired +ve freqs
if times == 2: # Double integration
FFTspec = -FFTspec_a*w / (w+EPS)**3
elif times == 1: # Single integration
FFTspec = FFTspec_a*iw / (iw+EPS)**2
elif times == 0: # No integration
FFTspec = FFTspec_a
else:
print 'Error'
FFTspec *= mask # Select frequencies to use
out_w = numpy.fft.irfft(FFTspec) # Return to time domain
if unwin == True:
out = out_w*window/(window+EPS)**2 # Remove window from time series
else:
out = out_w
print 'elapsed', time.clock()-t0
if odd == True: # Check for even length time series
return out[:-1] # If not, remove last entry
else:
return out
def direct_diff(x,k=1,period=None):
fx = numpy.fft.fft(x)
n = len (fx)
if period is None:
period = 2*numpy.pi
w = numpy.fft.fftfreq(n)*2j*numpy.pi/period*n
if k<0:
with numpy.errstate(divide="ignore", invalid="ignore"):
w = 1 / w**k
w[0] = 0.0
else:
w = w**k
if n>2000:
w[250:n-250] = 0.0
return numpy.fft.ifft(w*fx).real
def dc_int(sine, sample_rate):
# fourier transform
ft = numpy.fft.rfft(sine)
N = len(sine)
# bin frequencies
bin_width = float(sample_rate) / N
bin_freqs = numpy.arange(N//2 + 1) * bin_width # include extra frequency for Nyquist bin
# OK, don't divide by zero, and the Nyquist bin actually has a negative frequency
bin_freqs[0] = bin_freqs[1]
bin_freqs[-1] *= -1
# integrate!
integ_ft = ft / (2j*numpy.pi*bin_freqs)
integ_ft[0] = 0 # zero out the DC bin
# inverse rfft
return numpy.fft.irfft(integ_ft)
def int2(ys):
N = ys.shape[0]
w = (numpy.arange(N) - N /2.) / float(N)
# integration
Fys = numpy.fft.fft(ys)
with numpy.errstate(divide="ignore", invalid="ignore"):
modFys = numpy.fft.ifftshift(1./ (2 * numpy.pi * 1j * w) * numpy.fft.fftshift(Fys))
# modFys[0] will hold the result of dividing the DC component of y by 0, so it
# will be nan or inf. Setting modFys[0] to 0 amounts to choosing a specific
# constant of integration.
modFys[0] = 0
return numpy.fft.ifft(modFys).real / float(N)
def test(pth=r'C:\Users\Ray\Dropbox (Jan Medical)\SD Datasets\Normals\Other Subject Data\Data from JH Unit\001_2008Sep12_142457.csv'):
""" run on a sample CSV file
b: blue
g: green
r: red
c: cyan
m: magenta
y: yellow
k: black
w: white
"""
import os.path
import matplotlib.pyplot as plt
from scipy.fftpack import diff
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA
length = 2.**14
fh=open(pth,'r')
## for this sample data
sampleRate=1024
x = numpy.arange(length)*2*numpy.pi/length
f = numpy.sin(x)*numpy.cos(4*x)
plt.plot(f, label="data")
intg_data = diff(f,-1)
plt.plot(intg_data-.01, label="scipy diff - .01")
intg_data1 = direct_diff(f, -1, period=-1)
adj0 = intg_data.max()/intg_data1.max()
plt.plot(intg_data1*adj0, label="direct_diff, adj:"+str(adj0)) # diverges wildly...
intg_data3 = dc_int(f, sampleRate)
adjdc = intg_data.max()/intg_data3.max()
intg_data3 *= adjdc
plt.plot(intg_data3+.01, label="int dc +.01, adj:"+str(adjdc))
int2_res = int2(f)
adj = intg_data.max()/int2_res.max()
int2_res *= adj
plt.plot(int2_res, label="int2 adj:"+str(adj))
plt.legend()
plt.draw()
plt.show()
intg_data = diff(f,-2)
plt.plot(intg_data-.01, label="scipy diff - .01")
intg_data1 = direct_diff(f, -2, period=1)
adj0 = intg_data.max()/intg_data1.max()
plt.plot(intg_data1*adj0, label="direct_diff, adj:"+str(adj0)) # diverges wildly...
intg_data3 = dc_int(dc_int(f, sampleRate), sampleRate)
adjdc = intg_data.max()/intg_data3.max()
intg_data3 *= adjdc
plt.plot(intg_data3+.01, label="int dc +.01, adj:"+str(adjdc))
int2_res = int2(int2(f))
adj = intg_data.max()/int2_res.max()
int2_res *= adj
plt.plot(int2_res, label="int2 adj:"+str(adj))
plt.legend()
plt.draw()
plt.show()
if __name__ == '__main__':
import sys
if len(sys.argv)>1:
test(sys.argv[1])
else:
test()
More information about the NumPy-Discussion
mailing list