diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 6cc0efbec..5dca2bbc9 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -44,7 +44,7 @@ pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); pub enum StreamWriter { Mem(InMemWriter), - Disk(FileWriter), + Disk(FileWriter, InMemWriter), } impl StreamWriter { @@ -58,8 +58,9 @@ impl StreamWriter { StreamWriter::Mem(mem) => { mem.push(rb); } - StreamWriter::Disk(disk) => { + StreamWriter::Disk(disk, mem) => { disk.push(stream_name, schema_key, &rb)?; + mem.push(rb); } } Ok(()) @@ -112,7 +113,7 @@ impl WriterTable { let mut writer = if CONFIG.parseable.in_mem_ingestion { StreamWriter::Mem(InMemWriter::default()) } else { - StreamWriter::Disk(FileWriter::default()) + StreamWriter::Disk(FileWriter::default(), InMemWriter::default()) }; writer.push(stream_name, schema_key, record)?; @@ -146,7 +147,7 @@ impl WriterTable { buf: rb, }); } - StreamWriter::Disk(disk) => disk.close_all(), + StreamWriter::Disk(disk, _) => disk.close_all(), } } } @@ -155,13 +156,15 @@ impl WriterTable { let hashmap_guard = self.read().unwrap(); let (writer, context) = hashmap_guard.get(stream_name)?; let writer = writer.lock().unwrap(); - match &*writer { - StreamWriter::Mem(mem) => Some(ReadBuf { - time: context.time, - buf: mem.recordbatch_cloned(), - }), - StreamWriter::Disk(_) => None, - } + let mem = match &*writer { + StreamWriter::Mem(mem) => mem, + StreamWriter::Disk(_, mem) => mem, + }; + + Some(ReadBuf { + time: context.time, + buf: mem.recordbatch_cloned(), + }) } }