Skip to content

Commit

Permalink
Merge pull request #229 from rafi/feature/konnectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-wangxu authored Aug 29, 2023
2 parents 9a8cec3 + 0627451 commit 6582872
Show file tree
Hide file tree
Showing 20 changed files with 2,615 additions and 93 deletions.
9 changes: 9 additions & 0 deletions cmd/csi/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func Start(opt *csiOption) error {
csi.WithDriverMode(opt.DriverMode),
csi.WithUseNodeHostname(opt.UseNodeHostname),
csi.WithEnableSpdk(opt.EnableSpdk),
csi.WithKonnectivity(
opt.KonnectivityUDS,
opt.KonnectivityProxyHost,
opt.KonnectivityProxyPort,
opt.KonnectivityProxyMode,
opt.KonnectivityClientCert,
opt.KonnectivityClientKey,
opt.KonnectivityCACert,
),
)
if err := driver.Run(); err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions cmd/csi/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type csiOption struct {
EnableSpdk bool
ExtenderSchedulerNames []string
FrameworkSchedulerNames []string
KonnectivityUDS string
KonnectivityProxyHost string
KonnectivityProxyPort int
KonnectivityProxyMode string
KonnectivityClientCert string
KonnectivityClientKey string
KonnectivityCACert string
}

func (option *csiOption) addFlags(fs *pflag.FlagSet) {
Expand All @@ -53,4 +60,11 @@ func (option *csiOption) addFlags(fs *pflag.FlagSet) {
fs.BoolVar(&option.EnableSpdk, "enable-spdk", true, "enable spdk or not")
fs.StringSliceVar(&option.ExtenderSchedulerNames, "extender-scheduler-names", []string{"default-scheduler"}, "extender scheduler names")
fs.StringSliceVar(&option.FrameworkSchedulerNames, "framework-scheduler-names", []string{}, "framework scheduler names")
fs.StringVar(&option.KonnectivityUDS, "konnectivity-uds", "", "apiserver-network-proxy unix socket path")
fs.StringVar(&option.KonnectivityProxyHost, "konnectivity-proxy-host", "", "apiserver-network-proxy server host")
fs.IntVar(&option.KonnectivityProxyPort, "konnectivity-proxy-port", 0, "apiserver-network-proxy server port")
fs.StringVar(&option.KonnectivityProxyMode, "konnectivity-proxy-mode", "", "apiserver-network-proxy proxy mode, can be either 'grpc' or 'http-connect'")
fs.StringVar(&option.KonnectivityClientCert, "konnectivity-client-cert", "", "apiserver-network-proxy client cert")
fs.StringVar(&option.KonnectivityClientKey, "konnectivity-client-key", "", "apiserver-network-proxy client key")
fs.StringVar(&option.KonnectivityCACert, "konnectivity-ca-cert", "", "apiserver-network-proxy CA cert")
}
9 changes: 9 additions & 0 deletions docs/commandline/open-local_csi.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ open-local csi [flags]
--master string URL/IP for master.
--nodeID string the id of node
--path.sysfs string Path of sysfs mountpoint (default "/host_sys")
--use-node-hostname use node hostname dns for grpc connection
--konnectivity-uds apiserver-network-proxy unix socket path
--konnectivity-proxy-host apiserver-network-proxy server host
--konnectivity-proxy-port apiserver-network-proxy server port
--konnectivity-proxy-mode apiserver-network-proxy proxy mode, can be either 'grpc' or 'http-connect'
--konnectivity-client-cert apiserver-network-proxy client cert
--konnectivity-client-key apiserver-network-proxy client key
--konnectivity-ca-cert apiserver-network-proxy CA cert
```

### Options inherited from parent commands
Expand Down
68 changes: 68 additions & 0 deletions docs/user-guide/apiserver-network-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# apiserver-network-proxy (ANP)

The [apiserver-network-proxy](https://github.com/kubernetes-sigs/apiserver-network-proxy)
service, also named [Konnectivity setup](https://kubernetes.io/docs/tasks/extend-kubernetes/setup-konnectivity/),
provides a TCP level proxy for the control plane _to_ cluster communication.

Open-Local's CSI plugin runs an LVM daemon, by default on port `1736`, allowing
the controller and node plugins to communicate with worker nodes. However, in
some cases workers might be running at the edge, behind a NAT or other network
constraints. There are platforms like OpenYurt and SuperEdge that offer proxy
tunnels and various other edge solutions. With these, you might be interested in
the [`--use-node-hostname`](/docs/commandline/open-local_csi.md) argument, which
will use the node host-name DNS, instead of its IP, for the gRPC connection.

Konnectivity relies on an [`EgressSelectorConfiguration`](https://kubernetes.io/docs/reference/config-api/apiserver-config.v1alpha1/#apiserver-k8s-io-v1alpha1-EgressSelectorConfiguration)
to proxy traffic from the kube-apiserver (KAS) into the worker nodes. KAS can be
configured to send traffic (or not) to one or more of the proxies.

Open-Local supports the Konnectivity proxy using Unix socket or http-connect.
With this, Open-Local will communicate with the nodes through the Konnectivity
proxy and reach edge worker nodes.

Following are usage examples, with relevant changes to csi-plugin args:

## Using http-connect

```yaml
spec:
containers:
- name: csi-plugin
args:
- csi
- --konnectivity-proxy-host=rafi-konnectivity-server.rafi
- --konnectivity-proxy-port=8090
- --konnectivity-proxy-mode=http-connect
- --konnectivity-client-cert=/pki/konnectivity/tls.crt
- --konnectivity-client-key=/pki/konnectivity/tls.key
- --konnectivity-ca-cert=/pki/konnectivity/ca.crt
volumeMounts:
- mountPath: /pki/konnectivity/
name: konnectivity-client
readOnly: true
volumes:
- name: konnectivity-client
secret:
defaultMode: 420
secretName: rafi-pki-konnectivity-client
```
## GRPC socket
```yaml
spec:
containers:
- name: csi-plugin
args:
- csi
- --konnectivity-uds=/etc/kubernetes/konnectivity-server/konnectivity-server.socket
- --konnectivity-proxy-mode=grpc
volumeMounts:
- name: konnectivity-uds
mountPath: /etc/kubernetes/konnectivity-server
volumes:
- name: konnectivity-uds
hostPath:
path: /etc/kubernetes/konnectivity-server
type: DirectoryOrCreate
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
k8s.io/kubernetes v1.24.15
k8s.io/mount-utils v0.24.15
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.4
)

