[py-svn] r58385 - in py/trunk/py: . execnet/testing misc misc/testing path/svn/testing process process/testing

hpk at codespeak.net hpk at codespeak.net
Tue Sep 23 16:28:13 CEST 2008


Author: hpk
Date: Tue Sep 23 16:28:13 2008
New Revision: 58385

Added:
   py/trunk/py/process/killproc.py
      - copied, changed from r58363, py/trunk/py/misc/killproc.py
   py/trunk/py/process/testing/test_killproc.py
      - copied, changed from r58363, py/trunk/py/misc/testing/test_oskill.py
Removed:
   py/trunk/py/misc/killproc.py
   py/trunk/py/misc/testing/test_oskill.py
Modified:
   py/trunk/py/__init__.py
   py/trunk/py/execnet/testing/test_gateway.py
   py/trunk/py/path/svn/testing/test_auth.py
Log:
* move cross-platform process kill functionality and move it to 
  py.process.kill(pid)
* simplify test_auth.py
* use new functionality from some tests


Modified: py/trunk/py/__init__.py
==============================================================================
--- py/trunk/py/__init__.py	(original)
+++ py/trunk/py/__init__.py	Tue Sep 23 16:28:13 2008
@@ -99,6 +99,7 @@
 
     'process.__doc__'        : ('./process/__init__.py', '__doc__'),
     'process.cmdexec'        : ('./process/cmdexec.py', 'cmdexec'),
+    'process.kill'           : ('./process/killproc.py', 'kill'),
     'process.ForkedFunc'     : ('./process/forkedfunc.py', 'ForkedFunc'), 
 
     # path implementation

Modified: py/trunk/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py	(original)
+++ py/trunk/py/execnet/testing/test_gateway.py	Tue Sep 23 16:28:13 2008
@@ -516,8 +516,6 @@
             
     def test_waitclose_on_remote_killed(self):
         py.test.skip("fix needed: dying remote process does not cause waitclose() to fail")
-        if not hasattr(py.std.os, 'kill'):
-            py.test.skip("no os.kill")
         gw = py.execnet.PopenGateway()
         channel = gw.remote_exec("""
             import os
@@ -527,7 +525,7 @@
                 channel.send("#" * 100)
         """)
         remotepid = channel.receive()
-        os.kill(remotepid, 9)
+        py.process.kill(remotepid)
         py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
         py.test.raises(EOFError, channel.send, None)
         py.test.raises(EOFError, channel.receive)

Deleted: /py/trunk/py/misc/killproc.py
==============================================================================
--- /py/trunk/py/misc/killproc.py	Tue Sep 23 16:28:13 2008
+++ (empty file)
@@ -1,10 +0,0 @@
-
-import py
-import os, sys
-
-def killproc(pid):
-    if sys.platform == "win32":
-        py.process.cmdexec("taskkill /F /PID %d" %(pid,))
-    else:
-        os.kill(pid, 15)
-        

Deleted: /py/trunk/py/misc/testing/test_oskill.py
==============================================================================
--- /py/trunk/py/misc/testing/test_oskill.py	Tue Sep 23 16:28:13 2008
+++ (empty file)
@@ -1,23 +0,0 @@
-
-import py, sys
-
-from py.__.misc.killproc import killproc
-
-def test_win_killsubprocess():
-    if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
-        py.test.skip("you\'re using an older version of windows, which "
-                     "doesn\'t support 'taskkill' - py.misc.killproc is not "
-                     "available")
-    try:
-        import subprocess
-    except ImportError:
-        py.test.skip("no subprocess module")
-    tmp = py.test.ensuretemp("test_win_killsubprocess")
-    t = tmp.join("t.py")
-    t.write("import time ; time.sleep(100)")
-    proc = py.std.subprocess.Popen([sys.executable, str(t)])
-    assert proc.poll() is None # no return value yet
-    killproc(proc.pid)
-    ret = proc.wait()
-    assert ret != 0
-        

Modified: py/trunk/py/path/svn/testing/test_auth.py
==============================================================================
--- py/trunk/py/path/svn/testing/test_auth.py	(original)
+++ py/trunk/py/path/svn/testing/test_auth.py	Tue Sep 23 16:28:13 2008
@@ -3,7 +3,6 @@
 import svntestbase
 from threading import Thread
 import time
