Skip to content

Commit

Permalink
Merge branch 'main' into correlations
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Jan 27, 2025
2 parents 580fd07 + d58ed54 commit 14fbc50
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 34 deletions.
8 changes: 5 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ async fn create_manifest(
}
};
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
if let Err(err) =
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
Expand Down Expand Up @@ -330,8 +332,8 @@ pub async fn remove_manifest_from_snapshot(
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
STREAM_INFO.reset_first_event_at(stream_name)?;
meta.first_event_at = None;
STREAM_INFO.set_first_event_at(stream_name, None)?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
match CONFIG.options.mode {
Expand Down Expand Up @@ -391,7 +393,7 @@ pub async fn get_first_event(
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl EventFormat for Event {
};

if value_arr
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
{
return Err(anyhow!(
"Could not process this event due to mismatch in datatype"
Expand Down
7 changes: 2 additions & 5 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,8 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first) = self.to_data(
storage_schema,
time_partition,
schema_version,
)?;
let (data, mut schema, is_first) =
self.to_data(storage_schema, time_partition, schema_version)?;

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
Expand Down
33 changes: 15 additions & 18 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
use super::ingest::create_stream_if_not_exists;
use super::modal::utils::logstream_utils::{
create_stream_and_schema_from_storage, create_update_stream,
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
};
use super::query::update_schema_when_distributed;
use crate::alerts::Alerts;
use crate::catalog::get_first_event;
use crate::event::format::{override_data_type, LogSource};
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
Expand Down Expand Up @@ -57,7 +56,7 @@ use std::fs;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
use tracing::warn;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
Expand Down Expand Up @@ -550,19 +549,19 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
return Err(StreamError::StreamNotFound(stream_name));
}
}

let store = CONFIG.storage().get_object_store();
let dates: Vec<String> = Vec::new();
if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
let storage = CONFIG.storage().get_object_store();
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
let stream_first_event_at =
if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) {
Some(first_event_at)
} else if let Ok(Some(first_event_at)) =
storage.get_first_event_from_storage(&stream_name).await
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
update_first_event_at(&stream_name, &first_event_at).await
} else {
None
};

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
Expand All @@ -572,7 +571,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
let stream_info: StreamInfo = StreamInfo {
stream_type: stream_meta.stream_type.clone(),
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
first_event_at: stream_first_event_at,
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta
.time_partition_limit
Expand All @@ -582,8 +581,6 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
log_source: stream_meta.log_source.clone(),
};

// get the other info from

Ok((web::Json(stream_info), StatusCode::OK))
}

Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl IngestServer {
web::put()
.to(ingestor_logstream::put_stream)
.authorize_for_stream(Action::CreateStream),
)
),
)
.service(
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
Expand Down
54 changes: 54 additions & 0 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
storage::{LogStream, ObjectStoreFormat, StreamType},
validator,
};
use tracing::error;

pub async fn create_update_stream(
headers: &HeaderMap,
Expand Down Expand Up @@ -508,3 +509,56 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<

Ok(true)
}

/// Updates the first-event-at in storage and logstream metadata for the specified stream.
///
/// This function updates the `first-event-at` in both the object store and the stream info metadata.
/// If either update fails, an error is logged, but the function will still return the `first-event-at`.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to update.
/// * `first_event_at` - The value of first-event-at.
///
/// # Returns
///
/// * `Option<String>` - Returns `Some(String)` with the provided timestamp if the update is successful,
/// or `None` if an error occurs.
///
/// # Errors
///
/// This function logs an error if:
/// * The `first-event-at` cannot be updated in the object store.
/// * The `first-event-at` cannot be updated in the stream info.
///
/// # Examples
///```ignore
/// ```rust
/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at;
/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await;
/// match result {
/// Some(timestamp) => println!("first-event-at: {}", timestamp),
/// None => eprintln!("Failed to update first-event-at"),
/// }
/// ```
pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option<String> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_first_event_in_stream(stream_name, first_event_at)
.await
{
error!(
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
stream_name
);
}

if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) {
error!(
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
stream_name
);
}

