Skip to content

Commit

Permalink
Keep data in mem for local query (#381)
Browse files Browse the repository at this point in the history
When using the disk writer, keep a copy of record in 
a mutable memory writer. This allows querying hot data 
of past minute by cloning the records and provide to 
query table. 

The memory is discarded when finalise is called at 
the end of minute.
  • Loading branch information
trueleo authored Apr 26, 2023
1 parent d5769c3 commit 0ba1af0
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);

pub enum StreamWriter {
Mem(InMemWriter),
Disk(FileWriter),
Disk(FileWriter, InMemWriter),
}

impl StreamWriter {
Expand All @@ -58,8 +58,9 @@ impl StreamWriter {
StreamWriter::Mem(mem) => {
mem.push(rb);
}
StreamWriter::Disk(disk) => {
StreamWriter::Disk(disk, mem) => {
disk.push(stream_name, schema_key, &rb)?;
mem.push(rb);
}
}
Ok(())
Expand Down Expand Up @@ -112,7 +113,7 @@ impl WriterTable {
let mut writer = if CONFIG.parseable.in_mem_ingestion {
StreamWriter::Mem(InMemWriter::default())
} else {
StreamWriter::Disk(FileWriter::default())
StreamWriter::Disk(FileWriter::default(), InMemWriter::default())
};

writer.push(stream_name, schema_key, record)?;
Expand Down Expand Up @@ -146,7 +147,7 @@ impl WriterTable {
buf: rb,
});
}
StreamWriter::Disk(disk) => disk.close_all(),
StreamWriter::Disk(disk, _) => disk.close_all(),
}
}
}
Expand All @@ -155,13 +156,15 @@ impl WriterTable {
let hashmap_guard = self.read().unwrap();
let (writer, context) = hashmap_guard.get(stream_name)?;
let writer = writer.lock().unwrap();
match &*writer {
StreamWriter::Mem(mem) => Some(ReadBuf {
time: context.time,
buf: mem.recordbatch_cloned(),
}),
StreamWriter::Disk(_) => None,
}
let mem = match &*writer {
StreamWriter::Mem(mem) => mem,
StreamWriter::Disk(_, mem) => mem,
};

Some(ReadBuf {
time: context.time,
buf: mem.recordbatch_cloned(),
})
}
}

Expand Down

0 comments on commit 0ba1af0

Please sign in to comment.