[Scipy-svn] r2608 - in trunk/Lib/io: . tests
scipy-svn at scipy.org
scipy-svn at scipy.org
Wed Jan 24 18:27:48 EST 2007
Author: matthew.brett at gmail.com
Date: 2007-01-24 17:27:14 -0600 (Wed, 24 Jan 2007)
New Revision: 2608
Added:
trunk/Lib/io/npfile.py
trunk/Lib/io/tests/test_npfile.py
Modified:
trunk/Lib/io/__init__.py
trunk/Lib/io/fopen.py
Log:
Added npfile module with tests; still debugging
Modified: trunk/Lib/io/__init__.py
===================================================================
--- trunk/Lib/io/__init__.py 2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/__init__.py 2007-01-24 23:27:14 UTC (rev 2608)
@@ -9,6 +9,7 @@
convert_objectarray
from mio import *
from fopen import *
+from npfile import npfile
from recaster import sctype_attributes, Recaster
from array_import import *
from data_store import *
Modified: trunk/Lib/io/fopen.py
===================================================================
--- trunk/Lib/io/fopen.py 2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/fopen.py 2007-01-24 23:27:14 UTC (rev 2608)
@@ -10,8 +10,6 @@
LittleEndian = (sys.byteorder == 'little')
-_unit_imag = {'f': array(1j,'F'), 'd': 1j}
-
__all__ = ['fopen']
def getsize_type(mtype):
Added: trunk/Lib/io/npfile.py
===================================================================
--- trunk/Lib/io/npfile.py 2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/npfile.py 2007-01-24 23:27:14 UTC (rev 2608)
@@ -0,0 +1,188 @@
+# Author: Matthew Brett, Travis Oliphant
+
+"""
+Class for reading and writing numpy arrays from / to files
+"""
+
+import sys
+
+import numpy as N
+
+sys_endian_code = (sys.byteorder == 'little') and '<' or '>'
+
+class npfile(object):
+ ''' Class for reading and writing numpy arrays to/from files
+
+ Inputs:
+ file_name -- The complete path name to the file to open
+ or an open file-like object
+ permission -- Open the file with given permissions: ('r', 'H', 'a')
+ for reading, writing, or appending. This is the same
+ as the mode argument in the builtin open command.
+ format -- The byte-ordering of the file:
+ (['native', 'n'], ['ieee-le', 'l'], ['ieee-be', 'B']) for
+ native, little-endian, or big-endian respectively.
+
+ Attributes
+ endian -- default endian code for reading / writing
+ order -- default order for reading writing ('C' or 'F')
+ file -- file object containing read / written data
+ '''
+
+ def __init__(self, file_name,
+ permission='rb',
+ endian = 'dtype',
+ order = 'C'):
+ if 'b' not in permission: permission += 'b'
+ if isinstance(file_name, basestring):
+ self.file = file(file_name, permission)
+ else:
+ try:
+ closed = file_name.closed
+ except AttributeError:
+ raise TypeError, 'Need filename or file object as input'
+ if closed:
+ raise TypeError, 'File object should be open'
+ self.file = file_name
+
+ self.endian = endian
+ self.order = order
+
+ def get_endian(self):
+ return self._endian
+ def set_endian(self, endian_code):
+ self._endian = self.parse_endian(endian_code)
+ endian = property(get_endian, set_endian, None, 'get/set endian code')
+
+ def parse_endian(self, endian_code):
+ ''' Returns valid endian code from wider input options'''
+ if endian_code in ['native', 'n', 'N','default', '=']:
+ return sys_endian_code
+ elif endian_code in ['swapped', 's', 'S']:
+ return sys_endian_code == '<' and '>' or '<'
+ elif endian_code in ['ieee-le','l','L','little-endian',
+ 'little','le','<']:
+ return '<'
+ elif endian_code in ['ieee-be','B','b','big-endian',
+ 'big','be', '>']:
+ return '>'
+ elif endian_code == 'dtype':
+ return 'dtype'
+ else:
+ raise ValueError, "Unrecognized endian code: " + endian_code
+ return
+
+ def __del__(self):
+ try:
+ self.file.close()
+ except:
+ pass
+
+ def close(self):
+ self.file.close()
+
+ def seek(self, *args):
+ self.file.seek(*args)
+
+ def tell(self):
+ return self.file.tell()
+
+ def rewind(self,howmany=None):
+ """Rewind a file to its beginning or by a specified amount.
+ """
+ if howmany is None:
+ self.seek(0)
+ else:
+ self.seek(-howmany,1)
+
+ def size(self):
+ """Return the size of the file.
+
+ Cached once found
+ """
+ try:
+ sz = self.thesize
+ except AttributeError:
+ curpos = self.tell()
+ self.seek(0,2)
+ sz = self.tell()
+ self.seek(curpos)
+ self.thesize = sz
+ return sz
+
+ def raw_read(self, size=-1):
+ """Read raw bytes from file as string."""
+ return self.file.read(size)
+
+ def raw_write(self, str):
+ """Write string to file as raw bytes."""
+ return self.file.write(str)
+
+ def _endian_order(self, endian, order):
+ ''' Housekeeping function to return endian, order from input args '''
+ if endian is None:
+ endian = self.endian
+ else:
+ endian = self.parse_endian(endian)
+ if order is None:
+ order = self.order
+ return endian, order
+
+ def _endian_from_dtype(self, dt):
+ dt_endian = dt.byteorder
+ if dt_endian == '=':
+ dt_endian = sys_endian_code
+ return dt_endian
+
+ def write(self, data, endian=None, order=None):
+ ''' Write to open file object the flattened numpy array data
+
+ Inputs
+ data - numpy array or object convertable to array
+ endian - endianness of written data
+ (can be None, 'dtype', '<', '>')
+ (default from self.endian)
+ order - order of array to write (C, F)
+ (default from self.order)
+ '''
+ endian, order = self._endian_order(endian, order)
+ data = N.asarray(data)
+ dt_endian = self._endian_from_dtype(data.dtype)
+ if not endian == 'dtype':
+ if dt_endian != endian:
+ data = data.byteswap()
+ self.file.write(data.tostring(order=order))
+
+ fwrite = write
+
+ def read(self, shape, dt, endian=None, order=None):
+ ''' Read data from file and return it in a numpy array
+
+ Inputs
+ shape - shape of output array, or number of elements
+ dt - dtype of array to write
+ endian - endianness of written data
+ (can be None, 'dtype', '<', '>')
+ (default from self.endian)
+ order - order of array to write (C, F)
+ (default from self.order)
+ '''
+ endian, order = self._endian_order(endian, order)
+ try:
+ shape = tuple(shape)
+ except TypeError:
+ shape = (shape,)
+ dt = N.dtype(dt)
+ dt_endian = self._endian_from_dtype(dt)
+ if not endian == 'dtype':
+ if dt_endian != endian:
+ dt = dt.newbyteorder(endian)
+ sz = dt.itemsize * N.product(shape)
+ buf = self.file.read(sz)
+ return N.ndarray(shape=shape,
+ dtype=dt,
+ buffer=buf,
+ order=order).copy()
+
+
+ fread = read
Added: trunk/Lib/io/tests/test_npfile.py
===================================================================
--- trunk/Lib/io/tests/test_npfile.py 2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/tests/test_npfile.py 2007-01-24 23:27:14 UTC (rev 2608)
@@ -0,0 +1,82 @@
+from StringIO import StringIO
+import os
+from tempfile import mkstemp
+from numpy.testing import *
+import numpy as N
+
+set_package_path()
+from io.npfile import npfile, sys_endian_code
+restore_path()
+
+class test_npfile(NumpyTestCase):
+
+ def test_init(self):
+ fd, fname = mkstemp()
+ npf = npfile(fname)
+ arr = N.reshape(N.arange(10), (5,2))
+ self.assertRaises(IOError, npf.write, arr)
+ npf.close()
+ npf = npfile(fname, 'w')
+ npf.write(arr)
+ npf.rewind()
+ self.assertRaises(IOError, npf.read,
+ arr.shape,
+ arr.dtype)
+ npf.close()
+ os.remove(fname)
+
+ npf = npfile(StringIO(), endian='>', order='F')
+ assert npf.endian == '>', 'Endian not set correctly'
+ assert npf.order == 'F', 'Order not set correctly'
+ npf.endian = '<'
+ assert npf.endian == '<', 'Endian not set correctly'
+
+ def test_parse_endian(self):
+ npf = npfile(StringIO())
+ swapped_code = sys_endian_code == '<' and '>' or '<'
+ assert npf.parse_endian('native') == sys_endian_code
+ assert npf.parse_endian('swapped') == swapped_code
+ assert npf.parse_endian('l') == '<'
+ assert npf.parse_endian('B') == '>'
+ assert npf.parse_endian('dtype') == 'dtype'
+ self.assertRaises(ValueError, npf.parse_endian, 'nonsense')
+
+ def test_raw_read_write(self):
+ npf = npfile(StringIO())
+ str = 'test me with this string'
+ npf.raw_write(str)
+ npf.rewind()
+ assert str == npf.raw_read(len(str))
+
+ def test_read_write(self):
+ npf = npfile(StringIO())
+ arr = N.reshape(N.arange(10), (5,2))
+ f_arr = arr.reshape((2,5)).T
+ bo = arr.dtype.byteorder
+ swapped_code = sys_endian_code == '<' and '>' or '<'
+ if bo in ['=', sys_endian_code]:
+ nbo = swapped_code
+ else:
+ nbo = sys_endian_code
+ bs_arr = arr.newbyteorder(nbo)
+ adt = arr.dtype
+ shp = arr.shape
+ npf.write(arr)
+ npf.rewind()
+ assert_array_equal(npf.read(shp, adt), arr)
+ npf.rewind()
+ assert_array_equal(npf.read(shp, adt, endian=swapped_code),
+ bs_arr)
+ npf.rewind()
+ assert_array_equal(npf.read(shp, adt, order='F'),
+ f_arr)
+
+ npf = npfile(StringIO(), endian='swapped', order='F')
+ npf.write(arr)
+ npf.rewind()
+ assert_array_equal(npf.read(shp, adt), arr)
+ npf.rewind()
+ assert_array_equal(npf.read(shp, adt, endian='dtype'), bs_arr)
+ npf.rewind()
+ # assert_array_equal(npf.read(shp, adt, order='C'), f_arr)
+
More information about the Scipy-svn
mailing list