diff --git a/go/libraries/doltcore/dconfig/envvars.go b/go/libraries/doltcore/dconfig/envvars.go index f9df29d7e99..8f46bd1057d 100755 --- a/go/libraries/doltcore/dconfig/envvars.go +++ b/go/libraries/doltcore/dconfig/envvars.go @@ -43,6 +43,7 @@ const ( EnvDoltAuthorDate = "DOLT_AUTHOR_DATE" EnvDoltCommitterDate = "DOLT_COMMITTER_DATE" EnvDbNameReplace = "DOLT_DBNAME_REPLACE" + EnvSkipInvalidJournalRecords = "DOLT_SKIP_INVALID_JOURNAL_RECORDS" EnvDoltRootHost = "DOLT_ROOT_HOST" EnvDoltRootPassword = "DOLT_ROOT_PASSWORD" ) diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index 33f81340e39..bab535adcde 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -21,8 +21,11 @@ import ( "errors" "fmt" "io" + "math" + "os" "time" + "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" ) @@ -101,14 +104,23 @@ var journalRecordTimestampGenerator = func() uint64 { } func chunkRecordSize(c CompressedChunk) (recordSz, payloadOff uint32) { - recordSz += journalRecLenSz - recordSz += journalRecTagSz + journalRecKindSz - recordSz += journalRecTagSz + journalRecAddrSz - recordSz += journalRecTagSz // payload tag - payloadOff = recordSz + payloadOff += journalRecLenSz + payloadOff += journalRecTagSz + journalRecKindSz + payloadOff += journalRecTagSz + journalRecAddrSz + payloadOff += journalRecTagSz // payload tag + + // Make sure the size of the chunk wouldn't overflow the uint32 record length + maxCompressedChunkSize := math.MaxUint32 - int(payloadOff+journalRecChecksumSz) + if len(c.FullCompressedChunk) > maxCompressedChunkSize { + panic(fmt.Sprintf("compressed chunk size (%d) is larger than max size allowed "+ + "for chunk record (%d)", len(c.FullCompressedChunk), maxCompressedChunkSize)) + } + + recordSz = payloadOff recordSz += uint32(len(c.FullCompressedChunk)) recordSz += journalRecChecksumSz - return + + return recordSz, payloadOff } func rootHashRecordSize() (recordSz int) { @@ -121,7 +133,9 @@ func rootHashRecordSize() (recordSz int) { } func writeChunkRecord(buf []byte, c CompressedChunk) (n uint32) { - // length + // length – comes back as an unsigned 32 bit int, which aligns with the four bytes used + // in the journal storage protocol to store the total record length, assuring that we can't + // read a length that is too large to safely write. l, _ := chunkRecordSize(c) writeUint32(buf[:journalRecLenSz], l) n += journalRecLenSz @@ -211,16 +225,27 @@ func readJournalRecord(buf []byte) (rec journalRec, err error) { return } -func validateJournalRecord(buf []byte) bool { +// validateJournalRecord performs some sanity checks on the buffer |buf| containing a journal +// record, such as checking that the length of the record is not too short, and checking the +// checksum. If any problems are detected, an erorr is returned, otherwise nil is returned. +func validateJournalRecord(buf []byte) error { if len(buf) < (journalRecLenSz + journalRecChecksumSz) { - return false + return fmt.Errorf("invalid journal record: buffer length too small (%d < %d)", len(buf), (journalRecLenSz + journalRecChecksumSz)) } + off := readUint32(buf) if int(off) > len(buf) { - return false + return fmt.Errorf("invalid journal record: offset is greater than length of buffer (%d > %d)", + off, len(buf)) } + off -= indexRecChecksumSz - return crc(buf[:off]) == readUint32(buf[off:]) + crcMatches := crc(buf[:off]) == readUint32(buf[off:]) + if !crcMatches { + return fmt.Errorf("invalid journal record: CRC checksum does not match") + } + + return nil } // processJournalRecords iterates over a chunk journal's records by reading from disk using |r|, starting at @@ -241,36 +266,54 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f for { // peek to read next record size if buf, err = rdr.Peek(uint32Size); err != nil { - break + if err == io.EOF { + break + } else { + return 0, err + } } + // The first 4 bytes in the journal record are the total length of the record (including + // these first four bytes) l := readUint32(buf) - if buf, err = rdr.Peek(int(l)); err != nil { + + // The journal file data is initialized to a block of zero bytes, so if we read a record + // length of 0, we know we've reached the end of the journal records and are starting to + // read the zero padding. + if l == 0 { break } - if !validateJournalRecord(buf) { - // We read the journal file until we run into an invalid record. - break // stop if we can't validate |rec| + if buf, err = rdr.Peek(int(l)); err != nil { + return 0, err + } + + if err = validateJournalRecord(buf); err != nil { + // If the DOLT_SKIP_INVALID_JOURNAL_RECORDS env var is set, then we stop reading the journal + // as soon as we hit an invalid record. This allows users to opt-in to the previous behavior + // where we process as many journal records we can, but stop once we hit an invalid record. + if os.Getenv(dconfig.EnvSkipInvalidJournalRecords) != "" { + break + } else { + return 0, err + } } var rec journalRec if rec, err = readJournalRecord(buf); err != nil { - break // failed to read valid record + return 0, err } if err = cb(off, rec); err != nil { - break + return 0, err } // advance |rdr| state by |l| bytes if _, err = io.ReadFull(rdr, buf); err != nil { - break + return 0, err } off += int64(len(buf)) } - if err != nil && err != io.EOF { - return 0, err - } + // reset the file pointer to end of the last // successfully processed journal record if _, err = r.Seek(off, 0); err != nil { diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index 0e8b1cc2cdc..d3ece60633f 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -18,12 +18,14 @@ import ( "bytes" "context" "math/rand" + "os" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" @@ -71,10 +73,10 @@ func TestUnknownJournalRecordTag(t *testing.T) { // test behavior encountering unknown tag buf := makeUnknownTagJournalRecord() // checksum is ok - ok := validateJournalRecord(buf) - assert.True(t, ok) + err := validateJournalRecord(buf) + assert.NoError(t, err) // reading record fails - _, err := readJournalRecord(buf) + _, err = readJournalRecord(buf) assert.Error(t, err) } @@ -118,13 +120,25 @@ func TestProcessJournalRecords(t *testing.T) { assert.Equal(t, int(off), int(n)) require.NoError(t, err) + // write a bogus record to the end and verify that we get an error + i, sum = 0, 0 + writeCorruptJournalRecord(journal[off:]) + n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check) + require.Error(t, err) + require.Contains(t, err.Error(), "CRC checksum does not match") + assert.Equal(t, cnt, i) + // Since an error was encountered, the returned offset is 0 + assert.Equal(t, 0, int(n)) + + // Turn on the env setting to stop processing journal records once we hit an invalid record + require.NoError(t, os.Setenv(dconfig.EnvSkipInvalidJournalRecords, "1")) i, sum = 0, 0 // write a bogus record to the end and process again writeCorruptJournalRecord(journal[off:]) n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check) + require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) - require.NoError(t, err) } func randomMemTable(cnt int) (*memTable, map[hash.Hash]chunks.Chunk) { diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 888a48833d8..caea111b147 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -35,13 +35,17 @@ import ( ) const ( + // chunkJournalFileSize is the size we initialize the journal file to when it is first created. We + // create a 16KB block of zero-initialized data and then sync the file to the first byte. We do this + // to ensure that we can write to the journal file and that we have some space for initial records. + // This probably isn't strictly necessary, but it also doesn't hurt. chunkJournalFileSize = 16 * 1024 - // todo(andy): buffer must be able to hold an entire record, - // but we don't have a hard limit on record size right now. - // JSON data has cases where it won't chunk down as small as other data, - // so we have increased this to 5MB. If/when JSON chunking handles those - // cases, we could decrease this size to 1MB again. + // journalWriterBuffSize is the size of the statically allocated buffer where journal records are + // built before being written to the journal file on disk. There is not a hard limit on the size + // of records – specifically, some newer data chunking formats (i.e. optimized JSON storage) can + // produce chunks (and therefore chunk records) that are megabytes in size. The current limit of + // 5MB should be large enough to cover all but the most extreme cases. journalWriterBuffSize = 5 * 1024 * 1024 chunkJournalAddr = chunks.JournalFileID @@ -118,6 +122,9 @@ func createJournalWriter(ctx context.Context, path string) (wr *journalWriter, e return nil, err } + // Open the journal file and initialize it with 16KB of zero bytes. This is intended to + // ensure that we can write to the journal and to allocate space for the first set of + // records, but probably isn't strictly necessary. if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666); err != nil { return nil, err } @@ -183,7 +190,7 @@ var _ io.Closer = &journalWriter{} // The journal index will bw truncated to the last valid batch of lookups. Lookups with offsets // larger than the position of the last valid lookup metadata are rewritten to the index as they // are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we -// extend the jounral index with one metadata flush before existing this function to save indexing +// extend the journal index with one metadata flush before existing this function to save indexing // progress. func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer) (last hash.Hash, err error) { wr.lock.Lock()