[Python-checkins] CVS: python/nondist/sandbox/Lib README,NONE,1.1 davlib.py,NONE,1.1 httpx.py,NONE,1.1
Greg Stein
gstein@users.sourceforge.net
Mon, 10 Sep 2001 18:27:40 -0700
Update of /cvsroot/python/python/nondist/sandbox/Lib
In directory usw-pr-cvs1:/tmp/cvs-serv17467
Added Files:
README davlib.py httpx.py
Log Message:
Initial checkin of some files:
* README: describe this directory and its contents
* davlib.py: current, published davlib (only tweaked the header)
* httpx.py: initial draft from some coding over the weekend (incomplete,
untested, and it doesn't even load :-)
--- NEW FILE: README ---
This directory is for modules that are intended to go into the main Lib
directory of Python. They can be developed here until they are ready for
evaluation for inclusion into Python itself.
(this prevents iteration of development within the core, yet also provides
for public development of (new) modules)
Note: a module's presence here does not mean it *will* go into Lib, but
merely that (should it be accepted) the appropriate place is Lib.
--- NEW FILE: davlib.py ---
#
# DAV client library
#
# ### docco
#
import httplib
import urllib
import string
import types
import mimetypes
import qp_xml
INFINITY = 'infinity'
XML_DOC_HEADER = '<?xml version="1.0" encoding="utf-8"?>'
XML_CONTENT_TYPE = 'text/xml; charset="utf-8"'
# block size for copying files up to the server
BLOCKSIZE = 16384
class HTTPConnectionAuth(httplib.HTTPConnection):
def __init__(self, *args, **kw):
apply(httplib.HTTPConnection.__init__, (self,) + args, kw)
self.__username = None
self.__password = None
self.__nonce = None
self.__opaque = None
def setauth(self, username, password):
self.__username = username
self.__password = password
def _parse_status(elem):
text = elem.textof()
idx1 = string.find(text, ' ')
idx2 = string.find(text, ' ', idx1+1)
return int(text[idx1:idx2]), text[idx2+1:]
class _blank:
def __init__(self, **kw):
self.__dict__.update(kw)
class _propstat(_blank): pass
class _response(_blank): pass
class _multistatus(_blank): pass
def _extract_propstat(elem):
ps = _propstat(prop={}, status=None, responsedescription=None)
for child in elem.children:
if child.ns != 'DAV:':
continue
if child.name == 'prop':
for prop in child.children:
ps.prop[(prop.ns, prop.name)] = prop
elif child.name == 'status':
ps.status = _parse_status(child)
elif child.name == 'responsedescription':
ps.responsedescription = child.textof()
### unknown element name
return ps
def _extract_response(elem):
resp = _response(href=[], status=None, responsedescription=None, propstat=[])
for child in elem.children:
if child.ns != 'DAV:':
continue
if child.name == 'href':
resp.href.append(child.textof())
elif child.name == 'status':
resp.status = _parse_status(child)
elif child.name == 'responsedescription':
resp.responsedescription = child.textof()
elif child.name == 'propstat':
resp.propstat.append(_extract_propstat(child))
### unknown child element
return resp
def _extract_msr(root):
if root.ns != 'DAV:' or root.name != 'multistatus':
raise 'invalid response: <DAV:multistatus> expected'
msr = _multistatus(responses=[ ], responsedescription=None)
for child in root.children:
if child.ns != 'DAV:':
continue
if child.name == 'responsedescription':
msr.responsedescription = child.textof()
elif child.name == 'response':
msr.responses.append(_extract_response(child))
### unknown child element
return msr
def _extract_locktoken(root):
if root.ns != 'DAV:' or root.name != 'prop':
raise 'invalid response: <DAV:prop> expected'
elem = root.find('lockdiscovery', 'DAV:')
if not elem:
raise 'invalid response: <DAV:lockdiscovery> expected'
elem = elem.find('activelock', 'DAV:')
if not elem:
raise 'invalid response: <DAV:activelock> expected'
elem = elem.find('locktoken', 'DAV:')
if not elem:
raise 'invalid response: <DAV:locktoken> expected'
elem = elem.find('href', 'DAV:')
if not elem:
raise 'invalid response: <DAV:href> expected'
return elem.textof()
class DAVResponse(httplib.HTTPResponse):
def parse_multistatus(self):
self.root = qp_xml.Parser().parse(self)
self.msr = _extract_msr(self.root)
def parse_lock_response(self):
self.root = qp_xml.Parser().parse(self)
self.locktoken = _extract_locktoken(self.root)
class DAV(HTTPConnectionAuth):
response_class = DAVResponse
def get(self, url, extra_hdrs={ }):
return self._request('GET', url, extra_hdrs=extra_hdrs)
def head(self, url, extra_hdrs={ }):
return self._request('HEAD', url, extra_hdrs=extra_hdrs)
def post(self, url, data={ }, body=None, extra_hdrs={ }):
headers = extra_hdrs.copy()
assert body or data, "body or data must be supplied"
assert not (body and data), "cannot supply both body and data"
if data:
body = ''
for key, value in data.items():
if isinstance(value, types.ListType):
for item in value:
body = body + '&' + key + '=' + urllib.quote(str(item))
else:
body = body + '&' + key + '=' + urllib.quote(str(value))
body = body[1:]
headers['Content-Type'] = 'application/x-www-form-urlencoded'
return self._request('POST', url, body, headers)
def options(self, url='*', extra_hdrs={ }):
return self._request('OPTIONS', url, extra_hdrs=extra_hdrs)
def trace(self, url, extra_hdrs={ }):
return self._request('TRACE', url, extra_hdrs=extra_hdrs)
def put(self, url, contents,
content_type=None, content_enc=None, extra_hdrs={ }):
if not content_type:
content_type, content_enc = mimetypes.guess_type(url)
headers = extra_hdrs.copy()
if content_type:
headers['Content-Type'] = content_type
if content_enc:
headers['Content-Encoding'] = content_enc
return self._request('PUT', url, contents, headers)
def delete(self, url, extra_hdrs={ }):
return self._request('DELETE', url, extra_hdrs=extra_hdrs)
def propfind(self, url, body=None, depth=None, extra_hdrs={ }):
headers = extra_hdrs.copy()
headers['Content-Type'] = XML_CONTENT_TYPE
if depth is not None:
headers['Depth'] = str(depth)
return self._request('PROPFIND', url, body, headers)
def proppatch(self, url, body, extra_hdrs={ }):
headers = extra_hdrs.copy()
headers['Content-Type'] = XML_CONTENT_TYPE
return self._request('PROPPATCH', url, body, headers)
def mkcol(self, url, extra_hdrs={ }):
return self._request('MKCOL', url, extra_hdrs=extra_hdrs)
def move(self, src, dst, extra_hdrs={ }):
headers = extra_hdrs.copy()
headers['Destination'] = dst
return self._request('MOVE', src, extra_hdrs=headers)
def copy(self, src, dst, depth=None, extra_hdrs={ }):
headers = extra_hdrs.copy()
headers['Destination'] = dst
if depth is not None:
headers['Depth'] = str(depth)
return self._request('COPY', src, extra_hdrs=headers)
def lock(self, url, owner='', timeout=None, depth=None,
scope='exclusive', type='write', extra_hdrs={ }):
headers = extra_hdrs.copy()
headers['Content-Type'] = XML_CONTENT_TYPE
if depth is not None:
headers['Depth'] = str(depth)
if timeout is not None:
headers['Timeout'] = timeout
body = XML_DOC_HEADER + \
'<DAV:lockinfo xmlns:DAV="DAV:">' + \
'<DAV:lockscope><DAV:%s/></DAV:lockscope>' % scope + \
'<DAV:locktype><DAV:%s/></DAV:locktype>' % type + \
owner + \
'</DAV:lockinfo>'
return self._request('LOCK', url, body, extra_hdrs=headers)
def unlock(self, url, locktoken, extra_hdrs={ }):
headers = extra_hdrs.copy()
if locktoken[0] != '<':
locktoken = '<' + locktoken + '>'
headers['Lock-Token'] = locktoken
return self._request('UNLOCK', url, extra_hdrs=headers)
def _request(self, method, url, body=None, extra_hdrs={}):
"Internal method for sending a request."
self.request(method, url, body, extra_hdrs)
return self.getresponse()
#
# Higher-level methods for typical client use
#
def allprops(self, url, depth=None):
return self.propfind(url, depth=depth)
def propnames(self, url, depth=None):
body = XML_DOC_HEADER + \
'<DAV:propfind xmlns:DAV="DAV:"><DAV:propname/></DAV:propfind>'
return self.propfind(url, body, depth)
def getprops(self, url, *names, **kw):
assert names, 'at least one property name must be provided'
if kw.has_key('ns'):
xmlns = ' xmlns:NS="' + kw['ns'] + '"'
ns = 'NS:'
del kw['ns']
else:
xmlns = ns = ''
if kw.has_key('depth'):
depth = kw['depth']
del kw['depth']
else:
depth = 0
assert not kw, 'unknown arguments'
body = XML_DOC_HEADER + \
'<DAV:propfind xmlns:DAV="DAV:"' + xmlns + '><DAV:prop><' + ns + \
string.joinfields(names, '/><' + ns) + \
'/></DAV:prop></DAV:propfind>'
return self.propfind(url, body, depth)
def delprops(self, url, *names, **kw):
assert names, 'at least one property name must be provided'
if kw.has_key('ns'):
xmlns = ' xmlns:NS="' + kw['ns'] + '"'
ns = 'NS:'
del kw['ns']
else:
xmlns = ns = ''
assert not kw, 'unknown arguments'
body = XML_DOC_HEADER + \
'<DAV:propertyupdate xmlns:DAV="DAV:"' + xmlns + \
'><DAV:remove><DAV:prop><' + ns + \
string.joinfields(names, '/><' + ns) + \
'/></DAV:prop></DAV:remove></DAV:propertyupdate>'
return self.proppatch(url, body)
def setprops(self, url, *xmlprops, **props):
assert xmlprops or props, 'at least one property must be provided'
xmlprops = list(xmlprops)
if props.has_key('ns'):
xmlns = ' xmlns:NS="' + props['ns'] + '"'
ns = 'NS:'
del props['ns']
else:
xmlns = ns = ''
for key, value in props.items():
if value:
xmlprops.append('<%s%s>%s</%s%s>' % (ns, key, value, ns, key))
else:
xmlprops.append('<%s%s/>' % (ns, key))
elems = string.joinfields(xmlprops, '')
body = XML_DOC_HEADER + \
'<DAV:propertyupdate xmlns:DAV="DAV:"' + xmlns + \
'><DAV:set><DAV:prop>' + \
elems + \
'</DAV:prop></DAV:set></DAV:propertyupdate>'
return self.proppatch(url, body)
def get_lock(self, url, owner='', timeout=None, depth=None):
response = self.lock(url, owner, timeout, depth)
response.parse_lock_response()
return response.locktoken
--- NEW FILE: httpx.py ---
"""HTTP Extended Functionality
### docco...
"""
import httplib
import re
import base64
_DEFAULT_BODY_SAVE_LIMIT = 100000
_DEFAULT_RETRY_LIMIT = 3
#
# Some various security levels for ordering authentication schemes. These
# are for reference purposes. Authentication schemes can be relative to
# these values for fine-grained control of ordering. (for example, 310 would
# mark a scheme as stronger than Digest, but not as good as public key)
#
SECURITY_NONE = 100 # plain text
SECURITY_LOW = 200 # obscured (Basic)
SECURITY_MEDIUM = 300 # shared secret (Digest)
SECURITY_HIGH = 400 # public key
class Credentials:
def __init__(self):
self.__context = { } # (host, port, scheme, realm) -> context
self.__paths = { } # (host, port) -> { path -> { scheme : realm }}
def lookup_by_path(self, host, port, path):
"Return a map of scheme:context available at this path."
pathmap = self.__paths.get((host, port))
if not pathmap:
return { }
### need to do common-prefix type stuff here
p = pathmap.get(path)
if not p:
return { }
ctxs = { }
for scheme, realm in p.items():
c = self.__context.get((host, port, scheme, realm))
if c:
ctxs[scheme] = c
return ctxs
def lookup_by_realm(self, host, port, scheme, realm):
return self.__context.get((host, port, scheme, realm))
def add_path(self, host, port, path, scheme, realm):
pathmap = self.__paths.get((host, port))
if pathmap:
p = pathmap.get(path)
if p:
p[scheme] = realm
else:
pathmap[path] = { scheme : realm }
else:
self.__paths[host, port] = { path : { scheme : realm } }
def save_context(self, host, port, scheme, realm, context):
self.__context[host, port, scheme, realm] = context
class SimpleCredentials(Credentials):
def __init__(self, username, password):
self.__username = username
self.__password = password
def get_userpass(self):
return self.__username, self.__password
class Authenticator:
def get_context(self, conn, realm, cred):
return cred.lookup_by_realm(conn.host, conn.port, self.scheme, realm)
def save_context(self, conn, realm, cred, ctx):
cred.save_context(conn.host, conn.port, self.scheme, realm, ctx)
class BasicAuthenticator(Authenticator):
scheme = 'basic'
security = SECURITY_LOW
def authenticate(self, conn, response, auth_params, cred):
realm = auth_params['realm']
ctx = self.get_context(conn, realm, cred)
if ctx:
# If we have already authenticated for this realm (a context is
# present), then there is no hard work. The context will be used
# during the next request.
### what to return? the context for storage?
return 'handled'
# we expect the credentials to support the 'get_userpass' method
### should we check for support? return 'unhandled' maybe? can None
### be returned, indicating "don't bother trying"?
user, pass = cred.get_userpass()
ctx = 'Basic ' + base64.encodestring(user + ':' + pass).strip()
self.save_context(conn, realm, cred, ctx)
### what to return? the context for storage?
return 'handled'
def apply_origin_auth(self, conn, ctx):
conn.putheader('Authorization', ctx)
def apply_proxy_auth(self, conn, ctx):
conn.putheader('Proxy-Authorization', ctx)
class DigestAuthenticator(Authenticator):
scheme = 'digest'
security = SECURITY_MEDIUM
def authenticate(self, conn, response, auth_params, cred):
pass
def apply_origin_auth(self, conn, ctx):
conn.putheader('Authorization', ctx)
def apply_proxy_auth(self, conn, ctx):
conn.putheader('Proxy-Authorization', ctx)
authenticators = {
'basic' : BasicAuthenticator,
'digest' : DigestAuthenticator,
}
class HandleAuthentication:
body_save_limit = _DEFAULT_BODY_SAVE_LIMIT
origin_retry_limit = _DEFAULT_RETRY_LIMIT
proxy_retry_limit = _DEFAULT_RETRY_LIMIT
def __init__(self):
self.origin_cred = self.proxy_cred = self.__req = None
self.authenticators = { }
def set_origin_credentials(self, cred):
self.origin_cred = cred
def set_proxy_credentials(self, cred):
self.proxy_cred = cred
def getresponse(self):
#
# Record the number of times we have seen each error response. We will
# retry only a limited number of times. For each error, record the
# (Proxy-)Authorization headers that we sent to avoid trying the
# same value multiple times.
#
# Note: It is entirely possible that the Credentials object will
# return different values each time (through prompts to the
# user, or trying different values from a database); thus, we
# can automatically retry multiple times.
#
seen_401 = seen_407 = 0
used_401 = { }
used_407 = { }
# keep trying to get a valid response
while 1:
r = super().getresponse()
### look for Digest's Authentication-Info header and handle it
### what should be the generalized mechanism for this? we need to
### allow and Authenticator to examine the response headers (even
### on successful hits), and update the information in the
### Credentials object.
# Remember the saved request in case we need it, and then clear
# it from our instance data (so we don't error out the next time
# a request is issued).
saved_request = self.__req
self.__req = None
#
# If a origin/proxy challenge was issued, then simply exit if
# we don't have the appropriate credentials. If we have exceeded
# the allowable retries, then raise an exception.
### we could also return the error'd response, but I think the
### exception makes more sense (we tried and failed to handle it;
### we aren't just reporting a status code from the server)
#
if r.status == 401:
if not self.origin_cred:
return r
seen_401 += 1
if seen_401 > self.origin_retry_limit:
return RetryLimitExceeded(r, self.origin_retry_limit)
hdr_name = 'www-authenticate'
cred = self.origin_cred
elif r.status == 407:
if not self.proxy_cred:
return r
seen_407 += 1
if seen_407 > self.proxy_retry_limit:
return RetryLimitExceeded(r, self.proxy_retry_limit)
hdr_name = 'proxy-authenticate'
cred = self.proxy_cred
else:
# no authentication challenge was issued
return r
# consume the rest of the response -- we don't its body
r.read()
# We should have information on the request that generated this
# response.
assert saved_request
auth_hdrs = r.msg.getallmatchingheaders(hdr_name)
if not auth_hdrs:
raise MissingAuthHeader(hdr_name)
challenges = _parse_challenges(_combine_header_lines(auth_hdrs))
for scheme, params in challenges:
# do this check so each authenticator doesn't have to
realm = params.get('realm')
if not realm:
raise MissingAuthRealm(ProtocolViolation)
# remember that this path is associated with this scheme/realm
cred.add_path(self.host, self.port, saved_request.url, scheme,
realm)
# get the authenticator and try it
actr = self.get_authenticator(scheme)
if actr:
handled = actr.authenticate(self, r, params, cred)
### do something with 'handled'
# If we didn't save the request, or the body was cleared
# out (it fell over the limit), then we cannot resend, so
# we should simply return to the caller. However, we have
# computed the authentication header for insertion into
# the request-sequence when the caller generates a new
# request.
if self.__req is None or self.__req.body is None:
self.__req = None # ensure it is no longer present
return r
### resend the saved_request. more needed here?
saved_request.resend(self)
# loop to get the response for the resent request
# end -- while 1:
# NOTREACHED
def putrequest(self, method, url):
super().putrequest(method, url)
### is it a problem if one is already there? shouldn't be --
### we're starting a new request, so whatever was there can't
### be applicable but maybe raise an error for out-of-sequence?
### it is quite possible that the prior request had no problems
### and we're simply seeing a second request. hmm... but
### getresponse should have cleared this. for now, let's raise
### an error, although I'm guessing we wouldn't reach here --
### httplib may have already raised a sequencing error. needs
### more thought...
if self.__req:
raise httplib.CannotSendRequest()
self.__req = _SavedRequest(method, url, self.body_save_limit)
if self.origin_cred:
ctxs = self.origin_cred.lookup_by_path(self.host, self.port, url)
if ctxs:
### order them. select one with an available authenticator.
actr =
ctx =
actr.apply_origin_auth(self, ctx)
if self.proxy_cred:
# Proxies do not have paths, so we just use '/'
ctxs = self.proxy_cred.lookup_by_path(self.host, self.port, '/')
if ctxs:
### order them. select one with an available authenticator.
actr =
ctx =
actr.apply_proxy_auth(self, ctx)
def putheader(self, header, value):
super().putheader(header, value)
### filter out the (Proxy-)Authorization headers?
self.__req.headers.append((header, value))
def endheaders(self):
super().endheaders()
self.__req.endheaders()
def send(self, data):
super().send(data)
self.__req.send(data)
def get_authenticator(self, scheme):
try:
return self.authenticators[scheme]
except KeyError:
pass
cls = _authenticators.get(scheme)
if cls:
actr = self.authenticators[scheme] = cls()
return actr
return None
def set_authenticator(self, scheme, actr):
self.authenticators[scheme] = actr
class _SavedRequest:
def __init__(self, method, url, limit=_DEFAULT_BODY_SAVE_LIMIT):
self.method = method
self.url = url
self.headers = [ ]
self.body = None
self.limit = limit
def endheaders(self):
### use a cStringIO?
self.body = ''
def send(self, data):
if self.body is not None:
if len(self.body) + len(data) > self.limit:
# Oops. We fell over the limit. Just shut off recording of
# the body.
self.body = None
else:
self.body += data
def resend(self, conn):
conn.putrequest(self.method, self.url)
for header, value in self.headers:
### filter out (Proxy-)Authorization headers?
conn.putheader(header, value)
conn.endheaders()
conn.send(self.body)
class UseProxy:
pass
def _combine_header_lines(lines):
parts = [ ]
for h in lines:
if h[0].isspace():
# continuation line. RFC 2616, S2.2: replace LWS with SP.
parts[-1] += ' ' + h.strip()
else:
# header line. RFC 2616, S4.2: combine with "," separator.
i = h.index(':')
parts.append(h[i+1:].strip())
return ','.join(parts)
# consume commas and LWS (skip over null elements)
_commas = re.compile(',([ \t,]*)')
def _parse_challenges(hdr):
# hdr = 1#challenge
ch = [ ]
# challenge = auth-scheme 1*SP 1#auth-param
scheme, hdr = _get_token(hdr, 'missing auth-scheme')
params = { }
# auth-param = token "=" ( token | quoted-string )
while hdr:
name, hdr = _get_token(hdr, 'missing auth-param name')
if not hdr:
raise IllegalAuthFormat('unknown token -- more text required')
if hdr[0] != '=':
# Completed a challenge. The token we just read was the scheme
# for the next challenge.
ch.append((scheme, params))
scheme = name
params = { }
continue
hdr = hdr[1:].lstrip()
if hdr[:1] == '"':
value, hdr = _get_quoted_string(hdr)
else:
value, hdr = _get_token(hdr, 'missing auth-param value')
params[name] = value
# nothing more. stop parsing.
if not hdr:
break
# consume commas between auth-params and between challenges
match = _commas.match(hdr)
if not match:
raise IllegalAuthFormat('missing comma')
hdr = hdr[match.end():]
# store the last challenge
ch.append((scheme, params))
return ch
# construct an RE for parsing a valid token
_token_chars = ''
_separators = '()<>@,;:\\"/[]?={} \t'
for i in range(32, 127):
c = chr(i)
if c not in _separators:
_token_chars += c
_lws = '[ \t]*'
_token_chars = '-' + _token_chars.replace('-', '') # tweak for regex
_token = re.compile('([' + _token_chars + ']+)' + _lws)
def _get_token(s, msg):
"Return a lower-cased token from S, and the remainder text."
match = _token.match(s)
if not match:
raise IllegalAuthFormat(msg)
return match.group(1).lower(), s[match.end():]
# TEXT = OCTET - CTL + LWS. also remove '\' for proper quoted-pair parsing.
_chars = range(32, 256)
_chars.remove(127)
_chars.remove(ord('\\'))
_chars.remove(ord('"'))
_chars.remove(ord('-'))
_chars.remove(ord(']'))
_qdtext = ''.join(map(chr, _chars))
_quoted_string = re.compile('"(([-\\]' + _qdtext + '\t]|\\\\.)*)"' + _lws)
_quoted_pair = re.compile(r'\\(.)')
def _get_quoted_string(s):
"Return a quoted-string from S, and the remainder text."
match = _quoted_string.match(s)
if not match:
raise IllegalAuthFormat('illegal quoted-string')
# need to strip the '\' characters, but watch for '\\'
qs = _quoted_pair.sub(r'\1', match.group(1))
return qs, s[match.end():]
class RetryLimitExceeded(httplib.HTTPException):
def __init__(self, response, limit_was=None):
self.response = response
self.limit_was = limit_was
### this should probably be in httplib
class ProtocolViolation(httplib.HTTPException):
pass
class MissingAuthHeader(ProtocolViolation):
def __init__(self, which):
self.which = which
class IllegalAuthFormat(ProtocolViolation):
def __init__(self, msg):
self.msg = msg
class MissingAuthRealm(ProtocolViolation):
pass
def test():
h = ['header: value1',
' value2a, value2b',
' \tvalue3',
'header: value4a, value4b',
'\tvalue5',
'header: value6',
]
print _combine_header_lines(h)
print _parse_challenges('basic a=b, c=d, e=f, digest g=h, i=j, , k=l')
print _parse_challenges('basic a="hello",,,b="hello\\"there"')
print _parse_challenges('basic a="hello",,,b="hello\\there"')
print _parse_challenges('basic a="hello",,,b="hello there",')
print _parse_challenges('basic a="hello",,,b="hello there" ,,c=d,,,')