[py-svn] r58385 - in py/trunk/py: . execnet/testing misc misc/testing path/svn/testing process process/testing
hpk at codespeak.net
hpk at codespeak.net
Tue Sep 23 16:28:13 CEST 2008
Author: hpk
Date: Tue Sep 23 16:28:13 2008
New Revision: 58385
Added:
py/trunk/py/process/killproc.py
- copied, changed from r58363, py/trunk/py/misc/killproc.py
py/trunk/py/process/testing/test_killproc.py
- copied, changed from r58363, py/trunk/py/misc/testing/test_oskill.py
Removed:
py/trunk/py/misc/killproc.py
py/trunk/py/misc/testing/test_oskill.py
Modified:
py/trunk/py/__init__.py
py/trunk/py/execnet/testing/test_gateway.py
py/trunk/py/path/svn/testing/test_auth.py
Log:
* move cross-platform process kill functionality and move it to
py.process.kill(pid)
* simplify test_auth.py
* use new functionality from some tests
Modified: py/trunk/py/__init__.py
==============================================================================
--- py/trunk/py/__init__.py (original)
+++ py/trunk/py/__init__.py Tue Sep 23 16:28:13 2008
@@ -99,6 +99,7 @@
'process.__doc__' : ('./process/__init__.py', '__doc__'),
'process.cmdexec' : ('./process/cmdexec.py', 'cmdexec'),
+ 'process.kill' : ('./process/killproc.py', 'kill'),
'process.ForkedFunc' : ('./process/forkedfunc.py', 'ForkedFunc'),
# path implementation
Modified: py/trunk/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py (original)
+++ py/trunk/py/execnet/testing/test_gateway.py Tue Sep 23 16:28:13 2008
@@ -516,8 +516,6 @@
def test_waitclose_on_remote_killed(self):
py.test.skip("fix needed: dying remote process does not cause waitclose() to fail")
- if not hasattr(py.std.os, 'kill'):
- py.test.skip("no os.kill")
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import os
@@ -527,7 +525,7 @@
channel.send("#" * 100)
""")
remotepid = channel.receive()
- os.kill(remotepid, 9)
+ py.process.kill(remotepid)
py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
py.test.raises(EOFError, channel.send, None)
py.test.raises(EOFError, channel.receive)
Deleted: /py/trunk/py/misc/killproc.py
==============================================================================
--- /py/trunk/py/misc/killproc.py Tue Sep 23 16:28:13 2008
+++ (empty file)
@@ -1,10 +0,0 @@
-
-import py
-import os, sys
-
-def killproc(pid):
- if sys.platform == "win32":
- py.process.cmdexec("taskkill /F /PID %d" %(pid,))
- else:
- os.kill(pid, 15)
-
Deleted: /py/trunk/py/misc/testing/test_oskill.py
==============================================================================
--- /py/trunk/py/misc/testing/test_oskill.py Tue Sep 23 16:28:13 2008
+++ (empty file)
@@ -1,23 +0,0 @@
-
-import py, sys
-
-from py.__.misc.killproc import killproc
-
-def test_win_killsubprocess():
- if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
- py.test.skip("you\'re using an older version of windows, which "
- "doesn\'t support 'taskkill' - py.misc.killproc is not "
- "available")
- try:
- import subprocess
- except ImportError:
- py.test.skip("no subprocess module")
- tmp = py.test.ensuretemp("test_win_killsubprocess")
- t = tmp.join("t.py")
- t.write("import time ; time.sleep(100)")
- proc = py.std.subprocess.Popen([sys.executable, str(t)])
- assert proc.poll() is None # no return value yet
- killproc(proc.pid)
- ret = proc.wait()
- assert ret != 0
-
Modified: py/trunk/py/path/svn/testing/test_auth.py
==============================================================================
--- py/trunk/py/path/svn/testing/test_auth.py (original)
+++ py/trunk/py/path/svn/testing/test_auth.py Tue Sep 23 16:28:13 2008
@@ -3,7 +3,6 @@
import svntestbase
from threading import Thread
import time
-from py.__.misc.killproc import killproc
from py.__.conftest import option
def make_repo_auth(repo, userdata):
@@ -269,6 +268,10 @@
func_name))
self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False,
interactive=False)
+ self.port, self.pid = self._start_svnserve()
+
+ def teardown_method(self, method):
+ py.process.kill(self.pid)
def _start_svnserve(self):
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
@@ -279,203 +282,168 @@
class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
def test_checkout_constructor_arg(self):
- port, pid = self._start_svnserve()
- try:
- wc = py.path.svnwc(self.temppath, auth=self.auth)
- wc.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename))
- assert wc.join('.svn').check()
- finally:
- # XXX can we do this in a teardown_method too? not sure if that's
- # guaranteed to get called...
- killproc(pid)
+ port = self.port
+ wc = py.path.svnwc(self.temppath, auth=self.auth)
+ wc.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename))
+ assert wc.join('.svn').check()
def test_checkout_function_arg(self):
- port, pid = self._start_svnserve()
- try:
- wc = py.path.svnwc(self.temppath, auth=self.auth)
- wc.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename))
- assert wc.join('.svn').check()
- finally:
- killproc(pid)
+ port = self.port
+ wc = py.path.svnwc(self.temppath, auth=self.auth)
+ wc.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename))
+ assert wc.join('.svn').check()
def test_checkout_failing_non_interactive(self):
- port, pid = self._start_svnserve()
- try:
- auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
- interactive=False)
- wc = py.path.svnwc(self.temppath, auth)
- py.test.raises(Exception,
- ("wc.checkout('svn://localhost:%s/%s' % "
- "(port, self.repopath.basename))"))
- finally:
- killproc(pid)
+ port = self.port
+ auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
+ interactive=False)
+ wc = py.path.svnwc(self.temppath, auth)
+ py.test.raises(Exception,
+ ("wc.checkout('svn://localhost:%s/%s' % "
+ "(port, self.repopath.basename))"))
def test_log(self):
- port, pid = self._start_svnserve()
- try:
- wc = py.path.svnwc(self.temppath, self.auth)
- wc.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename))
- foo = wc.ensure('foo.txt')
- wc.commit('added foo.txt')
- log = foo.log()
- assert len(log) == 1
- assert log[0].msg == 'added foo.txt'
- finally:
- killproc(pid)
+ port = self.port
+ wc = py.path.svnwc(self.temppath, self.auth)
+ wc.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename))
+ foo = wc.ensure('foo.txt')
+ wc.commit('added foo.txt')
+ log = foo.log()
+ assert len(log) == 1
+ assert log[0].msg == 'added foo.txt'
def test_switch(self):
- port, pid = self._start_svnserve()
- try:
- wc = py.path.svnwc(self.temppath, auth=self.auth)
- svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
- wc.checkout(svnurl)
- wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
- wc.commit('added foo dir with foo.txt file')
- wc.ensure('bar', dir=True)
- wc.commit('added bar dir')
- bar = wc.join('bar')
- bar.switch(svnurl + '/foo')
- assert bar.join('foo.txt')
- finally:
- killproc(pid)
+ port = self.port
+ wc = py.path.svnwc(self.temppath, auth=self.auth)
+ svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
+ wc.checkout(svnurl)
+ wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
+ wc.commit('added foo dir with foo.txt file')
+ wc.ensure('bar', dir=True)
+ wc.commit('added bar dir')
+ bar = wc.join('bar')
+ bar.switch(svnurl + '/foo')
+ assert bar.join('foo.txt')
def test_update(self):
- port, pid = self._start_svnserve()
- try:
- wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
- auth=self.auth)
- wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
- auth=self.auth)
- wc1.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename))
- wc2.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename))
- wc1.ensure('foo', dir=True)
- wc1.commit('added foo dir')
- wc2.update()
- assert wc2.join('foo').check()
-
- auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
- wc2.auth = auth
- py.test.raises(Exception, 'wc2.update()')
- finally:
- killproc(pid)
+ port = self.port
+ wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
+ auth=self.auth)
+ wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
+ auth=self.auth)
+ wc1.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename))
+ wc2.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename))
+ wc1.ensure('foo', dir=True)
+ wc1.commit('added foo dir')
+ wc2.update()
+ assert wc2.join('foo').check()
+
+ auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
+ wc2.auth = auth
+ py.test.raises(Exception, 'wc2.update()')
def test_lock_unlock_status(self):
- port, pid = self._start_svnserve()
- try:
- wc = py.path.svnwc(self.temppath, auth=self.auth)
- wc.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename,))
- wc.ensure('foo', file=True)
- wc.commit('added foo file')
- foo = wc.join('foo')
- foo.lock()
- status = foo.status()
- assert status.locked
- foo.unlock()
- status = foo.status()
- assert not status.locked
-
- auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
- foo.auth = auth
- py.test.raises(Exception, 'foo.lock()')
- py.test.raises(Exception, 'foo.unlock()')
- finally:
- killproc(pid)
+ port = self.port
+ wc = py.path.svnwc(self.temppath, auth=self.auth)
+ wc.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename,))
+ wc.ensure('foo', file=True)
+ wc.commit('added foo file')
+ foo = wc.join('foo')
+ foo.lock()
+ status = foo.status()
+ assert status.locked
+ foo.unlock()
+ status = foo.status()
+ assert not status.locked
+
+ auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
+ foo.auth = auth
+ py.test.raises(Exception, 'foo.lock()')
+ py.test.raises(Exception, 'foo.unlock()')
def test_diff(self):
- port, pid = self._start_svnserve()
- try:
- wc = py.path.svnwc(self.temppath, auth=self.auth)
- wc.checkout(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename,))
- wc.ensure('foo', file=True)
- wc.commit('added foo file')
- wc.update()
- rev = int(wc.status().rev)
- foo = wc.join('foo')
- foo.write('bar')
- diff = foo.diff()
- assert '\n+bar\n' in diff
- foo.commit('added some content')
- diff = foo.diff()
- assert not diff
- diff = foo.diff(rev=rev)
- assert '\n+bar\n' in diff
-
- auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
- foo.auth = auth
- py.test.raises(Exception, 'foo.diff(rev=rev)')
- finally:
- killproc(pid)
+ port = self.port
+ wc = py.path.svnwc(self.temppath, auth=self.auth)
+ wc.checkout(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename,))
+ wc.ensure('foo', file=True)
+ wc.commit('added foo file')
+ wc.update()
+ rev = int(wc.status().rev)
+ foo = wc.join('foo')
+ foo.write('bar')
+ diff = foo.diff()
+ assert '\n+bar\n' in diff
+ foo.commit('added some content')
+ diff = foo.diff()
+ assert not diff
+ diff = foo.diff(rev=rev)
+ assert '\n+bar\n' in diff
+
+ auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
+ foo.auth = auth
+ py.test.raises(Exception, 'foo.diff(rev=rev)')
class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
def test_listdir(self):
- port, pid = self._start_svnserve()
- try:
- u = py.path.svnurl(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename),
- auth=self.auth)
- u.ensure('foo')
- paths = u.listdir()
- assert len(paths) == 1
- assert paths[0].auth is self.auth
-
- auth = SvnAuth('foo', 'bar', interactive=False)
- u = py.path.svnurl(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename),
- auth=auth)
- py.test.raises(Exception, 'u.listdir()')
- finally:
- killproc(pid)
+ port = self.port
+ u = py.path.svnurl(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename),
+ auth=self.auth)
+ u.ensure('foo')
+ paths = u.listdir()
+ assert len(paths) == 1
+ assert paths[0].auth is self.auth
+
+ auth = SvnAuth('foo', 'bar', interactive=False)
+ u = py.path.svnurl(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename),
+ auth=auth)
+ py.test.raises(Exception, 'u.listdir()')
def test_copy(self):
- port, pid = self._start_svnserve()
- try:
- u = py.path.svnurl(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename),
- auth=self.auth)
- foo = u.ensure('foo')
- bar = u.join('bar')
- foo.copy(bar)
- assert bar.check()
- assert bar.auth is self.auth
-
- auth = SvnAuth('foo', 'bar', interactive=False)
- u = py.path.svnurl(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename),
- auth=auth)
- foo = u.join('foo')
- bar = u.join('bar')
- py.test.raises(Exception, 'foo.copy(bar)')
- finally:
- killproc(pid)
+ port = self.port
+ u = py.path.svnurl(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename),
+ auth=self.auth)
+ foo = u.ensure('foo')
+ bar = u.join('bar')
+ foo.copy(bar)
+ assert bar.check()
+ assert bar.auth is self.auth
+
+ auth = SvnAuth('foo', 'bar', interactive=False)
+ u = py.path.svnurl(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename),
+ auth=auth)
+ foo = u.join('foo')
+ bar = u.join('bar')
+ py.test.raises(Exception, 'foo.copy(bar)')
def test_write_read(self):
- port, pid = self._start_svnserve()
- try:
- u = py.path.svnurl(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename),
- auth=self.auth)
- foo = u.ensure('foo')
- fp = foo.open()
- try:
- data = fp.read()
- finally:
- fp.close()
- assert data == ''
-
- auth = SvnAuth('foo', 'bar', interactive=False)
- u = py.path.svnurl(
- 'svn://localhost:%s/%s' % (port, self.repopath.basename),
- auth=auth)
- foo = u.join('foo')
- py.test.raises(Exception, 'foo.open()')
- finally:
- killproc(pid)
+ port = self.port
+ u = py.path.svnurl(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename),
+ auth=self.auth)
+ foo = u.ensure('foo')
+ fp = foo.open()
+ try:
+ data = fp.read()
+ finally:
+ fp.close()
+ assert data == ''
+
+ auth = SvnAuth('foo', 'bar', interactive=False)
+ u = py.path.svnurl(
+ 'svn://localhost:%s/%s' % (port, self.repopath.basename),
+ auth=auth)
+ foo = u.join('foo')
+ py.test.raises(Exception, 'foo.open()')
# XXX rinse, repeat... :|
Copied: py/trunk/py/process/killproc.py (from r58363, py/trunk/py/misc/killproc.py)
==============================================================================
--- py/trunk/py/misc/killproc.py (original)
+++ py/trunk/py/process/killproc.py Tue Sep 23 16:28:13 2008
@@ -1,10 +1,23 @@
-
import py
import os, sys
-def killproc(pid):
- if sys.platform == "win32":
- py.process.cmdexec("taskkill /F /PID %d" %(pid,))
- else:
- os.kill(pid, 15)
-
+if sys.platform == "win32":
+ try:
+ import ctypes
+ except ImportError:
+ def dokill(pid):
+ py.process.cmdexec("taskkill /F /PID %d" %(pid,))
+ else:
+ def dokill(pid):
+ PROCESS_TERMINATE = 1
+ handle = ctypes.windll.kernel32.OpenProcess(
+ PROCESS_TERMINATE, False, process.pid)
+ ctypes.windll.kernel32.TerminateProcess(handle, -1)
+ ctypes.windll.kernel32.CloseHandle(handle)
+else:
+ def dokill(pid):
+ os.kill(pid, 15)
+
+def kill(pid):
+ """ kill process by id. """
+ dokill(pid)
Copied: py/trunk/py/process/testing/test_killproc.py (from r58363, py/trunk/py/misc/testing/test_oskill.py)
==============================================================================
--- py/trunk/py/misc/testing/test_oskill.py (original)
+++ py/trunk/py/process/testing/test_killproc.py Tue Sep 23 16:28:13 2008
@@ -1,23 +1,13 @@
import py, sys
-from py.__.misc.killproc import killproc
-
-def test_win_killsubprocess():
- if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
- py.test.skip("you\'re using an older version of windows, which "
- "doesn\'t support 'taskkill' - py.misc.killproc is not "
- "available")
- try:
- import subprocess
- except ImportError:
- py.test.skip("no subprocess module")
- tmp = py.test.ensuretemp("test_win_killsubprocess")
+def test_kill():
+ subprocess = py.test.importorskip("subprocess")
+ tmp = py.test.ensuretemp("test_kill")
t = tmp.join("t.py")
t.write("import time ; time.sleep(100)")
proc = py.std.subprocess.Popen([sys.executable, str(t)])
assert proc.poll() is None # no return value yet
- killproc(proc.pid)
+ py.process.kill(proc.pid)
ret = proc.wait()
assert ret != 0
-
More information about the pytest-commit
mailing list