[pypy-issue] [issue979] Pickle fails in pypy, works in CPython

Simon Hova tracker at bugs.pypy.org
Wed Jan 4 19:34:47 CET 2012


Simon Hova <Simon at hova.net> added the comment:

EDI module attached.

________________________________________
PyPy bug tracker <tracker at bugs.pypy.org>
<https://bugs.pypy.org/issue979>
________________________________________
-------------- next part --------------
import logging

from array import array
from os.path import getsize, split
from datetime import date, datetime
from cStringIO import StringIO

class segment(dict):
	"""A list of elements."""
	def __init__(self, delimiters, text=None):
		super(segment,self).__init__() # Create a blank dictionary.
		self.closed=False # Initialize as open
		self._next=None
		self._delimiters=delimiters
		self._text=text
		
		if self.text:
			self.parse()

	def __hash__(self):
		return hash(tuple([self.text]))
	def __eq__(self, other):
		return other and self.text == other.text
	def __ne__(self, other):
		return not self.__eq__(other)
	def __iter__(self):
		return self

	def parse(self):
		self._text=self.text.partition(self.delimiters[0])[0] # take only the first segment!
		fields=self._text.split(self.delimiters[1]) # Splits our string into a list
		self._header=fields.pop(0)
		if self.FieldNames and self.header!=self.footer:
			if len(self.FieldNames)!=len(fields):
				raise TypeError('Expected {} field names, got {}'.format(len(self.FieldNames),len(fields)))
			super(segment,self).__init__(zip(self.FieldNames,(field for field in fields)))
		else:
			super(segment,self).__init__(zip(('{}{:02d}'.format(self.header,FieldNum) for FieldNum in range(1,len(fields)+1)),fields))
		# self._text=self.delimiters[1].join(self.values())
		# self._text=''
		self.closed=False
		
	def add(self, text, delimiters=None):
		if self.closed:
			return TypeError
		
	def next(self):
		if self._next and not self._next.closed:
			return self._next
		else:
			raise StopIteration

	def splitDate(self,datestring):
		try:
			if len(datestring)==10:
				return date(year=int(datestring[:4]),month=int(datestring[5:7]),day=int(datestring[8:10]))
			if len(datestring)==6:
				return date(year=int(datestring[:2]),month=int(datestring[2:4]),day=int(datestring[4:6]))
			elif len(datestring)==8:
				return date(year=int(datestring[:4]),month=int(datestring[4:6]),day=int(datestring[6:8]))
			else:
				raise TypeError
		except:
			logging.debug('Date error. Date string: ',datestring)
			raise

	def splitTime(self,timestring):
		try:
			if len(timestring)==10:
				return date(year=int(timestring[:4]),month=int(timestring[5:7]),day=int(timestring[8:10]))
			else:
				raise TypeError
		except:
			logging.debug('Date error. Date string: ',timestring)
			raise

	@property
	def text(self):
		return self._text
	@text.setter
	def text(self, value):
		if self.closed:
			raise TypeError
		else:
			self._text=value
			if self.delimiters:
				self.parse()

	@property
	def delimiters(self):
		return self._delimiters

	@property
	def header(self):
		return self._header

	@property
	def footer(self):
		return self._footer
	
	@property
	def FieldNames(self):
		return None

class document(object):
	"""A container holding contents of a single EDI envelope."""
	def __init__(self, _input=None, headersOnly=False):
		super(document,self).__init__()
		self.headersOnly=headersOnly
		if _input:
			self.input=_input
			self.start()
	def __str__(self):
		return self.interchange.__str__()
	def __hash__(self):
		return hash(tuple([self.interchange]))
	def __eq__(self, other):
		return other and self.interchange == other.interchange
	def __ne__(self, other):
		return not self.__eq__(other)
	@property
	def delimiters(self):
		return self.interchange.delimiters
	@property
	def closed(self):
		return self.interchange.closed
	@property
	def input(self):
		if not self._input or self._input.closed:
			raise OSError
		return self._input
	@input.setter
	def input(self, value):
		self._input=StringIO(value)
	def start(self):
		try:
			self.interchange=interchange(self.input.read(106))
		except:
			raise OSError('Could not find delimiters.')

		for segment in self.segments():
			self.add(segment)
	def add(self, seg):
		if self.interchange:
			if self.headersOnly:
				if self.Group and self.Document_Type:
					return
			self.interchange.add(segment(text=seg, delimiters=self.delimiters))
		else:
			if 'ISA' in seg[:106]:
				self.interchange=interchange(seg)
				if self.interchange.header!='ISA':
					raise TypeError
				return
			else:
				raise TypeError
	def segments(self):
		seg = array('c')
		for i in self.input.read():
			if i == "": break
			if i == '\0': continue
			if i == self.delimiters[0]:
			# End of segment found, exit the loop and return the segment.
				yield seg.tostring()
				seg = array('c')
			elif i != '\n':
				try:
					seg.append(i)
				except TypeError:
					raise self.BadFile('Corrupt characters found in data or unexpected EOF')
	@property
	def Document_Type(self):
		try:
			return self.Transactions[0]['Document Type']
		except:
			return
	@property
	def Interchange(self):
		try:
			return self.interchange
		except:
			return
	@property
	def Envelope(self):
		return self.Interchange
	@property
	def Group(self):
		try:
			return self.interchange.groups[0]
		except:
			return
	@property
	def Groups(self):
		return self.interchange.groups
	@property
	def Transactions(self):
		try:
			return self.Group.transactions
		except:
			return

