I discovered that some (all?) of the functions in
numarray.linear_algebra are very slow when operating on small matrices.
In particular, determinant and inverse are both more than 15 times
slower than their NumPy counterparts when operating on 4x4 matrices. I
assume that this is simply a result of numarray's higher overhead.
Normally the overhead of numarray is not much of a problem since when
I'm operating on lots of small data chunks I can usually agregate them
into larger chunks and operate on the big chunks. This is, of course,
the standard way to get decent performance in either numarray or NumPy.
However, because the functions in linear_algebra take only rank-2 (or 1
in some cases) arrays, their is no way to aggregate the small operations
and thus things run quite slow.
In order to address this I rewrote some of the functions in
linear_algebra to allow an additional, optional, dimension on the input
arrays. Rank-3 arrays are treated as being a set of matrices that are
indexed along the first axis of A. Thus determinant(A) is essentially
equivalent to array(map(determinant, A)) when A is rank-3. See the
attached file for more detail.
By this trick and by some relentless tuning, I got the numarray
functions to run at about the same speed as their NumPy counterparts
when computing the determinants and inverses of 1000 4x4 matrices.
That's a humungous speedup.
Is this approach worth pursuing for linear_algebra in general? I'll be
using these myself since I need the speed, although I may back out some
of the more aggresive tuning so I don't get bit if numarray's internals
change. I'll gladly donate this code to numarray if it's wanted, and I'm
willing to help convert the rest, although it probaly wouldn't happen as
fast as this stuff since I don't need it myself presently.
-tim
[Use this with caution at this point -- I just got finished with a
tuning spree and there may well be some bugs]
from numarray.linear_algebra import *
from numarray.linear_algebra import LinearAlgebra2
import numarray as na
LinAlgError = LinearAlgebra2.LinAlgError
def __setup__():
import sys
sys.float_output_suppress_small = True
__test__ = {
"determiniant" :
"""
>>> import LinearAlgebra as la, numarray.random_array as random_array
>>> A = random_array.random([100,5,5])
>>> _close(map(la.determinant, A), determinant(A))
True
>>> _close(la.determinant(A[0]), determinant(A[0]))
True
""",
"inverse" :
"""
>>> import LinearAlgebra as la, numarray.random_array as random_array
>>> A = []
>>> while len(A) < 100:
... candidate = random_array.random([5,5])
... if abs(la.determinant(candidate)) > 1e-3:
... A.append(candidate)
>>> A = na.asarray(A)
>>> _close(map(la.inverse, A), inverse(A))
True
>>> _close(la.inverse(A[0]), inverse(A[0]))
True
""",
"solve_linear_equations" :
"""
>>> import LinearAlgebra as la, numarray.random_array as random_array
>>> A = []
>>> while len(A) < 100:
... candidate = random_array.random([5,5])
... if abs(la.determinant(candidate)) > 1e-3:
... A.append(candidate)
>>> A = na.asarray(A)
>>> B = random_array.random([100,5])
>>> _close(map(la.solve_linear_equations, A, B), solve_linear_equations(A, B))
True
>>> _close(la.solve_linear_equations(A[0], B[0]), la.solve_linear_equations(A[0], B[0]))
True
"""
}
def solve_linear_equations(a, b):
"""solve_linear_equations(a, b) -> x such that dot(a,x) = b
*a* may be either rank-2 or rank-3, in either case, it
must be square along the last two axes *b* may either
have a rank one lower than *a*, in which case it
represents a vector or an array of vectors, or it may be
the same rank as *a* in which case it represents a matrix
or an array of matrices.
Since that may be a bit confusing let's look at some examples.
First the simplest case, a square matrix A, and a vector of
results B.
>>> A = [[1,2,3], [3,5,5], [5,6,7]]
>>> B = [1,1,1]
>>> x = solve_linear_equations(A, B)
>>> x
array([-0.5, 0. , 0.5])
>>> na.dot(A, x) # This should give us B
array([ 1., 1., 1.])
The next simplest case is a square matrix A and a matrix B.
>>> B = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
>>> solve_linear_equations(A, B)
array([[-0.625, -0.5 , 0.625],
[-0.5 , 1. , -0.5 ],
[ 0.875, -0.5 , 0.125]])
If *a* is rank-3, then the first dimension of *a* **and** *b* is
interpreted as selecting different submatrices or subvectors to operate
on. In this case, *b* will be rank-2 in the vector case and rank-3 in
the matrix case. Here is what is looks like in the vector case.
>>> A = [[[1, 3], [2j, 3j]],
... [[2, 4], [4j, 4j]],
... [[3, 5], [6j, 5j]]]
>>> B = [[1, 0], [0, 1], [1, 1]]
>>> solve_linear_equations(A, B)
array([[-1. +0.j , 0.66666667+0.j ],
[ 0. -0.5j , 0. +0.25j ],
[-0.33333333-0.33333333j, 0.4 +0.2j ]])
The first dimensions of *a* and *b* must either match or one of them must
be 1. In the latter case, the length-1 dimension is broadcast in the normal
way.
>>> B = [[1, 0], [0, 1]]
>>> solve_linear_equations(A, B)
Traceback (most recent call last):
...
LinearAlgebraError: first dimensions of a and b must match or be 1
>>> B = [[1, 0]]
>>> solve_linear_equations(A, B)
array([[-1. +0.j, 0.66666667+0.j],
[-0.5 +0.j, 0.5 +0.j],
[-0.33333333+0.j, 0.4 +0.j]])
"""
a = na.asarray(a)
b = na.asarray(b)
_assertRank((2,3), a)
_assertSubmatrixSquareness(a)
rank_a = len(a.shape)
_assertRank((rank_a-1, rank_a), b)
stretched = (rank_a == 2)
if stretched:
a = a[na.NewAxis,]
b = b[na.NewAxis,]
one_eq = (len(b.shape) == len(a.shape)-1)
if one_eq:
b = b[:,:,na.NewAxis]
broadcast_a = (a.shape[0] == 1)
broadcast_b = (b.shape[0] == 1)
if not (broadcast_a or broadcast_b or a.shape[0] == b.shape[0]):
raise LinAlgError, "first dimensions of a and b must match or be 1"
#
n_cases = max(a.shape[0], b.shape[0])
n_eq = a.shape[1]
n_rhs = b.shape[2]
if n_eq != b.shape[1]:
raise LinAlgError, 'Incompatible dimensions'
t = LinearAlgebra2._commonType(a, b)
if LinearAlgebra2._array_kind[t] == 1: # Complex routines take different arguments
lapack_routine = lapack_lite2.zgesv
else:
lapack_routine = lapack_lite2.dgesv
a, b = _castCopyAndTranspose(t, a, b, indices=(0,2,1))
result = na.zeros([n_cases, n_rhs, n_eq], b.type())
result[:] = b
b = result
pivots = na.zeros(n_eq, 'l')
a_stride = n_eq * n_eq * a.itemsize()
b_stride = n_eq * n_rhs * b.itemsize()
a_view = a[0]
b_view = b[0]
for i in range(n_cases):
if (i == 0) or (not broadcast_a):
a_i = a_view.copy()
b_i = b_view.copy()
outcome = lapack_routine(n_eq, n_rhs, a_i, n_eq, pivots, b_i, n_eq, 0)
if outcome['info'] > 0:
raise LinAlgError, 'Singular matrix'
b_view._copyFrom(b_i)
a_view._byteoffset += a_stride
b_view._byteoffset += b_stride
b = na.transpose(b, (0,2,1))
if one_eq:
b = b[...,0]
if stretched:
b = b[0]
return b
def inverse(a):
"""inverse(a) -> inverse matrix of a
*a* may be either rank-2 or rank-3. If it is rank-2, it must square.
>>> A = [[1,2,3], [3,5,5], [5,6,7]]
>>> Ainv = inverse(A)
>>> _close(na.dot(A, Ainv), na.identity(3))
True
If *a* is rank-3, it is treated as an array of rank-2 matrices and
must be square along the last 2 axes.
>>> A = [[[1, 3], [2j, 3j]],
... [[2, 4], [4j, 4j]],
... [[3, 5], [6j, 5j]]]
>>> Ainv = inverse(A)
>>> _close(map(na.dot, A, Ainv), [na.identity(2)]*3)
True
If *a* is not square along its last two axes, a LinAlgError is raised.
>>> inverse(na.asarray(A)[...,:1])
Traceback (most recent call last):
...
LinearAlgebraError: Array (or it submatrices) must be square
"""
a = na.asarray(a)
I = na.identity(a.shape[-2])
if len(a.shape) == 3:
I.shape = (1,) + I.shape
return solve_linear_equations(a, I)
def determinant(a):
"""determinant(a) -> ||a||
*a* may be either rank-2 or rank-3. If it is rank-2, it must square.
>>> A = [[1,2,3], [3,4,5], [5,6,7]]
>>> _close(determinant(A), 0)
True
If *a* is rank-3, it is treated as an array of rank-2 matrices and
must be square along the last 2 axes.
>>> A = [[[1, 3], [2j, 3j]], [[2, 4], [4j, 4j]], [[3, 5], [6j, 5j]]]
>>> _close(determinant(A), [-3j, -8j, -15j])
True
If *a* is not square along its last two axes, a LinAlgError is raised.
>>> determinant(na.asarray(A)[...,:1])
Traceback (most recent call last):
...
LinearAlgebraError: Array (or it submatrices) must be square
"""
a = na.asarray(a)
_assertRank((2,3), a)
_assertSubmatrixSquareness(a)
stretched = (len(a.shape) == 2)
if stretched:
a = a[na.NewAxis,]
t = LinearAlgebra2._commonType(a)
a = _castCopyAndTranspose(t, a, indices=(0,2,1))
n_cases, n = a.shape[:2]
if LinearAlgebra2._array_kind[t] == 1:
lapack_routine = lapack_lite2.zgetrf
else:
lapack_routine = lapack_lite2.dgetrf
no_pivoting = na.arrayrange(1, n+1)
pivots = na.zeros((n,), 'l')
all_pivots = na.zeros((n_cases, n,), 'l')
sum , not_equal = na.sum, na.not_equal
stride = n * n * a.itemsize()
pivots_stride = n * pivots.itemsize()
view = a[0].view()
view_pivots = all_pivots[0]
for i in range(n_cases):
a_i = view.copy()
outcome = lapack_routine(n, n, a_i, n, pivots, 0)
view_pivots._copyFrom(pivots)
view._copyFrom(a_i)
view._byteoffset += stride
view_pivots._byteoffset += pivots_stride
signs = na.where(sum(not_equal(all_pivots, no_pivoting), 1) % 2, -1, 1).astype(t)
for i in range(n):
signs *= a[:,i,i]
if stretched:
signs = signs[0]
return signs
def _assertRank(rank, *args):
for a in args:
if isinstance(rank, int):
rank = (rank,)
if len(a.shape) not in rank:
raise LinAlgError, 'Array must be rank %s' % (' or '.join([str(x) for x in rank]))
def _assertSubmatrixSquareness(*args):
for a in args:
if a.shape[-1] != a.shape[-2]:
raise LinAlgError, 'Array (or it submatrices) must be square'
def _castCopyAndTranspose(type, *numarray, **kargs):
indices = kargs.get("indices")
cast_numarray = []
for a in numarray:
if indices is None:
a = na.transpose(a)
else:
a = na.transpose(a, indices)
if a.type() == type:
cast_numarray.append(a.copy())
else:
cast_numarray.append(a.astype(type))
if len(cast_numarray) == 1:
return cast_numarray[0]
else:
return cast_numarray
def _close(a, b, *args, **kargs):
return bool(na.allclose(na.ravel(a), na.ravel(b), *args, **kargs))
def _time():
import timeit
setup = """
import linear_algebra_x
from numarray import linear_algebra
import numarray as na
A = na.array([((1,2),(2,1.0))]*1000)
try:
import LinearAlgebra as la
import Numeric
N = Numeric.array([((1,2),(2,1.0))]*1000)
except: pass
"""
N = 3
t_lx = timeit.Timer(stmt="linear_algebra_x.determinant(A)", setup=setup).timeit(N)
t_la = timeit.Timer(stmt="na.array(map(linear_algebra.determinant,A))", setup=setup).timeit(N)
t_np = timeit.Timer(stmt="Numeric.array(map(la.determinant,N))", setup=setup).timeit(N)
print "determinant:"
print " standard linear_algebra:", t_la
print " linear_algebra_x:", t_lx
print " numpy:", t_np
print " x version is", round(t_la/t_lx, 1), "times faster"
t_lx = timeit.Timer(stmt="linear_algebra_x.inverse(A)", setup=setup).timeit(N)
t_la = timeit.Timer(stmt="na.array(map(linear_algebra.inverse,A))", setup=setup).timeit(N)
t_np = timeit.Timer(stmt="Numeric.array(map(la.inverse,N))", setup=setup).timeit(N)
print "inverse:"
print " standard linear_algebra:", t_la
print " linear_algebra_x:", t_lx
print " numpy:", t_np
print " x version is", round(t_la/t_lx, 1), "times faster"
def _test():
__setup__()
import doctest, linear_algebra_x, linear_algebra_x
doctest.testmod(linear_algebra_x)
if __name__ == "__main__":
_test()
_time()