Skip to content

Commit

Permalink
Merge branch 'main' into kafka-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
hippalus committed Jan 26, 2025
2 parents 895929f + 1c001c8 commit 63b9bc5
Show file tree
Hide file tree
Showing 48 changed files with 766 additions and 971 deletions.
234 changes: 119 additions & 115 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
http = "0.2.7"
http-auth-basic = "0.3.3"
mime = "0.3.17"
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.3"
tower-http = { version = "0.6.1", features = ["cors"] }
Expand Down Expand Up @@ -78,11 +79,9 @@ humantime = "2.1.0"
humantime-serde = "1.1"

# File System and I/O
bzip2 = { version = "*", features = ["static"] }
fs_extra = "1.3"
path-clean = "1.0.1"
relative-path = { version = "1.7", features = ["serde"] }
xz2 = { version = "*", features = ["static"] }

# CLI and System
clap = { version = "4.5", default-features = false, features = [
Expand Down
8 changes: 7 additions & 1 deletion helm/templates/ingestor-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ metadata:
name: {{ include "parseable.fullname" . }}-ingestor
namespace: {{ .Release.Namespace }}
labels:
{{- include "parseable.ingestorLabels" . | nindent 4 }}
{{- with .Values.parseable.highAvailability.ingestor.extraLabels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
selector:
matchLabels:
Expand All @@ -32,6 +34,10 @@ spec:
{{- .Values.parseable.highAvailability.ingestor.labels | toYaml | nindent 8 }}
{{- include "parseable.ingestorLabelsSelector" . | nindent 8 }}
spec:
{{- with .Values.parseable.highAvailability.ingestor.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
initContainers:
- name: wait-for-query-service
image: curlimages/curl:latest # Lightweight image with curl
Expand Down
4 changes: 4 additions & 0 deletions helm/templates/querier-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ spec:
tolerations:
{{ toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.parseable.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: {{ .Chart.Name }}
securityContext:
Expand Down
13 changes: 12 additions & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,17 @@ parseable:
highAvailability:
enabled: false
ingestor:
affinity: {}
# podAntiAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
# matchLabels:
# app: parseable
# component: ingestor
# topologyKey: kubernetes.io/hostname
port: 8000
extraLabels:
app: parseable
podAnnotations: {}
nodeSelector: {}
tolerations: []
Expand Down Expand Up @@ -61,7 +71,7 @@ parseable:
## Note that Data directory is needed only for local mode
persistence:
staging:
enabled: false
enabled: true
storageClass: ""
accessMode: ReadWriteOnce
size: 5Gi
Expand Down Expand Up @@ -146,6 +156,7 @@ parseable:
create: true
name: "parseable"
annotations: {}
nodeSelector: {}
service:
type: ClusterIP
port: 80
Expand Down
5 changes: 1 addition & 4 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use bytes::Bytes;
use chrono::{DateTime, Local, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;
Expand Down Expand Up @@ -412,13 +411,11 @@ pub async fn get_first_event(
base_path_without_preceding_slash(),
stream_name
);
// Convert dates vector to Bytes object
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
dates_bytes,
&dates,
)
.await?;
if !ingestor_first_event_at.is_empty() {
Expand Down
8 changes: 2 additions & 6 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ impl ParseableSinkProcessor {
) -> anyhow::Result<Option<ParseableEvent>> {
let stream_name = consumer_record.topic.as_str();

create_stream_if_not_exists(
stream_name,
&StreamType::UserDefined.to_string(),
LogSource::default(),
)
.await?;
create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::default())
.await?;
let schema = STREAM_INFO.schema_raw(stream_name)?;

match &consumer_record.payload {
Expand Down
8 changes: 3 additions & 5 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl EventFormat for Event {
fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
Expand Down Expand Up @@ -94,10 +93,9 @@ impl EventFormat for Event {
}
};

if !static_schema_flag
&& value_arr
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
if value_arr
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
{
return Err(anyhow!(
"Could not process this event due to mismatch in datatype"
Expand Down
38 changes: 29 additions & 9 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::{
collections::{HashMap, HashSet},
fmt::Display,
sync::Arc,
};

Expand All @@ -29,7 +30,10 @@ use chrono::DateTime;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{metadata::SchemaVersion, utils::arrow::get_field};
use crate::{
metadata::SchemaVersion,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::DEFAULT_TIMESTAMP_KEY;

Expand Down Expand Up @@ -73,6 +77,20 @@ impl From<&str> for LogSource {
}
}

impl Display for LogSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
LogSource::Kinesis => "kinesis",
LogSource::OtelLogs => "otel-logs",
LogSource::OtelMetrics => "otel-metrics",
LogSource::OtelTraces => "otel-traces",
LogSource::Json => "json",
LogSource::Pmeta => "pmeta",
LogSource::Custom(custom) => custom,
})
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand All @@ -81,7 +99,6 @@ pub trait EventFormat: Sized {
fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
Expand All @@ -95,12 +112,8 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
)?;
let (data, mut schema, is_first) =
self.to_data(storage_schema, time_partition, schema_version)?;

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
Expand All @@ -126,7 +139,14 @@ pub trait EventFormat: Sized {
}
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
let rb = Self::decode(data, new_schema.clone())?;

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(rb.num_rows()))],
);

Ok((rb, is_first))
}
Expand Down
40 changes: 10 additions & 30 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::error;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType};
use crate::{metadata, storage::StreamType};
use chrono::NaiveDateTime;
use std::collections::HashMap;

Expand All @@ -49,7 +49,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(&self) -> Result<(), EventError> {
pub async fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand All @@ -69,10 +69,10 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Self::process_event(
STREAM_WRITERS.append_to_local(
&self.stream_name,
&key,
self.rb.clone(),
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
Expand All @@ -98,44 +98,24 @@ impl Event {
Ok(())
}

pub fn process_unchecked(&self) -> Result<(), PostError> {
pub fn process_unchecked(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(
STREAM_WRITERS.append_to_local(
&self.stream_name,
&key,
self.rb.clone(),
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
)
.map_err(PostError::Event)
)?;

Ok(())
}

pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
stream_name: &str,
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: &StreamType,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values.clone(),
stream_type,
)?;
Ok(())
}
}

pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
Expand Down
12 changes: 2 additions & 10 deletions src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::storage::staging::StorageDir;
use chrono::NaiveDateTime;

pub struct ArrowWriter {
#[allow(dead_code)]
pub file_path: PathBuf,
pub writer: StreamWriter<File>,
}

Expand All @@ -54,20 +52,14 @@ impl FileWriter {
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(
let (_, writer) = init_new_stream_writer_file(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
file_path: path,
writer,
},
);
self.insert(schema_key.to_owned(), ArrowWriter { writer });
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/event/writer/mem_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<const N: usize> Default for MemWriter<N> {
}

impl<const N: usize> MemWriter<N> {
pub fn push(&mut self, schema_key: &str, rb: RecordBatch) {
pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) {
if !self.schema_map.contains(schema_key) {
self.schema_map.insert(schema_key.to_owned());
self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap();
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct MutableBuffer<const N: usize> {
}

impl<const N: usize> MutableBuffer<N> {
fn push(&mut self, rb: RecordBatch) -> Option<Vec<RecordBatch>> {
fn push(&mut self, rb: &RecordBatch) -> Option<Vec<RecordBatch>> {
if self.rows + rb.num_rows() >= N {
let left = N - self.rows;
let right = rb.num_rows() - left;
Expand All @@ -121,7 +121,7 @@ impl<const N: usize> MutableBuffer<N> {
Some(inner)
} else {
self.rows += rb.num_rows();
self.inner.push(rb);
self.inner.push(rb.clone());
None
}
}
Expand Down
Loading

0 comments on commit 63b9bc5

Please sign in to comment.