unicode value

Alain Spineux aspineux at gmail.com
Thu May 24 21:16:26 CEST 2007


On 5/24/07, Michael Ströder <michael at stroeder.com> wrote:
>
> Alain Spineux wrote:
> >
> > Yes but what about unknown field type ?
>
> If you really want to dive into this look in directory
> pylib/w2lapp/schema/ of web2ldap's source. It works for me but I did not
> consider this whole framework mature enough to be incorporated into
> python-ldap.


I dont want to look at the schema:

Here are the sources and the results.
I use your more appropriate name for unicode testing :-)


#!/usr/bin/env python2.4

import sys, os, time
import ldap, ldapurl, ldap.modlist
import types
import datetime

host='localhost'
port=389
base_dn='dc=asxnet,dc=loc'

if True:
    who='cn=manager,cn=internal,dc=asxnet,dc=loc'
    cred=''********'
else:
    who='cn=nobody,cn=internal,dc=asxnet,dc=loc'
    cred='iMmTWz5pJ+lwY7i6M/BU61ngo1aBLyqQhRrrKbEc'


def unicode2utf8(st):
    """Convert unicode (and only unicode) string into utf-8 raw string as
expected by ldap"""

    if isinstance(st, types.UnicodeType):
        return st.encode('utf-8')
    else:
        return st

def utf82unicode(st):
    """encode st into utf-8"""
    return st.decode('utf-8')


def encode_modlist(modlist, no_op):
    """encode ldap modlist structure
       set no_op=True for Tuple of kind (int,str,[str,...])
       and False for (str, [str,...])
    """

    for i, mod in enumerate(modlist):
        if no_op:
            attr_name, attr_values=mod
        else:
            op, attr_name, attr_values=mod

        attr_name=unicode2utf8(attr_name)
        if isinstance(attr_values, (types.ListType, types.TupleType)):
            attr_values=map(unicode2utf8, attr_values)
        else:
            attr_values=unicode2utf8(attr_values)
        if no_op:
            modlist[i]=(attr_name, attr_values)
        else:
            modlist[i]=(op, attr_name, attr_values)

    return modlist

class UnicodeLDAPObject(ldap.ldapobject.LDAPObject):

    expiration_delay=300

    def __init__(self, uri, **kwargs):
        ldap.ldapobject.LDAPObject.__init__(self, uri, **kwargs)
        self.unicode_decoder={} # (msgid, expiration, decoder_data)
        # I use an expiration time to avoid the list to become to big when
the
        # server don't answere any request

    def search_ext(self,base,scope, filterstr, attrlist, *args, **kwargs):
        # base,scope,
filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1,sizelimit=0

        # convert filter
        filterstr=unicode2utf8(filterstr)

        # convert arglist and keep a copy of original values for later
decoding

        u_attrlist=attrlist
        decoder={}
        if u_attrlist!=None:
            attrlist=[]
            for attr in u_attrlist:
                if isinstance(attr, types.UnicodeType):
                    attr=attr.encode('utf-8')
                    # print 'ATTR', attr
                    decoder[attr]=True
                attrlist.append(attr)

        msgid=ldap.ldapobject.LDAPObject.search_ext(self,base,scope,
filterstr, attrlist, *args, **kwargs)

        if decoder:
            timeout=kwargs.get('timeout', None)
            if timeout==None or timeout<=0:
                timeout=self.expiration_delay
            self.unicode_decoder[msgid]=(msgid,
datetime.datetime.now()+datetime.timedelta(seconds=timeout),
decoder)
        return msgid

    def result3(self, *args, **kwargs):
        # kwargs=(self, msgid=_ldap.RES_ANY,all=1,timeout=None):
        rtype, rdata, rmsgid, decoded_serverctrls=
ldap.ldapobject.LDAPObject.result3(self, *args, **kwargs)

        if self.unicode_decoder.has_key(rmsgid):
            msgid, expire, decoder=self.unicode_decoder[rmsgid]
            if rtype not in [ ldap.RES_SEARCH_ENTRY,
ldap.RES_SEARCH_REFERENCE ]:
                # this was the last result
                del self.unicode_decoder[rmsgid]
            else:
                # reset the timeout
                timeout=kwargs.get('timeout', None)
                if timeout==None or timeout<=0:
                    timeout=self.expiration_delay
                self.unicode_decoder[msgid]=(msgid,
datetime.datetime.now()+datetime.timedelta(seconds=timeout),
decoder)

            # now decode the result
            if rdata:
                if rtype in [ldap.RES_SEARCH_ENTRY,
ldap.RES_SEARCH_REFERENCE, ldap.RES_SEARCH_RESULT]:
                    # FIXME: I dont know what is a RES_SEARCH_REFERENCE
                    rdata_u=[]
                    for i, (dn, attrs) in enumerate(rdata):
                        # FIXME: should I handle the 'dn' the same way
                        if decoder.has_key('dn'):
                            dn=utf82unicode(dn)
                        for key in attrs.keys():
                            if decoder.has_key(key):
                                attrs[key]=map(utf82unicode, attrs[key])
                        # print '\tITEM=', dn, attrs
                        rdata[i]=(dn, attrs)

        else:
            # no decoder for this => nothing to decode
            pass

        # remove other expired decoder info
        now=datetime.datetime.now()
        for msgid in self.unicode_decoder.keys():
            if self.unicode_decoder[rmsgid][1]<now:
                del self.unicode_decoder[rmsgid]

        return rtype, rdata, rmsgid, decoded_serverctrls

    def add_ext(self, dn, modlist, *args, **kwargs):
        # args=(self,dn,modlist,serverctrls=None,clientctrls=None)
        dn=unicode2utf8(dn)
        # print 'MODLIST', modlist
        modlist=encode_modlist(modlist, True)
        # print 'MODLIST unicode', modlist
        return ldap.ldapobject.LDAPObject.add_ext(self, dn, modlist, *args,
**kwargs)

    def modify_ext(self, dn, modlist, *args, **kwargs):
        # args=(self,dn,modlist,serverctrls=None,clientctrls=None)
        dn=unicode2utf8(dn)
        # print 'MODLIST', modlist
        modlist=encode_modlist(modlist, False)
        # print 'MODLIST unicode', modlist
        return ldap.ldapobject.LDAPObject.modify_ext(self, dn, modlist,
*args, **kwargs)

    def delete_ext(self, dn, *args, **kwargs):
        # args=(self,dn,serverctrls=None,clientctrls=None)
        dn=unicode2utf8(dn)
        return ldap.ldapobject.LDAPObject.delete_ext(self, dn, *args,
**kwargs)



