Medusa on Macintosh

Samuel Reynolds reynol@p...
Wed, 18 Apr 2001 11:00:34 -0600


---------------------- multipart/mixed attachment
Okay. I finally got back to the Mac-port/filesys.py changes
testing. I've updated my modified filesys.py and tested it
on MacOS 9, Windows NT, and SunOS (Solaris?) 5.7, and it
works like a champ. I even dropped it into Zope/ZServer/medusa
and ran Zope on top of it, without a hitch. The changes are as
described in a previous post
(http://groups.yahoo.com/group/medusa/message/286).
I have made some additional tweaks to os_filesystem.translate,
and improved the test script, so the files attached to the
earlier message are outdated.

While there is platform-specific code (unavoidable, I think),
it is only in os_filesystem.translate.

I've attached two files:
o filesys.py -- The modified file, tested on MacOS, WinNT, and Sun.
o filesys_test.py -- The test script.
These files have Windows-style (\r\n) line endings, so you
may need to convert them for your system (e.g., using
dos2unix on Unices).

Is anyone other than me interested in a Mac port and/or
improving filesys.py? Or should I keep changes like this
to myself in the future?

If there is interest, I'd appreciate if others would try this
out. (I renamed the original filesys.py to filesys_original.py
and dropped the modified file in its place; that makes it easy to
revert.) If you try it, I suggest you start by running the
test script. It generates absolute (native file system) paths
from standard relative paths within the Python tree for posix,
Dos/Windows/OS2, or Mac, as appropriate, then exercises
os_filesystem and compares the results to the reference paths.
If you test on any other system, you may need to add a new
block to generate the native absolute paths for your system,
and any changes to filesys.py will affect only
os_filesystem.translate.

- Sam
---------------------- multipart/mixed attachment
# filesys_test.py
#
#	SCR 2001.04.17
#

import os
import filesys
import string

# Utility for converting path separator
def ConvertSep( s, fromSep, toSep ):
return string.join( string.split( s, fromSep ), toSep )


print "**** Testing filesys.os_filesystem ****"
print ""

# Get working directory
workingDir = os.getcwd()
#print "Working directory:", workingDir

# Must be run from within Python directory
if string.find( workingDir, "Python" ) < 0:
raise RuntimeError, "Test must be run from within or below the Python main directory"

# Determine Python directory path
# We will use this as the root for the virtual filesystem.
bits = string.split( workingDir, os.sep )
#print "***"
#print bits
#print "***"
while string.find( bits[-1], "Python" ) < 0:
bits = bits[:-1]
pythonDir = string.join( bits, os.sep )
if os.name == 'mac':
pythonDir = pythonDir + os.sep
#print "pythonDir =", pythonDir
#print " "

# Create filesys for testing
fs = filesys.os_filesystem( pythonDir, '/' )

# Pick some subpaths to work with.
relPaths = ["Lib", "Lib/distutils", "Lib/distutils/command", "Lib/asyncore.py"]
realPaths = []
# Pull out longest directory path for cd-up/cd-down testing
deepPath = ""
deepPathDepth = 0
for relPath in relPaths:
if ( fs.isfile(relPath) ):
continue
tempL = string.split( relPath, '/' )
tempD = len(tempL)
if ( tempD > deepPathDepth ):
deepPathDepth = tempD
deepPath = relPath

# Since we're testing the conversion code, we have to
# determine the absolute paths the hard way for comparison.
if os.name == 'posix':
# Just concatenate root & path
for relPath in relPaths:
realPaths = realPaths \
+ [ pythonDir + sub_lib ]
elif os.name == 'mac':
# Explicitly add trailing separator for directory paths
# (with prior knowledge that the last path is a file)
onlyFilePath = relPaths[-1]
for relPath in relPaths:
temp = pythonDir + ConvertSep( relPath, '/', os.sep ) + os.sep
if relPath == onlyFilePath:
temp = temp[:-1]
#if relPath == onlyFilePath:
#	realPaths = realPaths \
#	+ [ pythonDir + ConvertSep( relPath, '/', os.sep ) ]
#else:
#	realPaths = realPaths \
#	+ [ pythonDir + ConvertSep( relPath, '/', os.sep ) + os.sep ]
realPaths = realPaths + [ temp ]
elif os.name == 'nt':
# Explicitly add trailing separator to root directory path.
for relPath in relPaths:
realPaths = realPaths \
+ [ pythonDir + os.sep + ConvertSep( relPath, '/', os.sep ) ]
else:
# os.name not specifically addressed.
# This is a test error!
raise RuntimeError, "Test paths not computed for os.name = " + os.name

exit

print "** Reference paths for test **"
print "%-21s ==> %s" % ( "Python directory", pythonDir )
for n in range( len(relPaths) ):
print "%-21s ==> %s" % ( relPaths[n], realPaths[n] )
print " "

# Core test & statistics routines
global passCount
passCount = 0
global failCount
failCount = 0
def CHKEQ( desc, testValue, refValue ):
global passCount
global failCount
passesTest = ( testValue == refValue )
if passesTest:
print "CHKEQ: %-50s - PASS" % desc
passCount += 1
else:
print "CHKEQ: %-50s - FAIL" % desc
print "\tExpected:", refValue
print "\tActual :", testValue
failCount += 1
return ( passesTest )

def CHKNE( desc, testValue, refValue ):
global passCount
global failCount
passesTest = ( testValue != refValue )
if passesTest:
print "CHKNE: %-50s - PASS" % desc
passCount += 1
else:
print "CHKNE: %-50s - FAIL" % desc
print "\tValue:", testValue
failCount += 1
return ( passesTest )

def CHKPASS( desc ):
global passCount
print "CHKPASS: %-50s - PASS" % desc
passCount += 1

def CHKFAIL( desc ):
global failCount
print "CHKFAIL: %-50s - FAIL" % desc
failCount += 1

#### Translation from virtual to absolute path

def CHK_TRANSLATE( subPath, refValue ):
desc = "%-25s ==> absolute" % subPath
temp = fs.translate( subPath )
CHKEQ( desc, temp, refValue )
for n in range( len(relPaths) ):
CHK_TRANSLATE( relPaths[n], realPaths[n] )


#### Relative movement within directory structure

# First, check our starting point
CHKEQ( "Initial virtual directory", fs.current_directory(), "/" )

# From '/', cwd('/') should leave us where we are
fs.cwd( '/' )
CHKEQ( "After cwd('/')", fs.current_directory(), "/" )

# CD into sub_lib, then back to top
for relPath in relPaths:
fs.cwd( relPath )
if ( fs.isfile( relPath) ):
# Shouldn't change working directory
CHKEQ( "After cwd('"+relPath+"')", fs.current_directory(), "/" )
else:
# Should have changed working directory
CHKEQ( "After cwd('"+relPath+"')", fs.current_directory(), "/" + relPath )
# Change working directory back to root
fs.cwd('/')
CHKEQ( "After cwd('/')", fs.current_directory(), "/" )

# Create up/down paths list
bits = string.split( deepPath, '/' )
upDownPaths1 = []
upDownPaths2 = []
onePath = "/"
for bit in bits:
upDownPaths1 += [ bit ]
onePath += bit
upDownPaths2 += [ onePath ]
onePath += '/'

# Start at virtual root
fs.cwd( '/' )

# CD up one level, and up two levels (should not move)
fs.cwd( ".." )
CHKEQ( "After cwd('..') from '/' ==> '/'", fs.current_directory(), "/" )
fs.cwd( '/' )
fs.cwd( "../.." )
CHKEQ( "After cwd('../..') from '/' ==> '/'", fs.current_directory(), "/" )

print "------------"

step = 1
for idx in range(0, len(upDownPaths1), step):
fs.cwd( upDownPaths1[idx] );
CHKEQ( "After cwd('"+upDownPaths1[idx]+"')", fs.current_directory(), \
upDownPaths2[idx] )
fs.cwd('/')

# CD into sub_lib_distutils_cmd as virtual "absolute" path
#fs.cwd( '/' + sub_lib_distutils_cmd )
#CHKEQ( "After cwd('/"+sub_lib_distutils_cmd+"')", \
#	fs.current_directory(), "/" + sub_lib_distutils_cmd )

# CD up two levels
#fs.cwd( "../.." )
#CHKEQ( "After cwd('../..')", \
#	fs.current_directory(), "/" + sub_lib )

# CD back to top
#fs.cwd('/')
#CHKEQ( "After cwd('/')", fs.current_directory(), "/" )

# Try to CD above root
#fs.cwd('/')
#fs.cwd('..')
#CHKEQ( "After cwd('..') when wd='/'", fs.current_directory(), "/" )

# CD up two levels above root
#fs.cwd('/')
#fs.cwd( "../.." )
#CHKEQ( "After cwd('../..') when wd='/'", fs.current_directory(), "/" )

# CD up past root from subdirectory
#fs.cwd('/' + sub_lib)
##CHKEQ( "After cwd('../..') when wd='/" + sub_lib + "'", \
#	fs.current_directory(), "/" )


#### Informational Tests

#	isfile/isdir
for relPath in relPaths:
if ( fs.isfile(relPath) ):
CHKEQ( "%-30s isdir:%d isfile:%d" % \
(relPath, fs.isdir(relPath), fs.isfile(relPath)), \
"d:%d f:%d" % (fs.isdir(relPath), fs.isfile(relPath)), \
"d:0 f:1" )
else:
CHKEQ( "%-30s isdir:%d isfile:%d" % \
(relPath, fs.isdir(relPath), fs.isfile(relPath)), \
"d:%d f:%d" % (fs.isdir(relPath), fs.isfile(relPath)), \
"d:1 f:0" )

#	exists
for relPath in relPaths:
# Real paths
CHKEQ( "%-30s exists:%d" % ( relPath, fs.exists(relPath) ),
fs.exists(relPath), 1 )
# Fake paths
CHKEQ( "%-30s exists:%d" % ( relPath+'_XX', fs.exists(relPath+'_XX') ),
fs.exists(relPath+'_XX'), 0 )

#	stat
for relPath in relPaths:
# Real paths
try:
temp = fs.stat( relPath )
CHKEQ( "stat('%s') returns tuple" % relPath, type(temp), type(()) )
except:
CHKFAIL( "stat('%s') returns tuple" % relPath )
# Fake paths
try:
temp = fs.stat( relPath+'_XX' )
CHKEQ( "stat('%s') throws" % (relPath+'_XX'), type(temp), None )
except:
CHKPASS( "stat('%s') throws" % (relPath+'_XX') )


print ""
print ' ' + '=' * 11
print " PASS: %5d" % passCount
print " FAIL: %5d" % failCount
print ' ' + '=' * 11
print ""

---------------------- multipart/mixed attachment
# -*- Mode: Python; tab-width: 4 -*-
#	$Id: filesys.py,v 1.9 1998/06/22 05:36:05 rushing Exp $
#	Author: Sam Rushing <rushing@n...>
#
# Generic filesystem interface.
#

# We want to provide a complete wrapper around any and all
# filesystem operations.

# this class is really just for documentation,
# identifying the API for a filesystem object.

# opening files for reading, and listing directories, should
# return a producer.

import posixpath

class abstract_filesystem:
def __init__ (self):
pass

def current_directory (self):
"Return a string representing the current directory."
pass

def listdir (self, path, long=0):
"""Return a listing of the directory at 'path' The empty string
indicates the current directory. If 'long' is set, instead
return a list of (name, stat_info) tuples
"""
pass

def open (self, path, mode):
"Return an open file object"
pass

def stat (self, path):
"Return the equivalent of os.stat() on the given path."
pass

def isdir (self, path):
"Does the path represent a directory?"
pass

def isfile (self, path):
"Does the path represent a plain file?"
pass

def cwd (self, path):
"Change the working directory."
pass

def cdup (self):
"Change to the parent of the current directory."
pass


def longify (self, path):
"""Return a 'long' representation of the filename
[for the output of the LIST command]"""
pass

# standard wrapper around a unix-like filesystem, with a 'false root'
# capability.

# security considerations: can symbolic links be used to 'escape' the
# root? should we allow it? if not, then we could scan the
# filesystem on startup, but that would not help if they were added
# later. We will probably need to check for symlinks in the cwd method.

# what to do if wd is an invalid directory?

import os
import stat

import string

def safe_stat (path):
try:
return (path, os.stat (path))
except:
return None

import regex
import regsub
import glob

class os_filesystem:
path_module = os.path

# set this to zero if you want to disable pathname globbing.
# [we currently don't glob, anyway]
do_globbing = 1

def __init__ (self, root, wd='/'):
self.root = root
self.wd = wd

### Provide replacements for path methods

def abspath (self, path):
return self.normalize (posixpath.join (self.wd, path))

def basename (self, path):
return posixpath.basename (path)

def commonprefix (self, pathlist):
return posixpath.commonprefix (pathlist)

def dirname (self, path):
return posixpath.dirname (path)

def exists (self, path):
p = self.abspath (path)
return self.path_module.exists (self.translate(p))

def expanduser (self, path):
p = self.abspath (path)
return self.path_module.expanduser (self.translate(p))

def expandvars (self, path):
p = self.abspath (path)
return self.path_module.expandvars (self.translate(p))

def getatime (self, path):
p = self.abspath (path)
return self.path_module.getatime (self.translate(p))

def getmtime (self, path):
p = self.abspath (path)
return self.path_module.getmtime (self.translate(p))

def getsize (self, path):
p = self.abspath (path)
return self.path_module.getsize (self.translate(p))

def isabs (self, path):
return posixpath.isabs (path)

def isdir (self, path):
p = self.abspath (path)
return self.path_module.isdir (self.translate(p))

def isfile (self, path):
p = self.abspath (path)
return self.path_module.isfile (self.translate(p))

def islink (self, path):
p = self.abspath (path)
return self.path_module.islink (self.translate(p))

def ismount (self, path):
p = self.abspath (path)
return self.path_module.ismount (self.translate(p))

def normcase (self, path):
return posixpath.normcase (path)

def normpath (self, path):
return posixpath.normpath (path)

def samefile (self, p1, p2):
p1 = self.abspath (p1)
p2 = self.abspath (p2)
return self.path_module.samefile (self.translate(p1), self.translate(p2))

def sameopenfile (self, fp1, fp2):
return self.path_module.sameopenfile (fp1, fp2)

def samestat (self, s1, s2):
return self.path_module.samestat (s1, s2)

def split (self, path):
return posixpath.split (path)

def stat (self, path):
p = self.abspath (path)
p = self.translate(p)
st = os.stat (p)
return st

# splitdrive NOT IMPLEMENTED

def splittext (self, path):
return posixpath.splittext (path)


### Provide Unix filesystem commands

def cwd (self, path):
p = self.abspath (path)
translated_path = self.translate(p)
if not self.path_module.isdir (translated_path):
return 0
else:
old_dir = os.getcwd()
# temporarily change to that directory, in order
# to see if we have permission to do so.
try:
can = 0
try:
os.chdir (translated_path)
can = 1
self.wd = p
except:
pass
finally:
if can:
os.chdir (old_dir)
return can

def cdup (self):
return self.cwd ('..')

def current_directory (self):
return self.wd

def listdir (self, path, long=0):
p = self.translate (path)
# I think we should glob, but limit it to the current
# directory only.
ld = os.listdir (p)
if not long:
return list_producer (ld, 0, None)
else:
old_dir = os.getcwd()
try:
os.chdir (p)
# if os.stat fails we ignore that file.
result = filter (None, map (safe_stat, ld))
finally:
os.chdir (old_dir)
return list_producer (result, 1, self.longify)

# TODO: implement a cache w/timeout for stat()
def open (self, path, mode):
p = self.translate (path)
return open (p, mode)

def unlink (self, path):
p = self.translate (path)
return os.unlink (p)

def mkdir (self, path):
p = self.translate (path)
return os.mkdir (p)

def rmdir (self, path):
p = self.translate (path)
return os.rmdir (p)

# utility methods
def normalize (self, path):
# watch for the ever-sneaky '/+' path element
path = regsub.gsub ('/+', '/', path)
p = posixpath.normpath (path)
# remove 'dangling' cdup's.
if len(p) > 2 and p[:3] == '/..':
p = '/'
return p

def translate (self, path):
# Get full (virtual-unix) path
p = self.abspath (path)
# Convert to absolute path recognizable by current OS
if ( os.name == 'posix' ):
if ( p[:1] == os.sep ):
p = p[1:]
p = self.path_module.join( self.root, p )
else:
# Make platform-specific pre-adjustments, if any
if ( os.sep != '/' ):
p = string.join( string.split( p, '/' ), os.sep )
if (os.name == 'nt' or os.name == 'dos' or os.name == 'os2'):
if ( p[:1] == os.sep ):
p = p[1:]
p = self.path_module.join( self.root, p )
# Make platform-specific post-adjustments, if any
if (os.name == 'mac'):
if self.path_module.isdir( p ) and ( p[-1] <> os.sep ):
p = p + os.sep
return p

def longify (self, (path, stat_info)):
return unix_longify (path, stat_info)

def __repr__ (self):
return '<unix-style fs root:%s wd:%s>' % (
self.root,
self.wd
)

if os.name == 'posix':

class unix_filesystem (os_filesystem):
pass

class schizophrenic_unix_filesystem (os_filesystem):
PROCESS_UID	= os.getuid()
PROCESS_EUID	= os.geteuid()
PROCESS_GID	= os.getgid()
PROCESS_EGID	= os.getegid()

def __init__ (self, root, wd='/', persona=(None, None)):
os_filesystem.__init__ (self, root, wd)
self.persona = persona

def become_persona (self):
if self.persona is not (None, None):
uid, gid = self.persona
# the order of these is important!
os.setegid (gid)
os.seteuid (uid)

def become_nobody (self):
if self.persona is not (None, None):
os.seteuid (self.PROCESS_UID)
os.setegid (self.PROCESS_GID)

# cwd, cdup, open, listdir
def cwd (self, path):
try:
self.become_persona()
return os_filesystem.cwd (self, path)
finally:
self.become_nobody()

def cdup (self, path):
try:
self.become_persona()
return os_filesystem.cdup (self)
finally:
self.become_nobody()

def open (self, filename, mode):
try:
self.become_persona()
return os_filesystem.open (self, filename, mode)
finally:
self.become_nobody()

def listdir (self, path, long=0):
try:
self.become_persona()
return os_filesystem.listdir (self, path, long)
finally:
self.become_nobody()

# This hasn't been very reliable across different platforms.
# maybe think about a separate 'directory server'.
#
#	import posixpath
#	import fcntl
#	import FCNTL
#	import select
#	import asyncore
#
#	# pipes /bin/ls for directory listings.
#	class unix_filesystem (os_filesystem):
#	pass
# path_module = posixpath
#
# def listdir (self, path, long=0):
# p = self.translate (path)
# if not long:
# return list_producer (os.listdir (p), 0, None)
# else:
# command = '/bin/ls -l %s' % p
# print 'opening pipe to "%s"' % command
# fd = os.popen (command, 'rt')
# return pipe_channel (fd)
#
# # this is both a dispatcher, _and_ a producer
# class pipe_channel (asyncore.file_dispatcher):
# buffer_size = 4096
#
# def __init__ (self, fd):
# asyncore.file_dispatcher.__init__ (self, fd)
# self.fd = fd
# self.done = 0
# self.data = ''
#
# def handle_read (self):
# if len (self.data) < self.buffer_size:
# self.data = self.data + self.fd.read (self.buffer_size)
# #print '%s.handle_read() => len(self.data) == %d' % (self, len(self.data))
#
# def handle_expt (self):
# #print '%s.handle_expt()' % self
# self.done = 1
#
# def ready (self):
# #print '%s.ready() => %d' % (self, len(self.data))
# return ((len (self.data) > 0) or self.done)
#
# def more (self):
# if self.data:
# r = self.data
# self.data = ''
# elif self.done:
# self.close()
# self.downstream.finished()
# r = ''
# else:
# r = None
# #print '%s.more() => %s' % (self, (r and len(r)))
# return r

# For the 'real' root, we could obtain a list of drives, and then
# use that. Doesn't win32 provide such a 'real' filesystem?
# [yes, I think something like this "\\.\c\windows"]

class msdos_filesystem (os_filesystem):
def longify (self, (path, stat_info)):
return msdos_longify (path, stat_info)

# A merged filesystem will let you plug other filesystems together.
# We really need the equivalent of a 'mount' capability - this seems
# to be the most general idea. So you'd use a 'mount' method to place
# another filesystem somewhere in the hierarchy.

# Note: this is most likely how I will handle ~user directories
# with the http server.

class merged_filesystem:
def __init__ (self, *fsys):
pass

# this matches the output of NT's ftp server (when in
# MSDOS mode) exactly.

def msdos_longify (file, stat_info):
if stat.S_ISDIR (stat_info[stat.ST_MODE]):
dir = '<DIR>'
else:
dir = ' '
date = msdos_date (stat_info[stat.ST_MTIME])
return '%s %s %8d %s' % (
date,
dir,
stat_info[stat.ST_SIZE],
file
)

def msdos_date (t):
try:
info = time.gmtime (t)
except:
info = time.gmtime (0)
# year, month, day, hour, minute, second, ...
if info[3] > 11:
merid = 'PM'
info[3] = info[3] - 12
else:
merid = 'AM'
return '%02d-%02d-%02d %02d:%02d%s' % (
info[1],
info[2],
info[0]%100,
info[3],
info[4],
merid
)

months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

mode_table = {
'0':'---',
'1':'--x',
'2':'-w-',
'3':'-wx',
'4':'r--',
'5':'r-x',
'6':'rw-',
'7':'rwx'
}

import time

def unix_longify (file, stat_info):
# for now, only pay attention to the lower bits
mode = ('%o' % stat_info[stat.ST_MODE])[-3:]
mode = string.join (map (lambda x: mode_table[x], mode), '')
if stat.S_ISDIR (stat_info[stat.ST_MODE]):
dirchar = 'd'
else:
dirchar = '-'
date = ls_date (long(time.time()), stat_info[stat.ST_MTIME])
return '%s%s %3d %-8d %-8d %8d %s %s' % (
dirchar,
mode,
stat_info[stat.ST_NLINK],
stat_info[stat.ST_UID],
stat_info[stat.ST_GID],
stat_info[stat.ST_SIZE],
date,
file
)

# Emulate the unix 'ls' command's date field.
# it has two formats - if the date is more than 180
# days in the past, then it's like this:
# Oct 19 1995
# otherwise, it looks like this:
# Oct 19 17:33

def ls_date (now, t):
try:
info = time.gmtime (t)
except:
info = time.gmtime (0)
# 15,600,000 == 86,400 * 180
if (now - t) > 15600000:
return '%s %2d %d' % (
months[info[1]-1],
info[2],
info[0]
)
else:
return '%s %2d %02d:%02d' % (
months[info[1]-1],
info[2],
info[3],
info[4]
)

# ===========================================================================
# Producers
# ===========================================================================

class list_producer:
def __init__ (self, file_list, long, longify):
self.file_list = file_list
self.long = long
self.longify = longify
self.done = 0

def ready (self):
if len(self.file_list):
return 1
else:
if not self.done:
self.done = 1
return 0
return (len(self.file_list) > 0)

# this should do a pushd/popd
def more (self):
if not self.file_list:
return ''
else:
# do a few at a time
bunch = self.file_list[:50]
if self.long:
bunch = map (self.longify, bunch)
self.file_list = self.file_list[50:]
return string.joinfields (bunch, '\r\n') + '\r\n'


---------------------- multipart/mixed attachment--