[Scipy-svn] r2608 - in trunk/Lib/io: . tests

scipy-svn at scipy.org scipy-svn at scipy.org
Wed Jan 24 18:27:48 EST 2007


Author: matthew.brett at gmail.com
Date: 2007-01-24 17:27:14 -0600 (Wed, 24 Jan 2007)
New Revision: 2608

Added:
   trunk/Lib/io/npfile.py
   trunk/Lib/io/tests/test_npfile.py
Modified:
   trunk/Lib/io/__init__.py
   trunk/Lib/io/fopen.py
Log:
Added npfile module with tests; still debugging

Modified: trunk/Lib/io/__init__.py
===================================================================
--- trunk/Lib/io/__init__.py	2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/__init__.py	2007-01-24 23:27:14 UTC (rev 2608)
@@ -9,6 +9,7 @@
      convert_objectarray
 from mio import *
 from fopen import *
+from npfile import npfile
 from recaster import sctype_attributes, Recaster
 from array_import import *
 from data_store import *

Modified: trunk/Lib/io/fopen.py
===================================================================
--- trunk/Lib/io/fopen.py	2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/fopen.py	2007-01-24 23:27:14 UTC (rev 2608)
@@ -10,8 +10,6 @@
 
 LittleEndian = (sys.byteorder == 'little')
 
-_unit_imag = {'f': array(1j,'F'), 'd': 1j}
-
 __all__ = ['fopen']
 
 def getsize_type(mtype):

Added: trunk/Lib/io/npfile.py
===================================================================
--- trunk/Lib/io/npfile.py	2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/npfile.py	2007-01-24 23:27:14 UTC (rev 2608)
@@ -0,0 +1,188 @@
+# Author: Matthew Brett, Travis Oliphant
+
+"""
+Class for reading and writing numpy arrays from / to files
+"""
+
+import sys
+
+import numpy as N
+
+sys_endian_code = (sys.byteorder == 'little') and '<' or '>'
+
+class npfile(object):
+    ''' Class for reading and writing numpy arrays to/from files
+    
+    Inputs:
+      file_name -- The complete path name to the file to open
+                   or an open file-like object
+      permission -- Open the file with given permissions: ('r', 'H', 'a')
+                    for reading, writing, or appending.  This is the same
+                    as the mode argument in the builtin open command.
+      format -- The byte-ordering of the file:
+                (['native', 'n'], ['ieee-le', 'l'], ['ieee-be', 'B']) for
+                native, little-endian, or big-endian respectively.
+
+    Attributes
+      endian   -- default endian code for reading / writing
+      order    -- default order for reading writing ('C' or 'F')
+      file -- file object containing read / written data
+    '''
+
+    def __init__(self, file_name,
+                 permission='rb',
+                 endian = 'dtype',
+                 order = 'C'):
+        if 'b' not in permission: permission += 'b'
+        if isinstance(file_name, basestring):
+            self.file = file(file_name, permission)
+        else:
+            try:
+                closed = file_name.closed
+            except AttributeError:
+                raise TypeError, 'Need filename or file object as input'
+            if closed:
+                raise TypeError, 'File object should be open'
+            self.file = file_name
+            
+        self.endian = endian
+        self.order = order
+
+    def get_endian(self):
+        return self._endian
+    def set_endian(self, endian_code):
+        self._endian = self.parse_endian(endian_code)
+    endian = property(get_endian, set_endian, None, 'get/set endian code')
+                                     
+    def parse_endian(self, endian_code):
+        ''' Returns valid endian code from wider input options'''
+        if endian_code in ['native', 'n', 'N','default', '=']:
+            return sys_endian_code
+        elif endian_code in ['swapped', 's', 'S']:
+            return sys_endian_code == '<' and '>' or '<'
+        elif endian_code in ['ieee-le','l','L','little-endian',
+                             'little','le','<']:
+            return '<'
+        elif endian_code in ['ieee-be','B','b','big-endian',
+                             'big','be', '>']:
+            return '>'
+        elif endian_code == 'dtype':
+            return 'dtype'
+        else:
+            raise ValueError, "Unrecognized endian code: " + endian_code
+        return
+
+    def __del__(self):
+        try:
+            self.file.close()
+        except:
+            pass
+
+    def close(self):
+        self.file.close()
+
+    def seek(self, *args):
+        self.file.seek(*args)
+
+    def tell(self):
+        return self.file.tell()
+
+    def rewind(self,howmany=None):
+        """Rewind a file to its beginning or by a specified amount.
+        """
+        if howmany is None:
+            self.seek(0)
+        else:
+            self.seek(-howmany,1)
+
+    def size(self):
+        """Return the size of the file.
+
+        Cached once found
+        """
+        try:
+            sz = self.thesize
+        except AttributeError:
+            curpos = self.tell()
+            self.seek(0,2)
+            sz = self.tell()
+            self.seek(curpos)
+            self.thesize = sz
+        return sz
+
+    def raw_read(self, size=-1):
+        """Read raw bytes from file as string."""
+        return self.file.read(size)
+
+    def raw_write(self, str):
+        """Write string to file as raw bytes."""
+        return self.file.write(str)
+
+    def _endian_order(self, endian, order):
+        ''' Housekeeping function to return endian, order from input args '''
+        if endian is None:
+            endian = self.endian
+        else:
+            endian = self.parse_endian(endian)
+        if order is None:
+            order = self.order
+        return endian, order
+
+    def _endian_from_dtype(self, dt):
+        dt_endian = dt.byteorder
+        if dt_endian == '=':
+            dt_endian = sys_endian_code
+        return dt_endian
+    
+    def write(self, data, endian=None, order=None):
+        ''' Write to open file object the flattened numpy array data
+
+        Inputs
+        data      - numpy array or object convertable to array
+        endian    - endianness of written data
+                    (can be None, 'dtype', '<', '>')
+                    (default from self.endian)
+        order     - order of array to write (C, F)
+                    (default from self.order)
+        '''
+        endian, order = self._endian_order(endian, order)
+        data = N.asarray(data)
+        dt_endian = self._endian_from_dtype(data.dtype)
+        if not endian == 'dtype':
+            if dt_endian != endian:
+                data = data.byteswap()
+        self.file.write(data.tostring(order=order))
+        
+    fwrite = write
+    
+    def read(self, shape, dt, endian=None, order=None):
+        ''' Read data from file and return it in a numpy array
+        
+        Inputs
+        shape     - shape of output array, or number of elements
+        dt        - dtype of array to write
+        endian    - endianness of written data
+                    (can be None, 'dtype', '<', '>')
+                    (default from self.endian)
+        order     - order of array to write (C, F)
+                    (default from self.order)
+        '''
+        endian, order = self._endian_order(endian, order)
+        try:
+            shape = tuple(shape)
+        except TypeError:
+            shape = (shape,)
+        dt = N.dtype(dt)
+        dt_endian = self._endian_from_dtype(dt)
+        if not endian == 'dtype':
+            if dt_endian != endian:
+                dt = dt.newbyteorder(endian)
+        sz = dt.itemsize * N.product(shape)
+        buf = self.file.read(sz)
+        return N.ndarray(shape=shape,
+                         dtype=dt,
+                         buffer=buf,
+                         order=order).copy()
+    
+
+    fread = read

