RFC complient mimetools...
Lance Ellinghaus
Lance_Ellinghaus at marshall.com
Wed Sep 29 12:38:05 EDT 1999
There is no need to use uuencode/uudecode as a pipe. The uuencode and uudecode
programs
are available as the uu.py module and will run on any platform.
Lance Ellinghaus
-------From: dragondm on 9/29/99 7:10:07 AM-------
From: dragondm
To: python-list
cc:
Subject: RFC complient mimetools...
Well, i've added rfc-complient mime-type parameter parsing,
and content-disposition header support to the mimetools module.
It seems to work, but if someone else could examine to see if my
sleep-deprived brain muddledd something up, I'd be happy :>
I'm including a copy here...
(yes, it's o-dark-thiry, I'm lucky to operate knews much less diff :> )
--- begin mimetools.py
# Various tools used by MIME-reading or MIME-writing programs.
import os
import rfc822
import string
import tempfile
# A derived class of rfc822.Message that knows about MIME headers and
# contains some hooks for decoding encoded and multipart messages.
class Message(rfc822.Message):
def __init__(self, fp, seekable = 1):
rfc822.Message.__init__(self, fp, seekable)
self.encodingheader = \
self.getheader('content-transfer-encoding')
self.typeheader = \
self.getheader('content-type')
self.dispheader = \
self.getheader('content-disposition')
self.parsetype()
self.parseplist()
def parsetype(self):
str = self.typeheader
if str == None:
str = 'text/plain'
if ';' in str:
i = string.index(str, ';')
self.plisttext = str[i:]
str = str[:i]
else:
self.plisttext = ''
fields = string.splitfields(str, '/')
for i in range(len(fields)):
fields[i] = string.lower(string.strip(fields[i]))
self.type = string.joinfields(fields, '/')
self.maintype = fields[0]
self.subtype = string.joinfields(fields[1:], '/')
if self.dispheader:
if ';' in self.dispheader:
i = string.index(self.dispheader, ';')
self.dplisttext = self.dispheader[i:]
self.disposition = self.dispheader[:i]
else:
self.dplisttext = ''
self.disposition = self.dispheader
else:
self.dplisttext = ''
self.disposition = None
def parseplist(self):
self.plist=[]
self.type_params=parseplist(self.plisttext)
for each in self.type_params.items():
self.plist.append("%s=%s" % each)
self.disp_params = parseplist(self.dplisttext)
def getplist(self):
return self.plist
def gettype_params(self):
return self.type_params
def getdisp_params(self):
return self.disp_params
def gettype_param(self, name):
name = string.lower(name)
if self.type_params.has_key(name):
return self.type_params[name]
else:
return None
def getdisp_param(self, name):
name = string.lower(name)
if self.disp_params.has_key(name):
return self.type_params[name]
else:
return None
def getdisp_paramnames(self):
return self.disp_params.keys()
def gettype_paramnames(self):
return self.type_params.keys()
# for backwards compatibillity -ddm
getparam = gettype_param
getparamnames = gettype_paramnames
def getencoding(self):
if self.encodingheader == None:
return '7bit'
return string.lower(self.encodingheader)
def getdisposition(self):
return self.disposition
def gettype(self):
return self.type
def getmaintype(self):
return self.maintype
def getsubtype(self):
return self.subtype
# Utility functions
# -----------------
#parses a MIME parameter list
def parseplist(pstr):
""" Parse a MIME parameter list.
Should comply to rfc2045, rfc2231.
-- The Dragon De Monsyne <dragondm at integral.org>
Will decode rfc2231 % escapes and param 'paramname*<number>'
folding. Rfc 822 comments should be properly ignored. Unquoted
whitespace is also ignored.
this parser tries to be pathalogically tolerant.
NOTES:
(1) Parameter names might still have a * on the end to indicate
that their values use the rfc2231 ' notation to specify charset and
language. % escapes in param value _HAVE_ been decoded, tho.
(2) This function returns a dictionary whose keys are param names,
(mapped to lower case) and whose values are the matching param values.
This is different to the mimetools Message class's parseplist()
which creates a list of "param=value" strings.
IMHO, that format is unneccesary, and simply makes more work for the
programmer, as: a) MIME parameter lists are explicitly unordered
(so sayeth the rfc) and the only other reason to use a list, multiple
parameters with the same name, while not explicitly disallowed, would
implicitly violate rfc2231 (to explain that: rfc2231 states that
parameters may be broken-up (i.e. by user-agents) across multiple
parameters by using a special syntax (paramname*sequence-number),
and that this syntax must be transparent to MIME. It also states
that some systems (e.g IMAP servers) must automatically decode such
encoding for certain parameters. Since MIME parameter lists are
unordered, multiple parameters with the same name would break this
syntax, if two such parameters were 'folded', there would be no way
of decoding what part went to which.)
"""
comment=0
params={}
thischr=''
lastchr=''
p=[]
v=[]
isValue=0
pstrlist=list(pstr)
pstrlist.reverse()
while pstrlist:
lastchr=thischr
thischr=pstrlist.pop()
if thischr == ';':
ppstr=string.lower(string.strip(string.join(p,'')))
vpstr=string.join(v,'')
if ppstr:
params[ppstr]=vpstr
p=[]
v=[]
isValue=0
elif thischr =='(':
comment=comment+1
while pstrlist and comment:
lastchr=thischr
thischr = pstrlist.pop()
if thischr == '(' and lastchr <>'\\':
comment=comment+1
elif thischr == ')' and lastchr <>'\\':
comment=comment-1
elif thischr =='=' :
if len(p) >0:
if p[-1] == '*':
# rfc 2231 describes different semantics
# for extended param values. Amongst other things
# quoted strings aren't allowed, but % escapes are. -ddm
#print "2: %s %s" % (str(p),p[-1])
isValue=2
else:
#print "1: %s %s" % (str(p),p[-1])
isValue=1
elif thischr=='"' and isValue==1 :
while pstrlist:
lastchr = thischr
thischr=pstrlist.pop()
if thischr == '"' and lastchr<>'\\':
break
if thischr == '\\' and lastchr <>'\\':
pass
else:
v.append(thischr)
elif thischr=='%' and isValue==2 :
xd1=''
xd2=''
try:
xd1=pstrlist.pop()
xd2=pstrlist.pop()
try:
v.append(chr(string.atoi(xd1+xd2,16)))
except ValueError:
#some bogus % escape ??
# mebbe someone fergot to quote something
# pass it as it. -ddm
v.append('%')
v.append(xd1)
v.append(xd2)
lastchr=xd2
except IndexError:
#blagh. Something got truncated, or bad % escape
#pass as-is -ddm
v.append('%')
lastchr='%'
if xd1:
v.append(xd1)
lastchr=xd1
elif thischr in string.whitespace:
pass
else:
if isValue:
v.append(thischr)
else:
p.append(thischr)
ppstr=string.lower(string.strip(string.join(p,'')))
vpstr=string.join(v,'')
if ppstr:
params[ppstr]=vpstr
fparams={}
#undo rfc2231 param folding. -ddm
for each in params.keys():
if each[-1]=='*':
paramname=each[:-1]
else:
paramname=each
if '*' in paramname:
pl=string.split(paramname,'*',1)
if len(pl) <>2:
break
try:
paramnum=string.atoi(pl[1],10)
except ValueError:
pass
else:
if paramnum==0:
if each[-1]=='*':
if not fparams.has_key(pl[0]+"*"):
if fparams.has_key(pl[0]):
fparams[pl[0]+"*"]=fparams[pl[0]]
del fparams[pl[0]]
else:
fparams[pl[0]+"*"]={}
fparams[pl[0]+'*'][0]=params[each]
else:
if not fparams.has_key(pl[0]):
fparams[pl[0]]={}
fparams[pl[0]][0]=params[each]
else:
if fparams.has_key(pl[0]+'*'):
fparams[pl[0]+'*'][paramnum]=params[each]
else:
if not fparams.has_key(pl[0]):
fparams[pl[0]]={}
fparams[pl[0]][paramnum]=params[each]
del params[each]
for each in fparams.keys():
l=fparams[each].keys()
l.sort()
val=""
for i in l:
val=val+fparams[each][i]
params[each]=val
return params
#def test_parseplist():
# print parseplist('; this=that ;theotherthing="some thing here; really";
foo=bar')
# print parseplist('this=that; foo=bar')
# print parseplist(""";title*0*=us-ascii'en'This%20is%20even%20more%20
# ;title*1*=%2A%2A%2Afun%2A%2A%2A%20
# ;title*2="isn't it!" """)
# print parseplist('this=that (A comment; Really) ; foo=bar')
# print parseplist('this="that (hi!)"; (A comment; Really) ; foo=bar')
# Return a random string usable as a multipart boundary.
# The method used is so that it is *very* unlikely that the same
# string of characters will every occur again in the Universe,
# so the caller needn't check the data it is packing for the
# occurrence of the boundary.
#
# The boundary contains dots so you have to quote it in the header.
_prefix = None
def choose_boundary():
global _prefix
import time
import random
if _prefix == None:
import socket
import os
hostid = socket.gethostbyname(socket.gethostname())
try:
uid = `os.getuid()`
except:
uid = '1'
try:
pid = `os.getpid()`
except:
pid = '1'
_prefix = hostid + '.' + uid + '.' + pid
timestamp = '%.3f' % time.time()
seed = `random.randint(0, 32767)`
return _prefix + '.' + timestamp + '.' + seed
# Subroutines for decoding some common content-transfer-types
def decode(input, output, encoding):
if encoding == 'base64':
import base64
return base64.decode(input, output)
if encoding == 'quoted-printable':
import quopri
return quopri.decode(input, output)
if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
import uu
return uu.decode(input, output)
if decodetab.has_key(encoding):
pipethrough(input, decodetab[encoding], output)
else:
raise ValueError, \
'unknown Content-Transfer-Encoding: %s' % encoding
def encode(input, output, encoding):
if encoding == 'base64':
import base64
return base64.encode(input, output)
if encoding == 'quoted-printable':
import quopri
return quopri.encode(input, output, 0)
if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
import uu
return uu.encode(input, output)
if encodetab.has_key(encoding):
pipethrough(input, encodetab[encoding], output)
else:
raise ValueError, \
'unknown Content-Transfer-Encoding: %s' % encoding
# The following is no longer used for standard encodings
# XXX This requires that uudecode and mmencode are in $PATH
uudecode_pipe = '''(
TEMP=/tmp/@uu.$$
sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
cat $TEMP
rm $TEMP
)'''
decodetab = {
'uuencode': uudecode_pipe,
'x-uuencode': uudecode_pipe,
'uue': uudecode_pipe,
'x-uue': uudecode_pipe,
'quoted-printable': 'mmencode -u -q',
'base64': 'mmencode -u -b',
}
encodetab = {
'x-uuencode': 'uuencode tempfile',
'uuencode': 'uuencode tempfile',
'x-uue': 'uuencode tempfile',
'uue': 'uuencode tempfile',
'quoted-printable': 'mmencode -q',
'base64': 'mmencode -b',
}
def pipeto(input, command):
pipe = os.popen(command, 'w')
copyliteral(input, pipe)
pipe.close()
def pipethrough(input, command, output):
tempname = tempfile.mktemp()
try:
temp = open(tempname, 'w')
except IOError:
print '*** Cannot create temp file', `tempname`
return
copyliteral(input, temp)
temp.close()
pipe = os.popen(command + ' <' + tempname, 'r')
copybinary(pipe, output)
pipe.close()
os.unlink(tempname)
def copyliteral(input, output):
while 1:
line = input.readline()
if not line: break
output.write(line)
def copybinary(input, output):
BUFSIZE = 8192
while 1:
line = input.read(BUFSIZE)
if not line: break
output.write(line)
--- end mimetools.py
--
-The Dragon De Monsyne
More information about the Python-list
mailing list