[pypy-issue] [issue979] Pickle fails in pypy, works in CPython
Simon Hova
tracker at bugs.pypy.org
Wed Jan 4 19:34:47 CET 2012
Simon Hova <Simon at hova.net> added the comment:
EDI module attached.
________________________________________
PyPy bug tracker <tracker at bugs.pypy.org>
<https://bugs.pypy.org/issue979>
________________________________________
-------------- next part --------------
import logging
from array import array
from os.path import getsize, split
from datetime import date, datetime
from cStringIO import StringIO
class segment(dict):
"""A list of elements."""
def __init__(self, delimiters, text=None):
super(segment,self).__init__() # Create a blank dictionary.
self.closed=False # Initialize as open
self._next=None
self._delimiters=delimiters
self._text=text
if self.text:
self.parse()
def __hash__(self):
return hash(tuple([self.text]))
def __eq__(self, other):
return other and self.text == other.text
def __ne__(self, other):
return not self.__eq__(other)
def __iter__(self):
return self
def parse(self):
self._text=self.text.partition(self.delimiters[0])[0] # take only the first segment!
fields=self._text.split(self.delimiters[1]) # Splits our string into a list
self._header=fields.pop(0)
if self.FieldNames and self.header!=self.footer:
if len(self.FieldNames)!=len(fields):
raise TypeError('Expected {} field names, got {}'.format(len(self.FieldNames),len(fields)))
super(segment,self).__init__(zip(self.FieldNames,(field for field in fields)))
else:
super(segment,self).__init__(zip(('{}{:02d}'.format(self.header,FieldNum) for FieldNum in range(1,len(fields)+1)),fields))
# self._text=self.delimiters[1].join(self.values())
# self._text=''
self.closed=False
def add(self, text, delimiters=None):
if self.closed:
return TypeError
def next(self):
if self._next and not self._next.closed:
return self._next
else:
raise StopIteration
def splitDate(self,datestring):
try:
if len(datestring)==10:
return date(year=int(datestring[:4]),month=int(datestring[5:7]),day=int(datestring[8:10]))
if len(datestring)==6:
return date(year=int(datestring[:2]),month=int(datestring[2:4]),day=int(datestring[4:6]))
elif len(datestring)==8:
return date(year=int(datestring[:4]),month=int(datestring[4:6]),day=int(datestring[6:8]))
else:
raise TypeError
except:
logging.debug('Date error. Date string: ',datestring)
raise
def splitTime(self,timestring):
try:
if len(timestring)==10:
return date(year=int(timestring[:4]),month=int(timestring[5:7]),day=int(timestring[8:10]))
else:
raise TypeError
except:
logging.debug('Date error. Date string: ',timestring)
raise
@property
def text(self):
return self._text
@text.setter
def text(self, value):
if self.closed:
raise TypeError
else:
self._text=value
if self.delimiters:
self.parse()
@property
def delimiters(self):
return self._delimiters
@property
def header(self):
return self._header
@property
def footer(self):
return self._footer
@property
def FieldNames(self):
return None
class document(object):
"""A container holding contents of a single EDI envelope."""
def __init__(self, _input=None, headersOnly=False):
super(document,self).__init__()
self.headersOnly=headersOnly
if _input:
self.input=_input
self.start()
def __str__(self):
return self.interchange.__str__()
def __hash__(self):
return hash(tuple([self.interchange]))
def __eq__(self, other):
return other and self.interchange == other.interchange
def __ne__(self, other):
return not self.__eq__(other)
@property
def delimiters(self):
return self.interchange.delimiters
@property
def closed(self):
return self.interchange.closed
@property
def input(self):
if not self._input or self._input.closed:
raise OSError
return self._input
@input.setter
def input(self, value):
self._input=StringIO(value)
def start(self):
try:
self.interchange=interchange(self.input.read(106))
except:
raise OSError('Could not find delimiters.')
for segment in self.segments():
self.add(segment)
def add(self, seg):
if self.interchange:
if self.headersOnly:
if self.Group and self.Document_Type:
return
self.interchange.add(segment(text=seg, delimiters=self.delimiters))
else:
if 'ISA' in seg[:106]:
self.interchange=interchange(seg)
if self.interchange.header!='ISA':
raise TypeError
return
else:
raise TypeError
def segments(self):
seg = array('c')
for i in self.input.read():
if i == "": break
if i == '\0': continue
if i == self.delimiters[0]:
# End of segment found, exit the loop and return the segment.
yield seg.tostring()
seg = array('c')
elif i != '\n':
try:
seg.append(i)
except TypeError:
raise self.BadFile('Corrupt characters found in data or unexpected EOF')
@property
def Document_Type(self):
try:
return self.Transactions[0]['Document Type']
except:
return
@property
def Interchange(self):
try:
return self.interchange
except:
return
@property
def Envelope(self):
return self.Interchange
@property
def Group(self):
try:
return self.interchange.groups[0]
except:
return
@property
def Groups(self):
return self.interchange.groups
@property
def Transactions(self):
try:
return self.Group.transactions
except:
return
class interchange(segment):
def __init__(self, text):
super(interchange,self).__init__(text=None, delimiters=None)
self._text=text
self._delimiters=self.delimiters
self.parse()
self.groups=list()
def __str__(self):
return '{}->{},Ctrl={}'.format(self['Sender ID'],self['Receiver ID'],self['Control Number'])
def __hash__(self):
return hash(tuple([self.Control_Number,self.Sender_ID,self.Reciever_ID,len(self.groups)]))
def __eq__(self, other):
return other and self.Control_Number == other.Control_Number and self.Sender_ID == other.Sender_ID and self.Reciever_ID == other.Reciever_ID and len(self.groups) == len(other.groups)
def __ne__(self, other):
return not self.__eq__(other)
@property
def FieldNames(self):
return ('Authorization Information Qualifier','Authorization Information','Security Information Qualifier','Security Information',
'Sender Qualifier','Sender','Receiver Qualifier','Receiver','Date','Time','Standard',
'Version','Control Number','Acknowledgement Requested','Test Indicator','Subelement Seperator')
@property
def footer(self):
return 'IEA'
def add(self, segment):
super(interchange,self).add(segment)
if segment.header==self.footer:
# if segment[1] == len(self.groups) and segment[2]==self.Control_Number:
if int(segment['IEA01']) == len(self.groups):
self.closed=True
else:
raise TypeError('Failed interchange check. Found {} groups but should have {}'.format(len(self.interchanges),int(segment['IEA01'])))
else:
try:
self.next().add(segment)
except StopIteration:
self.groups.append(group(segment.text,self.delimiters))
self._next=self.groups[-1]
def parse(self):
super(interchange,self).parse()
# Map fields
self['Control Number']=format(int(self['Control Number']),'>09')
self['DateTime']=datetime.strptime(' '.join([self['Date'],self['Time']]),'%y%m%d %H%M')
del self['Date']
del self['Time']
if self['Authorization Information Qualifier']=='00':
del self['Authorization Information Qualifier']
self['Authorization Information']=True
if self['Security Information Qualifier']=='00':
del self['Security Information Qualifier']
self['Security Information']=True
self['Receiver ID']='{}-{}'.format(self['Receiver Qualifier'],self['Receiver'].strip())
del self['Receiver Qualifier']
del self['Receiver']
self['Sender ID']='{}-{}'.format(self['Sender Qualifier'],self['Sender'].strip())
del self['Sender Qualifier']
del self['Sender']
del self['Subelement Seperator']
if self['Acknowledgement Requested']=='0':
self['Acknowledgement Requested']=False
else:
self['Acknowledgement Requested']=True
if self['Test Indicator']=='P':
self['Test Indicator']=False
elif self['Test Indicator']=='T':
self['Test Indicator']=True
else:
self['Test Indicator']=True
@property
def delimiters(self):
if not self._delimiters:
version = self.text[84:89]
delimiters = [self.text[105], self.text[3], self.text[104]]
if version >= '00405':
delimiters = [self.text[105], self.text[3], self.text[104], self.text[83]]
# Verify that the delimiters are valid.
for delim in delimiters:
if delim.isalnum():
raise OSError('{} is not a valid delimiter'.format(delim))
self._delimiters=delimiters
return self._delimiters
@delimiters.setter
def delimiters(self, value):
pass
class group(segment):
def __init__(self, text, delimiters):
super(group,self).__init__(text=text, delimiters=delimiters)
self.transactions=list()
def __str__(self):
return '{}->{},Ctrl={}'.format(self['Sender ID'],self['Receiver ID'],self['Control Number'])
def __hash__(self):
return hash(tuple([self['Control Number'],self['Sender ID'],self['Receiver ID'],len(self.transactions)]))
def __eq__(self, other):
return other and self['Control Number'] == other['Control Number'] and self['Sender ID'] == other['Sender ID'] and self['Receiver ID'] == other['Receiver ID'] and len(self.transactions) == len(other.transactions)
def __ne__(self, other):
return not self.__eq__(other)
@property
def FieldNames(self):
return ('Functional ID','Sender ID','Receiver ID','Date','Time','Control Number','Responsible Agency','Version')
@property
def footer(self):
return 'GE'
def parse(self):
super(group,self).parse()
# Map fields
self['Control Number']=int(self['Control Number'])
if self['Version'][2:6]=='3020':
self['DateTime']=datetime.strptime(' '.join([self['Date'],self['Time']]),'%y%m%d %H%M')
else:
self['DateTime']=datetime.strptime(' '.join([self['Date'],self['Time']]),'%Y%m%d %H%M')
del self['Date']
del self['Time']
def add(self, segment):
if segment.header==self.footer:
# if segment[1] == len(self.transactions) and segment[2]==self.Control_Number:
if int(segment['GE01']) == len(self.transactions):
self.closed=True
else:
raise TypeError('Failed group check. Found {} transactions but should have {}'.format(len(self.transactions),int(segment['GE01'])))
else:
try:
super(group,self).next().add(segment)
except StopIteration:
self.transactions.append(transaction(segment.text, segment.delimiters))
if len(self.transactions) % 5 == 0: logging.info('Wrote {} transactions'.format(len(self.transactions)))
self._next=self.transactions[-1]
class transaction(segment):
def __init__(self, text, delimiters):
super(transaction,self).__init__(text=text, delimiters=delimiters)
self.lines=list()
def __str__(self):
return '{} Ctrl={}'.format(self['Document Type'],self['Control Number'])
def __hash__(self):
return hash(tuple([self['Document Type'],self['Control Number'],len(self.lines)]))
def __eq__(self, other):
return other and self['Document Type'] == other['Document Type'] and self['Control Number'] == other['Control Number'] and len(self.lines) == len(other.lines)
def __ne__(self, other):
return not self.__eq__(other)
@property
def FieldNames(self):
return ('Document Type','Control Number')
@property
def footer(self):
return 'SE'
def parse(self):
super(transaction,self).parse()
# Map fields
self['Control Number']=int(self['Control Number'])
def add(self, segment):
if segment.header==self.footer:
# if segment[1] == len(self.lines) and segment[2]==self.Control_Number:
if int(segment['SE01']) == (len(self.lines) + 2): # add two lines for the header and footer lines
self.closed=True
self._next=None
else:
raise TypeError('Failed transaction check. Found {} lines but should have {}'.format(len(self.lines),int(segment['SE01'])))
else:
self.lines.append(segment)
if len(self.lines) % 1000==0: logging.info('Wrote {} lines'.format(len(self.lines)))
class record(document):
def __init__(self, headersOnly=False):
super(record, self).__init__(_input=None, headersOnly=headersOnly)
@property
def Manafacturer_ID(self):
if self.Document_Type == '852':
return self.Groups[0]['Receiver ID']
elif self.Document_Type == '856':
return self.Groups[0]['Sender ID']
else:
return self.Groups[0]['Sender ID']
@property
def Retailer_ID(self):
if self.Document_Type == '852':
return self.Groups[0]['Sender ID']
elif self.Document_Type == '856':
return self.Groups[0]['Receiver ID']
else:
return self.Groups[0]['Receiver ID']
# elif (elements[0] == 'XQ' and self.DocumentType == '852'):
# self.StartDate=elements[2]
# if len(elements) == 3: # If there is only one ship date.
# self.EndDate=elements[2]
# else:
# self.EndDate=elements[3]
# # break
# elif (elements[0] == 'DTM' and self.DocumentType == '856'):
# if elements[1] == '011': # This represents the ship date.
# self.EndDate = self.StartDate = elements[2]
# # break # ignore the rest of the file.
@property
def Start_Date(self):
return self.Groups[0]['DateTime'].strftime('%Y%m%d')
@property
def End_Date(self):
return self.Groups[0]['DateTime'].strftime('%Y%m%d')
class file(record):
def __init__(self, filepath, headersOnly=False):
super(file,self).__init__(headersOnly)
self._input=open(filepath,'rb')
self.File_Path=filepath
self.File_Size=getsize(self.File_Path)
self.start()
@property
def Directory_Name(self):
return split(self.File_Path)[0]
@property
def File_Name(self):
return split(self.File_Path)[1]
def __hash__(self):
return hash(self.File_Path,self.File_Size)
def __eq__(self, other):
return other and self.File_Path == other.File_Path and self.File_Size == other.File_Size
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.File_Path
More information about the pypy-issue
mailing list