Added: trunk/Lib/io/tests/test_npfile.py
===================================================================
--- trunk/Lib/io/tests/test_npfile.py	2007-01-24 14:35:01 UTC (rev 2607)
+++ trunk/Lib/io/tests/test_npfile.py	2007-01-24 23:27:14 UTC (rev 2608)
@@ -0,0 +1,82 @@
+from StringIO import StringIO
+import os
+from tempfile import mkstemp
+from numpy.testing import *
+import numpy as N
+
+set_package_path()
+from io.npfile import npfile, sys_endian_code
+restore_path()
+
+class test_npfile(NumpyTestCase):
+
+    def test_init(self):
+        fd, fname = mkstemp()
+        npf = npfile(fname)
+        arr = N.reshape(N.arange(10), (5,2))
+        self.assertRaises(IOError, npf.write, arr)
+        npf.close()
+        npf = npfile(fname, 'w')
+        npf.write(arr)
+        npf.rewind()
+        self.assertRaises(IOError, npf.read,
+                          arr.shape,
+                          arr.dtype)
+        npf.close()
+        os.remove(fname)
+
+        npf = npfile(StringIO(), endian='>', order='F')
+        assert npf.endian == '>', 'Endian not set correctly'
+        assert npf.order == 'F', 'Order not set correctly'
+        npf.endian = '<'
+        assert npf.endian == '<', 'Endian not set correctly'
+        
+    def test_parse_endian(self):
+        npf = npfile(StringIO())
+        swapped_code = sys_endian_code == '<' and '>' or '<'
+        assert npf.parse_endian('native') == sys_endian_code
+        assert npf.parse_endian('swapped') == swapped_code
+        assert npf.parse_endian('l') == '<'
+        assert npf.parse_endian('B') == '>'
+        assert npf.parse_endian('dtype') == 'dtype'
+        self.assertRaises(ValueError, npf.parse_endian, 'nonsense')
+
+    def test_raw_read_write(self):
+        npf = npfile(StringIO())
+        str = 'test me with this string'
+        npf.raw_write(str)
+        npf.rewind()
+        assert str == npf.raw_read(len(str))
+        
+    def test_read_write(self):
+        npf = npfile(StringIO())
+        arr = N.reshape(N.arange(10), (5,2))
+        f_arr = arr.reshape((2,5)).T
+        bo = arr.dtype.byteorder
+        swapped_code = sys_endian_code == '<' and '>' or '<'
+        if bo in ['=', sys_endian_code]:
+            nbo = swapped_code
+        else:
+            nbo = sys_endian_code
+        bs_arr = arr.newbyteorder(nbo)
+        adt = arr.dtype
+        shp = arr.shape
+        npf.write(arr)
+        npf.rewind()
+        assert_array_equal(npf.read(shp, adt), arr)
+        npf.rewind()
+        assert_array_equal(npf.read(shp, adt, endian=swapped_code),
+                           bs_arr)
+        npf.rewind()
+        assert_array_equal(npf.read(shp, adt, order='F'),
+                           f_arr)
+        
+        npf = npfile(StringIO(), endian='swapped', order='F')
+        npf.write(arr)
+        npf.rewind()
+        assert_array_equal(npf.read(shp, adt), arr)
+        npf.rewind()
+        assert_array_equal(npf.read(shp, adt, endian='dtype'), bs_arr)
+        npf.rewind()
+        # assert_array_equal(npf.read(shp, adt, order='C'), f_arr)
+        




More information about the Scipy-svn mailing list