Some(first_event_at.to_string())
}
37 changes: 35 additions & 2 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,46 @@ impl StreamInfo {
pub fn set_first_event_at(
&self,
stream_name: &str,
first_event_at: Option<String>,
first_event_at: &str,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at = first_event_at;
metadata.first_event_at = Some(first_event_at.to_owned());
})
}

/// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata.
///
/// This function is called during the retention task, when the parquet files along with the manifest files are deleted from the storage.
/// The manifest path is removed from the snapshot in the stream.json
/// and the first_event_at value in the stream.json is removed.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed.
///
/// # Returns
///
/// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed,
/// or a `MetadataError` if the stream metadata is not found.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = metadata.remove_first_event_at("my_stream");
/// match result {
/// Ok(()) => println!("first-event-at removed successfully"),
/// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e),
/// }
/// ```
pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at.take();
})
}

Expand Down
110 changes: 109 additions & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use actix_web_prometheus::PrometheusMetrics;
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Local;
use chrono::{DateTime, Local, Utc};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use once_cell::sync::OnceCell;
use relative_path::RelativePath;
Expand Down Expand Up @@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}

/// Updates the first event timestamp in the object store for the specified stream.
///
/// This function retrieves the current object-store format for the given stream,
/// updates the `first_event_at` field with the provided timestamp, and then
/// stores the updated format back in the object store.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to update.
/// * `first_event` - The timestamp of the first event to set.
///
/// # Returns
///
/// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful,
/// or an `ObjectStorageError` if an error occurs.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await;
/// assert!(result.is_ok());
/// ```
async fn update_first_event_in_stream(
&self,
stream_name: &str,
first_event: &str,
) -> Result<(), ObjectStorageError> {
let mut format = self.get_object_store_format(stream_name).await?;
format.first_event_at = Some(first_event.to_string());
let format_json = to_bytes(&format);
self.put_object(&stream_json_path(stream_name), format_json)
.await?;

Ok(())
}

async fn put_alerts(
&self,
stream_name: &str,
Expand Down Expand Up @@ -623,6 +659,78 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}

async fn get_stream_meta_from_storage(
&self,
stream_name: &str,
) -> Result<Vec<ObjectStoreFormat>, ObjectStorageError> {
let mut stream_metas = vec![];
let stream_meta_bytes = self
.get_objects(
Some(&RelativePathBuf::from_iter([
stream_name,
STREAM_ROOT_DIRECTORY,
])),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(stream_meta_bytes) = stream_meta_bytes {
for stream_meta in stream_meta_bytes {
let stream_meta_ob = serde_json::from_slice::<ObjectStoreFormat>(&stream_meta)?;
stream_metas.push(stream_meta_ob);
}
}

Ok(stream_metas)
}

/// Retrieves the earliest first-event-at from the storage for the specified stream.
///
/// This function fetches the object-store format from all the stream.json files for the given stream from the storage,
/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved.
///
/// # Returns
///
/// * `Result<Option<String>, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest
/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError`
/// if an error occurs.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = get_first_event_from_storage("my_stream").await;
/// match result {
/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event),
/// Ok(None) => println!("first-event-at not found"),
/// Err(err) => println!("Error: {:?}", err),
/// }
/// ```
async fn get_first_event_from_storage(
&self,
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
let mut all_first_events = vec![];
let stream_metas = self.get_stream_meta_from_storage(stream_name).await;
if let Ok(stream_metas) = stream_metas {
for stream_meta in stream_metas.iter() {
if let Some(first_event) = &stream_meta.first_event_at {
let first_event = DateTime::parse_from_rfc3339(first_event).unwrap();
let first_event = first_event.with_timezone(&Utc);
all_first_events.push(first_event);
}
}
}

if all_first_events.is_empty() {
return Ok(None);
}
let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339();
Ok(Some(first_event_at))
}

// pick a better name
fn get_bucket_name(&self) -> String;
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ mod action {
return;
}
}
if let Ok(first_event_at) = res_remove_manifest {
if let Ok(Some(first_event_at)) = res_remove_manifest {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at)
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
Expand Down

0 comments on commit 14fbc50

Please sign in to comment.