Skip to content

Commit

Permalink
idaholab#168; when in non-polling mode, don't expire files after 10 s…
Browse files Browse the repository at this point in the history
…econds of inactivity, be a little smarter
  • Loading branch information
mmguero committed Apr 25, 2023
1 parent f460fbe commit a5a5268
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions shared/bin/watch_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from watchdog.events import (
FileSystemEventHandler,
FileSystemMovedEvent,
FileMovedEvent,
FileModifiedEvent,
FileCreatedEvent,
FileClosedEvent,
Expand All @@ -19,6 +19,7 @@
)

from multiprocessing.pool import ThreadPool
from threading import get_native_id
from watchdog.utils import WatchdogShutdown
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
Expand All @@ -41,6 +42,7 @@ def __init__(
super().__init__(*args, **kwargs)
self.polling = polling
self.logger = logger if logger else logging
self.workerPid = get_native_id()
self.updateTime()
# self.deck is a dictionary mapping filenames to a list of OperationEvent of length n,
# with [0] being the oldest timestamp/operation and [n-1] being the newest
Expand Down Expand Up @@ -69,14 +71,14 @@ def on_any_event(self, event):
self.updateTime()

# if this is a move event, we need to track the old and new filenames
if isinstance(event, FileSystemMovedEvent):
if isinstance(event, FileMovedEvent):
fName = event.dest_path
fNameOld = event.src_path
self.logger.debug(f"↦\t{event.event_type: >10}\t{event.src_path} {event.dest_path}")
self.logger.debug(f"↦\t{event.event_type: >10}\t{event.src_path} {event.dest_path}\t{self.workerPid}")
else:
fName = event.src_path
fNameOld = None
self.logger.debug(f"🗲\t{event.event_type: <10}\t{event.src_path}")
self.logger.debug(f"🗲\t{event.event_type: <10}\t{event.src_path}\t{self.workerPid}")

# This is a pain, but due to this watchdog issue (see
# https://github.com/gorakhargosh/watchdog/issues/260 and
Expand Down Expand Up @@ -108,10 +110,8 @@ def on_any_event(self, event):
# a file was simply renamed in the watched directory (not moved
# from some other directory) so remove the old filename from our list
# and a new one will get added
if fNameOld in d:
d.pop(fNameOld, None)
if fNameOld in self.modDeck:
self.modDeck(d.pop(fNameOld, None))
d.pop(fNameOld, None)
self.modDeck.pop(fNameOld, None)

# insert or update file event(s)

Expand Down Expand Up @@ -154,7 +154,7 @@ def on_any_event(self, event):
and (newOpLog.size != modifyOpSizes[-1])
):
# promote what's already in modDec to the real deck, then append this new history item
self.logger.debug(f"𝦸\t{event.event_type: <10}\t{fName}")
self.logger.debug(f"𝦸\t{event.event_type: <10}\t{fName}\t{self.workerPid}")
d[fName] = self.modDeck.pop(fName)
d[fName].append(newOpLog)

Expand All @@ -175,31 +175,32 @@ def on_any_event(self, event):
isinstance(event, FileModifiedEvent)
or isinstance(event, FileClosedEvent)
or isinstance(event, FileCreatedEvent)
or isinstance(event, FileSystemMovedEvent)
or isinstance(event, FileMovedEvent)
):
# put FileClosedEvent events (which now have a timestamp of 0) at the front of
# the deck (to be processed first), and others to the back
deckInserted.move_to_end(fName, last=deckInserted[fName][-1].timestamp > 0)

elif isinstance(event, FileDeletedEvent):
# if a file is deleted I guess we don't need to track it any more
deckInserted.pop(fName, None)
d.pop(fName, None)
self.modDeck.pop(fName, None)
fName = None

else:
noop = True

if noop:
self.logger.debug(f"🗑\t{event.event_type: <10}\t{fName}")
self.logger.debug(f"🗑\t{event.event_type: <10}\t{fName}\t{self.workerPid}")

elif fName:
if fName in d:
self.logger.debug(f"➊\t{fName}\t{json.dumps(d[fName])}")
self.logger.debug(f"➊\t{fName}\t{json.dumps(d[fName])}\t{self.workerPid}")
if fName in self.modDeck:
self.logger.debug(f"➋\t{fName}\t{json.dumps(self.modDeck[fName])}")
self.logger.debug(f"➋\t{fName}\t{json.dumps(self.modDeck[fName])}\t{self.workerPid}")

except Exception as e:
self.logger.error(f"⨳\t{fName}\t{e}")
self.logger.error(f"⨳\t{fName}\t{e}\t{self.workerPid}")


###################################################################################################
Expand Down Expand Up @@ -227,7 +228,8 @@ def ProcessFileEventWorker(workerArgs):
logger = logging

with workerThreadCount as workerId:
logger.info(f"۞\tstarted\t[{workerId}]")
workerPid = get_native_id()
logger.info(f"۞\tstarted\t[{workerPid}:{workerId}]")

sleepInterval = 0.5
while (not shutDown[0]) and observer.is_alive():
Expand All @@ -238,20 +240,30 @@ def ProcessFileEventWorker(workerArgs):

with handler.deck as d:
for fileName, fileHistory in list(d.items()):
logger.debug(f"⏿ checking {fileName}\t{json.dumps(fileHistory)}\t[{workerId}]")
logger.debug(f"⏿ checking {fileName}\t{json.dumps(fileHistory)}\t[{workerPid}:{workerId}]")

if len(fileHistory) > 0:
if nowTime < fileHistory[-1].timestamp + assumeClosedSec:
# we can break because the list is ordered
logger.debug(
f"⎊\tbreaking early because {nowTime} < {fileHistory[-1].timestamp + assumeClosedSec}\t[{workerId}]"
f"⎊\tbreaking early because {nowTime} < {fileHistory[-1].timestamp + assumeClosedSec}\t[{workerPid}:{workerId}]"
)
break

if handler.polling or (fileHistory[-1].timestamp == 0):
# TODO is this ^ check accurate? I know it is for polling, but will we
# ALWAYS have a FileClosedEvent (timestamp == 0) in non-polling mode?

elif (
# - If we're polling, rely on the timestamp comparison done above and process this file
handler.polling
# - If we're not polling, but we have a timestamp == 0, then we had a FileClosedEvent and can be processed
or (fileHistory[-1].timestamp == 0)
# - If we're not polling, and the item has expired (timestamp comparison done above) and the only items
# in this item's history are "created" or "moved" then this was atomically moved in from another directory
# on the same filesystem and should be processed now
or (
not any(
set([x.operation for x in fileHistory if x.operation not in ('created', 'moved')])
)
)
):
del d[fileName]
if fileProcessor is not None:
extraArgs = (
Expand All @@ -264,12 +276,12 @@ def ProcessFileEventWorker(workerArgs):
**extraArgs,
)
logger.info(
f"🖄\tprocessed\t{fileName} at {(nowTime-fileHistory[-1].timestamp) if (fileHistory[-1].timestamp > 0) else 0} seconds\t[{workerId}]"
f"🖄\tprocessed\t{fileName} at {(nowTime-fileHistory[-1].timestamp) if (fileHistory[-1].timestamp > 0) else 0} seconds\t[{workerPid}:{workerId}]"
)
sleepInterval = 0.5

time.sleep(1)
logger.info(f"⛒\tfinished\t[{workerId}]")
logger.info(f"⛒\tfinished\t[{workerPid}:{workerId}]")


def WatchAndProcessDirectory(
Expand Down

0 comments on commit a5a5268

Please sign in to comment.