[Python-checkins] CVS: python/nondist/sandbox/Lib README,NONE,1.1 davlib.py,NONE,1.1 httpx.py,NONE,1.1

Greg Stein gstein@users.sourceforge.net
Mon, 10 Sep 2001 18:27:40 -0700

Update of /cvsroot/python/python/nondist/sandbox/Lib
In directory usw-pr-cvs1:/tmp/cvs-serv17467

Added Files:
	README davlib.py httpx.py 
Log Message:
Initial checkin of some files:

* README: describe this directory and its contents

* davlib.py: current, published davlib (only tweaked the header)

* httpx.py: initial draft from some coding over the weekend (incomplete,
  untested, and it doesn't even load :-)

This directory is for modules that are intended to go into the main Lib
directory of Python. They can be developed here until they are ready for
evaluation for inclusion into Python itself.

(this prevents iteration of development within the core, yet also provides
for public development of (new) modules)

Note: a module's presence here does not mean it *will* go into Lib, but
merely that (should it be accepted) the appropriate place is Lib.

--- NEW FILE: davlib.py ---
# DAV client library
# ### docco

import httplib
import urllib
import string
import types
import mimetypes
import qp_xml

INFINITY = 'infinity'
XML_DOC_HEADER = '<?xml version="1.0" encoding="utf-8"?>'
XML_CONTENT_TYPE = 'text/xml; charset="utf-8"'

# block size for copying files up to the server

class HTTPConnectionAuth(httplib.HTTPConnection):
  def __init__(self, *args, **kw):
    apply(httplib.HTTPConnection.__init__, (self,) + args, kw)

    self.__username = None
    self.__password = None
    self.__nonce = None
    self.__opaque = None

  def setauth(self, username, password):
    self.__username = username
    self.__password = password

def _parse_status(elem):
  text = elem.textof()
  idx1 = string.find(text, ' ')
  idx2 = string.find(text, ' ', idx1+1)
  return int(text[idx1:idx2]), text[idx2+1:]

class _blank:
  def __init__(self, **kw):
class _propstat(_blank): pass
class _response(_blank): pass
class _multistatus(_blank): pass

def _extract_propstat(elem):
  ps = _propstat(prop={}, status=None, responsedescription=None)
  for child in elem.children:
    if child.ns != 'DAV:':
    if child.name == 'prop':
      for prop in child.children:
        ps.prop[(prop.ns, prop.name)] = prop
    elif child.name == 'status':
      ps.status = _parse_status(child)
    elif child.name == 'responsedescription':
      ps.responsedescription = child.textof()
    ### unknown element name

  return ps

def _extract_response(elem):
  resp = _response(href=[], status=None, responsedescription=None, propstat=[])
  for child in elem.children:
    if child.ns != 'DAV:':
    if child.name == 'href':
    elif child.name == 'status':
      resp.status = _parse_status(child)
    elif child.name == 'responsedescription':
      resp.responsedescription = child.textof()
    elif child.name == 'propstat':
    ### unknown child element

  return resp

def _extract_msr(root):
  if root.ns != 'DAV:' or root.name != 'multistatus':
    raise 'invalid response: <DAV:multistatus> expected'

  msr = _multistatus(responses=[ ], responsedescription=None)

  for child in root.children:
    if child.ns != 'DAV:':
    if child.name == 'responsedescription':
      msr.responsedescription = child.textof()
    elif child.name == 'response':
    ### unknown child element

  return msr

def _extract_locktoken(root):
  if root.ns != 'DAV:' or root.name != 'prop':
    raise 'invalid response: <DAV:prop> expected'
  elem = root.find('lockdiscovery', 'DAV:')
  if not elem:
    raise 'invalid response: <DAV:lockdiscovery> expected'
  elem = elem.find('activelock', 'DAV:')
  if not elem:
    raise 'invalid response: <DAV:activelock> expected'
  elem = elem.find('locktoken', 'DAV:')
  if not elem:
    raise 'invalid response: <DAV:locktoken> expected'
  elem = elem.find('href', 'DAV:')
  if not elem:
    raise 'invalid response: <DAV:href> expected'
  return elem.textof()

class DAVResponse(httplib.HTTPResponse):
  def parse_multistatus(self):
    self.root = qp_xml.Parser().parse(self)
    self.msr = _extract_msr(self.root)

  def parse_lock_response(self):
    self.root = qp_xml.Parser().parse(self)
    self.locktoken = _extract_locktoken(self.root)

class DAV(HTTPConnectionAuth):

  response_class = DAVResponse

  def get(self, url, extra_hdrs={ }):
    return self._request('GET', url, extra_hdrs=extra_hdrs)

  def head(self, url, extra_hdrs={ }):
    return self._request('HEAD', url, extra_hdrs=extra_hdrs)

  def post(self, url, data={ }, body=None, extra_hdrs={ }):
    headers = extra_hdrs.copy()

    assert body or data, "body or data must be supplied"
    assert not (body and data), "cannot supply both body and data"
    if data:
      body = ''
      for key, value in data.items():
        if isinstance(value, types.ListType):
          for item in value:
            body = body + '&' + key + '=' + urllib.quote(str(item))
          body = body + '&' + key + '=' + urllib.quote(str(value))
      body = body[1:]
      headers['Content-Type'] = 'application/x-www-form-urlencoded'

    return self._request('POST', url, body, headers)

  def options(self, url='*', extra_hdrs={ }):
    return self._request('OPTIONS', url, extra_hdrs=extra_hdrs)

  def trace(self, url, extra_hdrs={ }):
    return self._request('TRACE', url, extra_hdrs=extra_hdrs)

  def put(self, url, contents,
          content_type=None, content_enc=None, extra_hdrs={ }):

    if not content_type:
      content_type, content_enc = mimetypes.guess_type(url)

    headers = extra_hdrs.copy()
    if content_type:
      headers['Content-Type'] = content_type
    if content_enc:
      headers['Content-Encoding'] = content_enc
    return self._request('PUT', url, contents, headers)

  def delete(self, url, extra_hdrs={ }):
    return self._request('DELETE', url, extra_hdrs=extra_hdrs)

  def propfind(self, url, body=None, depth=None, extra_hdrs={ }):
    headers = extra_hdrs.copy()
    headers['Content-Type'] = XML_CONTENT_TYPE
    if depth is not None:
      headers['Depth'] = str(depth)
    return self._request('PROPFIND', url, body, headers)

  def proppatch(self, url, body, extra_hdrs={ }):
    headers = extra_hdrs.copy()
    headers['Content-Type'] = XML_CONTENT_TYPE
    return self._request('PROPPATCH', url, body, headers)

  def mkcol(self, url, extra_hdrs={ }):
    return self._request('MKCOL', url, extra_hdrs=extra_hdrs)

  def move(self, src, dst, extra_hdrs={ }):
    headers = extra_hdrs.copy()
    headers['Destination'] = dst
    return self._request('MOVE', src, extra_hdrs=headers)

  def copy(self, src, dst, depth=None, extra_hdrs={ }):
    headers = extra_hdrs.copy()
    headers['Destination'] = dst
    if depth is not None:
      headers['Depth'] = str(depth)
    return self._request('COPY', src, extra_hdrs=headers)

  def lock(self, url, owner='', timeout=None, depth=None,
           scope='exclusive', type='write', extra_hdrs={ }):
    headers = extra_hdrs.copy()
    headers['Content-Type'] = XML_CONTENT_TYPE
    if depth is not None:
      headers['Depth'] = str(depth)
    if timeout is not None:
      headers['Timeout'] = timeout
    body = XML_DOC_HEADER + \
           '<DAV:lockinfo xmlns:DAV="DAV:">' + \
           '<DAV:lockscope><DAV:%s/></DAV:lockscope>' % scope + \
           '<DAV:locktype><DAV:%s/></DAV:locktype>' % type + \
           owner + \
    return self._request('LOCK', url, body, extra_hdrs=headers)

  def unlock(self, url, locktoken, extra_hdrs={ }):
    headers = extra_hdrs.copy()
    if locktoken[0] != '<':
      locktoken = '<' + locktoken + '>'
    headers['Lock-Token'] = locktoken
    return self._request('UNLOCK', url, extra_hdrs=headers)

  def _request(self, method, url, body=None, extra_hdrs={}):
    "Internal method for sending a request."

    self.request(method, url, body, extra_hdrs)
    return self.getresponse()

  # Higher-level methods for typical client use

  def allprops(self, url, depth=None):
    return self.propfind(url, depth=depth)

  def propnames(self, url, depth=None):
    body = XML_DOC_HEADER + \
           '<DAV:propfind xmlns:DAV="DAV:"><DAV:propname/></DAV:propfind>'
    return self.propfind(url, body, depth)

  def getprops(self, url, *names, **kw):
    assert names, 'at least one property name must be provided'
    if kw.has_key('ns'):
      xmlns = ' xmlns:NS="' + kw['ns'] + '"'
      ns = 'NS:'
      del kw['ns']
      xmlns = ns = ''
    if kw.has_key('depth'):
      depth = kw['depth']
      del kw['depth']
      depth = 0
    assert not kw, 'unknown arguments'
    body = XML_DOC_HEADER + \
           '<DAV:propfind xmlns:DAV="DAV:"' + xmlns + '><DAV:prop><' + ns + \
           string.joinfields(names, '/><' + ns) + \
    return self.propfind(url, body, depth)

  def delprops(self, url, *names, **kw):
    assert names, 'at least one property name must be provided'
    if kw.has_key('ns'):
      xmlns = ' xmlns:NS="' + kw['ns'] + '"'
      ns = 'NS:'
      del kw['ns']
      xmlns = ns = ''
    assert not kw, 'unknown arguments'
    body = XML_DOC_HEADER + \
           '<DAV:propertyupdate xmlns:DAV="DAV:"' + xmlns + \
           '><DAV:remove><DAV:prop><' + ns + \
           string.joinfields(names, '/><' + ns) + \
    return self.proppatch(url, body)

  def setprops(self, url, *xmlprops, **props):
    assert xmlprops or props, 'at least one property must be provided'
    xmlprops = list(xmlprops)
    if props.has_key('ns'):
      xmlns = ' xmlns:NS="' + props['ns'] + '"'
      ns = 'NS:'
      del props['ns']
      xmlns = ns = ''
    for key, value in props.items():
      if value:
        xmlprops.append('<%s%s>%s</%s%s>' % (ns, key, value, ns, key))
        xmlprops.append('<%s%s/>' % (ns, key))
    elems = string.joinfields(xmlprops, '')
    body = XML_DOC_HEADER + \
           '<DAV:propertyupdate xmlns:DAV="DAV:"' + xmlns + \
           '><DAV:set><DAV:prop>' + \
           elems + \
    return self.proppatch(url, body)

  def get_lock(self, url, owner='', timeout=None, depth=None):
    response = self.lock(url, owner, timeout, depth)
    return response.locktoken

--- NEW FILE: httpx.py ---
"""HTTP Extended Functionality

### docco...

import httplib
import re
import base64


# Some various security levels for ordering authentication schemes. These
# are for reference purposes. Authentication schemes can be relative to
# these values for fine-grained control of ordering. (for example, 310 would
# mark a scheme as stronger than Digest, but not as good as public key)
SECURITY_NONE = 100	# plain text
SECURITY_LOW = 200	# obscured (Basic)
SECURITY_MEDIUM = 300	# shared secret (Digest)
SECURITY_HIGH = 400	# public key

class Credentials:
    def __init__(self):
        self.__context = { }	# (host, port, scheme, realm) -> context
        self.__paths = { }	# (host, port) -> { path -> { scheme : realm }}

    def lookup_by_path(self, host, port, path):
        "Return a map of scheme:context available at this path."
        pathmap = self.__paths.get((host, port))
        if not pathmap:
            return { }
        ### need to do common-prefix type stuff here
        p = pathmap.get(path)
        if not p:
            return { }
        ctxs = { }
        for scheme, realm in p.items():
            c = self.__context.get((host, port, scheme, realm))
            if c:
                ctxs[scheme] = c
        return ctxs

    def lookup_by_realm(self, host, port, scheme, realm):
        return self.__context.get((host, port, scheme, realm))

    def add_path(self, host, port, path, scheme, realm):
        pathmap = self.__paths.get((host, port))
        if pathmap:
            p = pathmap.get(path)
            if p:
                p[scheme] = realm
                pathmap[path] = { scheme : realm }
            self.__paths[host, port] = { path : { scheme : realm } }

    def save_context(self, host, port, scheme, realm, context):
        self.__context[host, port, scheme, realm] = context

class SimpleCredentials(Credentials):
    def __init__(self, username, password):
        self.__username = username
        self.__password = password

    def get_userpass(self):
        return self.__username, self.__password

class Authenticator:
    def get_context(self, conn, realm, cred):
        return cred.lookup_by_realm(conn.host, conn.port, self.scheme, realm)

    def save_context(self, conn, realm, cred, ctx):
        cred.save_context(conn.host, conn.port, self.scheme, realm, ctx)

class BasicAuthenticator(Authenticator):
    scheme = 'basic'
    security = SECURITY_LOW

    def authenticate(self, conn, response, auth_params, cred):
        realm = auth_params['realm']
        ctx = self.get_context(conn, realm, cred)
        if ctx:
            # If we have already authenticated for this realm (a context is
            # present), then there is no hard work. The context will be used
            # during the next request.
            ### what to return? the context for storage?
            return 'handled'

        # we expect the credentials to support the 'get_userpass' method
        ### should we check for support? return 'unhandled' maybe? can None
        ### be returned, indicating "don't bother trying"?
        user, pass = cred.get_userpass()
        ctx = 'Basic ' + base64.encodestring(user + ':' + pass).strip()
        self.save_context(conn, realm, cred, ctx)
        ### what to return? the context for storage?
        return 'handled'

    def apply_origin_auth(self, conn, ctx):
        conn.putheader('Authorization', ctx)

    def apply_proxy_auth(self, conn, ctx):
        conn.putheader('Proxy-Authorization', ctx)

class DigestAuthenticator(Authenticator):
    scheme = 'digest'
    security = SECURITY_MEDIUM

    def authenticate(self, conn, response, auth_params, cred):

    def apply_origin_auth(self, conn, ctx):
        conn.putheader('Authorization', ctx)

    def apply_proxy_auth(self, conn, ctx):
        conn.putheader('Proxy-Authorization', ctx)

authenticators = {
    'basic' : BasicAuthenticator,
    'digest' : DigestAuthenticator,

class HandleAuthentication:
    body_save_limit = _DEFAULT_BODY_SAVE_LIMIT
    origin_retry_limit = _DEFAULT_RETRY_LIMIT
    proxy_retry_limit = _DEFAULT_RETRY_LIMIT

    def __init__(self):
        self.origin_cred = self.proxy_cred = self.__req = None
        self.authenticators = { }

    def set_origin_credentials(self, cred):
        self.origin_cred = cred

    def set_proxy_credentials(self, cred):
        self.proxy_cred = cred

    def getresponse(self):
        # Record the number of times we have seen each error response. We will
        # retry only a limited number of times. For each error, record the
        # (Proxy-)Authorization headers that we sent to avoid trying the
        # same value multiple times.
        # Note: It is entirely possible that the Credentials object will
        #       return different values each time (through prompts to the
        #       user, or trying different values from a database); thus, we
        #       can automatically retry multiple times.
        seen_401 = seen_407 = 0
        used_401 = { }
        used_407 = { }

        # keep trying to get a valid response
        while 1:
            r = super().getresponse()

            ### look for Digest's Authentication-Info header and handle it

            ### what should be the generalized mechanism for this? we need to
            ### allow and Authenticator to examine the response headers (even
            ### on successful hits), and update the information in the
            ### Credentials object.

            # Remember the saved request in case we need it, and then clear
            # it from our instance data (so we don't error out the next time
            # a request is issued).
            saved_request = self.__req
            self.__req = None

            # If a origin/proxy challenge was issued, then simply exit if
            # we don't have the appropriate credentials. If we have exceeded
            # the allowable retries, then raise an exception.
            ### we could also return the error'd response, but I think the
            ### exception makes more sense (we tried and failed to handle it;
            ### we aren't just reporting a status code from the server)
            if r.status == 401:
                if not self.origin_cred:
                    return r
                seen_401 += 1
                if seen_401 > self.origin_retry_limit:
                    return RetryLimitExceeded(r, self.origin_retry_limit)
                hdr_name = 'www-authenticate'
                cred = self.origin_cred
            elif r.status == 407:
                if not self.proxy_cred:
                    return r
                seen_407 += 1
                if seen_407 > self.proxy_retry_limit:
                    return RetryLimitExceeded(r, self.proxy_retry_limit)
                hdr_name = 'proxy-authenticate'
                cred = self.proxy_cred
                # no authentication challenge was issued
                return r

            # consume the rest of the response -- we don't its body

            # We should have information on the request that generated this
            # response.
            assert saved_request

            auth_hdrs = r.msg.getallmatchingheaders(hdr_name)
            if not auth_hdrs:
                raise MissingAuthHeader(hdr_name)
            challenges = _parse_challenges(_combine_header_lines(auth_hdrs))

            for scheme, params in challenges:
                # do this check so each authenticator doesn't have to
                realm = params.get('realm')
                if not realm:
                    raise MissingAuthRealm(ProtocolViolation)

                # remember that this path is associated with this scheme/realm
                cred.add_path(self.host, self.port, saved_request.url, scheme,

                # get the authenticator and try it
                actr = self.get_authenticator(scheme)
                if actr:
                    handled = actr.authenticate(self, r, params, cred)
                    ### do something with 'handled'

            # If we didn't save the request, or the body was cleared
            # out (it fell over the limit), then we cannot resend, so
            # we should simply return to the caller. However, we have
            # computed the authentication header for insertion into
            # the request-sequence when the caller generates a new
            # request.
            if self.__req is None or self.__req.body is None:
                self.__req = None	# ensure it is no longer present
                return r

            ### resend the saved_request. more needed here?

            # loop to get the response for the resent request

        # end -- while 1:
        # NOTREACHED

    def putrequest(self, method, url):
        super().putrequest(method, url)

        ### is it a problem if one is already there? shouldn't be --
        ### we're starting a new request, so whatever was there can't
        ### be applicable but maybe raise an error for out-of-sequence?
        ### it is quite possible that the prior request had no problems
        ### and we're simply seeing a second request. hmm... but
        ### getresponse should have cleared this. for now, let's raise
        ### an error, although I'm guessing we wouldn't reach here --
        ### httplib may have already raised a sequencing error. needs
        ### more thought...
        if self.__req:
            raise httplib.CannotSendRequest()

        self.__req = _SavedRequest(method, url, self.body_save_limit)

        if self.origin_cred:
            ctxs = self.origin_cred.lookup_by_path(self.host, self.port, url)
            if ctxs:
                ### order them. select one with an available authenticator.
                actr =
                ctx =
                actr.apply_origin_auth(self, ctx)

        if self.proxy_cred:
            # Proxies do not have paths, so we just use '/'
            ctxs = self.proxy_cred.lookup_by_path(self.host, self.port, '/')
            if ctxs:
                ### order them. select one with an available authenticator.
                actr =
                ctx =
                actr.apply_proxy_auth(self, ctx)

    def putheader(self, header, value):
        super().putheader(header, value)

        ### filter out the (Proxy-)Authorization headers?
        self.__req.headers.append((header, value))

    def endheaders(self):

    def send(self, data):

    def get_authenticator(self, scheme):
            return self.authenticators[scheme]
        except KeyError:
        cls = _authenticators.get(scheme)
        if cls:
            actr = self.authenticators[scheme] = cls()
            return actr
        return None

    def set_authenticator(self, scheme, actr):
        self.authenticators[scheme] = actr

class _SavedRequest:
    def __init__(self, method, url, limit=_DEFAULT_BODY_SAVE_LIMIT):
        self.method = method
        self.url = url
        self.headers = [ ]
        self.body = None
        self.limit = limit

    def endheaders(self):
        ### use a cStringIO?
        self.body = ''

    def send(self, data):
        if self.body is not None:
            if len(self.body) + len(data) > self.limit:
                # Oops. We fell over the limit. Just shut off recording of
                # the body.
                self.body = None
                self.body += data

    def resend(self, conn):
        conn.putrequest(self.method, self.url)
        for header, value in self.headers:
            ### filter out (Proxy-)Authorization headers?
            conn.putheader(header, value)

class UseProxy:

def _combine_header_lines(lines):
    parts = [ ]
    for h in lines:
        if h[0].isspace():
            # continuation line. RFC 2616, S2.2: replace LWS with SP.
            parts[-1] += ' ' + h.strip()
            # header line. RFC 2616, S4.2: combine with "," separator.
            i = h.index(':')
    return ','.join(parts)

# consume commas and LWS (skip over null elements)
_commas = re.compile(',([ \t,]*)')

def _parse_challenges(hdr):
    # hdr = 1#challenge
    ch = [ ]

    # challenge = auth-scheme 1*SP 1#auth-param
    scheme, hdr = _get_token(hdr, 'missing auth-scheme')

    params = { }
    # auth-param = token "=" ( token | quoted-string )
    while hdr:
        name, hdr = _get_token(hdr, 'missing auth-param name')
        if not hdr:
            raise IllegalAuthFormat('unknown token -- more text required')
        if hdr[0] != '=':
            # Completed a challenge. The token we just read was the scheme
            # for the next challenge.
            ch.append((scheme, params))
            scheme = name
            params = { }
        hdr = hdr[1:].lstrip()

        if hdr[:1] == '"':
            value, hdr = _get_quoted_string(hdr)
            value, hdr = _get_token(hdr, 'missing auth-param value')
        params[name] = value

        # nothing more. stop parsing.
        if not hdr:

        # consume commas between auth-params and between challenges
        match = _commas.match(hdr)
        if not match:
            raise IllegalAuthFormat('missing comma')
        hdr = hdr[match.end():]

    # store the last challenge
    ch.append((scheme, params))
    return ch

# construct an RE for parsing a valid token
_token_chars = ''
_separators = '()<>@,;:\\"/[]?={} \t'
for i in range(32, 127):
    c = chr(i)
    if c not in _separators:
        _token_chars += c
_lws = '[ \t]*'
_token_chars = '-' + _token_chars.replace('-', '')	# tweak for regex
_token = re.compile('([' + _token_chars + ']+)' + _lws)

def _get_token(s, msg):
    "Return a lower-cased token from S, and the remainder text."
    match = _token.match(s)
    if not match:
        raise IllegalAuthFormat(msg)
    return match.group(1).lower(), s[match.end():]

# TEXT = OCTET - CTL + LWS. also remove '\' for proper quoted-pair parsing.
_chars = range(32, 256)
_qdtext = ''.join(map(chr, _chars))
_quoted_string = re.compile('"(([-\\]' + _qdtext + '\t]|\\\\.)*)"' + _lws)
_quoted_pair = re.compile(r'\\(.)')

def _get_quoted_string(s):
    "Return a quoted-string from S, and the remainder text."
    match = _quoted_string.match(s)
    if not match:
        raise IllegalAuthFormat('illegal quoted-string')
    # need to strip the '\' characters, but watch for '\\'
    qs = _quoted_pair.sub(r'\1', match.group(1))
    return qs, s[match.end():]

class RetryLimitExceeded(httplib.HTTPException):
    def __init__(self, response, limit_was=None):
        self.response = response
        self.limit_was = limit_was

### this should probably be in httplib
class ProtocolViolation(httplib.HTTPException):

class MissingAuthHeader(ProtocolViolation):
    def __init__(self, which):
        self.which = which

class IllegalAuthFormat(ProtocolViolation):
    def __init__(self, msg):
        self.msg = msg

class MissingAuthRealm(ProtocolViolation):

def test():
    h = ['header: value1',
         ' value2a, value2b',
         '     \tvalue3',
         'header: value4a, value4b',
         'header: value6',
    print _combine_header_lines(h)
    print _parse_challenges('basic a=b, c=d, e=f, digest g=h, i=j, , k=l')
    print _parse_challenges('basic a="hello",,,b="hello\\"there"')
    print _parse_challenges('basic a="hello",,,b="hello\\there"')
    print _parse_challenges('basic a="hello",,,b="hello there",')
    print _parse_challenges('basic a="hello",,,b="hello there"   ,,c=d,,,')