Skip to content

Commit

Permalink
freezer: support reset ancient head+tail;
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbundler committed Dec 8, 2023
1 parent ec25525 commit 080828f
Show file tree
Hide file tree
Showing 17 changed files with 230 additions and 23 deletions.
8 changes: 1 addition & 7 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"math"
"math/big"
"math/rand"
"runtime/debug"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -417,7 +416,6 @@ func (p *Parlia) getParent(chain consensus.ChainHeaderReader, header *types.Head
}

if parent == nil || parent.Number.Uint64() != number-1 || parent.Hash() != header.ParentHash {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}
return parent, nil
Expand Down Expand Up @@ -540,7 +538,6 @@ func (p *Parlia) verifyHeader(chain consensus.ChainHeaderReader, header *types.H
// Ensure that the extra-data contains a signer list on checkpoint, but none otherwise
signersBytes := getValidatorBytesFromHeader(header, p.chainConfig, p.config)
if !isEpoch && len(signersBytes) != 0 {
log.Error("validate header err", "number", header.Number, "hash", header.Hash(), "chainconfig", p.chainConfig, "config", p.config, "bytes", len(signersBytes))
return errExtraValidators
}
if isEpoch && len(signersBytes) == 0 {
Expand Down Expand Up @@ -726,15 +723,13 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
// If we have explicit parents, pick from there (enforced)
header = parents[len(parents)-1]
if header.Hash() != hash || header.Number.Uint64() != number {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}
parents = parents[:len(parents)-1]
} else {
// No explicit parents (or no more left), reach out to the database
header = chain.GetHeader(hash, number)
if header == nil {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}
}
Expand Down Expand Up @@ -794,7 +789,7 @@ func (p *Parlia) findSnapFromHistorySegment(hash common.Hash, number uint64) (*S
tmp.config = p.config
tmp.sigCache = p.signatures
tmp.ethAPI = p.ethAPI
log.Info("found history segment snapshot", "number", number, "hash", hash, "segment", segment)
log.Debug("found history segment snapshot", "number", number, "hash", hash, "segment", segment)
return &tmp, true
}

Expand Down Expand Up @@ -999,7 +994,6 @@ func (p *Parlia) Prepare(chain consensus.ChainHeaderReader, header *types.Header
// Ensure the timestamp has the correct delay
parent := chain.GetHeader(header.ParentHash, number-1)
if parent == nil {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return consensus.ErrUnknownAncestor
}
header.Time = p.blockTimeForRamanujanFork(snap, header, parent)
Expand Down
2 changes: 0 additions & 2 deletions consensus/parlia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime/debug"
"sort"

lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -272,7 +271,6 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea
if number > 0 && number%s.config.Epoch == uint64(len(snap.Validators)/2) {
checkpointHeader := FindAncientHeader(header, uint64(len(snap.Validators)/2), chain, parents)
if checkpointHeader == nil {
log.Info("cannot find ancestor, FindAncientHeader", "number", number, "stack", string(debug.Stack()))
return nil, consensus.ErrUnknownAncestor
}

Expand Down
4 changes: 4 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,3 +470,7 @@ func (bc *BlockChain) WriteCanonicalHeaders(headers []*types.Header, tds []uint6
}
return nil
}

func (bc *BlockChain) FreezerDBReset(tail, head uint64) error {
return bc.db.AncientReset(tail, head)
}
3 changes: 1 addition & 2 deletions core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func SetupGenesisBlockWithOverride(db ethdb.Database, triedb *trie.Database, gen
log.Info("Writing default main-net genesis block")
genesis = DefaultGenesisBlock()
} else {
log.Info("Writing custom genesis block", "config", genesis.Config)
log.Info("Writing custom genesis block")
}
block, err := genesis.Commit(db, triedb)
if err != nil {
Expand Down Expand Up @@ -408,7 +408,6 @@ func LoadChainConfig(db ethdb.Database, genesis *Genesis) (*params.ChainConfig,
if stored != (common.Hash{}) {
storedcfg := rawdb.ReadChainConfig(db, stored)
if storedcfg != nil {
log.Info("found chain config", "hash", stored, "cfg", storedcfg)
return storedcfg, stored, nil
}
}
Expand Down
4 changes: 4 additions & 0 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ func (db *nofreezedb) AncientDatadir() (string, error) {
return "", errNotSupported
}

func (db *nofreezedb) AncientReset(tail, head uint64) error {
return errNotSupported
}

// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
Expand Down
29 changes: 29 additions & 0 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,3 +598,32 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
}
return nil
}

func (f *Freezer) AncientReset(tail, head uint64) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

for i := range f.tables {
nt, err := f.tables[i].resetItems(tail, head)
if err != nil {
return err
}
f.tables[i] = nt
}

if err := f.repair(); err != nil {
for _, table := range f.tables {
table.Close()
}
return err
}

f.frozen.Add(f.offset)
f.tail.Add(f.offset)
f.writeBatch = newFreezerBatch(f)
log.Info("Ancient database reset", "tail", f.tail.Load(), "frozen", f.frozen.Load())
return nil
}
7 changes: 7 additions & 0 deletions core/rawdb/freezer_resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ func (f *ResettableFreezer) TruncateTail(tail uint64) (uint64, error) {
return f.freezer.TruncateTail(tail)
}

func (f *ResettableFreezer) AncientReset(tail, head uint64) error {
f.lock.RLock()
defer f.lock.RUnlock()

return f.freezer.AncientReset(tail, head)
}

// Sync flushes all data tables to disk.
func (f *ResettableFreezer) Sync() error {
f.lock.RLock()
Expand Down
51 changes: 51 additions & 0 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,57 @@ func (t *freezerTable) truncateTail(items uint64) error {
return nil
}

// resetItems reset freezer table head & tail
func (t *freezerTable) resetItems(tail, head uint64) (*freezerTable, error) {
if t.readonly {
return nil, errors.New("resetItems in readonly mode")
}
itemHidden := t.itemHidden.Load()
items := t.items.Load()
if tail != head && (itemHidden > tail || items < head) {
return nil, errors.New("cannot reset to non-exist range")
}

var err error
if tail != head {
if err = t.truncateHead(head); err != nil {
return nil, err
}
if err = t.truncateTail(tail); err != nil {
return nil, err
}
return t, nil
}

// if tail == head, it means table reset to 0 item
t.releaseFilesAfter(t.tailId-1, true)
t.head.Close()
os.Remove(t.head.Name())
t.index.Close()
os.Remove(t.index.Name())
t.meta.Close()
os.Remove(t.meta.Name())

var idxName string
if t.noCompression {
idxName = fmt.Sprintf("%s.ridx", t.name) // raw index file
} else {
idxName = fmt.Sprintf("%s.cidx", t.name) // compressed index file
}
index, err := openFreezerFileForAppend(filepath.Join(t.path, idxName))
if err != nil {
return nil, err
}
tailIndex := indexEntry{
offset: uint32(tail),
}
if _, err = index.Write(tailIndex.append(nil)); err != nil {
return nil, err
}

return newFreezerTable(t.path, t.name, t.noCompression, t.readonly)
}

// Close closes all opened files.
func (t *freezerTable) Close() error {
t.lock.Lock()
Expand Down
97 changes: 97 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"testing"
"testing/quick"

"github.com/stretchr/testify/assert"

"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/metrics"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1337,3 +1339,98 @@ func TestRandom(t *testing.T) {
t.Fatal(err)
}
}

func TestResetItems(t *testing.T) {
t.Parallel()
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("truncate-tail-%d", rand.Uint64())

// Fill table
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
if err != nil {
t.Fatal(err)
}

// Write 7 x 20 bytes, splitting out into four files
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11)))
require.NoError(t, batch.commit())

