-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathtable.properties
408 lines (321 loc) · 22.8 KB
/
table.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
## The following table properties relate to the definition of data inside a table.
# A unique name identifying this table.
sleeper.table.name=example-table
# A boolean flag representing whether this table is online or offline.
# An offline table will not have any partition splitting or compaction jobs run automatically.
# Note that taking a table offline will not stop any partitions that are being split or compaction
# jobs that are running. Additionally, you are still able to ingest data to offline tables and perform
# queries against them.
sleeper.table.online=true
# Fully qualified class of a custom iterator to use when iterating over the values in this table.
# Defaults to nothing.
sleeper.table.iterator.class.name=sleeper.core.iterator.impl.AgeOffIterator
# Iterator configuration. An iterator will be initialised with the following configuration.
sleeper.table.iterator.config=b,3600000
## The following table properties relate to partition splitting.
# Splits file which will be used to initialise the partitions for this table. Defaults to nothing and
# the table will be created with a single root partition.
sleeper.table.splits.file=example/full/splits.txt
# Flag to set if you have base64 encoded the split points (only used for string key types and defaults
# to false).
sleeper.table.splits.base64.encoded=false
# Partitions in this table with more than the following number of records in will be split.
sleeper.table.partition.splitting.threshold=1000000000
# If true, partition splits will be applied via asynchronous requests sent to the state store
# committer lambda. If false, the partition splitting lambda will apply splits synchronously.
# This is only applied if async commits are enabled for the table. The default value is set in an
# instance property.
sleeper.table.partition.splitting.commit.async=true
## The following table properties relate to the storage of data inside a table.
# The size of the row group in the Parquet files - defaults to the value in the instance properties.
sleeper.table.rowgroup.size=8388608
# The size of the page in the Parquet files - defaults to the value in the instance properties.
sleeper.table.page.size=131072
# Whether dictionary encoding should be used for row key columns in the Parquet files.
sleeper.table.parquet.dictionary.encoding.rowkey.fields=false
# Whether dictionary encoding should be used for sort key columns in the Parquet files.
sleeper.table.parquet.dictionary.encoding.sortkey.fields=false
# Whether dictionary encoding should be used for value columns in the Parquet files.
sleeper.table.parquet.dictionary.encoding.value.fields=false
# Used to set parquet.columnindex.truncate.length, see documentation here:
# https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md
# The length in bytes to truncate binary values in a column index.
sleeper.table.parquet.columnindex.truncate.length=128
# Used to set parquet.statistics.truncate.length, see documentation here:
# https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md
# The length in bytes to truncate the min/max binary values in row groups.
sleeper.table.parquet.statistics.truncate.length=2147483647
# Used to set parquet.writer.version, see documentation here:
# https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md
# Can be either v1 or v2. The v2 pages store levels uncompressed while v1 pages compress levels with
# the data.
sleeper.table.parquet.writer.version=v2
# Used during Parquet queries to determine whether the column indexes are used.
sleeper.table.parquet.query.column.index.enabled=false
# The S3 readahead range - defaults to the row group size.
sleeper.table.fs.s3a.readahead.range=8388608
# The compression codec to use for this table. Defaults to the value in the instance properties.
# Valid values are: [uncompressed, snappy, gzip, lzo, brotli, lz4, zstd]
sleeper.table.compression.codec=zstd
# A file will not be deleted until this number of minutes have passed after it has been marked as
# ready for garbage collection. The reason for not deleting files immediately after they have been
# marked as ready for garbage collection is that they may still be in use by queries. Defaults to the
# value set in the instance properties.
sleeper.table.gc.delay.minutes=15
# If true, deletion of files will be applied via asynchronous requests sent to the state store
# committer lambda. If false, the garbage collector lambda will apply synchronously.
# This is only applied if async commits are enabled for the table. The default value is set in an
# instance property.
sleeper.table.gc.commit.async=true
## The following table properties relate to compactions.
# The name of the class that defines how compaction jobs should be created.
# This should implement sleeper.compaction.strategy.CompactionStrategy. Defaults to the strategy used
# by the whole instance (set in the instance properties).
sleeper.table.compaction.strategy.class=sleeper.compaction.core.job.creation.strategy.impl.SizeRatioCompactionStrategy
# The maximum number of files to read in a compaction job. Note that the state store must support
# atomic updates for this many files.
# The DynamoDBStateStore must be able to atomically apply 2 updates for each input file to remove the
# file references and update the file reference count, and another 2 updates for an output file to add
# a new file reference and update the reference count. There's a limit of 100 atomic updates, which
# equates to 49 files in a compaction.
# See also: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html.
# Also note that this many files may need to be open simultaneously. The value of
# 'sleeper.fs.s3a.max-connections' must be at least the value of this plus one. The extra one is for
# the output file.
sleeper.table.compaction.files.batch.size=12
# The number of compaction jobs that are to be created as a limit during a single invocation. If this
# limit is exceeded, the selection of jobs is randomised.
sleeper.table.compaction.job.creation.limit=100000
# The number of compaction jobs to send in a single batch.
# When compaction jobs are created, there is no limit on how many jobs can be created at once. A batch
# is a group of compaction jobs that will have their creation updates applied at the same time. For
# each batch, we send all compaction jobs to the SQS queue, then update the state store to assign job
# IDs to the input files.
sleeper.table.compaction.job.send.batch.size=1000
# The amount of time in seconds a batch of compaction jobs may be pending before it should not be
# retried. If the input files have not been successfully assigned to the jobs, and this much time has
# passed, then the batch will fail to send.
# Once a pending batch fails the input files will never be compacted again without other intervention,
# so it's important to ensure file assignment will be done within this time. That depends on the
# throughput of state store commits.
# It's also necessary to ensure file assignment will be done before the next invocation of compaction
# job creation, otherwise invalid jobs will be created for the same input files. The rate of these
# invocations is set in `sleeper.compaction.job.creation.period.minutes`.
sleeper.table.compaction.job.send.timeout.seconds=90
# The amount of time in seconds to wait between attempts to send a batch of compaction jobs. The batch
# will be sent if all input files have been successfully assigned to the jobs, otherwise the batch
# will be retried after a delay.
sleeper.table.compaction.job.send.retry.delay.seconds=30
# If true, compaction job ID assignment commit requests will be sent to the state store committer
# lambda to be performed asynchronously. If false, compaction job ID assignments will be committed
# synchronously by the compaction job creation lambda.
# This is only applied if async commits are enabled for the table. The default value is set in an
# instance property.
sleeper.table.compaction.job.id.assignment.commit.async=true
# If true, compaction job commit requests will be sent to the state store committer lambda to be
# performed asynchronously. If false, compaction jobs will be committed synchronously by compaction
# tasks.
# This is only applied if async commits are enabled for the table. The default value is set in an
# instance property.
sleeper.table.compaction.job.commit.async=true
# This property affects whether commits of compaction jobs are batched before being sent to the state
# store commit queue to be applied by the committer lambda. If this property is true and asynchronous
# commits are enabled then commits of compactions will be batched. If this property is false and
# asynchronous commits are enabled then commits of compactions will not be batched and will be sent
# directly to the committer lambda.
sleeper.table.compaction.job.async.commit.batching=true
# Used by the SizeRatioCompactionStrategy to decide if a group of files should be compacted.
# If the file sizes are s_1, ..., s_n then the files are compacted if s_1 + ... + s_{n-1} >= ratio *
# s_n.
sleeper.table.compaction.strategy.sizeratio.ratio=3
# Used by the SizeRatioCompactionStrategy to control the maximum number of jobs that can be running
# concurrently per partition.
sleeper.table.compaction.strategy.sizeratio.max.concurrent.jobs.per.partition=2147483647
# Select which compaction method to use for the table. DataFusion compaction support is experimental.
# Valid values are: [java, datafusion]
sleeper.table.compaction.method=JAVA
## The following table properties relate to storing and retrieving metadata for tables.
# The name of the class used for the state store. The default is DynamoDBTransactionLogStateStore.
# Options are:
# sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore
# sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStoreNoSnapshots
# sleeper.statestore.s3.S3StateStore
# sleeper.statestore.dynamodb.DynamoDBStateStore
sleeper.table.statestore.classname=sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore
# Overrides whether or not to apply state store updates asynchronously via the state store committer.
# Usually this is decided based on the state store implementation used by the Sleeper table, but other
# default behaviour can be set for the Sleeper instance.
# This is separate from the properties that determine which state store updates will be done as
# asynchronous commits. Those properties will only be applied when asynchronous commits are enabled.
sleeper.table.statestore.commit.async.enabled=true
# When using the transaction log state store, this sets whether to update from the transaction log
# before adding a transaction in the asynchronous state store committer.
# If asynchronous commits are used for all or almost all state store updates, this can be false to
# avoid the extra queries.
# If the state store is commonly updated directly outside of the asynchronous committer, this can be
# true to avoid conflicts and retries.
sleeper.table.statestore.committer.update.every.commit=false
# When using the transaction log state store, this sets whether to update from the transaction log
# before adding a batch of transactions in the asynchronous state store committer.
sleeper.table.statestore.committer.update.every.batch=true
# The number of attempts to make when applying a transaction to the state store.
sleeper.table.statestore.transactionlog.add.transaction.max.attempts=10
# The maximum amount of time to wait before the first retry when applying a transaction to the state
# store. Full jitter will be applied so that the actual wait time will be a random period between 0
# and this value. This ceiling will increase exponentially on further retries. See the below article.
# https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
sleeper.table.statestore.transactionlog.add.transaction.first.retry.wait.ceiling.ms=200
# The maximum amount of time to wait before any retry when applying a transaction to the state store.
# Full jitter will be applied so that the actual wait time will be a random period between 0 and this
# value. This restricts the exponential increase of the wait ceiling while retrying the transaction.
# See the below article.
# https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
sleeper.table.statestore.transactionlog.add.transaction.max.retry.wait.ceiling.ms=30000
# The number of seconds to wait after we've loaded a snapshot before looking for a new snapshot. This
# should relate to the rate at which new snapshots are created, configured in the instance property
# `sleeper.statestore.transactionlog.snapshot.creation.lambda.period.seconds`.
sleeper.table.statestore.transactionlog.time.between.snapshot.checks.seconds=60
# The number of milliseconds to wait after we've updated from the transaction log before checking for
# new transactions. The state visible to an instance of the state store can be out of date by this
# amount. This can avoid excessive queries by the same process, but can result in unwanted behaviour
# when using multiple state store objects. When adding a new transaction to update the state, this
# will be ignored and the state will be brought completely up to date.
sleeper.table.statestore.transactionlog.time.between.transaction.checks.ms=0
# The minimum number of transactions that a snapshot must be ahead of the local state, before we load
# the snapshot instead of updating from the transaction log.
sleeper.table.statestore.transactionlog.snapshot.load.min.transactions.ahead=10
# The number of days that transaction log snapshots remain in the snapshot store before being deleted.
sleeper.table.statestore.transactionlog.snapshot.expiry.days=2
# The minimum age in minutes of a snapshot in order to allow deletion of transactions leading up to
# it. When deleting old transactions, there's a chance that processes may still read transactions
# starting from an older snapshot. We need to avoid deletion of any transactions associated with a
# snapshot that may still be used as the starting point for reading the log.
sleeper.table.statestore.transactionlog.delete.behind.snapshot.min.age.minutes=2
# The minimum number of transactions that a transaction must be behind the latest snapshot before
# being deleted. This is the number of transactions that will be kept and protected from deletion,
# whenever old transactions are deleted. This includes the transaction that the latest snapshot was
# created against. Any transactions after the snapshot will never be deleted as they are still in
# active use.
# This should be configured in relation to the property which determines whether a process will load
# the latest snapshot or instead seek through the transaction log, since we need to preserve
# transactions that may still be read:
# sleeper.table.statestore.snapshot.load.min.transactions.ahead
# The snapshot that will be considered the latest snapshot is configured by a property to set the
# minimum age for it to count for this:
# sleeper.table.statestore.transactionlog.delete.behind.snapshot.min.age
sleeper.table.statestore.transactionlog.delete.number.behind.latest.snapshot=200
# This specifies whether queries and scans against DynamoDB tables used in the state stores are
# strongly consistent.
sleeper.table.statestore.dynamo.consistent.reads=false
## The following table properties relate to ingest.
# Specifies the strategy that ingest uses to creates files and references in partitions.
# Valid values are: [one_file_per_leaf, one_reference_per_leaf]
sleeper.table.ingest.file.writing.strategy=one_reference_per_leaf
# The way in which records are held in memory before they are written to a local store.
# Valid values are 'arraylist' and 'arrow'.
# The arraylist method is simpler, but it is slower and requires careful tuning of the number of
# records in each batch.
sleeper.table.ingest.record.batch.type=arrow
# The way in which partition files are written to the main Sleeper store.
# Valid values are 'direct' (which writes using the s3a Hadoop file system) and 'async' (which writes
# locally and then copies the completed Parquet file asynchronously into S3).
# The direct method is simpler but the async method should provide better performance when the number
# of partitions is large.
sleeper.table.ingest.partition.file.writer.type=async
# If true, ingest tasks will add files via requests sent to the state store committer lambda
# asynchronously. If false, ingest tasks will commit new files synchronously.
# This is only applied if async commits are enabled for the table. The default value is set in an
# instance property.
sleeper.table.ingest.job.files.commit.async=true
## The following table properties relate to bulk import, i.e. ingesting data using Spark jobs running
## on EMR or EKS.
# (Non-persistent EMR mode only) Which architecture to be used for EC2 instance types in the EMR
# cluster. Must be either "x86_64" "arm64" or "x86_64,arm64". For more information, see the Bulk
# import using EMR - Instance types section in docs/usage/ingest.md
sleeper.table.bulk.import.emr.instance.architecture=arm64
# (Non-persistent EMR mode only) The EC2 x86_64 instance types and weights to be used for the master
# node of the EMR cluster.
# For more information, see the Bulk import using EMR - Instance types section in docs/usage/ingest.md
sleeper.table.bulk.import.emr.master.x86.instance.types=m7i.xlarge
# (Non-persistent EMR mode only) The EC2 x86_64 instance types and weights to be used for the executor
# nodes of the EMR cluster.
# For more information, see the Bulk import using EMR - Instance types section in docs/usage/ingest.md
sleeper.table.bulk.import.emr.executor.x86.instance.types=m7i.4xlarge
# (Non-persistent EMR mode only) The EC2 ARM64 instance types and weights to be used for the master
# node of the EMR cluster.
# For more information, see the Bulk import using EMR - Instance types section in docs/usage/ingest.md
sleeper.table.bulk.import.emr.master.arm.instance.types=m7g.xlarge
# (Non-persistent EMR mode only) The EC2 ARM64 instance types and weights to be used for the executor
# nodes of the EMR cluster.
# For more information, see the Bulk import using EMR - Instance types section in docs/usage/ingest.md
sleeper.table.bulk.import.emr.executor.arm.instance.types=m7g.4xlarge
# (Non-persistent EMR mode only) The purchasing option to be used for the executor nodes of the EMR
# cluster.
# Valid values are ON_DEMAND or SPOT.
sleeper.table.bulk.import.emr.executor.market.type=SPOT
# (Non-persistent EMR mode only) The initial number of capacity units to provision as EC2 instances
# for executors in the EMR cluster.
# This is measured in instance fleet capacity units. These are declared alongside the requested
# instance types, as each type will count for a certain number of units. By default the units are the
# number of instances.
# This value overrides the default value in the instance properties. It can be overridden by a value
# in the bulk import job specification.
sleeper.table.bulk.import.emr.executor.initial.capacity=2
# (Non-persistent EMR mode only) The maximum number of capacity units to provision as EC2 instances
# for executors in the EMR cluster.
# This is measured in instance fleet capacity units. These are declared alongside the requested
# instance types, as each type will count for a certain number of units. By default the units are the
# number of instances.
# This value overrides the default value in the instance properties. It can be overridden by a value
# in the bulk import job specification.
sleeper.table.bulk.import.emr.executor.max.capacity=10
# (Non-persistent EMR mode only) The EMR release label to be used when creating an EMR cluster for
# bulk importing data using Spark running on EMR.
# This value overrides the default value in the instance properties. It can be overridden by a value
# in the bulk import job specification.
sleeper.table.bulk.import.emr.release.label=emr-7.2.0
# Specifies the minimum number of leaf partitions that are needed to run a bulk import job. If this
# minimum has not been reached, bulk import jobs will refuse to start
sleeper.table.bulk.import.min.leaf.partitions=64
# If true, bulk import will add files via requests sent to the state store committer lambda
# asynchronously. If false, bulk import will commit new files at the end of the job synchronously.
# This is only applied if async commits are enabled for the table. The default value is set in an
# instance property.
sleeper.table.bulk.import.job.files.commit.async=true
## The following table properties relate to the ingest batcher.
# Specifies the minimum total file size required for an ingest job to be batched and sent. An ingest
# job will be created if the batcher runs while this much data is waiting, and the minimum number of
# files is also met.
sleeper.table.ingest.batcher.job.min.size=1G
# Specifies the maximum total file size for a job in the ingest batcher. If more data is waiting than
# this, it will be split into multiple jobs. If a single file exceeds this, it will still be ingested
# in its own job. It's also possible some data may be left for a future run of the batcher if some
# recent files overflow the size of a job but aren't enough to create a job on their own.
sleeper.table.ingest.batcher.job.max.size=5G
# Specifies the minimum number of files for a job in the ingest batcher. An ingest job will be created
# if the batcher runs while this many files are waiting, and the minimum size of files is also met.
sleeper.table.ingest.batcher.job.min.files=1
# Specifies the maximum number of files for a job in the ingest batcher. If more files are waiting
# than this, they will be split into multiple jobs. It's possible some data may be left for a future
# run of the batcher if some recent files overflow the size of a job but aren't enough to create a job
# on their own.
sleeper.table.ingest.batcher.job.max.files=100
# Specifies the maximum time in seconds that a file can be held in the batcher before it will be
# included in an ingest job. When any file has been waiting for longer than this, a job will be
# created with all the currently held files, even if other criteria for a batch are not met.
sleeper.table.ingest.batcher.file.max.age.seconds=300
# Specifies the target ingest queue where batched jobs are sent.
# Valid values are: [standard_ingest, bulk_import_emr, bulk_import_persistent_emr, bulk_import_eks,
# bulk_import_emr_serverless]
sleeper.table.ingest.batcher.ingest.queue=bulk_import_emr_serverless
# The time in minutes that the tracking information is retained for a file before the records of its
# ingest are deleted (eg. which ingest job it was assigned to, the time this occurred, the size of the
# file).
# The expiry time is fixed when a file is saved to the store, so changing this will only affect new
# data.
# Defaults to 1 week.
sleeper.table.ingest.batcher.file.tracking.ttl.minutes=10080
## The following table properties relate to query execution
# The amount of time in minutes the query executor cache is valid for before it times out and needs
# refreshing.
sleeper.table.query.processor.cache.timeout=60