[Jython-checkins] jython: Refactor decompressing a bz2 file to stream the decompressed data.

jim.baker jython-checkins at python.org
Sat May 10 00:30:44 CEST 2014


http://hg.python.org/jython/rev/1241914a8648
changeset:   7228:1241914a8648
parent:      7174:b2890af7a5e8
user:        Indra Talip <indra.talip at gmail.com>
date:        Fri Jan 03 08:46:47 2014 +1100
summary:
  Refactor decompressing a bz2 file to stream the decompressed data.
implemented decompressing bz2 files in terms of pep 3116? and using streaming
reads instead of fully decompressing in memory.

files:
  src/org/python/modules/bz2/PyBZ2File.java |  302 ++++-----
  1 files changed, 144 insertions(+), 158 deletions(-)


diff --git a/src/org/python/modules/bz2/PyBZ2File.java b/src/org/python/modules/bz2/PyBZ2File.java
--- a/src/org/python/modules/bz2/PyBZ2File.java
+++ b/src/org/python/modules/bz2/PyBZ2File.java
@@ -1,19 +1,21 @@
 package org.python.modules.bz2;
 
 import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.python.core.ArgParser;
 import org.python.core.Py;
+import org.python.core.PyException;
 import org.python.core.PyIterator;
 import org.python.core.PyList;
 import org.python.core.PyLong;
@@ -21,8 +23,13 @@
 import org.python.core.PyObject;
 import org.python.core.PySequence;
 import org.python.core.PyString;
-import org.python.core.PyTuple;
 import org.python.core.PyType;
+import org.python.core.io.BinaryIOWrapper;
+import org.python.core.io.BufferedReader;
+import org.python.core.io.IOBase;
+import org.python.core.io.StreamIO;
+import org.python.core.io.TextIOBase;
+import org.python.core.io.UniversalIOWrapper;
 import org.python.expose.ExposedGet;
 import org.python.expose.ExposedMethod;
 import org.python.expose.ExposedNew;