class interchange(segment):
	def __init__(self, text):
		super(interchange,self).__init__(text=None, delimiters=None)
		self._text=text
		self._delimiters=self.delimiters
		self.parse()

		self.groups=list()
	def __str__(self):
		return '{}->{},Ctrl={}'.format(self['Sender ID'],self['Receiver ID'],self['Control Number'])
	def __hash__(self):
		return hash(tuple([self.Control_Number,self.Sender_ID,self.Reciever_ID,len(self.groups)]))
	def __eq__(self, other):
		return other and self.Control_Number == other.Control_Number and self.Sender_ID == other.Sender_ID and self.Reciever_ID == other.Reciever_ID and len(self.groups) == len(other.groups)
	def __ne__(self, other):
		return not self.__eq__(other)
	@property
	def FieldNames(self):
		return ('Authorization Information Qualifier','Authorization Information','Security Information Qualifier','Security Information',
			'Sender Qualifier','Sender','Receiver Qualifier','Receiver','Date','Time','Standard',
			'Version','Control Number','Acknowledgement Requested','Test Indicator','Subelement Seperator')
	@property
	def footer(self):
		return 'IEA'
	def add(self, segment):
		super(interchange,self).add(segment)
		if segment.header==self.footer:
			# if segment[1] == len(self.groups) and segment[2]==self.Control_Number:
			if int(segment['IEA01']) == len(self.groups):
				self.closed=True
			else:
				raise TypeError('Failed interchange check. Found {} groups but should have {}'.format(len(self.interchanges),int(segment['IEA01'])))
		else:
			try:
				self.next().add(segment)
			except StopIteration:
				self.groups.append(group(segment.text,self.delimiters))
				self._next=self.groups[-1]
	def parse(self):
		super(interchange,self).parse()

		# Map fields
		self['Control Number']=format(int(self['Control Number']),'>09')
		self['DateTime']=datetime.strptime(' '.join([self['Date'],self['Time']]),'%y%m%d %H%M')
		del self['Date']
		del self['Time']
		
		if self['Authorization Information Qualifier']=='00':
			del self['Authorization Information Qualifier']
			self['Authorization Information']=True
		
		if self['Security Information Qualifier']=='00':
			del self['Security Information Qualifier']
			self['Security Information']=True
		
		self['Receiver ID']='{}-{}'.format(self['Receiver Qualifier'],self['Receiver'].strip())
		del self['Receiver Qualifier']
		del self['Receiver']
		
		self['Sender ID']='{}-{}'.format(self['Sender Qualifier'],self['Sender'].strip())
		del self['Sender Qualifier']
		del self['Sender']
		
		del self['Subelement Seperator']
		
		if self['Acknowledgement Requested']=='0':
			self['Acknowledgement Requested']=False
		else:
			self['Acknowledgement Requested']=True
		
		if self['Test Indicator']=='P':
			self['Test Indicator']=False
		elif self['Test Indicator']=='T':
			self['Test Indicator']=True
		else:
			self['Test Indicator']=True
	@property
	def delimiters(self):
		if not self._delimiters:
			version = self.text[84:89]
			delimiters = [self.text[105], self.text[3], self.text[104]]
			
			if version >= '00405':
				delimiters = [self.text[105], self.text[3], self.text[104], self.text[83]]
				
			# Verify that the delimiters are valid.
			for delim in delimiters:
				if delim.isalnum():
					raise OSError('{} is not a valid delimiter'.format(delim))
				
			self._delimiters=delimiters
		return self._delimiters
	@delimiters.setter
	def delimiters(self, value):
		pass
