Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): webhook waiting for persistency. #20164

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,24 @@ message InsertNode {
uint32 session_id = 7;
}

message FastInsertNode {
// Id of the table to perform inserting.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 2;
repeated uint32 column_indices = 3;
data.DataChunk data_chunk = 4;

// An optional field and will be `None` for tables without user-defined pk.
// The `BatchInsertExecutor` should add a column with NULL value which will
// be filled in streaming.
optional uint32 row_id_index = 5;

// Session id is used to ensure that dml data from the same session should be
// sent to a fixed worker node and channel.
uint32 session_id = 6;
}

message DeleteNode {
// Id of the table to perform deleting.
uint32 table_id = 1;
Expand Down
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ message StreamSourceInfo {
message WebhookSourceInfo {
secret.SecretRef secret_ref = 1;
expr.ExprNode signature_expr = 2;
bool wait_for_persistence = 3;
}

message Source {
Expand Down
50 changes: 31 additions & 19 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,29 @@ message CreateTaskRequest {
plan_common.ExprContext expr_context = 5;
}

message CancelTaskRequest {
batch_plan.TaskId task_id = 1;
message FastInsertRequest {
bool wait_epoch = 1;
batch_plan.FastInsertNode fast_insert_node = 2;
// TODO(kexiang): add support for default columns. plan_common.ExprContext expr_context is needed for it.
}

message CancelTaskResponse {
common.Status status = 1;
message FastInsertResponse {
enum Status {
UNSPECIFIED = 0;
SUCCEEDED = 1;
DML_FAILED = 2;
}
Status status = 1;
string error_message = 2;
}

message GetTaskInfoRequest {
batch_plan.TaskId task_id = 1;
}
message CancelTaskRequest { batch_plan.TaskId task_id = 1; }

message GetDataResponse {
data.DataChunk record_batch = 2;
}
message CancelTaskResponse { common.Status status = 1; }

message GetTaskInfoRequest { batch_plan.TaskId task_id = 1; }

message GetDataResponse { data.DataChunk record_batch = 2; }

message ExecuteRequest {
batch_plan.TaskId task_id = 1;
Expand All @@ -70,14 +78,14 @@ message ExecuteRequest {

service TaskService {
rpc CreateTask(CreateTaskRequest) returns (stream TaskInfoResponse);
// Cancel an already-died (self execution-failure, previous aborted, completed) task will still succeed.
// Cancel an already-died (self execution-failure, previous aborted,
// completed) task will still succeed.
rpc CancelTask(CancelTaskRequest) returns (CancelTaskResponse);
rpc Execute(ExecuteRequest) returns (stream GetDataResponse);
rpc FastInsert(FastInsertRequest) returns (FastInsertResponse);
}

message GetDataRequest {
batch_plan.TaskOutputId task_output_id = 1;
}
message GetDataRequest { batch_plan.TaskOutputId task_output_id = 1; }

// The structure for permit-based back-pressure.
message Permits {
Expand All @@ -99,18 +107,22 @@ message GetStreamRequest {
}

oneof value {
// The first message, which tells the upstream which channel this exchange stream is for.
// The first message, which tells the upstream which channel this exchange
// stream is for.
Get get = 1;
// The following messages, which adds the permits back to the upstream to achieve back-pressure.
// The following messages, which adds the permits back to the upstream to
// achieve back-pressure.
Permits add_permits = 2;
}
}

message GetStreamResponse {
stream_plan.StreamMessage message = 1;
// The number of permits acquired for this message, which should be sent back to the upstream with `add_permits`.
// In theory, this can also be derived from the message itself by the receiver. Here we send it explicitly to
// avoid any sense of inconsistency for the derivation logic, so the receiver can simply send it back verbatim.
// The number of permits acquired for this message, which should be sent back
// to the upstream with `add_permits`. In theory, this can also be derived
// from the message itself by the receiver. Here we send it explicitly to
// avoid any sense of inconsistency for the derivation logic, so the receiver
// can simply send it back verbatim.
Permits permits = 2;
}

Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository = { workspace = true }

[dependencies]
anyhow = "1"
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
either = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ mod tests {
assert_eq!(*chunk.columns()[2], array);
});

assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(_));
assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(..));
let epoch = u64::MAX;
let full_range = (Bound::Unbounded, Bound::Unbounded);
let store_content = store
Expand Down
242 changes: 242 additions & 0 deletions src/batch/src/executor/fast_insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright 2024 RisingWave Labs
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::iter::repeat;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op, SerialArray, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
use risingwave_common::transaction::transaction_id::TxnId;
use risingwave_common::types::DataType;
use risingwave_common::util::epoch::Epoch;
use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_pb::batch_plan::FastInsertNode;

use crate::error::Result;

/// A fast insert executor spacially designed for non-pgwire inserts like websockets and webhooks.
pub struct FastInsertExecutor {
/// Target table id.
table_id: TableId,
table_version_id: TableVersionId,
dml_manager: DmlManagerRef,
column_indices: Vec<usize>,

row_id_index: Option<usize>,
txn_id: TxnId,
session_id: u32,
}

impl FastInsertExecutor {
pub fn build(
dml_manager: DmlManagerRef,
insert_node: FastInsertNode,
) -> Result<(FastInsertExecutor, DataChunk)> {
let table_id = TableId::new(insert_node.table_id);
let column_indices = insert_node
.column_indices
.iter()
.map(|&i| i as usize)
.collect();
let mut schema = Schema::new(vec![Field::unnamed(DataType::Jsonb)]);
schema.fields.push(Field::unnamed(DataType::Serial)); // row_id column
let data_chunk_pb = insert_node
.data_chunk
.expect("no data_chunk found in fast insert node");

Ok((
FastInsertExecutor::new(
table_id,
insert_node.table_version_id,
dml_manager,
column_indices,
insert_node.row_id_index.as_ref().map(|index| *index as _),
insert_node.session_id,
),
DataChunk::from_protobuf(&data_chunk_pb)?,
))
}

#[allow(clippy::too_many_arguments)]
pub fn new(
table_id: TableId,
table_version_id: TableVersionId,
dml_manager: DmlManagerRef,
column_indices: Vec<usize>,
row_id_index: Option<usize>,
session_id: u32,
) -> Self {
let txn_id = dml_manager.gen_txn_id();
Self {
table_id,
table_version_id,
dml_manager,
column_indices,
row_id_index,
txn_id,
session_id,
}
}
}

impl FastInsertExecutor {
pub async fn do_execute(self, data_chunk_to_insert: DataChunk) -> Result<Epoch> {
let table_dml_handle = self
.dml_manager
.table_dml_handle(self.table_id, self.table_version_id)?;
let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;

write_handle.begin()?;

// Transform the data chunk to a stream chunk, then write to the source.
// Return the returning chunk.
let write_txn_data = |chunk: DataChunk| async {
let cap = chunk.capacity();
let (mut columns, vis) = chunk.into_parts();

let mut ordered_columns = self
.column_indices
.iter()
.enumerate()
.map(|(i, idx)| (*idx, columns[i].clone()))
.collect_vec();

ordered_columns.sort_unstable_by_key(|(idx, _)| *idx);
columns = ordered_columns
.into_iter()
.map(|(_, column)| column)
.collect_vec();

// If the user does not specify the primary key, then we need to add a column as the
// primary key.
if let Some(row_id_index) = self.row_id_index {
let row_id_col = SerialArray::from_iter(repeat(None).take(cap));
columns.insert(row_id_index, Arc::new(row_id_col.into()))
}

let stream_chunk = StreamChunk::with_visibility(vec![Op::Insert; cap], columns, vis);

#[cfg(debug_assertions)]
table_dml_handle.check_chunk_schema(&stream_chunk);

write_handle.write_chunk(stream_chunk).await?;

Result::Ok(())
};
write_txn_data(data_chunk_to_insert).await?;
let epoch = write_handle.end_returning_epoch().await?;
return Ok(epoch);
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::ops::Bound;

use assert_matches::assert_matches;
use futures::StreamExt;
use risingwave_common::array::{Array, JsonbArrayBuilder};
use risingwave_common::catalog::{ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID};
use risingwave_common::transaction::transaction_message::TxnMsg;
use risingwave_common::types::JsonbVal;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::store::{ReadOptions, StateStoreReadExt};
use serde_json::json;

use super::*;
use crate::risingwave_common::array::ArrayBuilder;
use crate::risingwave_common::types::Scalar;
use crate::*;

#[tokio::test]
async fn test_fast_insert() -> Result<()> {
let epoch = Epoch::now();
let dml_manager = Arc::new(DmlManager::for_test());
let store = MemoryStateStore::new();
// Schema of the table
let mut schema = Schema::new(vec![Field::unnamed(DataType::Jsonb)]);
schema.fields.push(Field::unnamed(DataType::Serial)); // row_id column

let row_id_index = Some(1);

let mut builder = JsonbArrayBuilder::with_type(1, DataType::Jsonb);

let mut header_map = HashMap::new();
header_map.insert("data".to_string(), "value1".to_string());

let json_value = json!(header_map);
let jsonb_val = JsonbVal::from(json_value);
builder.append(Some(jsonb_val.as_scalar_ref()));

// Use builder to obtain a single (List) column DataChunk
let data_chunk = DataChunk::new(vec![builder.finish().into_ref()], 1);

// Create the table.
let table_id = TableId::new(0);

// Create reader
let column_descs = schema
.fields
.iter()
.enumerate()
.map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
.collect_vec();
// We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
// due to the `Weak` reference in `DmlManager`.
let reader = dml_manager
.register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
.unwrap();
let mut reader = reader.stream_reader().into_stream();

// Insert
let insert_executor = Box::new(FastInsertExecutor::new(
table_id,
INITIAL_TABLE_VERSION_ID,
dml_manager,
vec![0], // Ignoring insertion order
row_id_index,
0,
));
let handle = tokio::spawn(async move {
let epoch_recieved = insert_executor.do_execute(data_chunk).await.unwrap();

Check warning on line 215 in src/batch/src/executor/fast_insert.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"recieved" should be "received".
assert_eq!(epoch, epoch_recieved);

Check warning on line 216 in src/batch/src/executor/fast_insert.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"recieved" should be "received".
});

// Read
assert_matches!(reader.next().await.unwrap()?, TxnMsg::Begin(_));

assert_matches!(reader.next().await.unwrap()?, TxnMsg::Data(_, chunk) => {
assert_eq!(chunk.columns().len(),2);
let array = chunk.columns()[0].as_jsonb().iter().collect::<Vec<_>>();
assert_eq!(JsonbVal::from(array[0].unwrap()), jsonb_val);
});

assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(_, Some(epoch_notifier)) => {
epoch_notifier.send(epoch.clone()).unwrap();
});
let epoch = u64::MAX;
let full_range = (Bound::Unbounded, Bound::Unbounded);
let store_content = store
.scan(full_range, epoch, None, ReadOptions::default())
.await?;
assert!(store_content.is_empty());

handle.await.unwrap();

Ok(())
}
}
Loading
Loading