@@ -32,22 +39,22 @@
 public class PyBZ2File extends PyObject {
 
     public static final PyType TYPE = PyType.fromClass(PyBZ2File.class);
+    private int buffering;
 
-    @ExposedGet
-    public PyObject newlines = null;
+    @ExposedGet(name = "newlines")
+    public PyObject PyBZ2File_newlines() {
+        if (buffer != null) {
+            return buffer.getNewlines();
+        } else {
+            return Py.None;
+        }
+    }
 
-    private byte[] fileData = null;
-    private int offset = 0;
+    private TextIOBase buffer;
     private String fileName = null;
     private String fileMode = "";
     private boolean inIterMode = false;
     private boolean inUniversalNewlineMode = false;
-    private final ArrayList<String> validNewlines = new ArrayList<String>();
-    {
-        validNewlines.add("\n");
-        validNewlines.add("\r");
-        validNewlines.add("\r\n");
-    }
 
     private BZip2CompressorOutputStream writeStream = null;
 
@@ -86,9 +93,9 @@
     private void BZ2File___init__(PyString inFileName, String mode,
             int buffering, int compresslevel) {
         try {
-
             fileName = inFileName.asString();
             fileMode = mode;
+            this.buffering = buffering;
 
             // check universal newline mode
             if (mode.contains("U")) {
@@ -104,30 +111,34 @@
                 writeStream = new BZip2CompressorOutputStream(
                         new FileOutputStream(fileName), compresslevel);
             } else {
-                FileInputStream fin = new FileInputStream(fileName);
-                BufferedInputStream bin = new BufferedInputStream(fin);
-                BZip2CompressorInputStream bZin = new BZip2CompressorInputStream(
-                        bin);
-
-                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
-                final byte[] buf = new byte[100];
-                int n = 0;
-                while (-1 != (n = bZin.read(buf))) {
-                    buffer.write(buf, 0, n);
-                }
-                fileData = buffer.toByteArray();
-
-                buffer.close();
-                bZin.close();
-                bin.close();
-                fin.close();
+                makeReadBuffer();
             }
         } catch (IOException e) {
             throw Py.IOError("File " + fileName + " not found,");
         }
     }
 
+    private void makeReadBuffer() {
+        try {
+            FileInputStream fin = new FileInputStream(fileName);
+            BufferedInputStream bin = new BufferedInputStream(fin);
+            BZip2CompressorInputStream bZin = new BZip2CompressorInputStream(
+                    bin, true);
+            BufferedReader bufferedReader = new BufferedReader(
+                    new SkippableStreamIO(bZin, true), buffering);
+
+            if (inUniversalNewlineMode) {
+                buffer = new UniversalIOWrapper(bufferedReader);
+            } else {
+                buffer = new BinaryIOWrapper(bufferedReader);
+            }
+        } catch (FileNotFoundException fileNotFoundException) {
+            throw Py.IOError(fileNotFoundException);
+        } catch (IOException io) {
+            throw Py.IOError(io);
+        }
+    }
+
     @ExposedMethod
     public void __del__() {
         BZ2File_close();
@@ -135,8 +146,6 @@
 
     @ExposedMethod
     public void BZ2File_close() {
-        fileData = null;
-
         if (writeStream != null) {
             BZ2File_flush();
             try {
@@ -146,6 +155,9 @@
                 throw Py.IOError(e.getMessage());
             }
         }
+        if (buffer != null) {
+            buffer.close();
+        }
     }
 
     private void BZ2File_flush() {
@@ -158,16 +170,6 @@
         }
     }
 
-    private byte[] peek() {
-
-        byte[] buf = new byte[1];
-        if (fileData.length > offset) {
-            buf[0] = fileData[offset + 1];
-        }
-
-        return buf;
-    }
-
     @ExposedMethod
     public PyObject BZ2File_read(PyObject[] args, String[] kwds) {
         checkInIterMode();
@@ -176,59 +178,17 @@
                 new String[] { "size" }, 0);
 
         int size = ap.getInt(0, -1);
+        final String data = buffer.read(size);
 
-        byte[] buf = _BZ2File_read(size);
-
-        return new PyString(new String(buf));
-    }
-
-    private byte[] _BZ2File_read(int size) {
-        byte[] buf = null;
-        if (size == 0) {
-            return new byte[0];
-        } else if (size > 0) {
-            buf = new byte[size];
-        } else {
-            buf = new byte[fileData.length - offset];
-        }
-
-        int readbytes = 0;
-        for (int i = offset, j = 0; i < fileData.length && j < buf.length; i++, j++) {
-            buf[j] = fileData[i];
-
-            String possibleNewline = new String(new byte[] { buf[j] });
-            if (possibleNewline.equals("\r")) { // handle CRLF
-                buf[j] = '\n';
-                if (fileData[i + 1] == '\n') { // check if next character part
-                                               // of newline
-                    possibleNewline = possibleNewline
-                            + new String(new byte[] { fileData[i + 1] });
-                    buf = Arrays.copyOf(buf, buf.length - 1); // adjust buffer
-                                                              // size
-                    i++;
-                }
-            }
-            if (validNewlines.contains(possibleNewline)) {
-                addNewlineMarker(possibleNewline);
-            }
-
-            offset++;
-            readbytes++;
-        }
-
-        if (readbytes == 0) {
-            return new byte[0];
-        }
-
-        return buf;
-
+        return new PyString(data);
     }
 
     @ExposedMethod
     public PyObject BZ2File_next(PyObject[] args, String[] kwds) {
-        if (fileData == null) {
+        if (buffer == null || buffer.closed()) {
             throw Py.ValueError("Cannot call next() on closed file");
         }
+
         inIterMode = true;
         return null;
     }
@@ -242,57 +202,7 @@
 
         int size = ap.getInt(0, -1);
 
-        StringBuilder line = new StringBuilder();
-
-        byte[] buf = null;
-        int readSize = 0;
-        while ((buf = _BZ2File_read(1)).length > 0) {
-            line.append(new String(buf));
-            // handle newlines
-            boolean mustBreak = false;
-            if (inUniversalNewlineMode) {
-                if ((char) buf[0] == '\r') {
-                    if (peek()[0] == '\n') {
-                        buf = _BZ2File_read(1);
-                        mustBreak = true;
-                    }
-                    line.replace(line.length() - 1, line.length(), new String(
-                            "\n"));
-                    mustBreak = true;
-                } else if ((char) buf[0] == '\n'
-                        || (size > -1 && (readSize >= size))) {
-                    mustBreak = true;
-                }
-
-            } else {
-                if ((char) buf[0] == '\n' || (size > -1 && (readSize >= size))) {
-                    mustBreak = true;
-                }
-            }
-
-            if (mustBreak) {
-                break;
-            }
-        }
-
-        return new PyString(line.toString());
-    }
-
-    private void addNewlineMarker(String newline) {
-        if (newlines == null) {
-            newlines = new PyString(newline);
-        } else {
-            if (newlines instanceof PyString) {
-                if (!newlines.equals(new PyString(newline))) {
-                    newlines = new PyTuple(newlines, new PyString(newline));
-                }
-            } else {
-                if (!newlines.__contains__(new PyString(newline))) {
-                    newlines = newlines.__add__(new PyTuple(new PyString(
-                            newline)));
-                }
-            }
-        }
+        return new PyString(buffer.readline(size));
     }
 
     @ExposedMethod
