Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 29, 2025
2 parents f697da8 + 5294779 commit afca4ba
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 31 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ on:
- "helm/**"
- "assets/**"
- "**.md"
push:
branches:
- main

name: Ensure parseable builds on all release targets
jobs:
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ on:
- "helm/**"
- "assets/**"
- "**.md"
push:
branches:
- main

name: Lint, Test and Coverage Report
jobs:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ WORKDIR /parseable

# Cache dependencies
COPY Cargo.toml Cargo.lock build.rs .git ./
RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build --release && rm -rf src
RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build && rm -rf src

# Build the actual binary
COPY src ./src
Expand Down
16 changes: 10 additions & 6 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let stream_info: StreamInfo = StreamInfo {
stream_type: stream_meta.stream_type.clone(),
let stream_info = StreamInfo {
stream_type: stream_meta.stream_type,
created_at: stream_meta.created_at.clone(),
first_event_at: stream_first_event_at,
time_partition: stream_meta.time_partition.clone(),
Expand Down Expand Up @@ -626,8 +626,10 @@ pub async fn put_stream_hot_tier(
}
}

if PARSEABLE.streams.stream_type(&stream_name).unwrap()
== Some(StreamType::Internal.to_string())
if PARSEABLE
.streams
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
return Err(StreamError::Custom {
msg: "Hot tier can not be updated for internal stream".to_string(),
Expand Down Expand Up @@ -717,8 +719,10 @@ pub async fn delete_stream_hot_tier(
return Err(StreamError::HotTierNotEnabled(stream_name));
};

if PARSEABLE.streams.stream_type(&stream_name).unwrap()
== Some(StreamType::Internal.to_string())
if PARSEABLE
.streams
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
return Err(StreamError::Custom {
msg: "Hot tier can not be deleted for internal stream".to_string(),
Expand Down
6 changes: 4 additions & 2 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ pub async fn get_stats(
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let ingestor_stats = if PARSEABLE.streams.stream_type(&stream_name).unwrap()
== Some(StreamType::UserDefined.to_string())
let ingestor_stats = if PARSEABLE
.streams
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
Some(fetch_stats_from_ingestors(&stream_name).await?)
} else {
Expand Down
15 changes: 8 additions & 7 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct LogStreamMetadata {
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub stream_type: Option<String>,
pub stream_type: StreamType,
pub log_source: LogSource,
}

Expand Down Expand Up @@ -326,7 +326,7 @@ impl StreamInfo {
} else {
static_schema
},
stream_type: Some(stream_type.to_string()),
stream_type,
schema_version,
log_source,
..Default::default()
Expand All @@ -351,16 +351,17 @@ impl StreamInfo {
self.read()
.expect(LOCK_EXPECT)
.iter()
.filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string())
.filter(|(_, v)| v.stream_type == StreamType::Internal)
.map(|(k, _)| k.clone())
.collect()
}

pub fn stream_type(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
pub fn stream_type(&self, stream_name: &str) -> Result<StreamType, MetadataError> {
self.read()
.expect(LOCK_EXPECT)
.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.stream_type.clone())
.map(|metadata| metadata.stream_type)
}

pub fn update_stats(
Expand Down
7 changes: 2 additions & 5 deletions src/parseable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
option::Mode,
storage::{
object_storage::parseable_json_path, LogStream, ObjectStorageError, ObjectStorageProvider,
ObjectStoreFormat, StreamType,
ObjectStoreFormat,
},
};

Expand Down Expand Up @@ -190,10 +190,7 @@ impl Parseable {
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag;
let stream_type = stream_metadata
.stream_type
.map(|s| StreamType::from(s.as_str()))
.unwrap_or_default();
let stream_type = stream_metadata.stream_type;
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
self.streams.add_stream(
Expand Down
8 changes: 5 additions & 3 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ pub struct ObjectStoreFormat {
pub static_schema_flag: bool,
#[serde(default)]
pub hot_tier_enabled: bool,
pub stream_type: Option<String>,
#[serde(default)]
pub stream_type: StreamType,
#[serde(default)]
pub log_source: LogSource,
}
Expand All @@ -140,7 +141,8 @@ pub struct StreamInfo {
skip_serializing_if = "std::ops::Not::not"
)]
pub static_schema_flag: bool,
pub stream_type: Option<String>,
#[serde(default)]
pub stream_type: StreamType,
pub log_source: LogSource,
}

Expand Down Expand Up @@ -205,7 +207,7 @@ impl Default for ObjectStoreFormat {
version: CURRENT_SCHEMA_VERSION.to_string(),
schema_version: SchemaVersion::V1, // Newly created streams should be v1
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
stream_type: Some(StreamType::UserDefined.to_string()),
stream_type: StreamType::UserDefined,
created_at: Local::now().to_rfc3339(),
first_event_at: None,
owner: Owner::new("".to_string(), "".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
let format = ObjectStoreFormat {
created_at: Local::now().to_rfc3339(),
permissions: vec![Permisssion::new(PARSEABLE.options.username.clone())],
stream_type: Some(stream_type.to_string()),
stream_type,
time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()),
time_partition_limit: time_partition_limit.map(|limit| limit.to_string()),
custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()),
Expand Down

0 comments on commit afca4ba

Please sign in to comment.