New to Programming: Adding custom functions with ipynotify classes

Saran A ahlusar.ahluwalia at gmail.com
Fri Apr 3 03:30:42 CEST 2015


Hello All:

Here is the program that I am trying to write (with specs):

* Monitors a folder for files that are dropped throughout the day 

* When a file is dropped in the folder the program should scan the file 

o IF all the records in the file have the same length (line length)

o THEN the file should be moved to a "success" folder and a text file written indicating the total number of records processed 

o IF the file is empty OR the records are not all of the same length 

o THEN the file should be moved to a "failure" folder and a text file written indicating the cause for failure (for example: Empty file or line 100 was not the same length as the rest).  

Many on forums suggest using ipynotify. I am wondering how to combine my current script and add it to the ipynotify.

Below is my original script (the ipynotify script is provided after this)
 
[code]
# # # Without data to examine here, I can only guess based on this requirement's language that 
# # fixed records are in the input.

##I made the assumption that the directories are in the same filesystem

# # Takes the function fileinfo as a starting point and demonstrates calling a function from within a function.  
# I tested this little sample on a small set of files created with MD5 checksums.  I wrote the Python in such a way as it 
# would work with Python 2.x or 3.x (note the __future__ at the top).

# # # There are so many wonderful ways of failure, so, from a development standpoint, I would probably spend a bit 
# # more time trying to determine which failure(s) I would want to report to the user, and how (perhaps creating my own Exceptions)

# # # The only other comments I would make are about safe-file handling.

# # #   #1:  Question: After a user has created a file that has failed (in
# # #        processing),can the user create a file with the same name?
# # #        If so, then you will probably want to look at some sort
# # #        of file-naming strategy to avoid overwriting evidence of
# # #        earlier failures.

# # # File naming is a tricky thing.  I referenced the tempfile module [1] and the Maildir naming scheme to see two different 
# # types of solutions to the problem of choosing a unique filename.

## I am assuming that all of my files are going to be specified in unicode  

## Utilized Spyder's Scientific Computing IDE to debug, check for indentation errors and test function suite

from __future__ import print_function

import os.path
import time
import difflib
import logging

