Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtorc: fetch shard names only every --topo-information-refresh-duration #17319

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
20 changes: 14 additions & 6 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ CREATE TABLE vitess_keyspace (
keyspace varchar(128) NOT NULL,
keyspace_type smallint(5) NOT NULL,
durability_policy varchar(512) NOT NULL,
updated_timestamp timestamp NOT NULL,
PRIMARY KEY (keyspace)
)`,
`
Expand All @@ -305,24 +306,31 @@ CREATE TABLE vitess_shard (
shard varchar(128) NOT NULL,
primary_alias varchar(512) NOT NULL,
primary_timestamp varchar(512) NOT NULL,
updated_timestamp timestamp NOT NULL,
PRIMARY KEY (keyspace, shard)
)`,
`
CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port)
CREATE INDEX source_host_port_idx_database_instance_database_instance ON database_instance (source_host, source_port)
`,
`
CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard)
CREATE INDEX keyspace_shard_idx_topology_recovery ON topology_recovery (keyspace, shard)
`,
`
CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery)
CREATE INDEX end_recovery_idx_topology_recovery ON topology_recovery (end_recovery)
`,
`
CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp)
CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (alias, analysis_timestamp)
`,
`
CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id)
CREATE INDEX detection_idx_topology_recovery ON topology_recovery (detection_id)
`,
`
CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id)
CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps (recovery_id)
`,
`
CREATE INDEX updated_timestamp_idx_vitess_keyspace ON vitess_keyspace (updated_timestamp)
`,
`
CREATE INDEX updated_timestamp_idx_vitess_shard ON vitess_shard (updated_timestamp)
`,
}
4 changes: 2 additions & 2 deletions go/vt/vtorc/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var (
`INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone2-0000000200','localhost',6756,'ks','0','zone2',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653222207569643a3230307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363735357d20706f72745f6d61703a7b6b65793a227674222076616c75653a363735347d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363735362064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00');`,
`INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync');`,
`INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00','2022-12-28 07:23:25.129898+00:00');`,
`INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync','2022-12-28 07:23:25.129898+00:00');`,
}
)

Expand Down
40 changes: 24 additions & 16 deletions go/vt/vtorc/inst/keyspace_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package inst

import (
"errors"
"time"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -35,14 +36,12 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) {
return nil, err
}

query := `
select
keyspace_type,
durability_policy
from
vitess_keyspace
where keyspace=?
`
query := `SELECT
keyspace_type,
durability_policy
FROM
vitess_keyspace
WHERE keyspace = ?`
args := sqlutils.Args(keyspaceName)
keyspace := &topo.KeyspaceInfo{
Keyspace: &topodatapb.Keyspace{},
Expand All @@ -64,21 +63,30 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) {

// SaveKeyspace saves the keyspace record against the keyspace name.
func SaveKeyspace(keyspace *topo.KeyspaceInfo) error {
_, err := db.ExecVTOrc(`
replace
into vitess_keyspace (
keyspace, keyspace_type, durability_policy
) values (
?, ?, ?
)
`,
_, err := db.ExecVTOrc(`REPLACE
INTO vitess_keyspace (
keyspace, keyspace_type, durability_policy, updated_timestamp
) VALUES (
?, ?, ?, DATETIME('now')
)`,
keyspace.KeyspaceName(),
int(keyspace.KeyspaceType),
keyspace.GetDurabilityPolicy(),
)
return err
}

// DeleteStaleKeyspaces deletes keyspace records that have not been updated since a provided time.
func DeleteStaleKeyspaces(staleTime time.Time) error {
_, err := db.ExecVTOrc(`DELETE FROM vitess_keyspace
WHERE
updated_timestamp <= DATETIME(?, 'unixepoch')
`,
staleTime.Unix(),
)
return err
}

// GetDurabilityPolicy gets the durability policy for the given keyspace.
func GetDurabilityPolicy(keyspace string) (reparentutil.Durabler, error) {
ki, err := ReadKeyspace(keyspace)
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtorc/inst/keyspace_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package inst

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -124,3 +125,37 @@ func TestSaveAndReadKeyspace(t *testing.T) {
})
}
}

func TestDeleteStaleKeyspaces(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

keyspaceInfo := &topo.KeyspaceInfo{
Keyspace: &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: "none",
BaseKeyspace: "baseKeyspace",
},
}
keyspaceInfo.SetKeyspaceName("ks1")
err := SaveKeyspace(keyspaceInfo)
require.NoError(t, err)

readKeyspaceInfo, err := ReadKeyspace("ks1")
require.NoError(t, err)
require.NotNil(t, readKeyspaceInfo)