// nothing to do, all the items should still be there.
f, err = f.resetItems(0, 7)
assert.NoError(t, err)
fmt.Println(f.dumpIndexString(0, 1000))
checkRetrieve(t, f, map[uint64][]byte{
0: getChunk(20, 0xFF),
1: getChunk(20, 0xEE),
2: getChunk(20, 0xdd),
3: getChunk(20, 0xcc),
4: getChunk(20, 0xbb),
5: getChunk(20, 0xaa),
6: getChunk(20, 0x11),
})

f, err = f.resetItems(1, 5)
assert.NoError(t, err)
_, err = f.resetItems(0, 5)
assert.Error(t, err)
_, err = f.resetItems(1, 6)
assert.Error(t, err)

fmt.Println(f.dumpIndexString(0, 1000))
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
})
checkRetrieve(t, f, map[uint64][]byte{
1: getChunk(20, 0xEE),
2: getChunk(20, 0xdd),
3: getChunk(20, 0xcc),
4: getChunk(20, 0xbb),
})

f, err = f.resetItems(4, 4)
assert.NoError(t, err)
checkRetrieveError(t, f, map[uint64]error{
4: errOutOfBounds,
})

batch = f.newBatch(0)
require.Error(t, batch.AppendRaw(0, getChunk(20, 0xa0)))
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xa4)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xa5)))
require.NoError(t, batch.commit())
fmt.Println(f.dumpIndexString(0, 1000))

// Reopen the table, the deletion information should be persisted as well
f.Close()
f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
if err != nil {
t.Fatal(err)
}
fmt.Println(f.dumpIndexString(0, 1000))
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
})
checkRetrieve(t, f, map[uint64][]byte{
4: getChunk(20, 0xa4),
5: getChunk(20, 0xa5),
})

// truncate all, the entire freezer should be deleted
f.truncateTail(6)
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
1: errOutOfBounds,
2: errOutOfBounds,
3: errOutOfBounds,
4: errOutOfBounds,
5: errOutOfBounds,
6: errOutOfBounds,
})
}
4 changes: 4 additions & 0 deletions core/rawdb/prunedfreezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func (f *prunedfreezer) TruncateTail(tail uint64) (uint64, error) {
return 0, errNotSupported
}

func (f *prunedfreezer) AncientReset(tail, head uint64) error {
return errNotSupported
}

// Sync flushes meta data tables to disk.
func (f *prunedfreezer) Sync() error {
WriteFrozenOfAncientFreezer(f.db, atomic.LoadUint64(&f.frozen))
Expand Down
4 changes: 4 additions & 0 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) {
return t.db.TruncateTail(items)
}

func (t *table) AncientReset(tail, head uint64) error {
return t.db.AncientReset(tail, head)
}

// Sync is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) Sync() error {
Expand Down
5 changes: 2 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
if p, ok := eth.engine.(consensus.PoSA); ok {
log.Info("setup consensus engine history segment", "lastSegment", lastSegment)
p.SetupHistorySegment(hsm)
}
}
Expand Down Expand Up @@ -278,7 +277,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
bc.SetupHistorySegment(hsm)
log.Info("setup blockchain history segment", "lastSegment", lastSegment)
}
return bc, nil
})
Expand Down Expand Up @@ -418,7 +416,8 @@ func GetHistorySegmentAndLastSegment(db ethdb.Database, genesisHash common.Hash,

// check segment if match hard code
if err = rawdb.AvailableHistorySegment(db, lastSegment); err != nil {
return nil, nil, err
log.Warn("there is no available history to prune", "head", latestHeader.Number)
return hsm, nil, nil
}
return hsm, lastSegment, nil
}
Expand Down
Loading

0 comments on commit 080828f

Please sign in to comment.