From 169aab788562bec15e2dacfe34acae1da3a26869 Mon Sep 17 00:00:00 2001 From: pudelkoM Date: Wed, 19 Jul 2017 04:14:18 +0200 Subject: [PATCH] Implement delayed flow expiration This might fix #6, we'll see after some tests. --- flowscope.lua | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/flowscope.lua b/flowscope.lua index cd6da49..7fb6d35 100644 --- a/flowscope.lua +++ b/flowscope.lua @@ -299,7 +299,7 @@ function continuousDumper(qq, id, path, filterPipe) local ruleList = {} -- Build from the ruleSet for performance local ruleQueue = prio.newPrioQueue() local rxCtr = stats:newManualRxCounter("Dumper Thread #" .. id, "plain") - local last_ts = 0 + local lastTS = 0 print(ruleQueue) while moon.running() do @@ -310,23 +310,27 @@ function continuousDumper(qq, id, path, filterPipe) print(event.action, event.filter, event.timestamp) if event.action == ev.create and ruleSet[event.id] == nil then local triggerWallTime = wallTime() - local pcapFileName = path .. ("/FlowScope-dump " .. os.date("%Y-%m-%d %H-%M-%S", triggerWallTime) .. " " .. event.filter .. ".pcap"):gsub(" ", "_") + local pcapFileName = path .. ("/FlowScope-dump " .. os.date("%Y-%m-%d %H-%M-%S", triggerWallTime) .. " " .. event.filter .. " part " .. id .. ".pcap"):gsub(" ", "_") local pcapWriter = pcap:newWriter(pcapFileName, triggerWallTime) ruleSet[event.id] = {pfFn = pf.compile_filter(event.filter), pcap = pcapWriter} --ruleSet[event.filter] = {pfFn = function() end, pcap = nil} elseif event.action == ev.delete and ruleSet[event.id] ~= nil then - if ruleSet[event.id].pcap then - ruleSet[event.id].pcap:close() - end - log:info("[Dumper %i#]: Rule deletion: %s", id, event.filter) - --print("Dumper #" .. id .. ": rule deletion:", event.filter) - ruleSet[event.id] = nil + ruleSet[event.id].timestamp = event.timestamp + log:info("[Dumper %i#]: Marked rule %s as expired", id, event.id) end -- Update ruleList ruleList = {} - for _, v in pairs(ruleSet) do - ruleList[#ruleList+1] = v + for k, v in pairs(ruleSet) do + if v.timestamp ~= nil and lastTS > v.timestamp then + if ruleSet[event.id].pcap then + ruleSet[event.id].pcap:close() + end + log:info("[Dumper %i#]: Expired rule %s, %i > %i", id, k, lastTS, v.timestamp) + ruleSet[k] = nil + else + ruleList[#ruleList+1] = v + end end print("Dumper #" .. id .. ": total number of rules:", #ruleList) end @@ -338,7 +342,7 @@ function continuousDumper(qq, id, path, filterPipe) for i = 0, storage:size() - 1 do local pkt = storage:getPacket(i) local timestamp = pkt:getTimestamp() - last_ts = pkt.ts_vlan + lastTS = tonumber(pkt.ts_vlan) rxCtr:updateWithSize(1, pkt:getLength()) for _, rule in ipairs(ruleList) do