def initialize_logger(output_dir):
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
     
    # create console handler and set level to info
    handler = logging.StreamHandler()
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter("%(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
 
    # create error file handler and set level to error
    handler = logging.FileHandler(os.path.join(output_dir, "error.log"),"w", encoding=None, delay="true")
    handler.setLevel(logging.ERROR)
    formatter = logging.Formatter("%(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    # create debug file handler and set level to debug
    handler = logging.FileHandler(os.path.join(output_dir, "all.log"),"w")
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)


#This function's purpose is to obtain the filename, rootdir and filesize 

def fileinfo(f):
    filename = os.path.basename(f)
    rootdir = os.path.dirname(f)  
    filesize = os.path.getsize(f)
    return filename, rootdir, filesize

#This helper function returns the length of the file
def file_len(f):
    with open(f) as f:
        for i, l in enumerate(f):
            pass
            return i + 1

#This helper function attempts to copy file and move file to the respective directory
#I am assuming that the directories are in the same filesystem

# If directories ARE in different file systems, I would use the following helper function:

# def move(src, dest): 
#     shutil.move(src, dest)

def copy_and_move_file(src, dest):
    try:
        os.rename(src, dest)
        # eg. src and dest are the same file
    except IOError as e:
        print('Error: %s' % e.strerror)


path = "."
dirlist = os.listdir(path)


# Caveats of the "main" function is that it does not scale well 
#(although it is appropriate if one assumes that there will be few changes)

# It does not account for updated files existing in the directory - only new files "dropped" in
# (If this was included in the requirements, os.stat would be appropriate here)

 
def main(dirlist):   
    before = dict([(f, 0) for f in dirlist])
    while True:
        time.sleep(1) #time between update check
    after = dict([(f, None) for f in dirlist])
    added = [f for f in after if not f in before]
    if added:
        f = ''.join(added)
        print('Sucessfully added %s file - ready to validate') %(f)
        return validate_files(f)
    else:
        return move_to_failure_folder_and_return_error_file(f)


    
def validate_files(f):
    creation = time.ctime(os.path.getctime(f))
    lastmod = time.ctime(os.path.getmtime(f))
    if creation == lastmod and file_len(f) > 0:
        return move_to_success_folder_and_read(f)
    if file_len < 0 and creation != lastmod:
        return move_to_success_folder_and_read(f)
    else:
        return move_to_failure_folder_and_return_error_file(f)


# Failure/Success Folder Functions

def move_to_failure_folder_and_return_error_file():
    filename, rootdir, lastmod, creation, filesize = fileinfo(file)  
    os.mkdir('Failure')
    copy_and_move_file( 'Failure')
    initialize_logger('rootdir/Failure')
    logging.error("Either this file is empty or there are no lines")
     
             
def move_to_success_folder_and_read():
    filename, rootdir, lastmod, creation, filesize = fileinfo(file)  
    os.mkdir('Success')
    copy_and_move_file(rootdir, 'Success') #file name
    print("Success", file)
    return file_len(file)



if __name__ == '__main__':
   main(dirlist) 
[/code]

Here is my ipynotify script that I have tried writing, following the tutorial:

[code]

# My version for w: monitors events and logs them into a log file.
#
import os.path
from pyinotify import pyinotify

timestamp = datetime.today() #time_record
mask = pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO  #watched events

class EventHandler(pyinotify.ProcessEvent):
    def process_IN_CREATE(self, event):
        print "Created: %s " % os.path.join(event.path, event.name)
        event_log = open('/Users/sahluwalia/Desktop/', 'a')
        event_log.write(event.name + ' - ' + timestamp.strftime('%c') + '\n')
        event_log.close()

    def process_IN_MOVED_TO(self, event):
        print "Moved: %s " % os.path.join(event.path, event.name)
        event_log = open('/Users/sahluwalia/Desktop/', 'a')
        event_log.write(event.name + ' - ' + timestamp.strftime('%c') + '\n')
        event_log.close()


handler = EventHandler() #instantiated EventHandler Class
notifier = pyinotify.Notifier(wm, handler)

class Watcher(pyinotify.ProcessEvent): #I haave modified the Watcher class to process and read a new file creation or added file

    watchdir = '/tmp/watch'

    def __init__(self):
        pyinotify.ProcessEvent.__init__(self)
        wm = pyinotify.WatchManager()
        self.notifier = pyinotify.ThreadedNotifier(wm, self)
        wdd = wm.add_watch(self.watchdir, pyinotify.EventsCodes.IN_CREATE)
        print "Watching", self.watchdir
        self.notifier.start()

    def process_IN_CREATE(self, event):
        print "Seen:", event
        pathname = os.path.join(event.path, event.name)
        pfile = self._parse(pathname)
        print(pfile)

    def process_IN_MOVED_TO(self, event):
        print "Moved: %s " % os.path.join(event.path, event.name)
        pathname = os.path.join(event.path, event.name)
        pfile = self._parse(pathname)
        print(pfile)

    def _parse(self, filename):
        f = open(filename)
        file = [line.strip() for line in f]
        f.close()
        return file


class Log(pyinotify.ProcessEvent):
    def my_init(self, fileobj):
        """
        Method automatically called from ProcessEvent.__init__(). Additional
        keyworded arguments passed to ProcessEvent.__init__() are then
        delegated to my_init(). This is the case for fileobj.
        """
        self._fileobj = fileobj

    def process_default(self, event):
        self._fileobj.write(str(event) + '\n')
        self._fileobj.flush()

class TrackModifications(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self, event):
        print 'IN_MODIFY'

class Empty(pyinotify.ProcessEvent): #Inherited class to display message  
    def my_init(self, msg):
        self._msg = msg

    def process_default(self, event): #writes decribing the event
        print self._msg


# pyinotify.log.setLevel(10)
filelog = file('/Failure', 'w')

while True:
    try:
        notifier.process_events()
        if notifier.check_events():
            notifier.read_events()
    try:
    # It is important to pass named extra arguments like 'fileobj'
        handler = Empty(TrackModifications(Log(fileobj=filelog)), msg='This is an error message or notificaiton that will be logged  ')
        notifier = pyinotify.Notifier(wm, default_proc_fun=handler)
        wm.add_watch('/tmp', pyinotify.ALL_EVENTS)
        notifier.loop()
        filelog.close()
    except KeyboardInterrupt:
        notifier.stop()
        break
    finally:
        filelog.close()


if __name__ == '__main__':
      Watcher()

[/code]



More information about the Python-list mailing list