Skip to content

Commit

Permalink
Merge pull request #222 from peter-wangxu/bugfix/enhance_throttle_io
Browse files Browse the repository at this point in the history
blkio throttling enhancement: support quantity in `bps`
  • Loading branch information
peter-wangxu authored Aug 10, 2023
2 parents 2fce649 + 9a101ea commit 04dfda1
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 86 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ WORKDIR /go/src/github.com/alibaba/open-local
COPY . .
RUN make build && chmod +x bin/open-local

FROM alpine:3.9
FROM sting2me/open-local-base:latest
LABEL maintainers="Alibaba Cloud Authors"
LABEL description="open-local is a local disk management system"
RUN apk update && apk upgrade && apk add util-linux coreutils e2fsprogs e2fsprogs-extra xfsprogs xfsprogs-extra blkid file open-iscsi jq
COPY --from=builder /go/src/github.com/alibaba/open-local/bin/open-local /bin/open-local
COPY --from=thebeatles1994/open-local:tools /usr/local/bin/restic-amd64 /usr/local/bin/restic
ENTRYPOINT ["open-local"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ English | [简体中文](./README_zh_CN.md)
- Volume snapshot
- Volume metrics
- Raw block volume
- IO Throttling
- IO Throttling(direct-io only)
- Ephemeral inline volume

## Open-Local Feature Matrix
Expand Down
4 changes: 2 additions & 2 deletions README_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- [ACK 发行版](https://github.com/AliyunContainerService/ackdistro)
- 阿里云 ADP (云原生应用交付平台)
- [云原生 CNStack 产品](https://github.com/alibaba/CNStackCommunityEdition)
- [蚂蚁 AntStack Plus 产品](https://help.aliyun.com/document_detail/294414.html)
- 蚂蚁金融分布式架构 SOFAStack

## 特性

Expand All @@ -24,7 +24,7 @@
- 存储卷快照
- 存储卷监控
- 原生块设备
- IO 限流
- IO 限流(仅支持direct-io)
- 临时卷

## Open-Local版本能力矩阵
Expand Down
4 changes: 2 additions & 2 deletions docs/design/io-throttling.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ metadata:
provisioner: local.csi.aliyun.com
parameters:
volumeType: "LVM"
bps: 1048576
iops: 1024
bps: "1048576" # same as 1Mi
iops: "1024"
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
Expand Down
20 changes: 8 additions & 12 deletions docs/storageclass/param.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
# Storage Class

These parameters can be configured in StorageClass:

| Parameters | Values | Default | Description |
|-----------------------------|----------------------------------------|----------|---------------------|
| "csi.storage.k8s.io/fstype" | xfs, ext2, ext3, ext4 | ext4 | File system type that will be formatted during volume creation. This parameter is case sensitive! |
| "volumeType" | LVM, MountPoint, Device | | PV type that will be created by Open-Local. This parameter is case sensitive! |
| "mediaType" | hdd,ssd | | Media type that will be used when allocate Device for PV. The param only works when volumeType is MountPoint or Device. |
| "vgName" | | | The volume group name that the open-local will use to create the logical volume. This name must be contained in vg list, which can be found in .status.filteredStorageInfo in every [nls](../api/nls_zh_CN.md). If no value is set, open-local will choose a vg from vg list by itself. |
| "iops" | | | I/O operations per second. |
| "bps" | | | Throughput in KiB/s. |
| Parameters | Values | mandatory | Description |
|----------------------------- |------------------------- |----------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| "csi.storage.k8s.io/fstype" | xfs, ext2, ext3, ext4 | No | File system type that will be formatted during volume creation. This parameter is case sensitive! Default value is `ext4` |
| "volumeType" | LVM, MountPoint, Device | Yes | PV type that will be created by Open-Local. This parameter is case sensitive! |
| "mediaType" | hdd,ssd | No | Media type that will be used when allocate Device for PV. The param only works when volumeType is MountPoint or Device. |
| "vgName" | | No | The volume group name that the open-local will use to create the logical volume. This name must be contained in vg list, which can be found in .status.filteredStorageInfo in every nls. If no value is set, open-local will choose a vg from vg list by itself. |
| "iops" | | No | I/O operations per second. |
| "bps" | | No | Throughput in byte/Ki/Mi/Gi per second The default unit is `byte`, `102400`, `1Mi`, `100Mi`, `1Gi` are all valid format since v0.7.2. |
1 change: 1 addition & 0 deletions docs/user-guide/type-lvm_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ Events:
## IO 限流

Open-Local 支持为 PV 设置 IO 限流:
> 注意:默认的StorageClass `open-local-lvm-io-throttling` 不可用于生产环境,请按需调整parameters后再使用对应storageclass
```bash
# kubectl apply -f ./example/lvm/sts-io-throttling.yaml
Expand Down
14 changes: 14 additions & 0 deletions helm/templates/storage-class.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: false
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: {{ .Values.storageclass.lvm_throttle.name }}
{{ include "local.labels" . | indent 2 }}
provisioner: local.csi.aliyun.com
parameters:
volumeType: "LVM"
bps: "1048576"
iops: "1024"
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
---
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshotClass
metadata:
Expand Down
2 changes: 2 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ storageclass:
name: open-local-lvm
lvm_xfs:
name: open-local-lvm-xfs
lvm_throttle:
name: open-local-lvm-io-throttle
device_ssd:
name: open-local-device-ssd
device_hdd:
Expand Down
141 changes: 74 additions & 67 deletions pkg/csi/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,79 +600,86 @@ func (ns *nodeServer) resizeVolume(ctx context.Context, volumeID, targetPath str
return nil
}

func (ns *nodeServer) setIOThrottling(ctx context.Context, req *csi.NodePublishVolumeRequest) error {
func (ns *nodeServer) setIOThrottling(ctx context.Context, req *csi.NodePublishVolumeRequest) (err error) {
volumeID := req.VolumeId

containsValue, bps, iops, err := requireThrottleIO(req.VolumeContext)

if err != nil {
log.Errorf("invalid bps or iops parameter in storage class: %s", err)
return err
}
if !containsValue {
log.Infof("no need to set throttle for volume %s", volumeID)
return nil
}
volCap := req.GetVolumeCapability()
targetPath := req.GetTargetPath()
volumeID := req.VolumeId
iops, iopsExist := req.VolumeContext[localtype.VolumeIOPS]
bps, bpsExist := req.VolumeContext[localtype.VolumeBPS]
if iopsExist || bpsExist {
// get pod
var pod v1.Pod
var podUID string
switch volCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
// /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/yoda-c018ff81-d346-452e-b7b8-a45f1d1c230e/76cf946e-d074-4455-a272-4d3a81264fab
podUID = strings.Split(targetPath, "/")[10]
case *csi.VolumeCapability_Mount:
// /var/lib/kubelet/pods/2a7bbb9c-c915-4006-84d7-0e3ac9d8d70f/volumes/kubernetes.io~csi/yoda-70597cb6-c08b-4bbb-8d41-c4afcfa91866/mount
podUID = strings.Split(targetPath, "/")[5]
}
log.Infof("pod(volume id %s) uuid is %s", volumeID, podUID)
namespace := req.VolumeContext[localtype.PVCNameSpace]
// set ResourceVersion to 0
// https://arthurchiao.art/blog/k8s-reliability-list-data-zh/
pods, err := ns.options.kubeclient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ResourceVersion: "0"})
for _, podItem := range pods.Items {
if podItem.UID == types.UID(podUID) {
pod = podItem
}
// get pod
var pod v1.Pod
var podUID string
switch volCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
// /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/yoda-c018ff81-d346-452e-b7b8-a45f1d1c230e/76cf946e-d074-4455-a272-4d3a81264fab
podUID = strings.Split(targetPath, "/")[10]
case *csi.VolumeCapability_Mount:
// /var/lib/kubelet/pods/2a7bbb9c-c915-4006-84d7-0e3ac9d8d70f/volumes/kubernetes.io~csi/yoda-70597cb6-c08b-4bbb-8d41-c4afcfa91866/mount
podUID = strings.Split(targetPath, "/")[5]
}
log.Infof("pod(volume id %s) uuid is %s", volumeID, podUID)
namespace := req.VolumeContext[localtype.PVCNameSpace]
// set ResourceVersion to 0
// https://arthurchiao.art/blog/k8s-reliability-list-data-zh/
pods, err := ns.options.kubeclient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ResourceVersion: "0"})
for _, podItem := range pods.Items {
if podItem.UID == types.UID(podUID) {
pod = podItem
}
}
if err != nil {
return status.Errorf(codes.Internal, "NodePublishVolume: failed to get pod(uuid: %s): %s", podUID, err.Error())
}
// pod qosClass and blkioPath
qosClass := pod.Status.QOSClass
blkioPath := fmt.Sprintf("%s/fs/cgroup/blkio/%s%s%s", ns.options.sysPath, utils.CgroupPathFormatter.ParentDir, utils.CgroupPathFormatter.QOSDirFn(qosClass), utils.CgroupPathFormatter.PodDirFn(qosClass, podUID))
log.Infof("pod(volume id %s) qosClass: %s", volumeID, qosClass)
log.Infof("pod(volume id %s) blkio path: %s", volumeID, blkioPath)
// get lv lvpath
// todo: not support device kind
lvpath, _, err := ns.createLV(ctx, req)
if err != nil {
return status.Errorf(codes.Internal, "failed to get lv path %s: %s", volumeID, err.Error())
}
stat := syscall.Stat_t{}
_ = syscall.Stat(lvpath, &stat)
maj := uint64(stat.Rdev / 256)
min := uint64(stat.Rdev % 256)
log.Infof("volume %s maj:min: %d:%d", volumeID, maj, min)
log.Infof("volume %s path: %s", volumeID, lvpath)
if iops > 0 {
log.Infof("volume %s iops: %d", volumeID, iops)
cmdstr := fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %d", maj, min, iops), fmt.Sprintf("%s/%s", blkioPath, localtype.IOPSReadFile))
_, err := exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "NodePublishVolume: failed to get pod(uuid: %s): %s", podUID, err.Error())
}
// pod qosClass and blkioPath
qosClass := pod.Status.QOSClass
blkioPath := fmt.Sprintf("%s/fs/cgroup/blkio/%s%s%s", ns.options.sysPath, utils.CgroupPathFormatter.ParentDir, utils.CgroupPathFormatter.QOSDirFn(qosClass), utils.CgroupPathFormatter.PodDirFn(qosClass, podUID))
log.Infof("pod(volume id %s) qosClass: %s", volumeID, qosClass)
log.Infof("pod(volume id %s) blkio path: %s", volumeID, blkioPath)
// get lv lvpath
// todo: not support device kind
lvpath, _, err := ns.createLV(ctx, req)
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.IOPSReadFile), err.Error())
}
cmdstr = fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %d", maj, min, iops), fmt.Sprintf("%s%s", blkioPath, localtype.IOPSWriteFile))
_, err = exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to get lv path %s: %s", volumeID, err.Error())
}
stat := syscall.Stat_t{}
_ = syscall.Stat(lvpath, &stat)
maj := uint64(stat.Rdev / 256)
min := uint64(stat.Rdev % 256)
log.Infof("volume %s maj:min: %d:%d", volumeID, maj, min)
log.Infof("volume %s path: %s", volumeID, lvpath)
if iopsExist {
log.Infof("volume %s iops: %s", volumeID, iops)
cmdstr := fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %s", maj, min, iops), fmt.Sprintf("%s/%s", blkioPath, localtype.IOPSReadFile))
_, err := exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.IOPSReadFile), err.Error())
}
cmdstr = fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %s", maj, min, iops), fmt.Sprintf("%s%s", blkioPath, localtype.IOPSWriteFile))
_, err = exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.IOPSWriteFile), err.Error())
}
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.IOPSWriteFile), err.Error())
}
if bpsExist {
log.Infof("volume %s bps: %s", volumeID, bps)
cmdstr := fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %s", maj, min, bps), fmt.Sprintf("%s%s", blkioPath, localtype.BPSReadFile))
_, err := exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.BPSReadFile), err.Error())
}
cmdstr = fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %s", maj, min, bps), fmt.Sprintf("%s%s", blkioPath, localtype.BPSWriteFile))
_, err = exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.BPSWriteFile), err.Error())
}
}
if bps > 0 {
log.Infof("volume %s bps: %d", volumeID, bps)
cmdstr := fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %d", maj, min, bps), fmt.Sprintf("%s%s", blkioPath, localtype.BPSReadFile))
_, err := exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.BPSReadFile), err.Error())
}
cmdstr = fmt.Sprintf("echo %s > %s", fmt.Sprintf("%d:%d %d", maj, min, bps), fmt.Sprintf("%s%s", blkioPath, localtype.BPSWriteFile))
_, err = exec.Command("sh", "-c", cmdstr).CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to write blkio file %s: %s", fmt.Sprintf("%s%s", blkioPath, localtype.BPSWriteFile), err.Error())
}
}
return nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/csi/nodeutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"time"

"github.com/alibaba/open-local/pkg"
Expand Down Expand Up @@ -551,3 +552,28 @@ func labelRestored(path string) error {

return nil
}

// requireThrottleIO returns true if any throttle value and related values
// or else return false
func requireThrottleIO(volumeContext map[string]string) (r bool, bpsValue int64, iopsValue int64, err error) {
bps, ok1 := volumeContext[localtype.VolumeBPS]
iops, ok2 := volumeContext[localtype.VolumeIOPS]
if !ok1 && !ok2 {
return false, 0, 0, nil
}
if ok1 {
bpsQuantity, err := resource.ParseQuantity(bps)
if err != nil {
return false, 0, 0, err
}
bpsValue = bpsQuantity.Value()
}
if ok2 {
iopstmp, err := strconv.ParseInt(iops, 10, 64)
if err != nil {
return false, 0, 0, err
}
iopsValue = iopstmp
}
return true, bpsValue, iopsValue, nil
}
75 changes: 75 additions & 0 deletions pkg/csi/nodeutils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright © 2023 Alibaba Group Holding Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package csi

import (
"testing"
)

func Test_requireThrottleIO(t *testing.T) {
type args struct {
volumeContext map[string]string
}
tests := []struct {
name string
args args
wantR bool
wantBpsValue int64
wantIopsValue int64
wantErr bool
}{
{name: "test empty throttle value",
args: args{volumeContext: map[string]string{}},
wantR: false, wantBpsValue: 0, wantIopsValue: 0, wantErr: false},
{name: "test bps throttle value only",
args: args{volumeContext: map[string]string{"bps": "1024"}},
wantR: true, wantBpsValue: 1024, wantIopsValue: 0, wantErr: false},
{name: "test iops throttle value only",
args: args{volumeContext: map[string]string{"iops": "100"}},
wantR: true, wantBpsValue: 0, wantIopsValue: 100, wantErr: false},
{name: "test bps and iops throttle value",
args: args{volumeContext: map[string]string{"bps": "10240", "iops": "110"}},
wantR: true, wantBpsValue: 10240, wantIopsValue: 110, wantErr: false},
{name: "test bps quantity throttle value",
args: args{volumeContext: map[string]string{"bps": "1Mi", "iops": "110"}},
wantR: true, wantBpsValue: 1048576, wantIopsValue: 110, wantErr: false},
{name: "test invalid bps throttle value",
args: args{volumeContext: map[string]string{"bps": "abc"}},
wantR: false, wantBpsValue: 0, wantIopsValue: 0, wantErr: true},
{name: "test invalid iops throttle value",
args: args{volumeContext: map[string]string{"bps": "10240", "iops": "11b"}},
wantR: false, wantBpsValue: 0, wantIopsValue: 0, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotR, gotBpsValue, gotIopsValue, err := requireThrottleIO(tt.args.volumeContext)
if (err != nil) != tt.wantErr {
t.Errorf("requireThrottleIO() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotR != tt.wantR {
t.Errorf("requireThrottleIO() gotR = %v, want %v", gotR, tt.wantR)
}
if gotBpsValue != tt.wantBpsValue {
t.Errorf("requireThrottleIO() gotBpsValue = %v, want %v", gotBpsValue, tt.wantBpsValue)
}
if gotIopsValue != tt.wantIopsValue {
t.Errorf("requireThrottleIO() gotIopsValue = %v, want %v", gotIopsValue, tt.wantIopsValue)
}
})
}
}

0 comments on commit 04dfda1

Please sign in to comment.