Skip to content

Commit

Permalink
Progress towards #6
Browse files Browse the repository at this point in the history
  • Loading branch information
pudelkoM committed Jul 19, 2017
1 parent 57be7bb commit ab90d1f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 32 deletions.
6 changes: 3 additions & 3 deletions event.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ local mod = {}
mod.create = 1
mod.delete = 2

function mod.newEvent(filterString, action, id)
function mod.newEvent(filterString, action, id, timestamp)
local id = id or filterString
if action ~= mod.create and action ~= mod.delete then
log:warn("Invalid event action: %i", action)
log:error("Invalid event action: %i", action)
return nil
end
return {action = action, filter = filterString, id = id}
return {action = action, filter = filterString, id = id, timestamp = timestamp}
end

return mod
44 changes: 21 additions & 23 deletions flowscope.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ local pipe = require "pipe"
local timer = require "timer"
local flowtracker = require "flowtracker"
local ev = require "event"
local prio = require "prioQueue"

local jit = require "jit"
jit.opt.start("maxrecord=10000", "maxirconst=1000", "loopunroll=40")
Expand All @@ -34,6 +35,7 @@ function configure(parser)
end

function master(args)
log:setLevel("INFO")
if not args.generate then
for i, dev in ipairs(args.dev) do
args.dev[i] = device.config{
Expand All @@ -49,7 +51,7 @@ function master(args)
for i, dev in ipairs(args.dev) do
for i = 0, args.rxThreads - 1 do
if args.generate then
moon.startTask("traffic_generator", qq, i, nil, 1, args.rate)
moon.startTask("traffic_generator", qq, i, nil, 50, args.rate)
else
moon.startTask("inserter", dev:getRxQueue(i), qq)
end
Expand Down Expand Up @@ -92,12 +94,13 @@ function inserter(rxQueue, qq)
end

function swapper(tracker, pipes)
local buf = ffi.new("struct ipv4_5tuple[?]", 128)
local sz = 256
local buf = ffi.new("struct expired_flow4[?]", sz)
while moon.running() do
local c = tracker:swapper(buf, 128)
local c = tracker:swapper(buf, sz)
for i = 0, tonumber(c) - 1 do
local event = ev.newEvent(filterExprFromTuple(buf[i]), ev.delete)
log:info("[Swapper]: Sending event: %i, %s", event.action, event.filter)
local event = ev.newEvent(filterExprFromTuple(buf[i].tpl), ev.delete, nil, tonumber(buf[i].last_seen))
log:info("[Swapper]: Sending event: %i, %s %i", event.action, event.filter, event.timestamp)
for _, pipe in ipairs(pipes) do
pipe:send(event)
end
Expand All @@ -109,7 +112,7 @@ end
function traffic_generator(qq, id, packetSize, newFlowRate, rate)
local packetSize = packetSize or 64
local newFlowRate = newFlowRate or 0.5 -- new flows/s
local concurrentFlows = 1000
local concurrentFlows = 100
local rate = rate or 20 -- buckets/s
local baseIP = parseIPAddress("10.0.0.2")
local txCtr = stats:newManualTxCounter("Generator Thread #" .. id, "plain")
Expand All @@ -131,7 +134,7 @@ function traffic_generator(qq, id, packetSize, newFlowRate, rate)

while moon.running() do
local s1 = qq:enqueue()
local ts = moon.getTime()
local ts = moon.getTime() * 10^6
repeat
-- pkt.ip4.dst:set(baseIP)
pkt.ip4.dst:set(baseIP + math.random(0, concurrentFlows - 1))
Expand Down Expand Up @@ -260,10 +263,9 @@ function TBBTrackerAnalyzer(qq, id, hashmap, pipes)
--local ano = math.random(0, 10000000) == 0 or 0
if ano ~= 0 then
ttlData.tracked = true
--local event = {action = "create", filter = buildFilterExpr(parsedPkt)}
--local event = {action = "create", filter = filterExprFromTuple(tuple)}
ttlData.last_seen = pkt.ts_vlan
local event = ev.newEvent(filterExprFromTuple(tuple), ev.create)
print(bred("[TBB Analyzer Thread #".. id .."]: ") .. "Anomalous TTL:", TTL, ano, event.filterString)
log:warn("[TBB Analyzer Thread #%i]: Anomalous TTL: %i != %i, %s, ts %f", id, TTL, tonumber(ano), event.filter, pkt:getTimestamp())
for _, pipe in ipairs(pipes) do
pipe:send(event)
end
Expand Down Expand Up @@ -295,24 +297,17 @@ end
function continuousDumper(qq, id, path, filterPipe)
local ruleSet = {} -- Used to maintain the rules
local ruleList = {} -- Build from the ruleSet for performance
local ruleQueue = prio.newPrioQueue()
local rxCtr = stats:newManualRxCounter("Dumper Thread #" .. id, "plain")
local ruleExpirationTimer = timer:new(30)
local last_ts = 0
print(ruleQueue)

while moon.running() do
-- if ruleExpirationTimer:expired() then
-- print("Filter rules expired")
-- for _, rule in ipairs(ruleList) do
-- if rule.pcap then
-- rule.pcap:close()
-- end
-- end
-- ruleExpirationTimer:reset()
-- end

-- Get new filters
-- TODO: loop until all messages are read
local event = filterPipe:tryRecv(0)
if event ~= nil then
print(event.action, event.filter, event.timestamp)
if event.action == ev.create and ruleSet[event.id] == nil then
local triggerWallTime = wallTime()
local pcapFileName = path .. ("/FlowScope-dump " .. os.date("%Y-%m-%d %H-%M-%S", triggerWallTime) .. " " .. event.filter .. ".pcap"):gsub(" ", "_")
Expand All @@ -323,7 +318,7 @@ function continuousDumper(qq, id, path, filterPipe)
if ruleSet[event.id].pcap then
ruleSet[event.id].pcap:close()
end
log:info("[Dumper %i#]: Rule deletion: %s", id, ecent.filter)
log:info("[Dumper %i#]: Rule deletion: %s", id, event.filter)
--print("Dumper #" .. id .. ": rule deletion:", event.filter)
ruleSet[event.id] = nil
end
Expand All @@ -343,6 +338,7 @@ function continuousDumper(qq, id, path, filterPipe)
for i = 0, storage:size() - 1 do
local pkt = storage:getPacket(i)
local timestamp = pkt:getTimestamp()
last_ts = pkt.ts_vlan
rxCtr:updateWithSize(1, pkt:getLength())

for _, rule in ipairs(ruleList) do
Expand All @@ -358,9 +354,11 @@ function continuousDumper(qq, id, path, filterPipe)
::skip::
end
rxCtr:finalize()
for _, rule in ipairs(ruleSet) do
for _, rule in pairs(ruleSet) do
if rule.pcap then
rule.pcap:close()
else
log:error("[Dumper #%i]: Rule got no pcap", id)
end
end
log:info("[Dumper]: Shutdown")
Expand Down
9 changes: 8 additions & 1 deletion flowtracker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ffi.cdef [[
struct ttl_flow_data {
uint64_t running_sum; // Sum of all seen TTL values
uint64_t packets; // Number of observed TTL values
uint64_t last_seen; // Timestamp of last packet in flow
bool tracked;
} __attribute__((__packed__));

Expand All @@ -33,6 +34,11 @@ ffi.cdef [[
uint8_t proto;
} __attribute__((__packed__));

struct expired_flow4 {
struct ipv4_5tuple tpl;
uint64_t last_seen;
} __attribute__((__packed__));

uint32_t ipv4_5tuple_hash(struct ipv4_5tuple* tpl);
uint32_t ipv6_5tuple_hash(struct ipv6_5tuple* tpl);

Expand All @@ -55,10 +61,11 @@ ffi.cdef [[
typedef struct tbb_tracker tbb_tracker;
typedef struct const_accessor4 const_accessor4;
typedef struct accessor4 accessor4;
typedef struct expired_flow4 expired_flow4;
tbb_tracker* tbb_tracker_create(size_t pre_alloc);
void tbb_tracker_clear(tbb_tracker* tr);
void tbb_tracker_delete(tbb_tracker* tr);
size_t tbb_tracker_swapper(tbb_tracker* tr, struct ipv4_5tuple* buf, size_t sz);
size_t tbb_tracker_swapper(tbb_tracker* tr, expired_flow4* buf, size_t sz);
const_accessor4* tbb_tracker_const_access4(tbb_tracker* tr, const struct ipv4_5tuple* tpl);
const D* tbb_tracker_const_get4(const_accessor4* a);
void tbb_tracker_const_release4(const_accessor4* a);
Expand Down
15 changes: 11 additions & 4 deletions src/tbb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ namespace tbb_wrapper {
using tbb_map_v4 = tbb::concurrent_hash_map<v4tpl, D, tbb_wrapper::v4_crc_hash>;
using tbb_map_v6 = tbb::concurrent_hash_map<v6tpl, D, hash_v6>;

struct expired_flow4 {
v4tpl tpl;
std::uint64_t last_seen;
} __attribute__((__packed__));

/*
* Keep interface generic, split value access and modification
* Using flowscope with own datatype should be possible with only changing D
Expand All @@ -84,8 +89,8 @@ namespace tbb_wrapper {
delete current4;
delete old4;
}
std::size_t iterative_swapper(v4tpl* buf, std::size_t sz) {

std::size_t iterative_swapper(expired_flow4* buf, std::size_t sz) {
/*
* Idea:
* 1. Wait 30 sec since last swap
Expand Down Expand Up @@ -126,8 +131,10 @@ namespace tbb_wrapper {
break;
}
if (it->second.tracked) {
buf[i++] = it->first;
buf[i].tpl = it->first;
buf[i].last_seen = it->second.last_seen;
++purge_counter;
++i;
}
++expire_counter;
}
Expand Down Expand Up @@ -165,7 +172,7 @@ extern "C" {
delete tr;
}

std::size_t tbb_tracker_swapper(tbb_tracker* tr, v4tpl* buf, std::size_t sz) {
std::size_t tbb_tracker_swapper(tbb_tracker* tr, expired_flow4* buf, std::size_t sz) {
return tr->iterative_swapper(buf, sz);
}

Expand Down
3 changes: 2 additions & 1 deletion src/tuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace flowtracker {
struct ttl_flow_data {
std::uint64_t running_sum; // Sum of all seen TTL values
std::uint64_t packets; // Number of observed TTL values
std::uint64_t last_seen; // Timestamp of last packet in flow
bool tracked;

inline std::uint16_t get_average_TTL() const noexcept {
Expand All @@ -54,7 +55,7 @@ namespace flowtracker {
update_TTL(ttl);
return get_average_TTL();
}
};
} __attribute__((__packed__));
}

extern "C" {
Expand Down

0 comments on commit ab90d1f

Please sign in to comment.