@@ -300,10 +210,9 @@
         checkInIterMode();
 
         // make sure file data valid
-        if (fileData == null) {
+        if (buffer == null || buffer.closed()) {
             throw Py.ValueError("Cannot call readlines() on a closed file");
         }
-
         PyList lineList = new PyList();
 
         PyString line = null;
@@ -335,38 +244,65 @@
         int newOffset = ap.getInt(0);
         int whence = ap.getInt(1, 0);
 
+        if (fileMode.contains("w")) {
+            Py.IOError("seek works only while reading");
+        }
+
         // normalise offset
-        int finalOffset = 0;
+        long currentPos = buffer.tell();
+
+        long finalOffset = 0;
         switch (whence) {
         case 0: // offset from start of file
-            if (newOffset > fileData.length) {
-                finalOffset = fileData.length;
-            } else {
-                finalOffset = newOffset;
-            }
+            finalOffset = newOffset;
             break;
         case 1: // move relative to current position
-            finalOffset = offset + newOffset;
+            finalOffset = currentPos + newOffset;
+
             break;
         case 2: // move relative to end of file
-            finalOffset = fileData.length + newOffset;
+            long fileSize = currentPos;
+
+            // in order to seek from the end of the stream we need to fully read
+            // the decompressed stream to get the size
+            for (;;) {
+                final String data = buffer.read(IOBase.DEFAULT_BUFFER_SIZE);
+                if (data.isEmpty()) {
+                    break;
+                }
+                fileSize += data.length();
+            }
+
+            finalOffset = fileSize + newOffset;
+
+            // close and reset the buffer
+            buffer.close();
+            makeReadBuffer();
+
+            break;
         }
 
         if (finalOffset < 0) {
             finalOffset = 0;
-        } else {
-            if (finalOffset > fileData.length) {
-                finalOffset = fileData.length;
-            }
+        }
+
+        // can't seek backwards so close and reopen the stream at the start
+        if (whence != 2 && finalOffset < currentPos) {
+            buffer.close();
+            makeReadBuffer();
         }
 
         // seek operation
-        offset = finalOffset;
+        buffer.seek(finalOffset, 0);
     }
 
     @ExposedMethod
     public PyLong BZ2File_tell() {
-        return new PyLong(offset);
+        if (buffer == null) {
+            return Py.newLong(0);
+        } else {
+            return Py.newLong(buffer.tell());
+        }
     }
 
     @ExposedMethod
@@ -447,7 +383,7 @@
                 throw Py.ValueError("Stream closed");
             }
         } else if (fileMode.contains("r")) {
-            if (fileData == null) {
+            if (buffer == null || buffer.closed()) {
                 throw Py.ValueError("Stream closed");
             }
         }
@@ -461,4 +397,54 @@
         BZ2File_close();
         return false;
     }
+
+    private static class SkippableStreamIO extends StreamIO {
+        private long position = 0;
+
+        public SkippableStreamIO(InputStream inputStream, boolean closefd) {
+            super(inputStream, closefd);
+        }
+
+        @Override
+        public int readinto(ByteBuffer buf) {
+            int bytesRead = 0;
+            try {
+                bytesRead = super.readinto(buf);
+            } catch (PyException pyex) {
+                // translate errors on read of decompressed stream to EOFError
+                throw Py.EOFError(pyex.value.asStringOrNull());
+            }
+
+            position += bytesRead;
+            return bytesRead;
+        }
+
+        @Override
+        public long tell() {
+            return position;
+        }
+
+        @Override
+        public long seek(long offset, int whence) {
+            long skipBytes = offset - position;
+            if (whence != 0 || skipBytes < 0) {
+                throw Py.IOError("can only seek forward");
+            }
+
+            if (skipBytes == 0) {
+                return position;
+            } else {
+                long skipped = 0;
+                try {
+                    skipped = asInputStream().skip(skipBytes);
+                } catch (IOException ex) {
+                    throw Py.IOError(ex);
+                }
+                long newPosition = position + skipped;
+                position = newPosition;
+
+                return newPosition;
+            }
+        }
+    }
 }

-- 
Repository URL: http://hg.python.org/jython


More information about the Jython-checkins mailing list