Sophie

Sophie

distrib > CentOS > 5 > x86_64 > by-pkgid > c7d5af816329c171c658e5facf128c02 > files > 236

m2crypto-0.16-9.el5.x86_64.rpm

# -*- Mode: Python; tab-width: 4 -*-
#	$Id: filesys.py 299 2005-06-09 17:32:28Z heikki $
#	Author: Sam Rushing <rushing@nightmare.com>
#
# Generic filesystem interface.
#

# We want to provide a complete wrapper around any and all
# filesystem operations.

# this class is really just for documentation,
# identifying the API for a filesystem object.

# opening files for reading, and listing directories, should
# return a producer.

class abstract_filesystem:
	def __init__ (self):
		pass

	def current_directory (self):
		"Return a string representing the current directory."
		pass

	def listdir (self, path, long=0):
		"""Return a listing of the directory at 'path' The empty string
		indicates the current directory.  If 'long' is set, instead
		return a list of (name, stat_info) tuples
		"""
		pass

	def open (self, path, mode):
		"Return an open file object"
		pass

	def stat (self, path):
		"Return the equivalent of os.stat() on the given path."
		pass

	def isdir (self, path):
		"Does the path represent a directory?"
		pass

	def isfile (self, path):
		"Does the path represent a plain file?"
		pass

	def cwd (self, path):
		"Change the working directory."
		pass

	def cdup (self):
		"Change to the parent of the current directory."
		pass


	def longify (self, path):
		"""Return a 'long' representation of the filename
		[for the output of the LIST command]"""
		pass

# standard wrapper around a unix-like filesystem, with a 'false root'
# capability.

# security considerations: can symbolic links be used to 'escape' the
# root?  should we allow it?  if not, then we could scan the
# filesystem on startup, but that would not help if they were added
# later.  We will probably need to check for symlinks in the cwd method.

# what to do if wd is an invalid directory?

import os
import stat

import string

def safe_stat (path):
	try:
		return (path, os.stat (path))
	except:
		return None

import regsub
import glob

class os_filesystem:
	path_module = os.path

	# set this to zero if you want to disable pathname globbing.
	# [we currently don't glob, anyway]
	do_globbing = 1

	def __init__ (self, root, wd='/'):
		self.root = root
		self.wd = wd

	def current_directory (self):
		return self.wd

	def isfile (self, path):
		p = self.normalize (self.path_module.join (self.wd, path))
		return self.path_module.isfile (self.translate(p))

	def isdir (self, path):
		p = self.normalize (self.path_module.join (self.wd, path))
		return self.path_module.isdir (self.translate(p))

	def cwd (self, path):
		p = self.normalize (self.path_module.join (self.wd, path))
		translated_path = self.translate(p)
		if not self.path_module.isdir (translated_path):
			return 0
		else:
			old_dir = os.getcwd()
			# temporarily change to that directory, in order
			# to see if we have permission to do so.
			try:
				can = 0
				try:
					os.chdir (translated_path)
					can = 1
					self.wd = p
				except:
					pass
			finally:
				if can:
					os.chdir (old_dir)
			return can

	def cdup (self):
		return self.cwd ('..')

	def listdir (self, path, long=0):
		p = self.translate (path)
		# I think we should glob, but limit it to the current
		# directory only.
		ld = os.listdir (p)
		if not long:
			return list_producer (ld, 0, None)
		else:
			old_dir = os.getcwd()
			try:
				os.chdir (p)
				# if os.stat fails we ignore that file.
				result = filter (None, map (safe_stat, ld))
			finally:
				os.chdir (old_dir)
			return list_producer (result, 1, self.longify)

	# TODO: implement a cache w/timeout for stat()
	def stat (self, path):
		p = self.translate (path)
		return os.stat (p)

	def open (self, path, mode):
		p = self.translate (path)
		return open (p, mode)

	def unlink (self, path):
		p = self.translate (path)
		return os.unlink (p)

	def mkdir (self, path):
		p = self.translate (path)
		return os.mkdir (p)

	def rmdir (self, path):
		p = self.translate (path)
		return os.rmdir (p)

	# utility methods
	def normalize (self, path):
		# watch for the ever-sneaky '/+' path element
		path = regsub.gsub ('/+', '/', path)
		p = self.path_module.normpath (path)
		# remove 'dangling' cdup's.
		if len(p) > 2 and p[:3] == '/..':
			p = '/'
		return p

	def translate (self, path):
		# we need to join together three separate
		# path components, and do it safely.
		# <real_root>/<current_directory>/<path>
		# use the operating system's path separator.
		path = string.join (string.split (path, '/'), os.sep)
		p = self.normalize (self.path_module.join (self.wd, path))
		p = self.normalize (self.path_module.join (self.root, p[1:]))
		return p

	def longify (self, (path, stat_info)):
		return unix_longify (path, stat_info)

	def __repr__ (self):
		return '<unix-style fs root:%s wd:%s>' % (
			self.root,
			self.wd
			)

