parallel (concurrent) eventlet
David Gabriel
davidgab283 at gmail.com
Mon Jan 18 06:03:13 EST 2016
Dears,
I have an issue when I use eventlet Api to create parallel threads.
In fact, when I run the below code, only the program dealing with the
synchronozation with ldap data base is working and is continuously blocking
the others to run.
But, when I use the 'thread' Api the program is working fine without any
blocking issue. However, I can not use thread Api and I must to use
eventlet.
So I am wondering how to get the thread Api behavior using the eventlet Api
?
Could you please inform me how to fix this issue ?
Kindly find below my code.
But you need some configurations regarding ldap server/client.
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This script implements a syncrepl consumer which syncs data from an OpenLDAP
server to a local (shelve) database.
Notes:
The bound user needs read access to the attributes entryDN and entryCSN.
This needs the following software:
Python
pyasn1 0.1.4+
pyasn1-modules
python-ldap 2.4.10+
"""
# Import the python-ldap modules
import ldap,ldapurl
# Import specific classes from python-ldap
from ldap.ldapobject import ReconnectLDAPObject
from ldap.syncrepl import SyncreplConsumer
# Import modules from Python standard lib
import shelve,signal,time,sys,logging
import eventlet
#import thread
eventlet.monkey_patch()
# Global state
watcher_running = True
ldap_connection = False
class SyncReplConsumer(ReconnectLDAPObject,SyncreplConsumer):
"""
Syncrepl Consumer interface
"""
def __init__(self,db_path,*args,**kwargs):
# Initialise the LDAP Connection first
ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)
# Now prepare the data store
self.__data = shelve.open(db_path, 'c')
# We need this for later internal use
self.__presentUUIDs = dict()
def __del__(self):
# Close the data store properly to avoid corruption
self.__data.close()
def syncrepl_get_cookie(self):
if 'cookie' in self.__data:
return self.__data['cookie']
def syncrepl_set_cookie(self,cookie):
self.__data['cookie'] = cookie
def syncrepl_entry(self,dn,attributes,uuid):
# First we determine the type of change we have here (and store
away the previous data for later if needed)
previous_attributes = dict()
if uuid in self.__data:
change_type = 'modify'
previous_attributes = self.__data[uuid]
else:
change_type = 'add'
# Now we store our knowledge of the existence of this entry
(including the DN as an attribute for convenience)
attributes['dn'] = dn
self.__data[uuid] = attributes
# Debugging
print 'Detected', change_type, 'of entry:', dn
# If we have a cookie then this is not our first time being run, so
it must be a change
if 'ldap_cookie' in self.__data:
self.perform_application_sync(dn, attributes,
previous_attributes)
def syncrepl_delete(self,uuids):
# Make sure we know about the UUID being deleted, just in case...
uuids = [uuid for uuid in uuids if uuid in self.__data]
# Delete all the UUID values we know of
for uuid in uuids:
print 'Detected deletion of entry:', self.__data[uuid]['dn']
del self.__data[uuid]
def syncrepl_present(self,uuids,refreshDeletes=False):
# If we have not been given any UUID values, then we have recieved
all the present controls...
if uuids is None:
# We only do things if refreshDeletes is false as the syncrepl
extension will call syncrepl_delete instead when it detects a delete notice
if refreshDeletes is False:
deletedEntries = [uuid for uuid in self.__data.keys() if
uuid not in self.__presentUUIDs and uuid != 'ldap_cookie']
self.syncrepl_delete( deletedEntries )
# Phase is now completed, reset the list
self.__presentUUIDs = {}
else:
# Note down all the UUIDs we have been sent
for uuid in uuids:
self.__presentUUIDs[uuid] = True
def perform_application_sync(self,dn,attributes,previous_attributes):
print 'Performing application sync for:', dn
return True
# Shutdown handler
#def commenceShutdown(signum, stack):
def commenceShutdown():
# Declare the needed global variables
global watcher_running, ldap_connection
print 'Shutting down!'
# We are no longer running
watcher_running = False
# Tear down the server connection
if( ldap_connection ):
del ldap_connection
# Shutdown
sys.exit(0)
def mainOfSyncrepl(threadName):
# Time to actually begin execution
# Install our signal handlers
# signal.signal(signal.SIGTERM,commenceShutdown)
# signal.signal(signal.SIGINT,commenceShutdown)
try:
ldap_url =
ldapurl.LDAPUrl('ldap://localhost/dc=example,dc=org?*?sub?(objectClass=*)?bindname=cn=admin%2cdc=test%2cdc=com,X-BINDPW=myPassword')#ldapurl.LDAPUrl(sys.argv[1])
# ldap_url = ldapurl.LDAPUrl(link)
database_path = 'test.com'#sys.argv[2]
# database_path = pathName
except IndexError,e:
print 'Usage: syncrepl-client.py <LDAP URL> <pathname of database>'
sys.exit(1)
except ValueError,e:
print 'Error parsing command-line arguments:',str(e)
sys.exit(1)
while watcher_running:
print 'Connecting to LDAP server now...'
# Prepare the LDAP server connection (triggers the connection as
well)
ldap_connection =
SyncReplConsumer(database_path,ldap_url.initializeUrl())
# Now we login to the LDAP server
try:
ldap_connection.simple_bind_s(ldap_url.who,ldap_url.cred)
except ldap.INVALID_CREDENTIALS, e:
print 'Login to LDAP server failed: ', str(e)
sys.exit(1)
except ldap.SERVER_DOWN:
continue
# Commence the syncing
print 'Commencing sync process'
ldap_search = ldap_connection.syncrepl_search(
ldap_url.dn or '',
ldap_url.scope or ldap.SCOPE_SUBTREE,
mode = 'refreshAndPersist',
filterstr = ldap_url.filterstr or '(objectClass=*)')
print 'After syncrepl_search.'
try:
while ldap_connection.syncrepl_poll( all = 1, msgid =
ldap_search):
pass
except KeyboardInterrupt:
# User asked to exit
commenceShutdown()
pass
except Exception, e:
# Handle any exception
if watcher_running:
print 'Encountered a problem, going to retry. Error:',
str(e)
eventlet.sleep(5)
pass
# Define a function for the 2nd thread
def print_time(ThreadName):
count = 0
delay = 3
while 1:#count < 5:
count += 1
print "%s: %s" % (ThreadName, time.ctime(time.time()) )
eventlet.sleep(delay)
print 'Before call threads'
evt1 = eventlet.spawn(mainOfSyncrepl, "Thread-1",)
evt2 = eventlet.spawn(print_time, "Thread-2",)
evt3 = eventlet.spawn(print_time, "Thread-3",)
print 'After call threads'
evt1.wait()
evt2.wait()
evt3.wait()
print 'After wait'
2016-01-12 7:20 GMT-08:00 Ned Batchelder <ned at nedbatchelder.com>:
> David,
>
> We aren't going to be able to debug code that we can't see. Please post a
> link to the *actual* code that you are running.
>
> --Ned.
>
>
> On 1/12/16 7:00 AM, David Gabriel wrote:
>
> Dears
>
> For more details, I am using this code
> <https://github.com/rbarrois/python-ldap/blob/master/Demo/pyasn1/syncrepl.py>in
> order to ensure the updates from my data base.
> However, when I create an eventlet basing on this code, my program is
> blocked there and is not running other eventlets !!!
> Please advise me how to fix this issue ?
>
> Thanks in advance.
> Regards
>
> 2016-01-11 7:29 GMT-08:00 David Gabriel <davidgab283 at gmail.com>:
>
>> Thanks so much John
>> In fact your proposal works fine for this simple example but when I use
>> it for a complex code (a data base client that receives all updates from
>> the db), my program is continously running this db client and not other
>> programs.
>>
>> Any suggestions
>> Thanks in advance
>> regards
>>
>> 2016-01-11 5:50 GMT-08:00 John Eskew < <john.eskew at gmail.com>
>> john.eskew at gmail.com>:
>>
>>> Add this line below your imports:
>>>
>>> eventlet.monkey_patch()
>>>
>>> Here's why that line should fix things:
>>>
>>> http://eventlet.net/doc/patching.html#greening-the-world
>>>
>>> On Mon, Jan 11, 2016 at 6:27 AM, David Gabriel < <davidgab283 at gmail.com>
>>> davidgab283 at gmail.com> wrote:
>>>
>>>> Dears,
>>>> It is the first time I am developping with python and I want to execute
>>>> parallel threads using eventlet.When I run the below code, only one thread
>>>> is executed and not both.Please could you tell me how to fix this issue ?
>>>> Please advise me how to ensure a concurrent behavior between evt1 and
>>>> evt2.
>>>>
>>>> import eventlet
>>>> import time
>>>>
>>>> def print_time(ThreadName):
>>>> count = 0
>>>> delay = 3
>>>> while 1:#count < 5:
>>>> count += 1
>>>> print "%s: %s" % (ThreadName, time.ctime(time.time()) )
>>>> time.sleep(delay)
>>>>
>>>> print 'Before call threads'
>>>>
>>>> evt1 = eventlet.spawn(print_time, "Thread-1",)
>>>> evt2 = eventlet.spawn(print_time, "Thread-2",)
>>>>
>>>> print 'After call threads'
>>>>
>>>> evt1.wait()
>>>> evt2.wait()
>>>>
>>>>
>>>>
>>>> Any answer is welcome.
>>>> Thanks in advance.
>>>> Regards.
>>>>
>>>> _______________________________________________
>>>> Boston mailing list
>>>> Boston at python.org
>>>> https://mail.python.org/mailman/listinfo/boston
>>>>
>>>>
>>>
>>
>
>
> _______________________________________________
> Boston mailing listBoston at python.orghttps://mail.python.org/mailman/listinfo/boston
>
>
>
> _______________________________________________
> Boston mailing list
> Boston at python.org
> https://mail.python.org/mailman/listinfo/boston
>
>
More information about the Python-list
mailing list