-from py.__.misc.killproc import killproc
 from py.__.conftest import option
 
 def make_repo_auth(repo, userdata):
@@ -269,6 +268,10 @@
                                            func_name))
         self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False,
                                     interactive=False)
+        self.port, self.pid = self._start_svnserve()
+
+    def teardown_method(self, method):
+        py.process.kill(self.pid)
 
     def _start_svnserve(self):
         make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
@@ -279,203 +282,168 @@
 
 class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
     def test_checkout_constructor_arg(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc = py.path.svnwc(self.temppath, auth=self.auth)
-            wc.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename))
-            assert wc.join('.svn').check()
-        finally:
-            # XXX can we do this in a teardown_method too? not sure if that's
-            # guaranteed to get called...
-            killproc(pid)
+        port = self.port
+        wc = py.path.svnwc(self.temppath, auth=self.auth)
+        wc.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename))
+        assert wc.join('.svn').check()
 
     def test_checkout_function_arg(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc = py.path.svnwc(self.temppath, auth=self.auth)
-            wc.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename))
-            assert wc.join('.svn').check()
-        finally:
-            killproc(pid)
+        port = self.port
+        wc = py.path.svnwc(self.temppath, auth=self.auth)
+        wc.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename))
+        assert wc.join('.svn').check()
 
     def test_checkout_failing_non_interactive(self):
-        port, pid = self._start_svnserve()
-        try:
-            auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
-                                   interactive=False)
-            wc = py.path.svnwc(self.temppath, auth)
-            py.test.raises(Exception,
-                ("wc.checkout('svn://localhost:%s/%s' % "
-                 "(port, self.repopath.basename))"))
-        finally:
-            killproc(pid)
+        port = self.port
+        auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
+                               interactive=False)
+        wc = py.path.svnwc(self.temppath, auth)
+        py.test.raises(Exception,
+            ("wc.checkout('svn://localhost:%s/%s' % "
+             "(port, self.repopath.basename))"))
 
     def test_log(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc = py.path.svnwc(self.temppath, self.auth)
-            wc.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename))
-            foo = wc.ensure('foo.txt')
-            wc.commit('added foo.txt')
-            log = foo.log()
-            assert len(log) == 1
-            assert log[0].msg == 'added foo.txt'
-        finally:
-            killproc(pid)
+        port = self.port
+        wc = py.path.svnwc(self.temppath, self.auth)
+        wc.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename))
+        foo = wc.ensure('foo.txt')
+        wc.commit('added foo.txt')
+        log = foo.log()
+        assert len(log) == 1
+        assert log[0].msg == 'added foo.txt'
 
     def test_switch(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc = py.path.svnwc(self.temppath, auth=self.auth)
-            svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
-            wc.checkout(svnurl)
-            wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
-            wc.commit('added foo dir with foo.txt file')
-            wc.ensure('bar', dir=True)
-            wc.commit('added bar dir')
-            bar = wc.join('bar')
-            bar.switch(svnurl + '/foo')
-            assert bar.join('foo.txt')
-        finally:
-            killproc(pid)
+        port = self.port
+        wc = py.path.svnwc(self.temppath, auth=self.auth)
+        svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
+        wc.checkout(svnurl)
+        wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
+        wc.commit('added foo dir with foo.txt file')
+        wc.ensure('bar', dir=True)
+        wc.commit('added bar dir')
+        bar = wc.join('bar')
+        bar.switch(svnurl + '/foo')
+        assert bar.join('foo.txt')
 
     def test_update(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
-                                auth=self.auth)
-            wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
-                                auth=self.auth)
-            wc1.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename))
-            wc2.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename))
-            wc1.ensure('foo', dir=True)
-            wc1.commit('added foo dir')
-            wc2.update()
-            assert wc2.join('foo').check()
-
-            auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
-            wc2.auth = auth
-            py.test.raises(Exception, 'wc2.update()')
-        finally:
-            killproc(pid)
+        port = self.port
+        wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
+                            auth=self.auth)
+        wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
+                            auth=self.auth)
+        wc1.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename))
+        wc2.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename))
+        wc1.ensure('foo', dir=True)
+        wc1.commit('added foo dir')
+        wc2.update()
+        assert wc2.join('foo').check()
+
+        auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
+        wc2.auth = auth
+        py.test.raises(Exception, 'wc2.update()')
 
     def test_lock_unlock_status(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc = py.path.svnwc(self.temppath, auth=self.auth)
-            wc.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename,))
-            wc.ensure('foo', file=True)
-            wc.commit('added foo file')
-            foo = wc.join('foo')
-            foo.lock()
-            status = foo.status()
-            assert status.locked
-            foo.unlock()
-            status = foo.status()
-            assert not status.locked
-
-            auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
-            foo.auth = auth
-            py.test.raises(Exception, 'foo.lock()')
-            py.test.raises(Exception, 'foo.unlock()')
-        finally:
-            killproc(pid)
+        port = self.port
+        wc = py.path.svnwc(self.temppath, auth=self.auth)
+        wc.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename,))
+        wc.ensure('foo', file=True)
+        wc.commit('added foo file')
+        foo = wc.join('foo')
+        foo.lock()
+        status = foo.status()
+        assert status.locked
+        foo.unlock()
+        status = foo.status()
+        assert not status.locked
+
+        auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
+        foo.auth = auth
+        py.test.raises(Exception, 'foo.lock()')
+        py.test.raises(Exception, 'foo.unlock()')
 
     def test_diff(self):
