mssql to parquet as a batch #2274
-
Hello, currently I am trying to read data from a mssql db and store it in an azure_blob_storage as a parquet file. For this test I am trying to load 9 IDs and save them into one singe parquet file. I tried to buffer the data before writing it, but this did not worked. My final goal is to have a Benthos instance, that I start once a day and it copies all data that accumulated on this day into a Do you have any ideas what I am doing wrong? input:
label: "database_batched"
broker:
inputs:
- sql_select:
driver: "mssql"
dsn: "<connection>"
table: <table>
columns: ["*"]
where: ID > 64 AND ID < 74
batching:
count: 0
byte_size: 0
period: "5s"
check: ""
pipeline:
processors:
- label: "parquet_encode"
parquet_encode:
schema:
- name: ID
type: INT64
default_compression: uncompressed
default_encoding: PLAIN
output:
label: "testfile"
file:
path: /data/${!timestamp_unix_nano()}.parquet
#path: /data/data.parquet
codec: all-bytes |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
Hey @Someone894 👋 I think the input:
generate:
mapping: |
root.id = count("foo")
count: 2
batch_size: 2
processors:
- log:
message: ${! batch_size() }
- parquet_encode:
schema:
- name: id
type: INT64
default_compression: uncompressed
default_encoding: PLAIN
- log:
message: ${! error() }
- log:
message: ${! batch_size() }
output:
file:
path: ./output.parquet Note that the logs will print the error as In your case, you're setting It doesn't make sense to use a static name for the output file if you expect to have more than one batch, because then the parquet-encoded data will be appended to it, which I think results in a corrupt parquet file. |
Beta Was this translation helpful? Give feedback.
-
By now I found my problem The database I get my data from is updated regularly (a new record every vew secs) but when I set the period to Of course @mihaitodor you are right about the static file name. Benthos just overwrites the old file, I added it just to keep the MWE short. Here is the corrected MWE: input:
label: "database_batched"
broker:
inputs:
- sql_select:
driver: "mssql"
dsn: "<connection>"
table: <table>
columns: ["*"]
where: ID > 64 AND ID < 74
batching:
count: 0
byte_size: 0
period: 5s
check: ""
pipeline:
processors:
- label: "parquet_encode"
parquet_encode:
schema:
- name: ID
type: INT64
default_compression: uncompressed
default_encoding: PLAIN
output:
label: "testfile"
file:
path: /data/${!timestamp_unix_nano()}.parquet
#path: /data/data.parquet
codec: all-bytes |
Beta Was this translation helpful? Give feedback.
By now I found my problem
period: "5s"
is not working butperiod: 5s
is. But @mihaitodor your improved logging is very valuble to me since it feels like Benthos is rather quiet about problems, for example I only found out about the quotes by trying it out at random.The database I get my data from is updated regularly (a new record every vew secs) but when I set the period to
1h
Benthos collects all avilabe data and then just waits for the time to run out without ever checking for new data. Can Benthos be configrued in a way that it collects all data as it occures and starts a new parquet file once a day? Alternatively I just call Benthos once a day and then copy the data from the last da…