if os.name == 'posix':

	class unix_filesystem (os_filesystem):
		pass

	class schizophrenic_unix_filesystem (os_filesystem):
		PROCESS_UID		= os.getuid()
		PROCESS_EUID	= os.geteuid()
		PROCESS_GID		= os.getgid()
		PROCESS_EGID	= os.getegid()

		def __init__ (self, root, wd='/', persona=(None, None)):
			os_filesystem.__init__ (self, root, wd)
			self.persona = persona

		def become_persona (self):
			if self.persona is not (None, None):
				uid, gid = self.persona
				# the order of these is important!
				os.setegid (gid)
				os.seteuid (uid)

		def become_nobody (self):
			if self.persona is not (None, None):
				os.seteuid (self.PROCESS_UID)
				os.setegid (self.PROCESS_GID)

		# cwd, cdup, open, listdir
		def cwd (self, path):
			try:
				self.become_persona()
				return os_filesystem.cwd (self, path)
			finally:
				self.become_nobody()

		def cdup (self, path):
			try:
				self.become_persona()
				return os_filesystem.cdup (self)
			finally:
				self.become_nobody()

		def open (self, filename, mode):
			try:
				self.become_persona()
				return os_filesystem.open (self, filename, mode)
			finally:
				self.become_nobody()

		def listdir (self, path, long=0):
			try:
				self.become_persona()
				return os_filesystem.listdir (self, path, long)
			finally:
				self.become_nobody()

# This hasn't been very reliable across different platforms.
# maybe think about a separate 'directory server'.
#
#	import posixpath
#	import fcntl
#	import FCNTL
#	import select
#	import asyncore
#
#	# pipes /bin/ls for directory listings.
#	class unix_filesystem (os_filesystem):
#		pass
# 		path_module = posixpath
#
# 		def listdir (self, path, long=0):
# 			p = self.translate (path)
# 			if not long:
# 				return list_producer (os.listdir (p), 0, None)
# 			else:
# 				command = '/bin/ls -l %s' % p
# 				print 'opening pipe to "%s"' % command
# 				fd = os.popen (command, 'rt')
# 				return pipe_channel (fd)
#
# 	# this is both a dispatcher, _and_ a producer
# 	class pipe_channel (asyncore.file_dispatcher):
# 		buffer_size = 4096
#
# 		def __init__ (self, fd):
# 			asyncore.file_dispatcher.__init__ (self, fd)
# 			self.fd = fd
# 			self.done = 0
# 			self.data = ''
#
# 		def handle_read (self):
# 			if len (self.data) < self.buffer_size:
# 				self.data = self.data + self.fd.read (self.buffer_size)
# 			#print '%s.handle_read() => len(self.data) == %d' % (self, len(self.data))
#
# 		def handle_expt (self):
# 			#print '%s.handle_expt()' % self
# 			self.done = 1
#
# 		def ready (self):
# 			#print '%s.ready() => %d' % (self, len(self.data))
# 			return ((len (self.data) > 0) or self.done)
#
# 		def more (self):
# 			if self.data:
# 				r = self.data
# 				self.data = ''
# 			elif self.done:
# 				self.close()
# 				self.downstream.finished()
# 				r = ''
# 			else:
# 				r = None
# 			#print '%s.more() => %s' % (self, (r and len(r)))
# 			return r