-        port, pid = self._start_svnserve()
-        try:
-            wc = py.path.svnwc(self.temppath, auth=self.auth)
-            wc.checkout(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename,))
-            wc.ensure('foo', file=True)
-            wc.commit('added foo file')
-            wc.update()
-            rev = int(wc.status().rev)
-            foo = wc.join('foo')
-            foo.write('bar')
-            diff = foo.diff()
-            assert '\n+bar\n' in diff
-            foo.commit('added some content')
-            diff = foo.diff()
-            assert not diff
-            diff = foo.diff(rev=rev)
-            assert '\n+bar\n' in diff
-
-            auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
-            foo.auth = auth
-            py.test.raises(Exception, 'foo.diff(rev=rev)')
-        finally:
-            killproc(pid)
+        port = self.port
+        wc = py.path.svnwc(self.temppath, auth=self.auth)
+        wc.checkout(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename,))
+        wc.ensure('foo', file=True)
+        wc.commit('added foo file')
+        wc.update()
+        rev = int(wc.status().rev)
+        foo = wc.join('foo')
+        foo.write('bar')
+        diff = foo.diff()
+        assert '\n+bar\n' in diff
+        foo.commit('added some content')
+        diff = foo.diff()
+        assert not diff
+        diff = foo.diff(rev=rev)
+        assert '\n+bar\n' in diff
+
+        auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
+        foo.auth = auth
+        py.test.raises(Exception, 'foo.diff(rev=rev)')
 
 class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
     def test_listdir(self):
-        port, pid = self._start_svnserve()
-        try:
-            u = py.path.svnurl(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename),
-                auth=self.auth)
-            u.ensure('foo')
-            paths = u.listdir()
-            assert len(paths) == 1
-            assert paths[0].auth is self.auth
-
-            auth = SvnAuth('foo', 'bar', interactive=False)
-            u = py.path.svnurl(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename),
-                auth=auth)
-            py.test.raises(Exception, 'u.listdir()')
-        finally:
-            killproc(pid)
+        port = self.port
+        u = py.path.svnurl(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename),
+            auth=self.auth)
+        u.ensure('foo')
+        paths = u.listdir()
+        assert len(paths) == 1
+        assert paths[0].auth is self.auth
+
+        auth = SvnAuth('foo', 'bar', interactive=False)
+        u = py.path.svnurl(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename),
+            auth=auth)
+        py.test.raises(Exception, 'u.listdir()')
 
     def test_copy(self):
-        port, pid = self._start_svnserve()
-        try:
-            u = py.path.svnurl(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename),
-                auth=self.auth)
-            foo = u.ensure('foo')
-            bar = u.join('bar')
-            foo.copy(bar)
-            assert bar.check()
-            assert bar.auth is self.auth
-
-            auth = SvnAuth('foo', 'bar', interactive=False)
-            u = py.path.svnurl(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename),
-                auth=auth)
-            foo = u.join('foo')
-            bar = u.join('bar')
-            py.test.raises(Exception, 'foo.copy(bar)')
-        finally:
-            killproc(pid)
+        port = self.port
+        u = py.path.svnurl(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename),
+            auth=self.auth)
+        foo = u.ensure('foo')
+        bar = u.join('bar')
+        foo.copy(bar)
+        assert bar.check()
+        assert bar.auth is self.auth
+
+        auth = SvnAuth('foo', 'bar', interactive=False)
+        u = py.path.svnurl(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename),
+            auth=auth)
+        foo = u.join('foo')
+        bar = u.join('bar')
+        py.test.raises(Exception, 'foo.copy(bar)')
 
     def test_write_read(self):
