Testing FAM with Python

There is a Python module python-fam that uses the client lib of FAM.

There is segmentation fault on deletion of python FAM obj, which can be checked using GDB. After replacing the code PyMem_DEL with PyObject_Del in _fam.c and rebuilt the extension, the segmentation fault goes away. (I submit this as a bug?)

Here is the test code test_fam.py I wrote.

# test_fam.py

r"""

"""

import _fam

import select

from os import path

import sys

# from fam.h

"""enum FAMCodes { FAMChanged=1, FAMDeleted=2, FAMStartExecuting=3,

FAMStopExecuting=4, FAMCreated=5, FAMMoved=6,

FAMAcknowledge=7, FAMExists=8, FAMEndExist=9 };"""

FAMChanged=1

FAMDeleted=2

FAMStartExecuting=3

FAMStopExecuting=4

FAMCreated=5

#FAMMoved=6

FAMAcknowledge=7

FAMExists=8

FAMEndExist=9

_FM = _fam.open()

_Monitors = {}

_MonitorReqs = {}

def _notify(filename, change, code):

print filename, change

def monitor(filename, notify):

if filename not in _Monitors:

_Monitors[filename] = [notify]

if path.isdir(filename):

fr = _FM.monitorDirectory(filename, filename)

else:

fr = _FM.monitorFile(filename, None)

_MonitorReqs[filename] = fr

else:

_Monitors[filename].append(notify)

def unmonitor(filename, notify):

if filename not in _Monitors:

return

notifys = _Monitors[filename]

try:

notifys.remove(notify)

if not notifys:

fr = _MonitorReqs[filename]

#print 'cancelMonitor', fr.requestID()

fr.cancelMonitor()

del _MonitorReqs[filename]

del _Monitors[filename]

except Exception, e:

print e

def _notify_event(fe):

filename = fe.filename

if filename not in _Monitors:

if path.isabs(filename):

print "? not monitor", filename

return

# for dir monitor

adir = fe.userData

if adir in _Monitors:

notifys = _Monitors[adir]

filename = path.join(adir, filename)

else:

return

else:

notifys = _Monitors[filename]

ch, code = fe.code2str(), fe.code

print code, ch, fe.requestID, fe.userData, fe.hostname

for n in notifys:

n(filename, ch, code)

def run():

inl, eml = [_FM], []

while 1:

try:

ri, ro, re = select.select(inl, eml, eml)

except select.error, e:

print e

break

while _FM.pending():

print "check nextEvent"

fe = _FM.nextEvent()

_notify_event(fe)

#----------------------------------------------------------------------

def _unmonitor_onnotify(ecode):

notify = lambda filename, change, code: (code == ecode and unmonitor(filename, notify)

or _notify(filename, change, code))

return notify

def _test(files):

for fn in files:

monitor(fn, _unmonitor_onnotify(FAMDeleted))

monitor(fn, _unmonitor_onnotify(FAMChanged))

run()

#----------------------------------------------------------------------

if __name__ == '__main__':

a = sys.argv

alen = len(a)

if alen <= 1:

print "test_fam.py file1 file2 .."

sys.exit(0)

_test(a[1:])

Manual pages for fam.

Related: FAM Non-superuser Hack.

Comments

blog comments powered by Disqus

Categories