unicode value
Alain Spineux
aspineux at gmail.com
Thu May 24 21:16:26 CEST 2007
On 5/24/07, Michael Ströder <michael at stroeder.com> wrote:
>
> Alain Spineux wrote:
> >
> > Yes but what about unknown field type ?
>
> If you really want to dive into this look in directory
> pylib/w2lapp/schema/ of web2ldap's source. It works for me but I did not
> consider this whole framework mature enough to be incorporated into
> python-ldap.
I dont want to look at the schema:
Here are the sources and the results.
I use your more appropriate name for unicode testing :-)
#!/usr/bin/env python2.4
import sys, os, time
import ldap, ldapurl, ldap.modlist
import types
import datetime
host='localhost'
port=389
base_dn='dc=asxnet,dc=loc'
if True:
who='cn=manager,cn=internal,dc=asxnet,dc=loc'
cred=''********'
else:
who='cn=nobody,cn=internal,dc=asxnet,dc=loc'
cred='iMmTWz5pJ+lwY7i6M/BU61ngo1aBLyqQhRrrKbEc'
def unicode2utf8(st):
"""Convert unicode (and only unicode) string into utf-8 raw string as
expected by ldap"""
if isinstance(st, types.UnicodeType):
return st.encode('utf-8')
else:
return st
def utf82unicode(st):
"""encode st into utf-8"""
return st.decode('utf-8')
def encode_modlist(modlist, no_op):
"""encode ldap modlist structure
set no_op=True for Tuple of kind (int,str,[str,...])
and False for (str, [str,...])
"""
for i, mod in enumerate(modlist):
if no_op:
attr_name, attr_values=mod
else:
op, attr_name, attr_values=mod
attr_name=unicode2utf8(attr_name)
if isinstance(attr_values, (types.ListType, types.TupleType)):
attr_values=map(unicode2utf8, attr_values)
else:
attr_values=unicode2utf8(attr_values)
if no_op:
modlist[i]=(attr_name, attr_values)
else:
modlist[i]=(op, attr_name, attr_values)
return modlist
class UnicodeLDAPObject(ldap.ldapobject.LDAPObject):
expiration_delay=300
def __init__(self, uri, **kwargs):
ldap.ldapobject.LDAPObject.__init__(self, uri, **kwargs)
self.unicode_decoder={} # (msgid, expiration, decoder_data)
# I use an expiration time to avoid the list to become to big when
the
# server don't answere any request
def search_ext(self,base,scope, filterstr, attrlist, *args, **kwargs):
# base,scope,
filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1,sizelimit=0
# convert filter
filterstr=unicode2utf8(filterstr)
# convert arglist and keep a copy of original values for later
decoding
u_attrlist=attrlist
decoder={}
if u_attrlist!=None:
attrlist=[]
for attr in u_attrlist:
if isinstance(attr, types.UnicodeType):
attr=attr.encode('utf-8')
# print 'ATTR', attr
decoder[attr]=True
attrlist.append(attr)
msgid=ldap.ldapobject.LDAPObject.search_ext(self,base,scope,
filterstr, attrlist, *args, **kwargs)
if decoder:
timeout=kwargs.get('timeout', None)
if timeout==None or timeout<=0:
timeout=self.expiration_delay
self.unicode_decoder[msgid]=(msgid,
datetime.datetime.now()+datetime.timedelta(seconds=timeout),
decoder)
return msgid
def result3(self, *args, **kwargs):
# kwargs=(self, msgid=_ldap.RES_ANY,all=1,timeout=None):
rtype, rdata, rmsgid, decoded_serverctrls=
ldap.ldapobject.LDAPObject.result3(self, *args, **kwargs)
if self.unicode_decoder.has_key(rmsgid):
msgid, expire, decoder=self.unicode_decoder[rmsgid]
if rtype not in [ ldap.RES_SEARCH_ENTRY,
ldap.RES_SEARCH_REFERENCE ]:
# this was the last result
del self.unicode_decoder[rmsgid]
else:
# reset the timeout
timeout=kwargs.get('timeout', None)
if timeout==None or timeout<=0:
timeout=self.expiration_delay
self.unicode_decoder[msgid]=(msgid,
datetime.datetime.now()+datetime.timedelta(seconds=timeout),
decoder)
# now decode the result
if rdata:
if rtype in [ldap.RES_SEARCH_ENTRY,
ldap.RES_SEARCH_REFERENCE, ldap.RES_SEARCH_RESULT]:
# FIXME: I dont know what is a RES_SEARCH_REFERENCE
rdata_u=[]
for i, (dn, attrs) in enumerate(rdata):
# FIXME: should I handle the 'dn' the same way
if decoder.has_key('dn'):
dn=utf82unicode(dn)
for key in attrs.keys():
if decoder.has_key(key):
attrs[key]=map(utf82unicode, attrs[key])
# print '\tITEM=', dn, attrs
rdata[i]=(dn, attrs)
else:
# no decoder for this => nothing to decode
pass
# remove other expired decoder info
now=datetime.datetime.now()
for msgid in self.unicode_decoder.keys():
if self.unicode_decoder[rmsgid][1]<now:
del self.unicode_decoder[rmsgid]
return rtype, rdata, rmsgid, decoded_serverctrls
def add_ext(self, dn, modlist, *args, **kwargs):
# args=(self,dn,modlist,serverctrls=None,clientctrls=None)
dn=unicode2utf8(dn)
# print 'MODLIST', modlist
modlist=encode_modlist(modlist, True)
# print 'MODLIST unicode', modlist
return ldap.ldapobject.LDAPObject.add_ext(self, dn, modlist, *args,
**kwargs)
def modify_ext(self, dn, modlist, *args, **kwargs):
# args=(self,dn,modlist,serverctrls=None,clientctrls=None)
dn=unicode2utf8(dn)
# print 'MODLIST', modlist
modlist=encode_modlist(modlist, False)
# print 'MODLIST unicode', modlist
return ldap.ldapobject.LDAPObject.modify_ext(self, dn, modlist,
*args, **kwargs)
def delete_ext(self, dn, *args, **kwargs):
# args=(self,dn,serverctrls=None,clientctrls=None)
dn=unicode2utf8(dn)
return ldap.ldapobject.LDAPObject.delete_ext(self, dn, *args,
**kwargs)
def print_ldap_result(ldap_result):
for dn, item in ldap_result:
print 'DN=', repr(dn)
for k, v in item.iteritems():
print '\t%s: %s' % (k, repr(v))
print
ldap_url=ldapurl.LDAPUrl('ldap://%s:%d/%s' % (host, port, base_dn))
ldap_url.applyDefaults({
'who': who,
'cred' : cred, })
#l=ldap.ldapobject.LDAPObject(ldap_url.initializeUrl())
l=UnicodeLDAPObject(ldap_url.initializeUrl())
l.simple_bind_s(ldap_url.who, ldap_url.cred)
print 'Connected as', l.whoami_s()
first_name='Michael'
first_name2=u'Micha\xebl'
last_name=u'Str\xf6der'
email='michael at stroeder.com'
street=u'Hauptstra\xe1e'
country='Germany'
cn='%s %s' %(first_name, last_name)
dn='cn=%s,%s' %(cn, base_dn)
info={
u'cn' : (cn, ),
'mail' : (email, ),
'objectClass' : ('top', 'inetOrgPerson', 'kolabInetOrgPerson',),
u'sn' : (last_name, ),
u'givenName' : (first_name, ),
u'street': (street, ),
'c': (country, ),
'telephoneNumber': '+49 1111111111',
}
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
info.keys())
if ldap_result:
print '== Found'
print_ldap_result(ldap_result)
l.delete_s(dn)
print '== Deleted'
l.add_s(dn, ldap.modlist.addModlist(info))
print '== Created'
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
info.keys())
print_ldap_result(ldap_result)
l.modify_s(dn, [(ldap.MOD_REPLACE, u'givenName', first_name2),
(ldap.MOD_ADD, 'telephoneNumber', ( '+49 1234567890', )),
])
print '==Modified'
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
info.keys())
print_ldap_result(ldap_result)
print '==Display once more'
ldap_result=l.search_s(base_dn, ldap.SCOPE_ONELEVEL, '(cn=%s)' % (cn,) ,
['*', '+', u'dn', u'givenName', u'creatorsName'] )
print_ldap_result(ldap_result)
=============
Connected as dn:cn=manager,cn=internal,dc=asxnet,dc=loc
== Found
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
telephoneNumber: ['+49 1111111111', '+49 1234567890']
c: ['Germany']
cn: [u'Michael Str\xf6der']
objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
street: [u'Hauptstra\xe1e']
sn: [u'Str\xf6der']
mail: ['michael at stroeder.com']
givenName: [u'Micha\xebl']
== Deleted
== Created
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
telephoneNumber: ['+49 1111111111']
c: ['Germany']
cn: [u'Michael Str\xf6der']
objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
street: [u'Hauptstra\xe1e']
sn: [u'Str\xf6der']
mail: ['michael at stroeder.com']
givenName: [u'Michael']
==Modified
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
telephoneNumber: ['+49 1111111111', '+49 1234567890']
c: ['Germany']
cn: [u'Michael Str\xf6der']
objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
street: [u'Hauptstra\xe1e']
sn: [u'Str\xf6der']
mail: ['michael at stroeder.com']
givenName: [u'Micha\xebl']
==Display more
DN= 'cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc'
telephoneNumber: ['+49 1111111111', '+49 1234567890']
c: ['Germany']
cn: [u'Michael Str\xf6der']
objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
street: [u'Hauptstra\xe1e']
sn: [u'Str\xf6der']
mail: ['michael at stroeder.com']
givenName: [u'Micha\xebl']
==Display once more
DN= u'cn=Michael Str\xf6der,dc=asxnet,dc=loc'
telephoneNumber: ['+49 1111111111', '+49 1234567890']
c: ['Germany']
entryCSN: ['20070524191126Z#000002#00#000000']
cn: ['Michael Str\xc3\xb6der']
entryDN: ['cn=Michael Str\xc3\xb6der,dc=asxnet,dc=loc']
createTimestamp: ['20070524191126Z']
objectClass: ['top', 'inetOrgPerson', 'kolabInetOrgPerson']
creatorsName: [u'cn=manager,cn=internal,dc=asxnet,dc=loc']
entryUUID: ['5099e82e-9e76-102b-830b-0da78c7bd35e']
hasSubordinates: ['FALSE']
modifiersName: ['cn=manager,cn=internal,dc=asxnet,dc=loc']
street: ['Hauptstra\xc3\xa1e']
sn: ['Str\xc3\xb6der']
structuralObjectClass: ['inetOrgPerson']
subschemaSubentry: ['cn=Subschema']
mail: ['michael at stroeder.com']
givenName: [u'Micha\xebl']
modifyTimestamp: ['20070524191126Z']
--
--
Alain Spineux
aspineux gmail com
May the sources be with you
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ldap/attachments/20070524/f437afc6/attachment.html>
More information about the python-ldap
mailing list