class group(segment):
	def __init__(self, text, delimiters):
		super(group,self).__init__(text=text, delimiters=delimiters)
		self.transactions=list()
	def __str__(self):
		return '{}->{},Ctrl={}'.format(self['Sender ID'],self['Receiver ID'],self['Control Number'])
	def __hash__(self):
		return hash(tuple([self['Control Number'],self['Sender ID'],self['Receiver ID'],len(self.transactions)]))
	def __eq__(self, other):
		return other and self['Control Number'] == other['Control Number'] and self['Sender ID'] == other['Sender ID'] and self['Receiver ID'] == other['Receiver ID'] and len(self.transactions) == len(other.transactions)
	def __ne__(self, other):
		return not self.__eq__(other)
	@property
	def FieldNames(self):
		return ('Functional ID','Sender ID','Receiver ID','Date','Time','Control Number','Responsible Agency','Version')
	@property
	def footer(self):
		return 'GE'
	def parse(self):
		super(group,self).parse()

		# Map fields
		self['Control Number']=int(self['Control Number'])
		if self['Version'][2:6]=='3020':
			self['DateTime']=datetime.strptime(' '.join([self['Date'],self['Time']]),'%y%m%d %H%M')
		else:
			self['DateTime']=datetime.strptime(' '.join([self['Date'],self['Time']]),'%Y%m%d %H%M')
		del self['Date']
		del self['Time']
	def add(self, segment):
		if segment.header==self.footer:
			# if segment[1] == len(self.transactions) and segment[2]==self.Control_Number:
			if int(segment['GE01']) == len(self.transactions):
				self.closed=True
			else:
				raise TypeError('Failed group check. Found {} transactions but should have {}'.format(len(self.transactions),int(segment['GE01'])))
		else:
			try:
				super(group,self).next().add(segment)
			except StopIteration:
				self.transactions.append(transaction(segment.text, segment.delimiters))
				if len(self.transactions) % 5 == 0: logging.info('Wrote {} transactions'.format(len(self.transactions)))
				self._next=self.transactions[-1]
class transaction(segment):
	def __init__(self, text, delimiters):
		super(transaction,self).__init__(text=text, delimiters=delimiters)
		self.lines=list()
	def __str__(self):
		return '{} Ctrl={}'.format(self['Document Type'],self['Control Number'])
	def __hash__(self):
		return hash(tuple([self['Document Type'],self['Control Number'],len(self.lines)]))
	def __eq__(self, other):
		return other and self['Document Type'] == other['Document Type'] and self['Control Number'] == other['Control Number'] and len(self.lines) == len(other.lines)
	def __ne__(self, other):
		return not self.__eq__(other)
	@property
	def FieldNames(self):
		return ('Document Type','Control Number')
	@property
	def footer(self):
		return 'SE'
	def parse(self):
		super(transaction,self).parse()
		
		# Map fields
		self['Control Number']=int(self['Control Number'])
	def add(self, segment):
		if segment.header==self.footer:
			# if segment[1] == len(self.lines) and segment[2]==self.Control_Number:
			if int(segment['SE01']) == (len(self.lines) + 2): # add two lines for the header and footer lines
				self.closed=True
				self._next=None
			else:
				raise TypeError('Failed transaction check. Found {} lines but should have {}'.format(len(self.lines),int(segment['SE01'])))
		else:
			self.lines.append(segment)
			if len(self.lines) % 1000==0: logging.info('Wrote {} lines'.format(len(self.lines)))

class record(document):
	def __init__(self, headersOnly=False):
		super(record, self).__init__(_input=None, headersOnly=headersOnly)
	@property
	def Manafacturer_ID(self):
		if self.Document_Type == '852':
			return self.Groups[0]['Receiver ID']
		elif self.Document_Type == '856':
			return self.Groups[0]['Sender ID']
		else:
			return self.Groups[0]['Sender ID']
	@property
	def Retailer_ID(self):
		if self.Document_Type == '852':
			return self.Groups[0]['Sender ID']
		elif self.Document_Type == '856':
			return self.Groups[0]['Receiver ID']
		else:
			return self.Groups[0]['Receiver ID']

#	elif (elements[0] == 'XQ' and self.DocumentType == '852'):
#		self.StartDate=elements[2]
#		if len(elements) == 3: # If there is only one ship date.
#			self.EndDate=elements[2]
#		else:
#			self.EndDate=elements[3]
#		# break
#	elif (elements[0] == 'DTM' and self.DocumentType == '856'):
#		if elements[1] == '011': # This represents the ship date.
#			self.EndDate = self.StartDate = elements[2]
#			# break		# ignore the rest of the file.

	@property
	def Start_Date(self):
		return self.Groups[0]['DateTime'].strftime('%Y%m%d')
	@property
	def End_Date(self):
		return self.Groups[0]['DateTime'].strftime('%Y%m%d')
class file(record):
	def __init__(self, filepath, headersOnly=False):
		super(file,self).__init__(headersOnly)
		self._input=open(filepath,'rb')
		self.File_Path=filepath
		self.File_Size=getsize(self.File_Path)
		self.start()
	@property
	def Directory_Name(self):
		return split(self.File_Path)[0]
	@property
	def File_Name(self):
		return split(self.File_Path)[1]
	def __hash__(self):
		return hash(self.File_Path,self.File_Size)
	def __eq__(self, other):
		return other and self.File_Path == other.File_Path and self.File_Size == other.File_Size
	def __ne__(self, other):
		return not self.__eq__(other)
	def __str__(self):
		return self.File_Path


More information about the pypy-issue mailing list