-        port, pid = self._start_svnserve()
-        try:
-            u = py.path.svnurl(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename),
-                auth=self.auth)
-            foo = u.ensure('foo')
-            fp = foo.open()
-            try:
-                data = fp.read()
-            finally:
-                fp.close()
-            assert data == ''
-
-            auth = SvnAuth('foo', 'bar', interactive=False)
-            u = py.path.svnurl(
-                'svn://localhost:%s/%s' % (port, self.repopath.basename),
-                auth=auth)
-            foo = u.join('foo')
-            py.test.raises(Exception, 'foo.open()')
-        finally:
-            killproc(pid)
+        port = self.port
+        u = py.path.svnurl(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename),
+            auth=self.auth)
+        foo = u.ensure('foo')
+        fp = foo.open()
+        try:
+            data = fp.read()
+        finally:
+            fp.close()
+        assert data == ''
+
+        auth = SvnAuth('foo', 'bar', interactive=False)
+        u = py.path.svnurl(
+            'svn://localhost:%s/%s' % (port, self.repopath.basename),
+            auth=auth)
+        foo = u.join('foo')
+        py.test.raises(Exception, 'foo.open()')
 
     # XXX rinse, repeat... :|

Copied: py/trunk/py/process/killproc.py (from r58363, py/trunk/py/misc/killproc.py)
==============================================================================
--- py/trunk/py/misc/killproc.py	(original)
+++ py/trunk/py/process/killproc.py	Tue Sep 23 16:28:13 2008
@@ -1,10 +1,23 @@
-
 import py
 import os, sys
 
-def killproc(pid):
-    if sys.platform == "win32":
-        py.process.cmdexec("taskkill /F /PID %d" %(pid,))
-    else:
-        os.kill(pid, 15)
-        
+if sys.platform == "win32":
+    try:
+        import ctypes
+    except ImportError:
+        def dokill(pid):
+            py.process.cmdexec("taskkill /F /PID %d" %(pid,))
+    else: 
+        def dokill(pid):
+            PROCESS_TERMINATE = 1
+            handle = ctypes.windll.kernel32.OpenProcess(
+                        PROCESS_TERMINATE, False, process.pid)
+            ctypes.windll.kernel32.TerminateProcess(handle, -1)
+            ctypes.windll.kernel32.CloseHandle(handle)
+else:
+    def dokill(pid):
+        os.kill(pid, 15)     
+
+def kill(pid):
+    """ kill process by id. """
+    dokill(pid)

Copied: py/trunk/py/process/testing/test_killproc.py (from r58363, py/trunk/py/misc/testing/test_oskill.py)
==============================================================================
--- py/trunk/py/misc/testing/test_oskill.py	(original)
+++ py/trunk/py/process/testing/test_killproc.py	Tue Sep 23 16:28:13 2008
@@ -1,23 +1,13 @@
 
 import py, sys
 
-from py.__.misc.killproc import killproc
-
-def test_win_killsubprocess():
-    if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
-        py.test.skip("you\'re using an older version of windows, which "
-                     "doesn\'t support 'taskkill' - py.misc.killproc is not "
-                     "available")
-    try:
-        import subprocess
-    except ImportError:
-        py.test.skip("no subprocess module")
-    tmp = py.test.ensuretemp("test_win_killsubprocess")
+def test_kill():
+    subprocess = py.test.importorskip("subprocess")
+    tmp = py.test.ensuretemp("test_kill")
     t = tmp.join("t.py")
     t.write("import time ; time.sleep(100)")
     proc = py.std.subprocess.Popen([sys.executable, str(t)])
     assert proc.poll() is None # no return value yet
-    killproc(proc.pid)
+    py.process.kill(proc.pid)
     ret = proc.wait()
     assert ret != 0
-        



More information about the pytest-commit mailing list