def print_ldap_result(ldap_result):
    for dn, item in ldap_result:
        print 'DN=', repr(dn)
        for k, v in item.iteritems():
            print '\t%s: %s' % (k, repr(v))
        print

ldap_url=ldapurl.LDAPUrl('ldap://%s:%d/%s' % (host, port, base_dn))
ldap_url.applyDefaults({
   'who': who,
   'cred' : cred, })
#l=ldap.ldapobject.LDAPObject(ldap_url.initializeUrl())
l=UnicodeLDAPObject(ldap_url.initializeUrl())
l.simple_bind_s(ldap_url.who, ldap_url.cred)
print 'Connected as', l.whoami_s()


first_name='Michael'
first_name2=u'Micha\xebl'
last_name=u'Str\xf6der'
email='michael at stroeder.com'
street=u'Hauptstra\xe1e'
country='Germany'

cn='%s %s' %(first_name, last_name)
dn='cn=%s,%s' %(cn, base_dn)
info={
    u'cn' : (cn, ),
    'mail' : (email, ),
    'objectClass' : ('top', 'inetOrgPerson', 'kolabInetOrgPerson',),
    u'sn' : (last_name, ),
    u'givenName' : (first_name, ),
    u'street': (street, ),
    'c': (country, ),
    'telephoneNumber': '+49 1111111111',
}

ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
info.keys())
if ldap_result:
    print '== Found'
    print_ldap_result(ldap_result)
    l.delete_s(dn)
    print '== Deleted'

l.add_s(dn, ldap.modlist.addModlist(info))
print '== Created'
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
info.keys())
print_ldap_result(ldap_result)

l.modify_s(dn, [(ldap.MOD_REPLACE, u'givenName', first_name2),
                (ldap.MOD_ADD, 'telephoneNumber', ( '+49 1234567890', )),
                 ])

print '==Modified'
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
info.keys())
print_ldap_result(ldap_result)

print '==Display once more'
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
['*', '+', u'dn', u'givenName', u'creatorsName'] )
print_ldap_result(ldap_result)



=============


Connected as dn:cn=manager,cn=internal,dc=asxnet,dc=loc
== Found
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
        telephoneNumber: ['+49 1111111111', '+49 1234567890']
        c: ['Germany']
        cn: [u'Michael Str\xf6der']
        objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
        street: [u'Hauptstra\xe1e']
        sn: [u'Str\xf6der']
        mail: ['michael at stroeder.com']
        givenName: [u'Micha\xebl']

== Deleted
== Created
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
        telephoneNumber: ['+49 1111111111']
        c: ['Germany']
        cn: [u'Michael Str\xf6der']
        objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
        street: [u'Hauptstra\xe1e']
        sn: [u'Str\xf6der']
        mail: ['michael at stroeder.com']
        givenName: [u'Michael']

==Modified
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
        telephoneNumber: ['+49 1111111111', '+49 1234567890']
        c: ['Germany']
        cn: [u'Michael Str\xf6der']
        objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
        street: [u'Hauptstra\xe1e']
        sn: [u'Str\xf6der']
        mail: ['michael at stroeder.com']
        givenName: [u'Micha\xebl']

==Display more
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
        telephoneNumber: ['+49 1111111111', '+49 1234567890']
        c: ['Germany']
        cn: [u'Michael Str\xf6der']
        objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
        street: [u'Hauptstra\xe1e']
        sn: [u'Str\xf6der']
        mail: ['michael at stroeder.com']
        givenName: [u'Micha\xebl']

==Display once more
DN= u'cn=Michael Str\xf6der,dc=asxnet,dc=loc'
        telephoneNumber: ['+49 1111111111', '+49 1234567890']
        c: ['Germany']
        entryCSN: ['20070524191126Z#000002#00#000000']
        cn: ['Michael Str\xc3\xb6der']
        entryDN: ['cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc']
        createTimestamp: ['20070524191126Z']
        objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
        creatorsName: [u'cn=manager,cn=internal,dc=asxnet,dc=loc']
        entryUUID: ['5099e82e-9e76-102b-830b-0da78c7bd35e']
        hasSubordinates: ['FALSE']
        modifiersName: ['cn=manager,cn=internal,dc=asxnet,dc=loc']
        street: ['Hauptstra\xc3\xa1e']
        sn: ['Str\xc3\xb6der']
        structuralObjectClass: ['inetOrgPerson']
        subschemaSubentry: ['cn=Subschema']
        mail: ['michael at stroeder.com']
        givenName: [u'Micha\xebl']
        modifyTimestamp: ['20070524191126Z']





-- 
--
Alain Spineux
aspineux gmail com
May the sources be with you
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ldap/attachments/20070524/f437afc6/attachment.html>


More information about the python-ldap mailing list