[Jython-checkins] jython: Refactor decompressing a bz2 file to stream the decompressed data.
jim.baker
jython-checkins at python.org
Sat May 10 00:30:44 CEST 2014
http://hg.python.org/jython/rev/1241914a8648
changeset: 7228:1241914a8648
parent: 7174:b2890af7a5e8
user: Indra Talip <indra.talip at gmail.com>
date: Fri Jan 03 08:46:47 2014 +1100
summary:
Refactor decompressing a bz2 file to stream the decompressed data.
implemented decompressing bz2 files in terms of pep 3116? and using streaming
reads instead of fully decompressing in memory.
files:
src/org/python/modules/bz2/PyBZ2File.java | 302 ++++-----
1 files changed, 144 insertions(+), 158 deletions(-)
diff --git a/src/org/python/modules/bz2/PyBZ2File.java b/src/org/python/modules/bz2/PyBZ2File.java
--- a/src/org/python/modules/bz2/PyBZ2File.java
+++ b/src/org/python/modules/bz2/PyBZ2File.java
@@ -1,19 +1,21 @@
package org.python.modules.bz2;
import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.python.core.ArgParser;
import org.python.core.Py;
+import org.python.core.PyException;
import org.python.core.PyIterator;
import org.python.core.PyList;
import org.python.core.PyLong;
@@ -21,8 +23,13 @@
import org.python.core.PyObject;
import org.python.core.PySequence;
import org.python.core.PyString;
-import org.python.core.PyTuple;
import org.python.core.PyType;
+import org.python.core.io.BinaryIOWrapper;
+import org.python.core.io.BufferedReader;
+import org.python.core.io.IOBase;
+import org.python.core.io.StreamIO;
+import org.python.core.io.TextIOBase;
+import org.python.core.io.UniversalIOWrapper;
import org.python.expose.ExposedGet;
import org.python.expose.ExposedMethod;
import org.python.expose.ExposedNew;
@@ -32,22 +39,22 @@
public class PyBZ2File extends PyObject {
public static final PyType TYPE = PyType.fromClass(PyBZ2File.class);
+ private int buffering;
- @ExposedGet
- public PyObject newlines = null;
+ @ExposedGet(name = "newlines")
+ public PyObject PyBZ2File_newlines() {
+ if (buffer != null) {
+ return buffer.getNewlines();
+ } else {
+ return Py.None;
+ }
+ }
- private byte[] fileData = null;
- private int offset = 0;
+ private TextIOBase buffer;
private String fileName = null;
private String fileMode = "";
private boolean inIterMode = false;
private boolean inUniversalNewlineMode = false;
- private final ArrayList<String> validNewlines = new ArrayList<String>();
- {
- validNewlines.add("\n");
- validNewlines.add("\r");
- validNewlines.add("\r\n");
- }
private BZip2CompressorOutputStream writeStream = null;
@@ -86,9 +93,9 @@
private void BZ2File___init__(PyString inFileName, String mode,
int buffering, int compresslevel) {
try {
-
fileName = inFileName.asString();
fileMode = mode;
+ this.buffering = buffering;
// check universal newline mode
if (mode.contains("U")) {
@@ -104,30 +111,34 @@
writeStream = new BZip2CompressorOutputStream(
new FileOutputStream(fileName), compresslevel);
} else {
- FileInputStream fin = new FileInputStream(fileName);
- BufferedInputStream bin = new BufferedInputStream(fin);
- BZip2CompressorInputStream bZin = new BZip2CompressorInputStream(
- bin);
-
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
- final byte[] buf = new byte[100];
- int n = 0;
- while (-1 != (n = bZin.read(buf))) {
- buffer.write(buf, 0, n);
- }
- fileData = buffer.toByteArray();
-
- buffer.close();
- bZin.close();
- bin.close();
- fin.close();
+ makeReadBuffer();
}
} catch (IOException e) {
throw Py.IOError("File " + fileName + " not found,");
}
}
+ private void makeReadBuffer() {
+ try {
+ FileInputStream fin = new FileInputStream(fileName);
+ BufferedInputStream bin = new BufferedInputStream(fin);
+ BZip2CompressorInputStream bZin = new BZip2CompressorInputStream(
+ bin, true);
+ BufferedReader bufferedReader = new BufferedReader(
+ new SkippableStreamIO(bZin, true), buffering);
+
+ if (inUniversalNewlineMode) {
+ buffer = new UniversalIOWrapper(bufferedReader);
+ } else {
+ buffer = new BinaryIOWrapper(bufferedReader);
+ }
+ } catch (FileNotFoundException fileNotFoundException) {
+ throw Py.IOError(fileNotFoundException);
+ } catch (IOException io) {
+ throw Py.IOError(io);
+ }
+ }
+
@ExposedMethod
public void __del__() {
BZ2File_close();
@@ -135,8 +146,6 @@
@ExposedMethod
public void BZ2File_close() {
- fileData = null;
-
if (writeStream != null) {
BZ2File_flush();
try {
@@ -146,6 +155,9 @@
throw Py.IOError(e.getMessage());
}
}
+ if (buffer != null) {
+ buffer.close();
+ }
}
private void BZ2File_flush() {
@@ -158,16 +170,6 @@
}
}
- private byte[] peek() {
-
- byte[] buf = new byte[1];
- if (fileData.length > offset) {
- buf[0] = fileData[offset + 1];
- }
-
- return buf;
- }
-
@ExposedMethod
public PyObject BZ2File_read(PyObject[] args, String[] kwds) {
checkInIterMode();
@@ -176,59 +178,17 @@
new String[] { "size" }, 0);
int size = ap.getInt(0, -1);
+ final String data = buffer.read(size);
- byte[] buf = _BZ2File_read(size);
-
- return new PyString(new String(buf));
- }
-
- private byte[] _BZ2File_read(int size) {
- byte[] buf = null;
- if (size == 0) {
- return new byte[0];
- } else if (size > 0) {
- buf = new byte[size];
- } else {
- buf = new byte[fileData.length - offset];
- }
-
- int readbytes = 0;
- for (int i = offset, j = 0; i < fileData.length && j < buf.length; i++, j++) {
- buf[j] = fileData[i];
-
- String possibleNewline = new String(new byte[] { buf[j] });
- if (possibleNewline.equals("\r")) { // handle CRLF
- buf[j] = '\n';
- if (fileData[i + 1] == '\n') { // check if next character part
- // of newline
- possibleNewline = possibleNewline
- + new String(new byte[] { fileData[i + 1] });
- buf = Arrays.copyOf(buf, buf.length - 1); // adjust buffer
- // size
- i++;
- }
- }
- if (validNewlines.contains(possibleNewline)) {
- addNewlineMarker(possibleNewline);
- }
-
- offset++;
- readbytes++;
- }
-
- if (readbytes == 0) {
- return new byte[0];
- }
-
- return buf;
-
+ return new PyString(data);
}
@ExposedMethod
public PyObject BZ2File_next(PyObject[] args, String[] kwds) {
- if (fileData == null) {
+ if (buffer == null || buffer.closed()) {
throw Py.ValueError("Cannot call next() on closed file");
}
+
inIterMode = true;
return null;
}
@@ -242,57 +202,7 @@
int size = ap.getInt(0, -1);
- StringBuilder line = new StringBuilder();
-
- byte[] buf = null;
- int readSize = 0;
- while ((buf = _BZ2File_read(1)).length > 0) {
- line.append(new String(buf));
- // handle newlines
- boolean mustBreak = false;
- if (inUniversalNewlineMode) {
- if ((char) buf[0] == '\r') {
- if (peek()[0] == '\n') {
- buf = _BZ2File_read(1);
- mustBreak = true;
- }
- line.replace(line.length() - 1, line.length(), new String(
- "\n"));
- mustBreak = true;
- } else if ((char) buf[0] == '\n'
- || (size > -1 && (readSize >= size))) {
- mustBreak = true;
- }
-
- } else {
- if ((char) buf[0] == '\n' || (size > -1 && (readSize >= size))) {
- mustBreak = true;
- }
- }
-
- if (mustBreak) {
- break;
- }
- }
-
- return new PyString(line.toString());
- }
-
- private void addNewlineMarker(String newline) {
- if (newlines == null) {
- newlines = new PyString(newline);
- } else {
- if (newlines instanceof PyString) {
- if (!newlines.equals(new PyString(newline))) {
- newlines = new PyTuple(newlines, new PyString(newline));
- }
- } else {
- if (!newlines.__contains__(new PyString(newline))) {
- newlines = newlines.__add__(new PyTuple(new PyString(
- newline)));
- }
- }
- }
+ return new PyString(buffer.readline(size));
}
@ExposedMethod
@@ -300,10 +210,9 @@
checkInIterMode();
// make sure file data valid
- if (fileData == null) {
+ if (buffer == null || buffer.closed()) {
throw Py.ValueError("Cannot call readlines() on a closed file");
}
-
PyList lineList = new PyList();
PyString line = null;
@@ -335,38 +244,65 @@
int newOffset = ap.getInt(0);
int whence = ap.getInt(1, 0);
+ if (fileMode.contains("w")) {
+ Py.IOError("seek works only while reading");
+ }
+
// normalise offset
- int finalOffset = 0;
+ long currentPos = buffer.tell();
+
+ long finalOffset = 0;
switch (whence) {
case 0: // offset from start of file
- if (newOffset > fileData.length) {
- finalOffset = fileData.length;
- } else {
- finalOffset = newOffset;
- }
+ finalOffset = newOffset;
break;
case 1: // move relative to current position
- finalOffset = offset + newOffset;
+ finalOffset = currentPos + newOffset;
+
break;
case 2: // move relative to end of file
- finalOffset = fileData.length + newOffset;
+ long fileSize = currentPos;
+
+ // in order to seek from the end of the stream we need to fully read
+ // the decompressed stream to get the size
+ for (;;) {
+ final String data = buffer.read(IOBase.DEFAULT_BUFFER_SIZE);
+ if (data.isEmpty()) {
+ break;
+ }
+ fileSize += data.length();
+ }
+
+ finalOffset = fileSize + newOffset;
+
+ // close and reset the buffer
+ buffer.close();
+ makeReadBuffer();
+
+ break;
}
if (finalOffset < 0) {
finalOffset = 0;
- } else {
- if (finalOffset > fileData.length) {
- finalOffset = fileData.length;
- }
+ }
+
+ // can't seek backwards so close and reopen the stream at the start
+ if (whence != 2 && finalOffset < currentPos) {
+ buffer.close();
+ makeReadBuffer();
}
// seek operation
- offset = finalOffset;
+ buffer.seek(finalOffset, 0);
}
@ExposedMethod
public PyLong BZ2File_tell() {
- return new PyLong(offset);
+ if (buffer == null) {
+ return Py.newLong(0);
+ } else {
+ return Py.newLong(buffer.tell());
+ }
}
@ExposedMethod
@@ -447,7 +383,7 @@
throw Py.ValueError("Stream closed");
}
} else if (fileMode.contains("r")) {
- if (fileData == null) {
+ if (buffer == null || buffer.closed()) {
throw Py.ValueError("Stream closed");
}
}
@@ -461,4 +397,54 @@
BZ2File_close();
return false;
}
+
+ private static class SkippableStreamIO extends StreamIO {
+ private long position = 0;
+
+ public SkippableStreamIO(InputStream inputStream, boolean closefd) {
+ super(inputStream, closefd);
+ }
+
+ @Override
+ public int readinto(ByteBuffer buf) {
+ int bytesRead = 0;
+ try {
+ bytesRead = super.readinto(buf);
+ } catch (PyException pyex) {
+ // translate errors on read of decompressed stream to EOFError
+ throw Py.EOFError(pyex.value.asStringOrNull());
+ }
+
+ position += bytesRead;
+ return bytesRead;
+ }
+
+ @Override
+ public long tell() {
+ return position;
+ }
+
+ @Override
+ public long seek(long offset, int whence) {
+ long skipBytes = offset - position;
+ if (whence != 0 || skipBytes < 0) {
+ throw Py.IOError("can only seek forward");
+ }
+
+ if (skipBytes == 0) {
+ return position;
+ } else {
+ long skipped = 0;
+ try {
+ skipped = asInputStream().skip(skipBytes);
+ } catch (IOException ex) {
+ throw Py.IOError(ex);
+ }
+ long newPosition = position + skipped;
+ position = newPosition;
+
+ return newPosition;
+ }
+ }
+ }
}
--
Repository URL: http://hg.python.org/jython
More information about the Jython-checkins
mailing list