Skip to content

Commit

Permalink
Automated cherry pick of #21894: Fix/host load servers (#21897)
Browse files Browse the repository at this point in the history
* fix(host): load disks init scsi bus

* fix(region,host): delete instance snapshot progress incorrect
  • Loading branch information
wanyaoqi authored Dec 30, 2024
1 parent 8466820 commit af3bb67
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pkg/compute/models/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2937,7 +2937,7 @@ func (self *SDisk) CleanOverduedSnapshots(ctx context.Context, userCred mcclient
}
snapshot.SetModelManager(SnapshotManager, snapshot)
if snapshot.ExpiredAt.Before(now) {
err = snapshot.StartSnapshotDeleteTask(ctx, userCred, false, self.Id)
err = snapshot.StartSnapshotDeleteTask(ctx, userCred, false, self.Id, 0, 0)
if err != nil {
return err
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/compute/models/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,14 @@ func (self *SSnapshotManager) CreateSnapshot(ctx context.Context, owner mcclient
return snapshot, nil
}

func (self *SSnapshot) StartSnapshotDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, reloadDisk bool, parentTaskId string) error {
func (self *SSnapshot) StartSnapshotDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, reloadDisk bool, parentTaskId string, deleteSnapshotTotalCnt int, deletedSnapshotCnt int) error {
params := jsonutils.NewDict()
params.Set("reload_disk", jsonutils.NewBool(reloadDisk))
if deleteSnapshotTotalCnt <= 0 {
deleteSnapshotTotalCnt = 1
}
params.Set("snapshot_total_count", jsonutils.NewInt(int64(deleteSnapshotTotalCnt)))
params.Set("deleted_snapshot_count", jsonutils.NewInt(int64(deletedSnapshotCnt)))
self.SetStatus(ctx, userCred, api.SNAPSHOT_DELETING, "")
task, err := taskman.TaskManager.NewTask(ctx, "SnapshotDeleteTask", self, userCred, params, parentTaskId, "", nil)
if err != nil {
Expand Down Expand Up @@ -781,7 +786,7 @@ func (self *SSnapshot) GetRegionDriver() IRegionDriver {
}

func (self *SSnapshot) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
return self.StartSnapshotDeleteTask(ctx, userCred, false, "")
return self.StartSnapshotDeleteTask(ctx, userCred, false, "", 0, 0)
}

func (self *SSnapshot) PerformDeleted(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
Expand All @@ -792,7 +797,7 @@ func (self *SSnapshot) PerformDeleted(ctx context.Context, userCred mcclient.Tok
if err != nil {
return nil, err
}
err = self.StartSnapshotDeleteTask(ctx, userCred, true, "")
err = self.StartSnapshotDeleteTask(ctx, userCred, true, "", 0, 0)
return nil, err
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/compute/regiondrivers/kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,15 @@ func (self *SKVMRegionDriver) RequestDeleteInstanceSnapshot(ctx context.Context,
}

params := jsonutils.NewDict()
taskParams := task.GetParams()
var deleteSnapshotTotalCnt int64 = 1
if taskParams.Contains("snapshot_total_count") {
deleteSnapshotTotalCnt, _ = taskParams.Int("snapshot_total_count")
}
deletedSnapshotCnt := deleteSnapshotTotalCnt - int64(len(snapshots))
params.Set("del_snapshot_id", jsonutils.NewString(snapshots[0].Id))
task.SetStage("OnKvmSnapshotDelete", params)
err = snapshots[0].StartSnapshotDeleteTask(ctx, task.GetUserCred(), false, task.GetTaskId())
err = snapshots[0].StartSnapshotDeleteTask(ctx, task.GetUserCred(), false, task.GetTaskId(), int(deleteSnapshotTotalCnt), int(deletedSnapshotCnt))
if err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/compute/storagedrivers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ func (self *SBaseStorageDriver) RequestDeleteSnapshot(ctx context.Context, snaps
} else {
params.Set("auto_deleted", jsonutils.JSONTrue)
}
taskParams := task.GetParams()
if taskParams.Contains("snapshot_total_count") {
totalCnt, _ := taskParams.Get("snapshot_total_count")
params.Set("snapshot_total_count", totalCnt)
deletedCnt, _ := taskParams.Get("deleted_snapshot_count")
params.Set("deleted_snapshot_count", deletedCnt)
}

guest.SetStatus(ctx, task.GetUserCred(), api.VM_SNAPSHOT_DELETE, "Start Delete Snapshot")
return drv.RequestDeleteSnapshot(ctx, guest, task, params)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/tasks/disk_backup_create_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (self *DiskBackupCreateTask) OnSave(ctx context.Context, backup *models.SDi
return nil
})
snapshot := snapshotModel.(*models.SSnapshot)
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId())
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
self.taskFailed(ctx, backup, jsonutils.NewString(err.Error()), api.BACKUP_STATUS_CLEANUP_SNAPSHOT_FAILED)
return
Expand All @@ -151,7 +151,7 @@ func (self *DiskBackupCreateTask) OnSaveFailed(ctx context.Context, backup *mode
}
snapshot := snapshotModel.(*models.SSnapshot)
self.taskFailed(ctx, backup, data, api.BACKUP_STATUS_SAVE_FAILED)
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId())
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
log.Errorf("unable to cleanup snapshot: %s", err.Error())
self.taskFailed(ctx, backup, data, api.BACKUP_STATUS_SAVE_FAILED)
Expand Down
2 changes: 1 addition & 1 deletion pkg/compute/tasks/disk_clean_overdued_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (self *SnapshotCleanupTask) StartSnapshotsDelete(ctx context.Context, snaps
}
self.SetStage("OnDeleteSnapshot", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
err := snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId())
err := snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
self.OnDeleteSnapshotFailed(ctx, self.GetObject(), nil)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/compute/tasks/instance_snapshot_delete_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (self *InstanceSnapshotDeleteTask) OnInit(
ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {

isp := obj.(*models.SInstanceSnapshot)
sps, err := isp.GetSnapshots()
if err != nil {
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
return
}
snapshotCnt := len(sps)
self.Params.Set("snapshot_total_count", jsonutils.NewInt(int64(snapshotCnt)))
self.SetStage("OnInstanceSnapshotDelete", nil)
if err := isp.GetRegionDriver().RequestDeleteInstanceSnapshot(ctx, isp, self); err != nil {
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
Expand All @@ -84,6 +91,7 @@ func (self *InstanceSnapshotDeleteTask) OnKvmSnapshotDelete(
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
return
}

if err := isp.GetRegionDriver().RequestDeleteInstanceSnapshot(ctx, isp, self); err != nil {
self.taskFail(ctx, isp, jsonutils.NewString(err.Error()))
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/compute/tasks/snapshot_delete_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (self *GuestDeleteSnapshotsTask) StartDeleteDiskSnapshots(
self.Params.Set("snapshots", jsonutils.Marshal(snapshots))
self.SetStage("OnSnapshotDelete", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id, 0, 0)
return
}
self.SetStageComplete(ctx, nil)
Expand Down Expand Up @@ -304,7 +304,7 @@ func (self *DiskDeleteSnapshotsTask) StartDeleteDiskSnapshots(
self.Params.Set("snapshots", jsonutils.Marshal(snapshots))
self.SetStage("OnSnapshotDelete", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id)
snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.Id, 0, 0)
return
}
self.SetStageComplete(ctx, nil)
Expand Down
14 changes: 11 additions & 3 deletions pkg/hostman/guestman/guesthandlers/guesthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,12 @@ func guestDeleteSnapshot(ctx context.Context, userCred mcclient.TokenCredential,
if err != nil {
return nil, httperrors.NewMissingParameterError("disk_id")
}
var totalCnt, deletedCnt int64 = 1, 0
if body.Contains("snapshot_total_count") {
totalCnt, _ = body.Int("snapshot_total_count")
deletedCnt, _ = body.Int("deleted_snapshot_count")
}

guest, ok := guestman.GetGuestManager().GetServer(sid)
if !ok {
return nil, httperrors.NewNotFoundError("guest %s not found", sid)
Expand All @@ -725,9 +731,11 @@ func guestDeleteSnapshot(ctx context.Context, userCred mcclient.TokenCredential,
}

params := &guestman.SDeleteDiskSnapshot{
Sid: sid,
DeleteSnapshot: deleteSnapshot,
Disk: disk,
Sid: sid,
DeleteSnapshot: deleteSnapshot,
Disk: disk,
TotalDeleteSnapshotCount: int(totalCnt),
DeletedSnapshotCount: int(deletedCnt),
}

if body.Contains("encrypt_info") {
Expand Down
3 changes: 3 additions & 0 deletions pkg/hostman/guestman/guesthelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type SDeleteDiskSnapshot struct {
ConvertSnapshot string
BlockStream bool
EncryptInfo apis.SEncryptInfo

TotalDeleteSnapshotCount int
DeletedSnapshotCount int
}

type SLibvirtServer struct {
Expand Down
38 changes: 27 additions & 11 deletions pkg/hostman/guestman/guesttasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (d *SGuestDiskSyncTask) syncDisksConf() {
}
if idxs := d.guest.GetNeedMergeBackingFileDiskIndexs(); len(idxs) > 0 {
d.guest.StreamDisks(context.Background(),
func() { d.guest.streamDisksComplete(context.Background()) }, idxs,
func() { d.guest.streamDisksComplete(context.Background()) }, idxs, -1, -1,
)
}
d.callback(d.errors...)
Expand Down Expand Up @@ -1635,7 +1635,7 @@ func (s *SGuestResumeTask) startStreamDisks(disksIdx []int) {
s.startTime = time.Time{}
s.detachStartupTask()
if s.IsMonitorAlive() {
s.StreamDisks(s.ctx, func() { s.onStreamComplete(disksIdx) }, disksIdx)
s.StreamDisks(s.ctx, func() { s.onStreamComplete(disksIdx) }, disksIdx, -1, -1)
}
}

Expand Down Expand Up @@ -1730,8 +1730,10 @@ func (s *SGuestBlockProgressBaseTask) onGetBlockJobs(jobs []monitor.BlockJob) {
}

diskCount := s.task.StreamingDiskCount()
streamedDiskCount := s.task.StreamingDiskCompletedCount()
if diskCount > 0 {
progress = float64(s.task.StreamingDiskCompletedCount())/float64(diskCount)*100.0 + 1.0/float64(diskCount)*progress
progress = float64(streamedDiskCount)/float64(diskCount)*100.0 + 1.0/float64(diskCount)*progress
log.Debugf("stream disk111111 progress %v, streamedDiskCount %v, diskCount %v ", progress, streamedDiskCount, diskCount)
}
hostutils.UpdateServerProgress(context.Background(), s.GetId(), progress, mbps)
s.task.OnGetBlockJobs(jobs)
Expand All @@ -1757,12 +1759,17 @@ type SGuestStreamDisksTask struct {
c chan struct{}
streamDevs []string
lvmBacking []string

progressTotalDiskCnt int
progressCompletedDiskCnt int
}

func NewGuestStreamDisksTask(ctx context.Context, guest *SKVMGuestInstance, callback func(), disksIdx []int) *SGuestStreamDisksTask {
func NewGuestStreamDisksTask(ctx context.Context, guest *SKVMGuestInstance, callback func(), disksIdx []int, totalCnt, completedCnt int) *SGuestStreamDisksTask {
task := &SGuestStreamDisksTask{
callback: callback,
disksIdx: disksIdx,
callback: callback,
disksIdx: disksIdx,
progressTotalDiskCnt: totalCnt,
progressCompletedDiskCnt: completedCnt,
}
task.SGuestBlockProgressBaseTask = NewGuestBlockProgressBaseTask(ctx, guest, task)
return task
Expand Down Expand Up @@ -1834,10 +1841,18 @@ func (s *SGuestStreamDisksTask) startDoBlockStream() {
}

func (s *SGuestStreamDisksTask) StreamingDiskCompletedCount() int {
return len(s.disksIdx) - len(s.streamDevs) - 1
completedCnt := len(s.disksIdx) - len(s.streamDevs) - 1
if s.progressCompletedDiskCnt > 0 {
completedCnt += s.progressCompletedDiskCnt
}
return completedCnt
}

func (s *SGuestStreamDisksTask) StreamingDiskCount() int {
if s.progressTotalDiskCnt > 0 {
return s.progressTotalDiskCnt
}

return len(s.disksIdx)
}

Expand Down Expand Up @@ -2060,9 +2075,9 @@ func NewGuestSnapshotDeleteTask(
}
}

func (s *SGuestSnapshotDeleteTask) Start() {
func (s *SGuestSnapshotDeleteTask) Start(totalDeleteSnapshotCount, deletedSnapshotCount int) {
if s.blockStream {
s.startBlockStream()
s.startBlockStream(totalDeleteSnapshotCount, deletedSnapshotCount)
return
}

Expand All @@ -2073,14 +2088,15 @@ func (s *SGuestSnapshotDeleteTask) Start() {
s.fetchDisksInfo(s.doReloadDisk)
}

func (s *SGuestSnapshotDeleteTask) startBlockStream() {
func (s *SGuestSnapshotDeleteTask) startBlockStream(totalDeleteSnapshotCount, deletedSnapshotCount int) {
diskIdx := []int{}
for i := range s.Desc.Disks {
if s.Desc.Disks[i].DiskId == s.disk.GetId() {
diskIdx = append(diskIdx, int(s.Desc.Disks[i].Index))
break
}
}
s.StreamDisks(s.ctx, s.onStreamDiskComplete, diskIdx)
s.StreamDisks(s.ctx, s.onStreamDiskComplete, diskIdx, totalDeleteSnapshotCount, deletedSnapshotCount)
}

func (s *SGuestSnapshotDeleteTask) onStreamDiskComplete() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/hostman/guestman/pci.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,29 @@ func (s *SKVMGuestInstance) initGuestDisks(pciRoot, pciBridge *desc.PCIControlle
for i := 0; i < len(s.Desc.Disks); i++ {
devType := qemu.GetDiskDeviceModel(s.Desc.Disks[i].Driver)
id := fmt.Sprintf("drive_%d", s.Desc.Disks[i].Index)
if s.Desc.Disks[i].Pci != nil || s.Desc.Disks[i].Scsi != nil {
log.Infof("guest %s disk %v has been init", s.Desc.Uuid, s.Desc.Disks[i].Index)
continue
}

switch s.Desc.Disks[i].Driver {
case DISK_DRIVER_VIRTIO:
if s.Desc.Disks[i].Pci == nil {
s.Desc.Disks[i].Pci = desc.NewPCIDevice(cont.CType, devType, id)
}
case DISK_DRIVER_SCSI:
if s.Desc.VirtioScsi == nil {
s.Desc.VirtioScsi = &desc.SGuestVirtioScsi{
PCIDevice: desc.NewPCIDevice(pciRoot.CType, "virtio-scsi-pci", "scsi"),
}
}
s.Desc.Disks[i].Scsi = desc.NewScsiDevice(s.Desc.VirtioScsi.Id, devType, id)
case DISK_DRIVER_PVSCSI:
if s.Desc.PvScsi == nil {
s.Desc.PvScsi = &desc.SGuestPvScsi{
PCIDevice: desc.NewPCIDevice(pciRoot.CType, "pvscsi", "scsi"),
}
}
s.Desc.Disks[i].Scsi = desc.NewScsiDevice(s.Desc.PvScsi.Id, devType, id)
case DISK_DRIVER_IDE:
s.Desc.Disks[i].Ide = desc.NewIdeDevice(devType, id)
Expand Down
10 changes: 6 additions & 4 deletions pkg/hostman/guestman/qemu-kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3043,9 +3043,9 @@ func (s *SKVMGuestInstance) GetFuseTmpPath() string {
return path.Join(s.HomeDir(), "tmp")
}

func (s *SKVMGuestInstance) StreamDisks(ctx context.Context, callback func(), disksIdx []int) {
func (s *SKVMGuestInstance) StreamDisks(ctx context.Context, callback func(), disksIdx []int, totalCnt, completedCnt int) {
log.Infof("Start guest block stream task %s ...", s.GetName())
task := NewGuestStreamDisksTask(ctx, s, callback, disksIdx)
task := NewGuestStreamDisksTask(ctx, s, callback, disksIdx, totalCnt, completedCnt)
task.Start()
}

Expand Down Expand Up @@ -3126,7 +3126,8 @@ func (s *SKVMGuestInstance) StaticSaveSnapshot(
func (s *SKVMGuestInstance) DeleteSnapshot(ctx context.Context, delParams *SDeleteDiskSnapshot) (jsonutils.JSONObject, error) {
if len(delParams.ConvertSnapshot) > 0 || delParams.BlockStream {
return s.ExecDeleteSnapshotTask(ctx, delParams.Disk, delParams.DeleteSnapshot,
delParams.ConvertSnapshot, delParams.BlockStream, delParams.EncryptInfo)
delParams.ConvertSnapshot, delParams.BlockStream, delParams.EncryptInfo,
delParams.TotalDeleteSnapshotCount, delParams.DeletedSnapshotCount)
} else {
res := jsonutils.NewDict()
res.Set("deleted", jsonutils.JSONTrue)
Expand All @@ -3137,11 +3138,12 @@ func (s *SKVMGuestInstance) DeleteSnapshot(ctx context.Context, delParams *SDele
func (s *SKVMGuestInstance) ExecDeleteSnapshotTask(
ctx context.Context, disk storageman.IDisk,
deleteSnapshot string, convertSnapshot string, blockStream bool, encryptInfo apis.SEncryptInfo,
totalDeleteSnapshotCount, deletedSnapshotCount int,
) (jsonutils.JSONObject, error) {
if s.IsRunning() {
if s.isLiveSnapshotEnabled() {
task := NewGuestSnapshotDeleteTask(ctx, s, disk, deleteSnapshot, convertSnapshot, blockStream, encryptInfo)
task.Start()
task.Start(totalDeleteSnapshotCount, deletedSnapshotCount)
return nil, nil
} else {
return nil, fmt.Errorf("Guest dosen't support live snapshot delete")
Expand Down

0 comments on commit af3bb67

Please sign in to comment.