Skip to content

Commit

Permalink
Use O_SYNC for filestore meta files when SyncAlways is enabled (#…
Browse files Browse the repository at this point in the history
…5729)

This will use `O_SYNC` when writing `meta.inf`, `meta.sum` and
`meta.key` files for both streams and consumers when the `SyncAlways`
filestore option is enabled. It will also do the same when writing the
consumer state file, or rewriting it if we are converting between
ciphers.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Jul 31, 2024
2 parents 896cf0f + 540468a commit 13f2738
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 33 deletions.
68 changes: 35 additions & 33 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"hash"
"io"
"io/fs"
"math"
"net"
"os"
Expand Down Expand Up @@ -765,9 +766,7 @@ func (fs *fileStore) setupAEK() error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -803,19 +802,15 @@ func (fs *fileStore) writeStreamMeta() error {
b = fs.aek.Seal(nonce, nonce, b, nil)
}

<-dios
err = os.WriteFile(meta, b, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(meta, b, defaultFilePerms)
if err != nil {
return err
}
fs.hh.Reset()
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
<-dios
err = os.WriteFile(sum, []byte(checksum), defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -1206,9 +1201,7 @@ func (mb *msgBlock) convertCipher() error {
// the old keyfile back.
if err := fs.genEncryptionKeysForBlock(mb); err != nil {
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
<-dios
os.WriteFile(keyFile, ekey, defaultFilePerms)
dios <- struct{}{}
fs.writeFileWithOptionalSync(keyFile, ekey, defaultFilePerms)
return err
}
mb.bek.XORKeyStream(buf, buf)
Expand Down Expand Up @@ -3401,9 +3394,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -8695,9 +8686,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
if err != nil {
return nil, err
}
<-dios
err = os.WriteFile(o.ifn, state, defaultFilePerms)
dios <- struct{}{}
err = fs.writeFileWithOptionalSync(o.ifn, state, defaultFilePerms)
if err != nil {
if didCreate {
os.RemoveAll(odir)
Expand Down Expand Up @@ -9171,9 +9160,7 @@ func (o *consumerFileStore) writeState(buf []byte) error {
o.mu.Unlock()

// Lock not held here but we do limit number of outstanding calls that could block OS threads.
<-dios
err := os.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
err := o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms)

o.mu.Lock()
if err != nil {
Expand Down Expand Up @@ -9212,9 +9199,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err = os.WriteFile(keyFile, encrypted, defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms)
if err != nil {
return err
}
Expand All @@ -9235,9 +9220,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
b = cfs.aek.Seal(nonce, nonce, b, nil)
}

<-dios
err = os.WriteFile(meta, b, defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(meta, b, defaultFilePerms)
if err != nil {
return err
}
Expand All @@ -9246,9 +9229,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)

<-dios
err = os.WriteFile(sum, []byte(checksum), defaultFilePerms)
dios <- struct{}{}
err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
return err
}
Expand Down Expand Up @@ -9543,9 +9524,7 @@ func (o *consumerFileStore) Stop() error {

if len(buf) > 0 {
o.waitOnFlusher()
<-dios
err = os.WriteFile(ifn, buf, defaultFilePerms)
dios <- struct{}{}
err = o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms)
}
return err
}
Expand Down Expand Up @@ -9779,3 +9758,26 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {

return output, reader.Close()
}

// writeFileWithOptionalSync is equivalent to os.WriteFile() but optionally
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if fs.fcfg.SyncAlways {
flags |= os.O_SYNC
}
f, err := os.OpenFile(name, flags, perm)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
_ = f.Close()
return err
}
return f.Close()
}
26 changes: 26 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7754,3 +7754,29 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) {
require_Error(b, err, ErrStoreEOF)
}
}

func Benchmark_FileStoreCreateConsumerStores(b *testing.B) {
for _, syncAlways := range []bool{true, false} {
b.Run(fmt.Sprintf("%v", syncAlways), func(b *testing.B) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: b.TempDir(), SyncAlways: syncAlways},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(b, err)
defer fs.Stop()

oconfig := ConsumerConfig{
DeliverSubject: "d",
FilterSubject: "foo",
AckPolicy: AckAll,
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
oname := fmt.Sprintf("obs22_%d", i)
ofs, err := fs.ConsumerStore(oname, &oconfig)
require_NoError(b, err)
require_NoError(b, ofs.Stop())
}
})
}
}

0 comments on commit 13f2738

Please sign in to comment.