require (
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,7 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -2478,6 +2479,8 @@ rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.37/go.mod h1:vfnxT4FXNT8eGvO+xi/DsyC/qHmdujqwrUa1WSspCsk=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.4 h1:1RSHUg/47zxbcYkN4r+zMS8ZObRFpyDDBkcmWjTD5vM=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.4/go.mod h1:e7I0gvW7fYKOqZDDsvaETBEyfM4dXh6DQ/SsqNInVC0=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY=
sigs.k8s.io/kustomize/api v0.11.4/go.mod h1:k+8RsqYbgpkIrJ4p9jcdPqe8DprLxFUUO0yNOq8C+xI=
Expand Down
24 changes: 19 additions & 5 deletions pkg/csi/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func MustRunThisWhenTest() {
}

// NewGrpcConnection lvm connection
func NewGrpcConnection(address string, timeout time.Duration) (Connection, error) {
conn, err := connect(address, timeout)
func NewGrpcConnection(address string, timeout time.Duration, konnectivityOpts GrpcProxyClientOptions) (Connection, error) {
conn, err := connect(address, timeout, konnectivityOpts)
if err != nil {
return nil, err
}
Expand All @@ -87,8 +87,10 @@ func (c *workerConnection) Close() error {
return c.conn.Close()
}

func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
func connect(address string, timeout time.Duration, proxyOpts GrpcProxyClientOptions) (*grpc.ClientConn, error) {
log.V(6).Infof("New Connecting to %s", address)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// only for unit test
var bufDialerFunc func(context.Context, string) (net.Conn, error)
dialOptions := []grpc.DialOption{
Expand All @@ -101,6 +103,20 @@ func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
}
dialOptions = append(dialOptions, grpc.WithContextDialer(bufDialerFunc))
}
// setup konnectivity proxy connection
if proxyOpts.ProxyHost != "" || proxyOpts.ProxyUDSName != "" {
var err error
var proxyDialer proxyFunc
if proxyOpts.ProxyUDSName == "" {
proxyDialer, err = getKonnectivityMTLSDialer(ctx, address, timeout, proxyOpts)
} else {
proxyDialer, err = getKonnectivityUDSDialer(ctx, address, timeout, proxyOpts)
}
if err != nil {
return nil, fmt.Errorf("failed to setup konnectivity dialer: %w", err)
}
dialOptions = append(dialOptions, grpc.WithContextDialer(proxyDialer))
}
// if strings.HasPrefix(address, "/") {
// dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
// return net.DialTimeout("unix", addr, timeout)
Expand All @@ -118,8 +134,6 @@ func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
log.Warningf("Connection to %s timed out", address)
Expand Down
Loading

0 comments on commit 6582872

Please sign in to comment.