diff --git a/README.md b/README.md index b795166..e1fbe52 100644 --- a/README.md +++ b/README.md @@ -72,3 +72,5 @@ Almost C API, excepts: - [ ] compaction_filter/compaction_filter_factory/compaction_filter_context - [ ] transactiondb_property_value/transactiondb_property_int - [ ] optimistictransactiondb_property_value/optimistictransactiondb_property_int +- [ ] writebatch_update_timestamps/writebatch_wi_update_timestamps + diff --git a/build.sh b/build.sh index 1d20c05..ab75730 100755 --- a/build.sh +++ b/build.sh @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version} # Note: if you don't have a good reason, please do not set -DPORTABLE=ON # This one is set here on purpose of compatibility with github action runtime processor -rocksdb_version="9.3.1" +rocksdb_version="9.4.0" cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \ mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \ -DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \ diff --git a/c.h b/c.h index 6e20676..99d55a6 100644 --- a/c.h +++ b/c.h @@ -434,6 +434,9 @@ rocksdb_create_column_family_with_ttl( extern ROCKSDB_LIBRARY_API void rocksdb_drop_column_family( rocksdb_t* db, rocksdb_column_family_handle_t* handle, char** errptr); +extern ROCKSDB_LIBRARY_API rocksdb_column_family_handle_t* + rocksdb_get_default_column_family_handle(rocksdb_t* db); + extern ROCKSDB_LIBRARY_API void rocksdb_column_family_handle_destroy( rocksdb_column_family_handle_t*); @@ -502,6 +505,13 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_get_cf_with_ts( rocksdb_column_family_handle_t* column_family, const char* key, size_t keylen, size_t* vallen, char** ts, size_t* tslen, char** errptr); +/** + * Returns a malloc() buffer with the DB identity, assigning the length to + * *id_len. Returns NULL if an error occurred. + */ +extern ROCKSDB_LIBRARY_API char* rocksdb_get_db_identity(rocksdb_t* db, + size_t* id_len); + // if values_list[i] == NULL and errs[i] == NULL, // then we got status.IsNotFound(), which we will not return. // all errors except status status.ok() and status.IsNotFound() are returned. @@ -727,6 +737,8 @@ extern ROCKSDB_LIBRARY_API const char* rocksdb_iter_timestamp( const rocksdb_iterator_t*, size_t* tslen); extern ROCKSDB_LIBRARY_API void rocksdb_iter_get_error( const rocksdb_iterator_t*, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_iter_refresh( + const rocksdb_iterator_t* iter, char** errptr); extern ROCKSDB_LIBRARY_API void rocksdb_wal_iter_next( rocksdb_wal_iterator_t* iter); @@ -747,6 +759,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_writebatch_t* rocksdb_writebatch_create( void); extern ROCKSDB_LIBRARY_API rocksdb_writebatch_t* rocksdb_writebatch_create_from( const char* rep, size_t size); +extern ROCKSDB_LIBRARY_API rocksdb_writebatch_t* +rocksdb_writebatch_create_with_params(size_t reserved_bytes, size_t max_bytes, + size_t protection_bytes_per_key, + size_t default_cf_ts_sz); extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_destroy( rocksdb_writebatch_t*); extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_clear(rocksdb_writebatch_t*); @@ -842,6 +858,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_rollback_to_save_point( rocksdb_writebatch_t*, char** errptr); extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_pop_save_point( rocksdb_writebatch_t*, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_update_timestamps( + rocksdb_writebatch_t* wb, const char* ts, size_t tslen, void* state, + size_t (*get_ts_size)(void*, uint32_t), char** errptr); /* Write batch with index */ @@ -850,6 +869,11 @@ rocksdb_writebatch_wi_create(size_t reserved_bytes, unsigned char overwrite_keys); extern ROCKSDB_LIBRARY_API rocksdb_writebatch_wi_t* rocksdb_writebatch_wi_create_from(const char* rep, size_t size); +extern ROCKSDB_LIBRARY_API rocksdb_writebatch_wi_t* +rocksdb_writebatch_wi_create_with_params( + rocksdb_comparator_t* backup_index_comparator, size_t reserved_bytes, + unsigned char overwrite_key, size_t max_bytes, + size_t protection_bytes_per_key); extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_wi_destroy( rocksdb_writebatch_wi_t*); extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_wi_clear( @@ -960,6 +984,9 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base_cf( rocksdb_writebatch_wi_t* wbwi, rocksdb_iterator_t* base_iterator, rocksdb_column_family_handle_t* cf); +extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_wi_update_timestamps( + rocksdb_writebatch_wi_t* wbwi, const char* ts, size_t tslen, void* state, + size_t (*get_ts_size)(void*, uint32_t), char** errptr); /* Options utils */ @@ -1588,6 +1615,17 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_plain_table_factory( rocksdb_options_t*, uint32_t, int, double, size_t, size_t, char, unsigned char, unsigned char); +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_options_get_write_dbid_to_manifest(rocksdb_options_t*); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_write_dbid_to_manifest( + rocksdb_options_t*, unsigned char); + +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_options_get_track_and_verify_wals_in_manifest(rocksdb_options_t*); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_track_and_verify_wals_in_manifest(rocksdb_options_t*, + unsigned char); + extern ROCKSDB_LIBRARY_API void rocksdb_options_set_min_level_to_compress( rocksdb_options_t* opt, int level); diff --git a/db.go b/db.go index f21efe7..fd3c3ff 100644 --- a/db.go +++ b/db.go @@ -1269,6 +1269,11 @@ func (db *DB) CreateColumnFamily(opts *Options, name string) (handle *ColumnFami return } +// GetDefaultColumnFamily gets default column family handle. +func (db *DB) GetDefaultColumnFamily() *ColumnFamilyHandle { + return newNativeColumnFamilyHandle(C.rocksdb_get_default_column_family_handle(db.c)) +} + // CreateColumnFamilies creates new column families. func (db *DB) CreateColumnFamilies(opts *Options, names []string) (handles []*ColumnFamilyHandle, err error) { if len(names) == 0 { diff --git a/db_test.go b/db_test.go index e07b0ea..16edd1d 100644 --- a/db_test.go +++ b/db_test.go @@ -59,6 +59,9 @@ func TestDBCRUD(t *testing.T) { ro = NewDefaultReadOptions() ) + df := db.GetDefaultColumnFamily() + require.NotNil(t, df) + // create require.Nil(t, db.Put(wo, givenKey, givenVal1)) @@ -68,6 +71,13 @@ func TestDBCRUD(t *testing.T) { require.Nil(t, err) require.EqualValues(t, v1.Data(), givenVal1) + { + _v1, err := db.GetCF(ro, df, givenKey) + defer _v1.Free() + require.Nil(t, err) + require.EqualValues(t, _v1.Data(), givenVal1) + } + // retrieve bytes _v1, err := db.GetBytes(ro, givenKey) require.Nil(t, err) diff --git a/iterator.go b/iterator.go index 12d1bd5..43cfa68 100644 --- a/iterator.go +++ b/iterator.go @@ -124,6 +124,21 @@ func (iter *Iterator) Err() (err error) { return } +// Refresh if supported, the DB state that the iterator reads from is updated to +// the latest state. The iterator will be invalidated after the call. +// Regardless of whether the iterator was created/refreshed previously +// with or without a snapshot, the iterator will be reading the +// latest DB state after this call. +// Note that you will need to call a Seek*() function to get the iterator +// back into a valid state before calling a function that assumes the +// state is already valid, like Next(). +func (iter *Iterator) Refresh() (err error) { + var cErr *C.char + C.rocksdb_iter_refresh(iter.c, &cErr) + err = fromCError(cErr) + return +} + // Close closes the iterator. func (iter *Iterator) Close() { C.rocksdb_iter_destroy(iter.c) diff --git a/iterator_test.go b/iterator_test.go index 3de6122..204be88 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -31,6 +31,8 @@ func TestIterator(t *testing.T) { } require.Nil(t, iter.Err()) require.EqualValues(t, actualKeys, givenKeys) + + require.NoError(t, iter.Refresh()) } func TestIteratorWriteManyThenIter(t *testing.T) { diff --git a/options.go b/options.go index b38247f..5cdf245 100644 --- a/options.go +++ b/options.go @@ -2044,6 +2044,50 @@ func (opts *Options) SetPlainTableFactory( ) } +// WriteDBIDToManifest writes/does not write historically DB ID has always been stored in Identity File in DB folder. +// If this flag is true, the DB ID is written to Manifest file in addition +// to the Identity file. By doing this 2 problems are solved +// 1. We don't checksum the Identity file where as Manifest file is. +// 2. Since the source of truth for DB is Manifest file DB ID will sit with +// the source of truth. Previously the Identity file could be copied +// independent of Manifest and that can result in wrong DB ID. +// +// We recommend setting this flag to true. +// +// Default: false +func (opts *Options) WriteDBIDToManifest(v bool) { + C.rocksdb_options_set_write_dbid_to_manifest(opts.c, boolToChar(v)) +} + +// IsDBIDWrittenToManifest returns if historically DB ID has always been stored in Identity File in DB folder. +func (opts *Options) IsDBIDWrittenToManifest() bool { + return charToBool(C.rocksdb_options_get_write_dbid_to_manifest(opts.c)) +} + +// ToggleTrackAndVerifyWALsInManifestFlag if true, the log numbers and sizes of the synced WALs are tracked +// in MANIFEST. During DB recovery, if a synced WAL is missing +// from disk, or the WAL's size does not match the recorded size in +// MANIFEST, an error will be reported and the recovery will be aborted. +// +// This is one additional protection against WAL corruption besides the +// per-WAL-entry checksum. +// +// Note that this option does not work with secondary instance. +// Currently, only syncing closed WALs are tracked. Calling `DB::SyncWAL()`, +// etc. or writing with `WriteOptions::sync=true` to sync the live WAL is not +// tracked for performance/efficiency reasons. +// +// Default: false +func (opts *Options) ToggleTrackAndVerifyWALsInManifestFlag(v bool) { + C.rocksdb_options_set_track_and_verify_wals_in_manifest(opts.c, boolToChar(v)) +} + +// TrackAndVerifyWALsInManifestFlag checks if the log numbers and sizes of the synced WALs are tracked +// in MANIFEST. +func (opts *Options) TrackAndVerifyWALsInManifestFlag() bool { + return charToBool(C.rocksdb_options_get_track_and_verify_wals_in_manifest(opts.c)) +} + // SetWriteBufferManager binds with a WriteBufferManager. // // The memory usage of memtable will report to this object. The same object diff --git a/options_test.go b/options_test.go index 786d1c0..f18f4b5 100644 --- a/options_test.go +++ b/options_test.go @@ -297,6 +297,14 @@ func TestOptions(t *testing.T) { opts.SetMaxSubcompactions(3) require.EqualValues(t, 3, opts.GetMaxSubcompactions()) + require.False(t, opts.IsDBIDWrittenToManifest()) + opts.WriteDBIDToManifest(true) + require.True(t, opts.IsDBIDWrittenToManifest()) + + require.False(t, opts.TrackAndVerifyWALsInManifestFlag()) + opts.ToggleTrackAndVerifyWALsInManifestFlag(true) + require.True(t, opts.TrackAndVerifyWALsInManifestFlag()) + opts.SetMaxBytesForLevelMultiplierAdditional([]int{2 << 20}) opts.SetDbLogDir("./abc") diff --git a/write_batch.go b/write_batch.go index 56fa5cc..aacdf21 100644 --- a/write_batch.go +++ b/write_batch.go @@ -19,6 +19,11 @@ func NewWriteBatch() *WriteBatch { return newNativeWriteBatch(C.rocksdb_writebatch_create()) } +// NewWriteBatchWithParams with params. +func NewWriteBatchWithParams(reservedBytes, maxBytes, protectionBytesPerKey, defaultCFTs int) *WriteBatch { + return newNativeWriteBatch(C.rocksdb_writebatch_create_with_params(C.size_t(reservedBytes), C.size_t(maxBytes), C.size_t(protectionBytesPerKey), C.size_t(defaultCFTs))) +} + // NewNativeWriteBatch create a WriteBatch object. func newNativeWriteBatch(c *C.rocksdb_writebatch_t) *WriteBatch { return &WriteBatch{ @@ -129,7 +134,7 @@ func (wb *WriteBatch) SingleDeleteCFWithTS(cf *ColumnFamilyHandle, key, ts []byt } // DeleteRange deletes keys that are between [startKey, endKey) -func (wb *WriteBatch) DeleteRange(startKey []byte, endKey []byte) { +func (wb *WriteBatch) DeleteRange(startKey, endKey []byte) { cStartKey := refGoBytes(startKey) cEndKey := refGoBytes(endKey) C.rocksdb_writebatch_delete_range(wb.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) @@ -137,7 +142,7 @@ func (wb *WriteBatch) DeleteRange(startKey []byte, endKey []byte) { // DeleteRangeCF deletes keys that are between [startKey, endKey) and // belong to a given column family -func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, startKey []byte, endKey []byte) { +func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, startKey, endKey []byte) { cStartKey := refGoBytes(startKey) cEndKey := refGoBytes(endKey) C.rocksdb_writebatch_delete_range_cf(wb.c, cf.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) diff --git a/write_batch_test.go b/write_batch_test.go index 9384b6c..d1479f7 100644 --- a/write_batch_test.go +++ b/write_batch_test.go @@ -56,6 +56,55 @@ func TestWriteBatch(t *testing.T) { require.True(t, v1.Data() == nil) } +func TestWriteBatchWithParams(t *testing.T) { + t.Parallel() + + db := newTestDB(t, nil) + defer db.Close() + + var ( + givenKey1 = []byte("key1") + givenVal1 = []byte("val1") + givenKey2 = []byte("key2") + ) + wo := NewDefaultWriteOptions() + require.Nil(t, db.Put(wo, givenKey2, []byte("foo"))) + + // create and fill the write batch + wb := NewWriteBatchWithParams(10000, 200000, 10, 0) + defer wb.Destroy() + wb.Put(givenKey1, givenVal1) + wb.Delete(givenKey2) + require.EqualValues(t, wb.Count(), 2) + + // perform the batch + require.Nil(t, db.Write(wo, wb)) + + // check changes + ro := NewDefaultReadOptions() + v1, err := db.Get(ro, givenKey1) + defer v1.Free() + require.Nil(t, err) + require.EqualValues(t, v1.Data(), givenVal1) + + v2, err := db.Get(ro, givenKey2) + defer v2.Free() + require.Nil(t, err) + require.True(t, v2.Data() == nil) + + // DeleteRange test + wb.Clear() + wb.DeleteRange(givenKey1, givenKey2) + + // perform the batch + require.Nil(t, db.Write(wo, wb)) + + v1, err = db.Get(ro, givenKey1) + defer v1.Free() + require.Nil(t, err) + require.True(t, v1.Data() == nil) +} + func TestWriteBatchIterator(t *testing.T) { t.Parallel() diff --git a/write_batch_wi.go b/write_batch_wi.go index 5dbef6b..3b6f5c5 100644 --- a/write_batch_wi.go +++ b/write_batch_wi.go @@ -19,6 +19,17 @@ func NewWriteBatchWI(reservedBytes uint, overwriteKeys bool) *WriteBatchWI { return newNativeWriteBatchWI(cWB) } +// NewWriteBatchWIWithParams with params. +func NewWriteBatchWIWithParams(cp *Comparator, reservedBytes int, overwriteKey bool, maxBytes, protectionBytesPerKey int) *WriteBatchWI { + return newNativeWriteBatchWI(C.rocksdb_writebatch_wi_create_with_params( + cp.c, + C.size_t(reservedBytes), + boolToChar(overwriteKey), + C.size_t(maxBytes), + C.size_t(protectionBytesPerKey), + )) +} + // NewNativeWriteBatchWI create a WriteBatchWI object. func newNativeWriteBatchWI(c *C.rocksdb_writebatch_wi_t) *WriteBatchWI { return &WriteBatchWI{c: c} @@ -99,7 +110,7 @@ func (wb *WriteBatchWI) SingleDeleteCF(cf *ColumnFamilyHandle, key []byte) { } // DeleteRange deletes keys that are between [startKey, endKey) -func (wb *WriteBatchWI) DeleteRange(startKey []byte, endKey []byte) { +func (wb *WriteBatchWI) DeleteRange(startKey, endKey []byte) { cStartKey := refGoBytes(startKey) cEndKey := refGoBytes(endKey) C.rocksdb_writebatch_wi_delete_range(wb.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) @@ -107,7 +118,7 @@ func (wb *WriteBatchWI) DeleteRange(startKey []byte, endKey []byte) { // DeleteRangeCF deletes keys that are between [startKey, endKey) and // belong to a given column family -func (wb *WriteBatchWI) DeleteRangeCF(cf *ColumnFamilyHandle, startKey []byte, endKey []byte) { +func (wb *WriteBatchWI) DeleteRangeCF(cf *ColumnFamilyHandle, startKey, endKey []byte) { cStartKey := refGoBytes(startKey) cEndKey := refGoBytes(endKey) C.rocksdb_writebatch_wi_delete_range_cf(wb.c, cf.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey)))