# For the 'real' root, we could obtain a list of drives, and then
# use that.  Doesn't win32 provide such a 'real' filesystem?
# [yes, I think something like this "\\.\c\windows"]

class msdos_filesystem (os_filesystem):
	def longify (self, (path, stat_info)):
		return msdos_longify (path, stat_info)

# A merged filesystem will let you plug other filesystems together.
# We really need the equivalent of a 'mount' capability - this seems
# to be the most general idea.  So you'd use a 'mount' method to place
# another filesystem somewhere in the hierarchy.

# Note: this is most likely how I will handle ~user directories
# with the http server.

class merged_filesystem:
	def __init__ (self, *fsys):
		pass

# this matches the output of NT's ftp server (when in
# MSDOS mode) exactly.

def msdos_longify (file, stat_info):
	if stat.S_ISDIR (stat_info[stat.ST_MODE]):
		dir = '<DIR>'
	else:
		dir = '     '
	date = msdos_date (stat_info[stat.ST_MTIME])
	return '%s       %s %8d %s' % (
		date,
		dir,
		stat_info[stat.ST_SIZE],
		file
		)

def msdos_date (t):
	try:
		info = time.gmtime (t)
	except:
		info = time.gmtime (0)
	# year, month, day, hour, minute, second, ...
	if info[3] > 11:
		merid = 'PM'
		info[3] = info[3] - 12
	else:
		merid = 'AM'
	return '%02d-%02d-%02d  %02d:%02d%s' % (
		info[1],
		info[2],
		info[0]%100,
		info[3],
		info[4],
		merid
		)

months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
		  'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

mode_table = {
	'0':'---',
	'1':'--x',
	'2':'-w-',
	'3':'-wx',
	'4':'r--',
	'5':'r-x',
	'6':'rw-',
	'7':'rwx'
	}

import time

def unix_longify (file, stat_info):
	# for now, only pay attention to the lower bits
	mode = ('%o' % stat_info[stat.ST_MODE])[-3:]
	mode = string.join (map (lambda x: mode_table[x], mode), '')
	if stat.S_ISDIR (stat_info[stat.ST_MODE]):
		dirchar = 'd'
	else:
		dirchar = '-'
	date = ls_date (long(time.time()), stat_info[stat.ST_MTIME])
	return '%s%s %3d %-8d %-8d %8d %s %s' % (
		dirchar,
		mode,
		stat_info[stat.ST_NLINK],
		stat_info[stat.ST_UID],
		stat_info[stat.ST_GID],
		stat_info[stat.ST_SIZE],
		date,
		file
		)
		
# Emulate the unix 'ls' command's date field.
# it has two formats - if the date is more than 180
# days in the past, then it's like this:
# Oct 19  1995
# otherwise, it looks like this:
# Oct 19 17:33

def ls_date (now, t):
	try:
		info = time.gmtime (t)
	except:
		info = time.gmtime (0)
	# 15,600,000 == 86,400 * 180
	if (now - t) > 15600000:
		return '%s %2d  %d' % (
			months[info[1]-1],
			info[2],
			info[0]
			)
	else:
		return '%s %2d %02d:%02d' % (
			months[info[1]-1],
			info[2],
			info[3],
			info[4]
			)

# ===========================================================================
# Producers
# ===========================================================================

class list_producer:
	def __init__ (self, file_list, long, longify):
		self.file_list = file_list
		self.long = long
		self.longify = longify
		self.done = 0

	def ready (self):
		if len(self.file_list):
			return 1
		else:
			if not self.done:
				self.done = 1
			return 0
		return (len(self.file_list) > 0)

	# this should do a pushd/popd
	def more (self):
		if not self.file_list:
			return ''
		else:
			# do a few at a time
			bunch = self.file_list[:50]
			if self.long:
				bunch = map (self.longify, bunch)
			self.file_list = self.file_list[50:]
			return string.joinfields (bunch, '\r\n') + '\r\n'