// test a staletime before save causes no delete
require.NoError(t, DeleteStaleKeyspaces(time.Now().Add(-time.Hour)))
readKeyspaceInfo, err = ReadKeyspace("ks1")
require.NoError(t, err)
require.NotNil(t, readKeyspaceInfo)

// test statetime of now deletes everything
require.NoError(t, DeleteStaleKeyspaces(time.Now()))
readKeyspaceInfo, err = ReadKeyspace("ks1")
require.Error(t, err)
require.Nil(t, readKeyspaceInfo)
}
55 changes: 45 additions & 10 deletions go/vt/vtorc/inst/shard_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package inst

import (
"errors"
"time"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
Expand All @@ -38,13 +39,12 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s
return
}

query := `
select
query := `SELECT
primary_alias, primary_timestamp
from
FROM
vitess_shard
where keyspace=? and shard=?
`
WHERE
keyspace = ? AND shard = ?`
args := sqlutils.Args(keyspaceName, shardName)
shardFound := false
err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error {
Expand All @@ -62,14 +62,38 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s
return primaryAlias, primaryTimestamp, nil
}

// GetAllShardNames returns the names of all keyspace/shards.
func GetAllShardNames() (map[string][]string, error) {
shards := make(map[string][]string, 0)
query := `SELECT keyspace, shard FROM vitess_shard`
err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
keyspace := row.GetString("keyspace")
shards[keyspace] = append(shards[keyspace], row.GetString("shard"))
return nil
})
return shards, err
}

// GetKeyspaceShardNames returns the names of all shards in a keyspace.
func GetKeyspaceShardNames(keyspaceName string) ([]string, error) {
shards := make([]string, 0)
query := `SELECT shard FROM vitess_shard WHERE keyspace = ?`
args := sqlutils.Args(keyspaceName)
err := db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error {
shards = append(shards, row.GetString("shard"))
return nil
})
return shards, err
}

// SaveShard saves the shard record against the shard name.
func SaveShard(shard *topo.ShardInfo) error {
_, err := db.ExecVTOrc(`
replace
into vitess_shard (
keyspace, shard, primary_alias, primary_timestamp
) values (
?, ?, ?, ?
REPLACE
INTO vitess_shard (
keyspace, shard, primary_alias, primary_timestamp, updated_timestamp
) VALUES (
?, ?, ?, ?, DATETIME('now')
)
`,
shard.Keyspace(),
Expand All @@ -80,6 +104,17 @@ func SaveShard(shard *topo.ShardInfo) error {
return err
}

// DeleteStaleShards deletes shard records that have not been updated since a provided time.
func DeleteStaleShards(staleTime time.Time) error {
_, err := db.ExecVTOrc(`DELETE FROM vitess_shard
WHERE
updated_timestamp <= DATETIME(?, 'unixepoch')
`,
staleTime.Unix(),
)
return err
}

// getShardPrimaryAliasString gets the shard primary alias to be stored as a string in the database.
func getShardPrimaryAliasString(shard *topo.ShardInfo) string {
if shard.PrimaryAlias == nil {
Expand Down
60 changes: 60 additions & 0 deletions go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,63 @@ func TestSaveAndReadShard(t *testing.T) {
})
}
}

func TestGetAllShardNames(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil)
err := SaveShard(shardInfo)
require.NoError(t, err)

shardNames, err := GetAllShardNames()
require.NoError(t, err)
require.Equal(t, map[string][]string{
"ks1": {"-80"},
}, shardNames)
}

func TestGetKeyspaceShardNames(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

for _, shardName := range []string{"-80", "80-"} {
shardInfo := topo.NewShardInfo("ks1", shardName, &topodatapb.Shard{}, nil)
err := SaveShard(shardInfo)
require.NoError(t, err)
}

shardNames, err := GetKeyspaceShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-80", "80-"}, shardNames)
}

func TestDeleteStaleShards(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil)
err := SaveShard(shardInfo)
require.NoError(t, err)
shards, err := GetAllShardNames()
require.NoError(t, err)
require.Len(t, shards, 1)

// test a staletime before save causes no delete
require.NoError(t, DeleteStaleShards(time.Now().Add(-time.Hour)))
shards, err = GetAllShardNames()
require.NoError(t, err)
require.Len(t, shards, 1)

// test statetime of now deletes everything
require.NoError(t, DeleteStaleShards(time.Now()))
shards, err = GetAllShardNames()
require.NoError(t, err)
require.Len(t, shards, 0)
}
Loading
Loading