From f48178a4cdf9ebc9942f4e2b55a3f2a5e48fe16e Mon Sep 17 00:00:00 2001 From: xushiwei Date: Thu, 7 Sep 2023 08:11:46 +0800 Subject: [PATCH] kodofs/kodoutil --- go.mod | 5 + go.sum | 2 + internal/kodo/api/api.go | 44 + internal/kodo/auth/auth.go | 250 ++++ internal/kodo/bucket.go | 153 +++ internal/kodo/bucket_list.go | 246 ++++ internal/kodo/client/client.go | 274 +++++ internal/kodo/clientv2/client.go | 131 ++ internal/kodo/clientv2/interceptor.go | 70 ++ internal/kodo/clientv2/interceptor_auth.go | 39 + internal/kodo/clientv2/interceptor_debug.go | 210 ++++ .../clientv2/interceptor_default_header.go | 55 + .../kodo/clientv2/interceptor_retry_hosts.go | 124 ++ .../kodo/clientv2/interceptor_retry_simple.go | 194 +++ internal/kodo/clientv2/request.go | 92 ++ internal/kodo/clientv2/request_compatible.go | 95 ++ internal/kodo/conf/conf.go | 23 + internal/kodo/form_upload.go | 1070 +++++++++++++++++ internal/kodo/freezer/freezer.go | 48 + internal/kodo/hostprovider/host_provider.go | 53 + internal/kodo/log/logger.go | 76 ++ internal/kodo/log/logger_test.go | 23 + internal/kodo/reqid/reqid.go | 22 + kodofs.go | 5 + kodoutil/kodoutil.go | 99 ++ 25 files changed, 3403 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/kodo/api/api.go create mode 100644 internal/kodo/auth/auth.go create mode 100644 internal/kodo/bucket.go create mode 100644 internal/kodo/bucket_list.go create mode 100644 internal/kodo/client/client.go create mode 100644 internal/kodo/clientv2/client.go create mode 100644 internal/kodo/clientv2/interceptor.go create mode 100644 internal/kodo/clientv2/interceptor_auth.go create mode 100644 internal/kodo/clientv2/interceptor_debug.go create mode 100644 internal/kodo/clientv2/interceptor_default_header.go create mode 100644 internal/kodo/clientv2/interceptor_retry_hosts.go create mode 100644 internal/kodo/clientv2/interceptor_retry_simple.go create mode 100644 internal/kodo/clientv2/request.go create mode 100644 internal/kodo/clientv2/request_compatible.go create mode 100644 internal/kodo/conf/conf.go create mode 100644 internal/kodo/form_upload.go create mode 100644 internal/kodo/freezer/freezer.go create mode 100644 internal/kodo/hostprovider/host_provider.go create mode 100644 internal/kodo/log/logger.go create mode 100644 internal/kodo/log/logger_test.go create mode 100644 internal/kodo/reqid/reqid.go create mode 100644 kodofs.go create mode 100644 kodoutil/kodoutil.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..95dd643 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/xushiwei/kodofs + +go 1.16 + +require golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4c4db29 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= diff --git a/internal/kodo/api/api.go b/internal/kodo/api/api.go new file mode 100644 index 0000000..d0430c3 --- /dev/null +++ b/internal/kodo/api/api.go @@ -0,0 +1,44 @@ +package api + +import ( + "io" + "net/http" +) + +// ----------------------------------------------------------------------------------------- + +// BytesFromRequest 读取http.Request.Body的内容到slice中 +func BytesFromRequest(r *http.Request) (b []byte, err error) { + if r.ContentLength == 0 { + return + } + if r.ContentLength > 0 { + b = make([]byte, int(r.ContentLength)) + _, err = io.ReadFull(r.Body, b) + return + } + return io.ReadAll(r.Body) +} + +// ----------------------------------------------------------------------------------------- + +// 可以根据Code判断是何种类型错误 +type QError struct { + Code string + Message string +} + +// Error 继承error接口 +func (e *QError) Error() string { + return e.Code + ": " + e.Message +} + +// NewError 返回QError指针 +func NewError(code, message string) *QError { + return &QError{ + Code: code, + Message: message, + } +} + +// ----------------------------------------------------------------------------------------- diff --git a/internal/kodo/auth/auth.go b/internal/kodo/auth/auth.go new file mode 100644 index 0000000..ccd7025 --- /dev/null +++ b/internal/kodo/auth/auth.go @@ -0,0 +1,250 @@ +package auth + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha1" + "encoding/base64" + "fmt" + "io" + "net/http" + "net/textproto" + "sort" + "strings" + + "github.com/xushiwei/kodofs/internal/kodo/api" + "github.com/xushiwei/kodofs/internal/kodo/conf" +) + +// 七牛签名算法的类型: +// QBoxToken, QiniuToken, BearToken, QiniuMacToken +type TokenType int + +const ( + TokenQiniu TokenType = iota + TokenQBox +) + +const ( + IAMKeyLen = 33 + IAMKeyPrefix = "IAM-" + + AuthorizationPrefixQiniu = "Qiniu " + AuthorizationPrefixQBox = "QBox " +) + +// ----------------------------------------------------------------------------------------- + +// AK/SK可以从 https://portal.qiniu.com/user/key 获取 +type Credentials struct { + AccessKey string + SecretKey []byte +} + +// 构建一个Credentials对象 +func New(accessKey, secretKey string) *Credentials { + return &Credentials{accessKey, []byte(secretKey)} +} + +// Sign 对数据进行签名,一般用于私有空间下载用途 +func (ath *Credentials) Sign(data []byte) (token string) { + h := hmac.New(sha1.New, ath.SecretKey) + h.Write(data) + + sign := base64.URLEncoding.EncodeToString(h.Sum(nil)) + return fmt.Sprintf("%s:%s", ath.AccessKey, sign) +} + +// SignWithData 对数据进行签名,一般用于上传凭证的生成用途 +func (ath *Credentials) SignWithData(b []byte) (token string) { + encodedData := base64.URLEncoding.EncodeToString(b) + sign := ath.Sign([]byte(encodedData)) + return fmt.Sprintf("%s:%s", sign, encodedData) +} + +// SignToken 根据t的类型对请求进行签名,并把token加入req中 +func (ath *Credentials) AddToken(t TokenType, req *http.Request) error { + switch t { + case TokenQiniu: + token, sErr := ath.SignRequestV2(req) + if sErr != nil { + return sErr + } + req.Header.Add("Authorization", AuthorizationPrefixQiniu+token) + default: + token, err := ath.SignRequest(req) + if err != nil { + return err + } + req.Header.Add("Authorization", AuthorizationPrefixQBox+token) + } + return nil +} + +// SignRequest 对数据进行签名,一般用于管理凭证的生成 +func (ath *Credentials) SignRequest(req *http.Request) (token string, err error) { + data, err := collectData(req) + if err != nil { + return + } + token = ath.Sign(data) + return +} + +// SignRequestV2 对数据进行签名,一般用于高级管理凭证的生成 +func (ath *Credentials) SignRequestV2(req *http.Request) (token string, err error) { + + data, err := collectDataV2(req) + if err != nil { + return + } + token = ath.Sign(data) + return +} + +type ( + xQiniuHeaderItem struct { + HeaderName string + HeaderValue string + } + xQiniuHeaders []xQiniuHeaderItem +) + +func (headers xQiniuHeaders) Len() int { + return len(headers) +} + +func (headers xQiniuHeaders) Less(i, j int) bool { + if headers[i].HeaderName < headers[j].HeaderName { + return true + } else if headers[i].HeaderName > headers[j].HeaderName { + return false + } else { + return headers[i].HeaderValue < headers[j].HeaderValue + } +} + +func (headers xQiniuHeaders) Swap(i, j int) { + headers[i], headers[j] = headers[j], headers[i] +} + +func collectDataV2(req *http.Request) (data []byte, err error) { + u := req.URL + + //write method path?query + s := fmt.Sprintf("%s %s", req.Method, u.Path) + if u.RawQuery != "" { + s += "?" + s += u.RawQuery + } + + //write host and post + s += "\nHost: " + req.Host + "\n" + + //write content type + contentType := req.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/x-www-form-urlencoded" + req.Header.Set("Content-Type", contentType) + } + s += fmt.Sprintf("Content-Type: %s\n", contentType) + + xQiniuHeaders := make(xQiniuHeaders, 0, len(req.Header)) + for headerName := range req.Header { + if len(headerName) > len("X-Qiniu-") && strings.HasPrefix(headerName, "X-Qiniu-") { + xQiniuHeaders = append(xQiniuHeaders, xQiniuHeaderItem{ + HeaderName: textproto.CanonicalMIMEHeaderKey(headerName), + HeaderValue: req.Header.Get(headerName), + }) + } + } + + if len(xQiniuHeaders) > 0 { + sort.Sort(xQiniuHeaders) + for _, xQiniuHeader := range xQiniuHeaders { + s += fmt.Sprintf("%s: %s\n", xQiniuHeader.HeaderName, xQiniuHeader.HeaderValue) + } + } + s += "\n" + + data = []byte(s) + //write body + if incBodyV2(req) { + s2, rErr := api.BytesFromRequest(req) + if rErr != nil { + err = rErr + return + } + req.Body = io.NopCloser(bytes.NewReader(s2)) + data = append(data, s2...) + } + return +} + +func collectData(req *http.Request) (data []byte, err error) { + u := req.URL + s := u.Path + if u.RawQuery != "" { + s += "?" + s += u.RawQuery + } + s += "\n" + + data = []byte(s) + if incBody(req) { + s2, rErr := api.BytesFromRequest(req) + if rErr != nil { + err = rErr + return + } + req.Body = io.NopCloser(bytes.NewReader(s2)) + data = append(data, s2...) + } + return +} + +// 管理凭证生成时,是否同时对request body进行签名 +func incBody(req *http.Request) bool { + return req.Body != nil && req.Header.Get("Content-Type") == conf.CONTENT_TYPE_FORM +} + +func incBodyV2(req *http.Request) bool { + contentType := req.Header.Get("Content-Type") + return req.Body != nil && (contentType == conf.CONTENT_TYPE_FORM || contentType == conf.CONTENT_TYPE_JSON) +} + +// ----------------------------------------------------------------------------------------- + +// MacContextKey 是用户的密钥信息 +// context.Context中的键值不应该使用普通的字符串, 有可能导致命名冲突 +type macContextKey struct{} + +// tokenTypeKey 是签名算法类型key +type tokenTypeKey struct{} + +// WithCredentials 返回一个包含密钥信息的context +func WithCredentials(ctx context.Context, cred *Credentials) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, macContextKey{}, cred) +} + +// WithCredentialsType 返回一个context, 保存了密钥信息和token类型 +func WithCredentialsType(ctx context.Context, cred *Credentials, t TokenType) context.Context { + ctx = WithCredentials(ctx, cred) + return context.WithValue(ctx, tokenTypeKey{}, t) +} + +// CredentialsFromContext 从context获取密钥信息 +func CredentialsFromContext(ctx context.Context) (cred *Credentials, t TokenType, ok bool) { + cred, ok = ctx.Value(macContextKey{}).(*Credentials) + t, yes := ctx.Value(tokenTypeKey{}).(TokenType) + if !yes { + t = TokenQBox + } + return +} + +// ----------------------------------------------------------------------------------------- diff --git a/internal/kodo/bucket.go b/internal/kodo/bucket.go new file mode 100644 index 0000000..7cd9aff --- /dev/null +++ b/internal/kodo/bucket.go @@ -0,0 +1,153 @@ +package kodo + +import ( + "context" + "encoding/base64" + "fmt" + "strings" + "time" + + "github.com/xushiwei/kodofs/internal/kodo/auth" + clientv1 "github.com/xushiwei/kodofs/internal/kodo/client" +) + +// 资源管理相关的默认域名 +const ( + DefaultRsHost = "rs.qiniu.com" + DefaultRsfHost = "rsf.qiniu.com" + DefaultAPIHost = "api.qiniu.com" + DefaultPubHost = "pu.qbox.me:10200" +) + +// ----------------------------------------------------------------------------------------- + +type BucketManagerOptions struct { + RetryMax int // 单域名重试次数,当前只有 uc 相关的服务有多域名 + // 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。 + HostFreezeDuration time.Duration +} + +// BucketManager 提供了对资源进行管理的操作 +type BucketManager struct { + Client *clientv1.Client + Mac *auth.Credentials + Cfg *Config + // options BucketManagerOptions +} + +// NewBucketManager 用来构建一个新的资源管理对象 +func NewBucketManager(mac *auth.Credentials, cfg *Config) *BucketManager { + if cfg == nil { + cfg = &Config{} + } + if cfg.CentralRsHost == "" { + cfg.CentralRsHost = DefaultRsHost + } + + return &BucketManager{ + Client: &clientv1.DefaultClient, + Mac: mac, + Cfg: cfg, + } +} + +// FetchRet 资源抓取的返回值 +type FetchRet struct { + Hash string `json:"hash"` + Fsize int64 `json:"fsize"` + MimeType string `json:"mimeType"` + Key string `json:"key"` +} + +// EncodedEntry 生成URL Safe Base64编码的 Entry +func EncodedEntry(bucket, key string) string { + entry := fmt.Sprintf("%s:%s", bucket, key) + return base64.URLEncoding.EncodeToString([]byte(entry)) +} + +// 构建op的方法,非导出的方法无法用在Batch操作中 +func uriFetch(resURL, bucket, key string) string { + return fmt.Sprintf("/fetch/%s/to/%s", + base64.URLEncoding.EncodeToString([]byte(resURL)), EncodedEntry(bucket, key)) +} + +// Fetch 根据提供的远程资源链接来抓取一个文件到空间并已指定文件名保存 +func (m *BucketManager) Fetch(resURL, bucket, key string) (fetchRet FetchRet, err error) { + reqHost, rErr := m.IoReqHost(bucket) + if rErr != nil { + err = rErr + return + } + reqURL := fmt.Sprintf("%s%s", reqHost, uriFetch(resURL, bucket, key)) + err = m.Client.CredentialedCall(context.Background(), m.Mac, auth.TokenQiniu, &fetchRet, "POST", reqURL, nil) + return +} + +func (m *BucketManager) IoReqHost(bucket string) (reqHost string, err error) { + var reqErr error + + if m.Cfg.IoHost == "" { + reqHost, reqErr = m.IovipHost(bucket) + if reqErr != nil { + err = reqErr + return + } + } else { + reqHost = m.Cfg.IoHost + } + if !strings.HasPrefix(reqHost, "http") { + reqHost = endpoint(m.Cfg.UseHTTPS, reqHost) + } + return +} + +func (m *BucketManager) IovipHost(bucket string) (iovipHost string, err error) { + zone, err := m.Zone(bucket) + if err != nil { + return + } + + iovipHost = zone.GetIoHost(m.Cfg.UseHTTPS) + return +} + +func (m *BucketManager) RsfReqHost(bucket string) (reqHost string, err error) { + var reqErr error + + if m.Cfg.RsfHost == "" { + reqHost, reqErr = m.RsfHost(bucket) + if reqErr != nil { + err = reqErr + return + } + } else { + reqHost = m.Cfg.RsfHost + } + if !strings.HasPrefix(reqHost, "http") { + reqHost = endpoint(m.Cfg.UseHTTPS, reqHost) + } + return +} + +func (m *BucketManager) RsfHost(bucket string) (rsfHost string, err error) { + zone, err := m.Zone(bucket) + if err != nil { + return + } + + rsfHost = zone.GetRsfHost(m.Cfg.UseHTTPS) + return +} + +func (m *BucketManager) Zone(bucket string) (z *Zone, err error) { + + if m.Cfg.Zone != nil { + z = m.Cfg.Zone + return + } + + z, err = GetZone(m.Mac.AccessKey, bucket) + return +} + +// ----------------------------------------------------------------------------------------- diff --git a/internal/kodo/bucket_list.go b/internal/kodo/bucket_list.go new file mode 100644 index 0000000..9d11263 --- /dev/null +++ b/internal/kodo/bucket_list.go @@ -0,0 +1,246 @@ +package kodo + +import ( + "context" + "errors" + "fmt" + "net/url" + "strconv" + + "github.com/xushiwei/kodofs/internal/kodo/auth" +) + +// ListItem 为文件列举的返回值 +type ListItem struct { + + // 资源名 + Key string `json:"key"` + + // 上传时间,单位:100纳秒,其值去掉低七位即为Unix时间戳。 + PutTime int64 `json:"putTime"` + + // 文件的HASH值,使用hash值算法计算。 + Hash string `json:"hash"` + + // 资源内容的大小,单位:字节。 + Fsize int64 `json:"fsize"` + + // 资源的 MIME 类型。 + MimeType string `json:"mimeType"` + + /** + * 文件上传时设置的endUser + */ + EndUser string `json:"endUser"` + + /** + * 资源的存储类型 + * 0 表示标准存储 + * 1 表示低频存储 + * 2 表示归档存储 + * 3 表示深度归档存储 + */ + Type int `json:"type"` + + /** + * 文件的存储状态,即禁用状态和启用状态间的的互相转换,请参考:文件状态。 + * 0 表示启用 + * 1 表示禁用 + */ + Status int `json:"status"` + + /** + * 文件的 md5 值 + */ + Md5 string `json:"md5"` +} + +// 接口可能返回空的记录 +func (l *ListItem) IsEmpty() (empty bool) { + if l == nil { + return true + } + + return l.Key == "" && l.Hash == "" && l.Fsize == 0 && l.PutTime == 0 +} + +func (l *ListItem) String() string { + str := "" + str += fmt.Sprintf("Hash: %s\n", l.Hash) + str += fmt.Sprintf("Fsize: %d\n", l.Fsize) + str += fmt.Sprintf("PutTime: %d\n", l.PutTime) + str += fmt.Sprintf("MimeType: %s\n", l.MimeType) + str += fmt.Sprintf("Type: %d\n", l.Type) + str += fmt.Sprintf("EndUser: %s\n", l.EndUser) + return str +} + +type ListFilesRet struct { + Marker string `json:"marker"` + Items []ListItem `json:"items"` + CommonPrefixes []string `json:"commonPrefixes"` +} + +// ListFiles 用来获取空间文件列表,可以根据需要指定文件的前缀 prefix,文件的目录 delimiter,循环列举的时候下次 +// 列举的位置 marker,以及每次返回的文件的最大数量limit,其中limit最大为1000。 +func (m *BucketManager) ListFiles(bucket, prefix, delimiter, marker string, + limit int) (entries []ListItem, commonPrefixes []string, nextMarker string, hasNext bool, err error) { + + ret, hNext, e := m.ListFilesWithContext(context.Background(), bucket, + ListInputOptionsPrefix(prefix), + ListInputOptionsDelimiter(delimiter), + ListInputOptionsMarker(marker), + ListInputOptionsLimit(limit)) + if e != nil { + return nil, nil, "", false, e + } + return ret.Items, ret.CommonPrefixes, ret.Marker, hNext, nil +} + +type listInputOptions struct { + prefix string + delimiter string + marker string + limit int +} + +type ListInputOption func(options *listInputOptions) + +func ListInputOptionsPrefix(prefix string) ListInputOption { + return func(input *listInputOptions) { + input.prefix = prefix + } +} + +func ListInputOptionsDelimiter(delimiter string) ListInputOption { + return func(input *listInputOptions) { + input.delimiter = delimiter + } +} + +func ListInputOptionsMarker(marker string) ListInputOption { + return func(input *listInputOptions) { + input.marker = marker + } +} + +func ListInputOptionsLimit(limit int) ListInputOption { + return func(input *listInputOptions) { + input.limit = limit + } +} + +// ListFilesWithContext +// +// @Description: 用来获取空间文件列表,可以根据需要指定文件的列举条件 +// @receiver m BucketManager +// @param ctx context +// @param bucket 列举的 bucket +// @param options 列举的可选条件 +// 列举条件-需要列举 Key 的前缀:ListInputOptionsPrefix(prefix) +// 列举条件-文件的目录分隔符:ListInputOptionsDelimiter(delimiter) +// 列举条件-下次列举的位置:ListInputOptionsMarker(marker) +// 列举条件-每次返回的文件的最大数量:ListInputOptionsLimit(limit) 范围:1~1000 +// @return ret 列举的对象数据 +// @return hasNext 是否还有数据未被列举 +// @return err 列举时的错误信息 +func (m *BucketManager) ListFilesWithContext(ctx context.Context, bucket string, options ...ListInputOption) (ret *ListFilesRet, hasNext bool, err error) { + if len(bucket) == 0 { + return nil, false, errors.New("bucket can't empty") + } + + inputOptions := listInputOptions{} + for _, option := range options { + option(&inputOptions) + } + + if inputOptions.limit <= 0 || inputOptions.limit > 1000 { + return nil, false, errors.New("invalid list limit, only allow [1, 1000]") + } + + ctx = auth.WithCredentialsType(ctx, m.Mac, auth.TokenQiniu) + host, reqErr := m.RsfReqHost(bucket) + if reqErr != nil { + return nil, false, reqErr + } + + ret = &ListFilesRet{} + reqURL := fmt.Sprintf("%s%s", host, uriListFiles(bucket, inputOptions.prefix, inputOptions.delimiter, inputOptions.marker, inputOptions.limit)) + err = m.Client.CredentialedCall(ctx, m.Mac, auth.TokenQiniu, ret, "POST", reqURL, nil) + if err != nil { + return nil, false, err + } + + return ret, len(ret.Marker) > 0, nil +} + +type listFilesRet2 struct { + Marker string `json:"marker"` + Item ListItem `json:"item"` + Dir string `json:"dir"` +} + +// ListBucket 用来获取空间文件列表,可以根据需要指定文件的前缀 prefix,文件的目录 delimiter,流式返回每条数据。 +// Deprecated +func (m *BucketManager) ListBucket(bucket, prefix, delimiter, marker string) (retCh chan listFilesRet2, err error) { + return m.ListBucketContext(context.Background(), bucket, prefix, delimiter, marker) +} + +// ListBucketContext 用来获取空间文件列表,可以根据需要指定文件的前缀 prefix,文件的目录 delimiter,流式返回每条数据。 +// 接受的context可以用来取消列举操作 +// Deprecated +func (m *BucketManager) ListBucketContext(ctx context.Context, bucket, prefix, delimiter, marker string) (retCh chan listFilesRet2, err error) { + + ret, _, lErr := m.ListFilesWithContext(ctx, bucket, + ListInputOptionsLimit(250), + ListInputOptionsPrefix(prefix), + ListInputOptionsDelimiter(delimiter), + ListInputOptionsMarker(marker)) + if lErr != nil { + return nil, lErr + } + + count := len(ret.CommonPrefixes) + len(ret.Items) + retCh = make(chan listFilesRet2, count) + defer close(retCh) + + if len(ret.CommonPrefixes) > 0 { + for _, commonPrefix := range ret.CommonPrefixes { + retCh <- listFilesRet2{ + Marker: ret.Marker, + Item: ListItem{}, + Dir: commonPrefix, + } + } + } + + if len(ret.Items) > 0 { + for _, item := range ret.Items { + retCh <- listFilesRet2{ + Marker: ret.Marker, + Item: item, + Dir: "", + } + } + } + + return retCh, err +} + +func uriListFiles(bucket, prefix, delimiter, marker string, limit int) string { + query := make(url.Values) + query.Add("bucket", bucket) + if prefix != "" { + query.Add("prefix", prefix) + } + if delimiter != "" { + query.Add("delimiter", delimiter) + } + if marker != "" { + query.Add("marker", marker) + } + if limit > 0 { + query.Add("limit", strconv.FormatInt(int64(limit), 10)) + } + return fmt.Sprintf("/list?%s", query.Encode()) +} diff --git a/internal/kodo/client/client.go b/internal/kodo/client/client.go new file mode 100644 index 0000000..7a6e037 --- /dev/null +++ b/internal/kodo/client/client.go @@ -0,0 +1,274 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "runtime" + "strings" + "time" + + "github.com/xushiwei/kodofs/internal/kodo/auth" + "github.com/xushiwei/kodofs/internal/kodo/conf" + "github.com/xushiwei/kodofs/internal/kodo/reqid" +) + +// 用来打印调试信息 +var DebugMode = false +var DeepDebugInfo = false + +func getUserAgentWithAppName(userApp string) string { + return fmt.Sprintf("QiniuGo/%s (%s; %s; %s) %s", + conf.Version, runtime.GOOS, runtime.GOARCH, userApp, runtime.Version()) +} + +// ----------------------------------------------------------------------------------------- + +type Client struct { + *http.Client +} + +var UserAgent = getUserAgentWithAppName("default") +var DefaultClient = Client{&http.Client{Transport: http.DefaultTransport}} + +func (r Client) CallWithBodyGetter(ctx context.Context, ret interface{}, method, reqUrl string, headers http.Header, body io.Reader, + getBody func() (io.ReadCloser, error), bodyLength int64) (err error) { + + resp, err := r.DoRequestWithBodyGetter(ctx, method, reqUrl, headers, body, getBody, bodyLength) + if err != nil { + return err + } + return CallRet(ctx, ret, resp) +} + +func (r Client) DoRequestWithBodyGetter(ctx context.Context, method, reqUrl string, headers http.Header, body io.Reader, + getBody func() (io.ReadCloser, error), bodyLength int64) (resp *http.Response, err error) { + + req, err := newRequest(ctx, method, reqUrl, headers, body) + if err != nil { + return + } + req.ContentLength = bodyLength + req.GetBody = getBody + return r.Do(ctx, req) +} + +func (r Client) Do(ctx context.Context, req *http.Request) (resp *http.Response, err error) { + reqctx := req.Context() + + if reqId, ok := reqid.ReqidFromContext(ctx); ok { + req.Header.Set("X-Reqid", reqId) + } else if reqId, ok = reqid.ReqidFromContext(reqctx); ok { + req.Header.Set("X-Reqid", reqId) + } + + if _, ok := req.Header["User-Agent"]; !ok { + req.Header.Set("User-Agent", UserAgent) + } + + resp, err = r.Client.Do(req) + return +} + +func (r Client) DoRequestWith(ctx context.Context, method, reqUrl string, headers http.Header, body io.Reader, + bodyLength int) (resp *http.Response, err error) { + + req, err := newRequest(ctx, method, reqUrl, headers, body) + if err != nil { + return + } + req.ContentLength = int64(bodyLength) + return r.Do(ctx, req) +} + +func (r Client) Call(ctx context.Context, ret interface{}, method, reqUrl string, headers http.Header) (err error) { + + resp, err := r.DoRequestWith(ctx, method, reqUrl, headers, nil, 0) + if err != nil { + return err + } + return CallRet(ctx, ret, resp) +} + +func (r Client) CredentialedCall(ctx context.Context, cred *auth.Credentials, tokenType auth.TokenType, ret interface{}, + method, reqUrl string, headers http.Header) error { + ctx = auth.WithCredentialsType(ctx, cred, tokenType) + return r.Call(ctx, ret, method, reqUrl, headers) +} + +// -------------------------------------------------------------------- + +type ErrorInfo struct { + Err string `json:"error,omitempty"` + Key string `json:"key,omitempty"` + Reqid string `json:"reqid,omitempty"` + Errno int `json:"errno,omitempty"` + Code int `json:"code"` +} + +func (r *ErrorInfo) ErrorDetail() string { + + msg, _ := json.Marshal(r) + return string(msg) +} + +func (r *ErrorInfo) Error() string { + + return r.Err +} + +func (r *ErrorInfo) RpcError() (code, errno int, key, err string) { + + return r.Code, r.Errno, r.Key, r.Err +} + +func (r *ErrorInfo) HttpCode() int { + + return r.Code +} + +func parseError(e *ErrorInfo, r io.Reader) { + + body, err1 := io.ReadAll(r) + if err1 != nil { + e.Err = err1.Error() + return + } + + var ret struct { + Err string `json:"error"` + Key string `json:"key"` + Errno int `json:"errno"` + } + if decodeJsonFromData(body, &ret) == nil && ret.Err != "" { + // qiniu error msg style returns here + e.Err, e.Key, e.Errno = ret.Err, ret.Key, ret.Errno + return + } + e.Err = string(body) +} + +func ResponseError(resp *http.Response) (err error) { + + e := &ErrorInfo{ + Reqid: resp.Header.Get("X-Reqid"), + Code: resp.StatusCode, + } + if resp.StatusCode > 299 { + if resp.ContentLength != 0 { + ct, ok := resp.Header["Content-Type"] + if ok && strings.HasPrefix(ct[0], "application/json") { + parseError(e, resp.Body) + } else { + bs, rErr := io.ReadAll(resp.Body) + if rErr != nil { + err = rErr + } + e.Err = strings.TrimRight(string(bs), "\n") + } + } + } + return e +} + +func CallRet(ctx context.Context, ret interface{}, resp *http.Response) (err error) { + + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + if resp.StatusCode/100 == 2 { + if ret != nil && resp.ContentLength != 0 { + err = DecodeJsonFromReader(resp.Body, ret) + if err != nil { + return + } + } + return nil + } + return ResponseError(resp) +} + +func newRequest(ctx context.Context, method, reqUrl string, headers http.Header, body io.Reader) (req *http.Request, err error) { + req, err = http.NewRequest(method, reqUrl, body) + if err != nil { + return + } + + if headers == nil { + headers = http.Header{} + } + + err = addDefaultHeader(headers) + if err != nil { + return + } + + req.Header = headers + req = req.WithContext(ctx) + + //check access token + mac, t, ok := auth.CredentialsFromContext(ctx) + if ok { + err = mac.AddToken(t, req) + if err != nil { + return + } + } + return +} + +// ----------------------------------------------------------------------------------------- + +type jsonDecodeError struct { + original error + data []byte +} + +func (e jsonDecodeError) Error() string { return fmt.Sprintf("%s: %s", e.original.Error(), e.data) } + +func (e jsonDecodeError) Unwrap() error { return e.original } + +func decodeJsonFromData(data []byte, v interface{}) error { + err := json.Unmarshal(data, v) + if err != nil { + return jsonDecodeError{original: err, data: data} + } + return nil +} + +func DecodeJsonFromReader(reader io.Reader, v interface{}) error { + buf := new(bytes.Buffer) + t := io.TeeReader(reader, buf) + err := json.NewDecoder(t).Decode(v) + if err != nil { + return jsonDecodeError{original: err, data: buf.Bytes()} + } + return nil +} + +// ----------------------------------------------------------------------------------------- + +const ( + RequestHeaderKeyXQiniuDate = "X-Qiniu-Date" +) + +func addDefaultHeader(headers http.Header) error { + return addXQiniuDate(headers) +} + +func addXQiniuDate(headers http.Header) error { + if conf.IsDisableQiniuTimestampSignature() { + return nil + } + + timeString := time.Now().UTC().Format("20060102T150405Z") + headers.Set(RequestHeaderKeyXQiniuDate, timeString) + return nil +} + +// ----------------------------------------------------------------------------------------- diff --git a/internal/kodo/clientv2/client.go b/internal/kodo/clientv2/client.go new file mode 100644 index 0000000..4d1fb34 --- /dev/null +++ b/internal/kodo/clientv2/client.go @@ -0,0 +1,131 @@ +package clientv2 + +import ( + "net/http" + "sort" + + clientV1 "github.com/xushiwei/kodofs/internal/kodo/client" +) + +type Client interface { + Do(req *http.Request) (*http.Response, error) +} + +type Handler func(req *http.Request) (*http.Response, error) + +type client struct { + coreClient Client + interceptors []Interceptor +} + +func NewClient(cli Client, interceptors ...Interceptor) Client { + if cli == nil { + if clientV1.DefaultClient.Client != nil { + cli = NewClientWithClientV1(&clientV1.DefaultClient) + } else if http.DefaultClient != nil { + cli = http.DefaultClient + } else { + cli = &http.Client{} + } + } + + var is interceptorList = interceptors + is = append(is, newDefaultHeaderInterceptor()) + is = append(is, newDebugInterceptor()) + sort.Sort(is) + + // 反转 + for i, j := 0, len(is)-1; i < j; i, j = i+1, j-1 { + is[i], is[j] = is[j], is[i] + } + + return &client{ + coreClient: cli, + interceptors: is, + } +} + +func (c *client) Do(req *http.Request) (*http.Response, error) { + handler := func(req *http.Request) (*http.Response, error) { + return c.coreClient.Do(req) + } + + interceptors := c.interceptors + for _, interceptor := range interceptors { + h := handler + i := interceptor + handler = func(r *http.Request) (*http.Response, error) { + return i.Intercept(r, h) + } + } + + resp, err := handler(req) + if err != nil { + return resp, err + } + + if resp == nil { + return nil, &clientV1.ErrorInfo{ + Code: -999, + Err: "unknown error, no response", + } + } + + if resp.StatusCode/100 != 2 { + return resp, clientV1.ResponseError(resp) + } + + return resp, nil +} + +func Do(c Client, options RequestParams) (*http.Response, error) { + req, err := NewRequest(options) + if err != nil { + return nil, err + } + + return c.Do(req) +} + +func DoAndDecodeJsonResponse(c Client, options RequestParams, ret interface{}) (*http.Response, error) { + resp, err := Do(c, options) + if err != nil { + return resp, err + } + + if ret == nil || resp.ContentLength == 0 { + return resp, nil + } + + if err = clientV1.DecodeJsonFromReader(resp.Body, ret); err != nil { + return resp, err + } + + return resp, nil +} + +type clientV1Wrapper struct { + c *clientV1.Client +} + +func (c *clientV1Wrapper) Do(req *http.Request) (*http.Response, error) { + return c.c.Do(req.Context(), req) +} + +func NewClientWithClientV1(c *clientV1.Client) Client { + if c == nil { + c = &clientV1.DefaultClient + } + + if c.Client == nil { + if clientV1.DefaultClient.Client != nil { + c.Client = clientV1.DefaultClient.Client + } else { + c.Client = &http.Client{} + } + } + + return &clientV1Wrapper{ + c: c, + } +} diff --git a/internal/kodo/clientv2/interceptor.go b/internal/kodo/clientv2/interceptor.go new file mode 100644 index 0000000..0e54a71 --- /dev/null +++ b/internal/kodo/clientv2/interceptor.go @@ -0,0 +1,70 @@ +package clientv2 + +import ( + "net/http" +) + +const ( + InterceptorPriorityDefault InterceptorPriority = 100 + InterceptorPriorityRetryHosts InterceptorPriority = 200 + InterceptorPriorityRetrySimple InterceptorPriority = 300 + InterceptorPrioritySetHeader InterceptorPriority = 400 + InterceptorPriorityNormal InterceptorPriority = 500 + InterceptorPriorityAuth InterceptorPriority = 600 + InterceptorPriorityDebug InterceptorPriority = 700 +) + +type InterceptorPriority int + +type Interceptor interface { + // Priority 数字越小优先级越高 + Priority() InterceptorPriority + + // Intercept 拦截处理函数 + Intercept(req *http.Request, handler Handler) (*http.Response, error) +} + +type interceptorList []Interceptor + +func (l interceptorList) Less(i, j int) bool { + return l[i].Priority() < l[j].Priority() +} + +func (l interceptorList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + +func (l interceptorList) Len() int { + return len(l) +} + +type simpleInterceptor struct { + priority InterceptorPriority + handler func(req *http.Request, handler Handler) (*http.Response, error) +} + +func NewSimpleInterceptor(interceptorHandler func(req *http.Request, handler Handler) (*http.Response, error)) Interceptor { + return NewSimpleInterceptorWithPriority(InterceptorPriorityNormal, interceptorHandler) +} + +func NewSimpleInterceptorWithPriority(priority InterceptorPriority, interceptorHandler func(req *http.Request, handler Handler) (*http.Response, error)) Interceptor { + if priority <= 0 { + priority = InterceptorPriorityNormal + } + + return &simpleInterceptor{ + priority: priority, + handler: interceptorHandler, + } +} + +func (interceptor *simpleInterceptor) Priority() InterceptorPriority { + return interceptor.priority +} + +func (interceptor *simpleInterceptor) Intercept(req *http.Request, handler Handler) (*http.Response, error) { + if interceptor == nil || interceptor.handler == nil { + return handler(req) + } + return interceptor.handler(req, handler) +} diff --git a/internal/kodo/clientv2/interceptor_auth.go b/internal/kodo/clientv2/interceptor_auth.go new file mode 100644 index 0000000..da44623 --- /dev/null +++ b/internal/kodo/clientv2/interceptor_auth.go @@ -0,0 +1,39 @@ +package clientv2 + +import ( + "net/http" + + "github.com/xushiwei/kodofs/internal/kodo/auth" +) + +type AuthConfig struct { + Credentials auth.Credentials // + TokenType auth.TokenType // 不包含上传 +} + +type authInterceptor struct { + config AuthConfig +} + +func NewAuthInterceptor(config AuthConfig) Interceptor { + return &authInterceptor{ + config: config, + } +} + +func (interceptor *authInterceptor) Priority() InterceptorPriority { + return InterceptorPriorityAuth +} + +func (interceptor *authInterceptor) Intercept(req *http.Request, handler Handler) (*http.Response, error) { + if interceptor == nil || req == nil { + return handler(req) + } + + err := interceptor.config.Credentials.AddToken(interceptor.config.TokenType, req) + if err != nil { + return nil, err + } + + return handler(req) +} diff --git a/internal/kodo/clientv2/interceptor_debug.go b/internal/kodo/clientv2/interceptor_debug.go new file mode 100644 index 0000000..191a8f6 --- /dev/null +++ b/internal/kodo/clientv2/interceptor_debug.go @@ -0,0 +1,210 @@ +package clientv2 + +import ( + "crypto/tls" + "fmt" + "net/http" + "net/http/httptrace" + "net/http/httputil" + + clientV1 "github.com/xushiwei/kodofs/internal/kodo/client" + "github.com/xushiwei/kodofs/internal/kodo/log" +) + +type DebugLevel int + +const ( + DebugLevelPrintNone DebugLevel = 0 + DebugLevelPrintNormal DebugLevel = 1 + DebugLevelPrintDetail DebugLevel = 2 +) + +var ( + printRequestTrace = false + printRequestLevel *DebugLevel = nil + printResponseLevel *DebugLevel = nil +) + +func PrintRequestTrace(isPrint bool) { + printRequestTrace = isPrint +} + +func IsPrintRequestTrace() bool { + return printRequestTrace +} + +func PrintRequest(level DebugLevel) { + printRequestLevel = &level +} + +func IsPrintRequest() bool { + if printRequestLevel != nil { + return *printRequestLevel == DebugLevelPrintNormal || *printRequestLevel == DebugLevelPrintDetail + } + return clientV1.DebugMode +} + +func IsPrintRequestBody() bool { + if printRequestLevel != nil { + return *printRequestLevel == DebugLevelPrintDetail + } + return clientV1.DeepDebugInfo +} + +func PrintResponse(level DebugLevel) { + printResponseLevel = &level +} + +func IsPrintResponse() bool { + if printResponseLevel != nil { + return *printResponseLevel == DebugLevelPrintNormal || *printResponseLevel == DebugLevelPrintDetail + } + return clientV1.DebugMode +} + +func IsPrintResponseBody() bool { + if printResponseLevel != nil { + return *printResponseLevel == DebugLevelPrintDetail + } + return clientV1.DeepDebugInfo +} + +type debugInterceptor struct { +} + +func newDebugInterceptor() Interceptor { + return &debugInterceptor{} +} + +func (interceptor *debugInterceptor) Priority() InterceptorPriority { + return InterceptorPriorityDebug +} + +func (interceptor *debugInterceptor) Intercept(req *http.Request, handler Handler) (*http.Response, error) { + if interceptor == nil { + return handler(req) + } + + label := interceptor.requestLabel(req) + + if e := interceptor.printRequest(label, req); e != nil { + return nil, e + } + + req = interceptor.printRequestTrace(label, req) + + resp, err := handler(req) + + if e := interceptor.printResponse(label, resp); e != nil { + return nil, e + } + + return resp, err +} + +func (interceptor *debugInterceptor) requestLabel(req *http.Request) string { + if req == nil || req.URL == nil { + return "" + } + return fmt.Sprintf("Url:%s", req.URL.String()) +} + +func (interceptor *debugInterceptor) printRequest(label string, req *http.Request) error { + if req == nil { + return nil + } + + printReq := IsPrintRequest() + if !printReq { + return nil + } + + info := label + " request:\n" + d, dErr := httputil.DumpRequest(req, IsPrintRequestBody()) + if dErr != nil { + return dErr + } + info += string(d) + "\n" + + log.Debug(info) + return nil +} + +func (interceptor *debugInterceptor) printRequestTrace(label string, req *http.Request) *http.Request { + if !IsPrintRequestTrace() || req == nil { + return req + } + + label += "\n" + trace := &httptrace.ClientTrace{ + GetConn: func(hostPort string) { + log.Debug(label + fmt.Sprintf("GetConn, %s \n", hostPort)) + }, + GotConn: func(connInfo httptrace.GotConnInfo) { + remoteAddr := connInfo.Conn.RemoteAddr() + log.Debug(label + fmt.Sprintf("GotConn, Network:%s RemoteAddr:%s \n", remoteAddr.Network(), remoteAddr.String())) + }, + PutIdleConn: func(err error) { + log.Debug(label + fmt.Sprintf("PutIdleConn, err:%v \n", err)) + }, + GotFirstResponseByte: func() { + log.Debug(label + "GotFirstResponseByte \n") + }, + Got100Continue: func() { + log.Debug(label + "Got100Continue \n") + }, + DNSStart: func(info httptrace.DNSStartInfo) { + log.Debug(label + fmt.Sprintf("DNSStart, host:%s \n", info.Host)) + }, + DNSDone: func(info httptrace.DNSDoneInfo) { + log.Debug(label + fmt.Sprintf("DNSDone, addr:%+v \n", info.Addrs)) + }, + ConnectStart: func(network, addr string) { + log.Debug(label + fmt.Sprintf("ConnectStart, network:%+v ip:%s \n", network, addr)) + }, + ConnectDone: func(network, addr string, err error) { + log.Debug(label + fmt.Sprintf("ConnectDone, network:%s ip:%s err:%v \n", network, addr, err)) + }, + TLSHandshakeStart: func() { + log.Debug(label + "TLSHandshakeStart \n") + }, + TLSHandshakeDone: func(state tls.ConnectionState, err error) { + log.Debug(label + fmt.Sprintf("TLSHandshakeDone, state:%+v err:%s \n", state, err)) + }, + // go1.10 不支持 + //WroteHeaderField: func(key string, value []string) { + // log.Debug(label + fmt.Sprintf("WroteHeaderField, key:%s value:%s \n", key, value)) + //}, + WroteHeaders: func() { + log.Debug(label + "WroteHeaders \n") + }, + Wait100Continue: func() { + log.Debug(label + "Wait100Continue \n") + }, + WroteRequest: func(info httptrace.WroteRequestInfo) { + log.Debug(label + fmt.Sprintf("WroteRequest, err:%v \n", info.Err)) + }, + } + return req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) +} + +func (interceptor *debugInterceptor) printResponse(label string, resp *http.Response) error { + if resp == nil { + return nil + } + + printResp := IsPrintResponse() + if !printResp { + return nil + } + + info := label + " response:\n" + d, dErr := httputil.DumpResponse(resp, IsPrintResponseBody()) + if dErr != nil { + return dErr + } + info += string(d) + "\n" + + log.Debug(info) + return nil +} diff --git a/internal/kodo/clientv2/interceptor_default_header.go b/internal/kodo/clientv2/interceptor_default_header.go new file mode 100644 index 0000000..2527154 --- /dev/null +++ b/internal/kodo/clientv2/interceptor_default_header.go @@ -0,0 +1,55 @@ +package clientv2 + +import ( + "net/http" + "time" + + clientV1 "github.com/xushiwei/kodofs/internal/kodo/client" + "github.com/xushiwei/kodofs/internal/kodo/conf" +) + +type defaultHeaderInterceptor struct { +} + +func newDefaultHeaderInterceptor() Interceptor { + return &defaultHeaderInterceptor{} +} + +func (interceptor *defaultHeaderInterceptor) Priority() InterceptorPriority { + return InterceptorPrioritySetHeader +} + +func (interceptor *defaultHeaderInterceptor) Intercept(req *http.Request, handler Handler) (resp *http.Response, err error) { + if interceptor == nil || req == nil { + return handler(req) + } + + if req.Header == nil { + req.Header = http.Header{} + } + + if e := addUseragent(req.Header); e != nil { + return nil, e + } + + if e := addXQiniuDate(req.Header); e != nil { + return nil, e + } + + return handler(req) +} + +func addUseragent(headers http.Header) error { + headers.Set("User-Agent", clientV1.UserAgent) + return nil +} + +func addXQiniuDate(headers http.Header) error { + if conf.IsDisableQiniuTimestampSignature() { + return nil + } + + timeString := time.Now().UTC().Format("20060102T150405Z") + headers.Set("X-Qiniu-Date", timeString) + return nil +} diff --git a/internal/kodo/clientv2/interceptor_retry_hosts.go b/internal/kodo/clientv2/interceptor_retry_hosts.go new file mode 100644 index 0000000..51aec4e --- /dev/null +++ b/internal/kodo/clientv2/interceptor_retry_hosts.go @@ -0,0 +1,124 @@ +package clientv2 + +import ( + "net/http" + "net/url" + "strings" + "time" + + "github.com/xushiwei/kodofs/internal/kodo/hostprovider" +) + +type HostsRetryConfig struct { + RetryConfig RetryConfig // 主备域名重试参数 + HostFreezeDuration time.Duration // 主备域名冻结时间(默认:600s),当一个域名请求失败被冻结的时间,最小 time.Millisecond + HostProvider hostprovider.HostProvider // 备用域名获取方法 + ShouldFreezeHost func(req *http.Request, resp *http.Response, err error) bool +} + +func (c *HostsRetryConfig) init() { + if c.RetryConfig.ShouldRetry == nil { + c.RetryConfig.ShouldRetry = func(req *http.Request, resp *http.Response, err error) bool { + return isHostRetryable(req, resp, err) + } + } + if c.RetryConfig.RetryMax < 0 { + c.RetryConfig.RetryMax = 1 + } + + c.RetryConfig.init() + + if c.HostFreezeDuration < time.Millisecond { + c.HostFreezeDuration = 600 * time.Second + } + + if c.ShouldFreezeHost == nil { + c.ShouldFreezeHost = func(req *http.Request, resp *http.Response, err error) bool { + return true + } + } +} + +type hostsRetryInterceptor struct { + options HostsRetryConfig +} + +func NewHostsRetryInterceptor(options HostsRetryConfig) Interceptor { + return &hostsRetryInterceptor{ + options: options, + } +} + +func (interceptor *hostsRetryInterceptor) Priority() InterceptorPriority { + return InterceptorPriorityRetryHosts +} + +func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler Handler) (resp *http.Response, err error) { + if interceptor == nil || req == nil { + return handler(req) + } + + interceptor.options.init() + + // 不重试 + if interceptor.options.RetryConfig.RetryMax <= 0 { + return handler(req) + } + + for i := 0; ; i++ { + // Clone 防止后面 Handler 处理对 req 有污染 + reqBefore := cloneReq(req.Context(), req) + resp, err = handler(req) + + if !interceptor.options.RetryConfig.ShouldRetry(reqBefore, resp, err) { + return resp, err + } + + // 尝试冻结域名 + oldHost := req.URL.Host + if interceptor.options.ShouldFreezeHost(req, resp, err) { + if fErr := interceptor.options.HostProvider.Freeze(oldHost, err, interceptor.options.HostFreezeDuration); fErr != nil { + break + } + } + + if i >= interceptor.options.RetryConfig.RetryMax { + break + } + + // 尝试更换域名 + newHost, pErr := interceptor.options.HostProvider.Provider() + if pErr != nil { + break + } + + if len(newHost) == 0 { + break + } + + if newHost != oldHost { + urlString := req.URL.String() + urlString = strings.Replace(urlString, oldHost, newHost, 1) + u, ppErr := url.Parse(urlString) + if ppErr != nil { + break + } + + reqBefore.Host = u.Host + reqBefore.URL = u + } + + req = reqBefore + + retryInterval := interceptor.options.RetryConfig.RetryInterval() + if retryInterval < time.Microsecond { + continue + } + time.Sleep(retryInterval) + } + return resp, err +} + +func isHostRetryable(req *http.Request, resp *http.Response, err error) bool { + return isRequestRetryable(req) && (isResponseRetryable(resp) || IsErrorRetryable(err)) +} diff --git a/internal/kodo/clientv2/interceptor_retry_simple.go b/internal/kodo/clientv2/interceptor_retry_simple.go new file mode 100644 index 0000000..301196d --- /dev/null +++ b/internal/kodo/clientv2/interceptor_retry_simple.go @@ -0,0 +1,194 @@ +package clientv2 + +import ( + "io" + "math/rand" + "net" + "net/http" + "net/url" + "os" + "strings" + "syscall" + "time" + + clientv1 "github.com/xushiwei/kodofs/internal/kodo/client" +) + +type RetryConfig struct { + RetryMax int // 最大重试次数 + RetryInterval func() time.Duration // 重试时间间隔 + ShouldRetry func(req *http.Request, resp *http.Response, err error) bool +} + +func (c *RetryConfig) init() { + if c == nil { + return + } + + if c.RetryMax < 0 { + c.RetryMax = 0 + } + + if c.RetryInterval == nil { + c.RetryInterval = func() time.Duration { + return time.Duration(50+rand.Int()%50) * time.Millisecond + } + } + + if c.ShouldRetry == nil { + c.ShouldRetry = func(req *http.Request, resp *http.Response, err error) bool { + return isSimpleRetryable(req, resp, err) + } + } +} + +type simpleRetryInterceptor struct { + config RetryConfig +} + +func NewSimpleRetryInterceptor(config RetryConfig) Interceptor { + return &simpleRetryInterceptor{ + config: config, + } +} + +func (interceptor *simpleRetryInterceptor) Priority() InterceptorPriority { + return InterceptorPriorityRetrySimple +} + +func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler Handler) (resp *http.Response, err error) { + if interceptor == nil || req == nil { + return handler(req) + } + + interceptor.config.init() + + // 不重试 + if interceptor.config.RetryMax <= 0 { + return handler(req) + } + + // 可能会被重试多次 + for i := 0; ; i++ { + // Clone 防止后面 Handler 处理对 req 有污染 + reqBefore := cloneReq(req.Context(), req) + resp, err = handler(req) + + if !interceptor.config.ShouldRetry(reqBefore, resp, err) { + return resp, err + } + req = reqBefore + + if i >= interceptor.config.RetryMax { + break + } + + retryInterval := interceptor.config.RetryInterval() + if retryInterval < time.Microsecond { + continue + } + time.Sleep(retryInterval) + } + return resp, err +} + +func isSimpleRetryable(req *http.Request, resp *http.Response, err error) bool { + return isRequestRetryable(req) && (isResponseRetryable(resp) || IsErrorRetryable(err)) +} + +func isRequestRetryable(req *http.Request) bool { + if req == nil { + return false + } + + if req.Body == nil { + return true + } + + if req.GetBody != nil { + b, err := req.GetBody() + if err != nil || b == nil { + return false + } + req.Body = b + return true + } + + seeker, ok := req.Body.(io.Seeker) + if !ok { + return false + } + + _, err := seeker.Seek(0, io.SeekStart) + return err == nil +} + +func isResponseRetryable(resp *http.Response) bool { + if resp == nil { + return false + } + return isStatusCodeRetryable(resp.StatusCode) +} + +func isStatusCodeRetryable(statusCode int) bool { + if statusCode < 500 { + return false + } + + if statusCode == 501 || statusCode == 509 || statusCode == 573 || statusCode == 579 || + statusCode == 608 || statusCode == 612 || statusCode == 614 || statusCode == 616 || statusCode == 618 || + statusCode == 630 || statusCode == 631 || statusCode == 632 || statusCode == 640 || statusCode == 701 { + return false + } + + return true +} + +func IsErrorRetryable(err error) bool { + if err == nil { + return false + } + + switch t := err.(type) { + case *net.OpError: + return isNetworkErrorWithOpError(t) + case *url.Error: + return IsErrorRetryable(t.Err) + case net.Error: + return t.Timeout() + case *clientv1.ErrorInfo: + return isStatusCodeRetryable(t.Code) + default: + if err == io.EOF { + return true + } + return false + } +} + +func isNetworkErrorWithOpError(err *net.OpError) bool { + if err == nil || err.Err == nil { + return false + } + + switch t := err.Err.(type) { + case *net.DNSError: + return true + case *os.SyscallError: + if errno, ok := t.Err.(syscall.Errno); ok { + return errno == syscall.ECONNABORTED || + errno == syscall.ECONNRESET || + errno == syscall.ECONNREFUSED || + errno == syscall.ETIMEDOUT + } + case *net.OpError: + return isNetworkErrorWithOpError(t) + default: + desc := err.Err.Error() + if strings.Contains(desc, "use of closed network connection") { + return true + } + } + + return false +} diff --git a/internal/kodo/clientv2/request.go b/internal/kodo/clientv2/request.go new file mode 100644 index 0000000..0cada2b --- /dev/null +++ b/internal/kodo/clientv2/request.go @@ -0,0 +1,92 @@ +package clientv2 + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/url" +) + +const ( + RequestMethodGet = "GET" + RequestMethodPut = "PUT" + RequestMethodPost = "POST" + RequestMethodHead = "HEAD" + RequestMethodDelete = "DELETE" +) + +type RequestBodyCreator func(options *RequestParams) (io.Reader, error) + +func RequestBodyCreatorOfJson(object interface{}) RequestBodyCreator { + body := object + return func(o *RequestParams) (io.Reader, error) { + reqBody, err := json.Marshal(body) + if err != nil { + return nil, err + } + o.Header.Add("Content-Type", "application/json") + return bytes.NewReader(reqBody), nil + } +} + +func RequestBodyCreatorForm(info map[string][]string) RequestBodyCreator { + body := FormStringInfo(info) + return func(o *RequestParams) (io.Reader, error) { + o.Header.Add("Content-Type", "application/x-www-form-urlencoded") + return bytes.NewBufferString(body), nil + } +} + +func FormStringInfo(info map[string][]string) string { + if len(info) == 0 { + return "" + } + return url.Values(info).Encode() +} + +type RequestParams struct { + Context context.Context + Method string + Url string + Header http.Header + BodyCreator RequestBodyCreator +} + +func (o *RequestParams) init() { + if o.Context == nil { + o.Context = context.Background() + } + + if len(o.Method) == 0 { + o.Method = RequestMethodGet + } + + if o.Header == nil { + o.Header = http.Header{} + } + + if o.BodyCreator == nil { + o.BodyCreator = func(options *RequestParams) (io.Reader, error) { + return nil, nil + } + } +} + +func NewRequest(options RequestParams) (*http.Request, error) { + options.init() + + body, cErr := options.BodyCreator(&options) + if cErr != nil { + return nil, cErr + } + + req, err := http.NewRequest(options.Method, options.Url, body) + if err != nil { + return nil, err + } + req = req.WithContext(options.Context) + req.Header = options.Header + return req, nil +} diff --git a/internal/kodo/clientv2/request_compatible.go b/internal/kodo/clientv2/request_compatible.go new file mode 100644 index 0000000..de10ae2 --- /dev/null +++ b/internal/kodo/clientv2/request_compatible.go @@ -0,0 +1,95 @@ +package clientv2 + +import ( + "context" + "mime/multipart" + "net/http" + "net/textproto" + "net/url" +) + +// 此处是为了版本兼容,sdk 支持最低版本为 go1.10, go1.13 提供 req.Clone 方法, +// 此处 copy 高版本的 go 标准库方法 +func cloneReq(ctx context.Context, r *http.Request) *http.Request { + if ctx == nil { + panic("nil context") + } + + r2 := r.WithContext(ctx) + if r.Header != nil { + r2.Header = cloneHeader(r.Header) + } + if r.Trailer != nil { + r2.Trailer = cloneHeader(r.Trailer) + } + if s := r.TransferEncoding; s != nil { + s2 := make([]string, len(s)) + copy(s2, s) + r2.TransferEncoding = s2 + } + r2.Form = cloneURLValues(r.Form) + r2.PostForm = cloneURLValues(r.PostForm) + r2.MultipartForm = cloneMultipartForm(r.MultipartForm) + return r2 +} + +func cloneHeader(h http.Header) http.Header { + if h == nil { + return nil + } + + // Find total number of values. + nv := 0 + for _, vv := range h { + nv += len(vv) + } + sv := make([]string, nv) // shared backing array for headers' values + h2 := make(http.Header, len(h)) + for k, vv := range h { + n := copy(sv, vv) + h2[k] = sv[:n:n] + sv = sv[n:] + } + return h2 +} + +func cloneURLValues(v url.Values) url.Values { + if v == nil { + return nil + } + + // http.Header and url.Values have the same representation, so temporarily + // treat it like http.Header, which does have a clone: + return url.Values(cloneHeader(http.Header(v))) +} + +func cloneMultipartForm(f *multipart.Form) *multipart.Form { + if f == nil { + return nil + } + f2 := &multipart.Form{ + Value: (map[string][]string)(cloneHeader(http.Header(f.Value))), + } + if f.File != nil { + m := make(map[string][]*multipart.FileHeader) + for k, vv := range f.File { + vv2 := make([]*multipart.FileHeader, len(vv)) + for i, v := range vv { + vv2[i] = cloneMultipartFileHeader(v) + } + m[k] = vv2 + } + f2.File = m + } + return f2 +} + +func cloneMultipartFileHeader(fh *multipart.FileHeader) *multipart.FileHeader { + if fh == nil { + return nil + } + fh2 := new(multipart.FileHeader) + *fh2 = *fh + fh2.Header = textproto.MIMEHeader(cloneHeader(http.Header(fh.Header))) + return fh2 +} diff --git a/internal/kodo/conf/conf.go b/internal/kodo/conf/conf.go new file mode 100644 index 0000000..1da4c7d --- /dev/null +++ b/internal/kodo/conf/conf.go @@ -0,0 +1,23 @@ +package conf + +import ( + "os" + "strings" +) + +const Version = "7.17.1" + +const ( + CONTENT_TYPE_JSON = "application/json" + CONTENT_TYPE_FORM = "application/x-www-form-urlencoded" + CONTENT_TYPE_OCTET = "application/octet-stream" + CONTENT_TYPE_MULTIPART = "multipart/form-data" + + disableQiniuTimestampSignatureEnvKey = "DISABLE_QINIU_TIMESTAMP_SIGNATURE" +) + +func IsDisableQiniuTimestampSignature() bool { + value := os.Getenv(disableQiniuTimestampSignatureEnvKey) + value = strings.ToLower(value) + return value == "true" || value == "yes" || value == "y" || value == "1" +} diff --git a/internal/kodo/form_upload.go b/internal/kodo/form_upload.go new file mode 100644 index 0000000..3e780bc --- /dev/null +++ b/internal/kodo/form_upload.go @@ -0,0 +1,1070 @@ +package kodo + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + "mime/multipart" + "net/http" + "net/textproto" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/xushiwei/kodofs/internal/kodo/api" + "github.com/xushiwei/kodofs/internal/kodo/auth" + "github.com/xushiwei/kodofs/internal/kodo/client" + "github.com/xushiwei/kodofs/internal/kodo/clientv2" + "github.com/xushiwei/kodofs/internal/kodo/hostprovider" + "golang.org/x/sync/singleflight" +) + +const ( + // 获取下一个分片Reader失败 + ErrNextReader = "ErrNextReader" + // 超过了最大的重试上传次数 + ErrMaxUpRetry = "ErrMaxUpRetry" +) + +type Region struct { + // 上传入口 + SrcUpHosts []string `json:"src_up,omitempty"` + + // 加速上传入口 + CdnUpHosts []string `json:"cdn_up,omitempty"` + + // 获取文件信息入口 + RsHost string `json:"rs,omitempty"` + + // bucket列举入口 + RsfHost string `json:"rsf,omitempty"` + + ApiHost string `json:"api,omitempty"` + + // 存储io 入口 + IovipHost string `json:"io,omitempty"` + + // 源站下载入口 + IoSrcHost string `json:"io_src,omitempty"` +} + +// 获取io host +func (r *Region) GetIoHost(useHttps bool) string { + return endpoint(useHttps, r.IovipHost) +} + +// 获取rsfHost +func (r *Region) GetRsfHost(useHttps bool) string { + return endpoint(useHttps, r.RsfHost) +} + +// ----------------------------------------------------------------------------------------- + +type Config struct { + //兼容保留 + Zone *Region //空间所在的存储区域 + + Region *Region + + // 如果设置的Host本身是以http://开头的,又设置了该字段为true,那么优先使用该字段,使用https协议 + // 同理如果该字段为false, 但是设置的host以https开头,那么使用http协议通信 + UseHTTPS bool //是否使用https域名 + UseCdnDomains bool //是否使用cdn加速域名 + CentralRsHost string //中心机房的RsHost,用于list bucket + + // 兼容保留 + RsHost string + RsfHost string + UpHost string + ApiHost string + IoHost string +} + +// GetRegion返回一个Region指针 +// 默认返回最新的Region, 如果该字段没有,那么返回兼容保留的Zone, 如果都为nil, 就返回nil +func (c *Config) GetRegion() *Region { + if c.Region != nil { + return c.Region + } + if c.Zone != nil { + return c.Zone + } + return nil +} + +// PutRet 为七牛标准的上传回复内容。 +// 如果使用了上传回调或者自定义了returnBody,那么需要根据实际情况,自己自定义一个返回值结构体 +type PutRet struct { + Hash string `json:"hash"` + PersistentID string `json:"persistentId"` + Key string `json:"key"` +} + +type FormUploader struct { + Client *client.Client + Cfg *Config +} + +func NewFormUploader(cfg *Config) *FormUploader { + if cfg == nil { + cfg = &Config{} + } + + return &FormUploader{ + Client: &client.DefaultClient, + Cfg: cfg, + } +} + +func NewFormUploaderEx(cfg *Config, clt *client.Client) *FormUploader { + if cfg == nil { + cfg = &Config{} + } + + if clt == nil { + clt = &client.DefaultClient + } + + return &FormUploader{ + Client: clt, + Cfg: cfg, + } +} + +// ----------------------------------------------------------------------------------------- + +type PutExtra struct { + // 可选,用户自定义参数,必须以 "x:" 开头。若不以x:开头,则忽略。 + Params map[string]string + + UpHost string + + TryTimes int // 可选。尝试次数 + + // 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。 + HostFreezeDuration time.Duration + + // 可选,当为 "" 时候,服务端自动判断。 + MimeType string + + // 上传事件:进度通知。这个事件的回调函数应该尽可能快地结束。 + OnProgress func(fsize, uploaded int64) +} + +const ( + defaultTryTimes = 3 +) + +func (extra *PutExtra) init() { + if extra.TryTimes == 0 { + extra.TryTimes = defaultTryTimes + } + if extra.HostFreezeDuration <= 0 { + extra.HostFreezeDuration = 10 * 60 * time.Second + } +} + +func (extra *PutExtra) getUpHost(useHttps bool) string { + return hostAddSchemeIfNeeded(useHttps, extra.UpHost) +} + +func hostAddSchemeIfNeeded(useHttps bool, host string) string { + if host == "" { + return "" + } else if strings.Contains(host, "://") { + return host + } else { + return endpoint(useHttps, host) + } +} + +func endpoint(useHttps bool, host string) string { + host = strings.TrimSpace(host) + if host == "" { + return "" + } + + if strings.HasPrefix(host, "http://") || + strings.HasPrefix(host, "https://") { + return host + } + + scheme := "http://" + if useHttps { + scheme = "https://" + } + return fmt.Sprintf("%s%s", scheme, host) +} + +// ----------------------------------------------------------------------------------------- + +type PutPolicy struct { + + // 指定上传的目标资源空间 Bucket 和资源键 Key(最大为 750 字节)。有三种格式: + // ,表示允许用户上传文件到指定的 bucket。在这种格式下文件只能新增(分片上传 v1 版 需要指定 insertOnly 为 1 才是新增,否则也为覆盖上传),若已存在同名资源(且文件内容/etag不一致),上传会失败;若已存在资源的内容/etag一致,则上传会返回成功。 + // :,表示只允许用户上传指定 key 的文件。在这种格式下文件默认允许修改,若已存在同名资源则会被覆盖。如果只希望上传指定 key 的文件,并且不允许修改,那么可以将下面的 insertOnly 属性值设为 1。 + // :,表示只允许用户上传指定以 keyPrefix 为前缀的文件,当且仅当 isPrefixalScope 字段为 1 时生效,isPrefixalScope 为 1 时无法覆盖上传。 + Scope string `json:"scope"` + + // 若为 1,表示允许用户上传以 scope 的 keyPrefix 为前缀的文件。 + IsPrefixalScope int `json:"isPrefixalScope,omitempty"` + + // 上传凭证有效截止时间。Unix时间戳,单位为秒。该截止时间为上传完成后,在七牛空间生成文件的校验时间,而非上传的开始时间, + // 一般建议设置为上传开始时间 + 3600s,用户可根据具体的业务场景对凭证截止时间进行调整。 + Expires uint64 `json:"deadline"` + + // 若非0, 即使Scope为 Bucket:Key 的形式也是insert only + InsertOnly uint16 `json:"insertOnly,omitempty"` + + // 唯一属主标识。特殊场景下非常有用,例如根据 App-Client 标识给图片或视频打水印。 + EndUser string `json:"endUser,omitempty"` + + // Web 端文件上传成功后,浏览器执行 303 跳转的 URL。通常用于表单上传。 + // 文件上传成功后会跳转到 ?upload_ret=包含 returnBody 内容。 + // 如不设置 returnUrl,则直接将 returnBody 的内容返回给客户端。 + ReturnURL string `json:"returnUrl,omitempty"` + + // 上传成功后,自定义七牛云最终返回給上传端(在指定 returnUrl 时是携带在跳转路径参数中)的数据。支持魔法变量和自定义变量。 + // returnBody 要求是合法的 JSON 文本。 + // 例如 {“key”: $(key), “hash”: $(etag), “w”: $(imageInfo.width), “h”: $(imageInfo.height)}。 + ReturnBody string `json:"returnBody,omitempty"` + + // 上传成功后,七牛云向业务服务器发送 POST 请求的 URL。必须是公网上可以正常进行 POST 请求并能响应 HTTP/1.1 200 OK 的有效 URL。 + // 另外,为了给客户端有一致的体验,我们要求 callbackUrl 返回包 Content-Type 为 “application/json”,即返回的内容必须是合法的 + // JSON 文本。出于高可用的考虑,本字段允许设置多个 callbackUrl(用英文符号 ; 分隔),在前一个 callbackUrl 请求失败的时候会依次 + // 重试下一个 callbackUrl。一个典型例子是:http:///callback;http:///callback,并同时指定下面的 callbackHost 字段。 + // 在 callbackUrl 中使用 ip 的好处是减少对 dns 解析的依赖,可改善回调的性能和稳定性。指定 callbackUrl,必须指定 callbackbody, + // 且值不能为空。 + CallbackURL string `json:"callbackUrl,omitempty"` + + // 上传成功后,七牛云向业务服务器发送回调通知时的 Host 值。与 callbackUrl 配合使用,仅当设置了 callbackUrl 时才有效。 + CallbackHost string `json:"callbackHost,omitempty"` + + // 上传成功后,七牛云向业务服务器发送 Content-Type: application/x-www-form-urlencoded 的 POST 请求。业务服务器可以通过直接读取 + // 请求的 query 来获得该字段,支持魔法变量和自定义变量。callbackBody 要求是合法的 url query string。 + // 例如key=$(key)&hash=$(etag)&w=$(imageInfo.width)&h=$(imageInfo.height)。如果callbackBodyType指定为application/json, + // 则callbackBody应为json格式,例如:{“key”:"$(key)",“hash”:"$(etag)",“w”:"$(imageInfo.width)",“h”:"$(imageInfo.height)"}。 + CallbackBody string `json:"callbackBody,omitempty"` + + // 上传成功后,七牛云向业务服务器发送回调通知 callbackBody 的 Content-Type。默认为 application/x-www-form-urlencoded,也可设置 + // 为 application/json。 + CallbackBodyType string `json:"callbackBodyType,omitempty"` + + // 资源上传成功后触发执行的预转持久化处理指令列表。fileType=2或3(上传归档存储或深度归档存储文件)时,不支持使用该参数。支持魔法变量和自 + // 定义变量。每个指令是一个 API 规格字符串,多个指令用;分隔。请参阅persistenOps详解与示例。同时添加 persistentPipeline 字段,使用专 + // 用队列处理,请参阅persistentPipeline。 + PersistentOps string `json:"persistentOps,omitempty"` + + // 接收持久化处理结果通知的 URL。必须是公网上可以正常进行 POST 请求并能响应 HTTP/1.1 200 OK 的有效 URL。该 URL 获取的内容和持久化处 + // 理状态查询的处理结果一致。发送 body 格式是 Content-Type 为 application/json 的 POST 请求,需要按照读取流的形式读取请求的 body + // 才能获取。 + PersistentNotifyURL string `json:"persistentNotifyUrl,omitempty"` + + // 转码队列名。资源上传成功后,触发转码时指定独立的队列进行转码。为空则表示使用公用队列,处理速度比较慢。建议使用专用队列。 + PersistentPipeline string `json:"persistentPipeline,omitempty"` + + // saveKey 的优先级设置。为 true 时,saveKey不能为空,会忽略客户端指定的key,强制使用saveKey进行文件命名。参数不设置时, + // 默认值为false + ForceSaveKey bool `json:"forceSaveKey,omitempty"` // + + // 自定义资源名。支持魔法变量和自定义变量。forceSaveKey 为false时,这个字段仅当用户上传的时候没有主动指定 key 时起作用; + // forceSaveKey 为true时,将强制按这个字段的格式命名。 + SaveKey string `json:"saveKey,omitempty"` + + // 限定上传文件大小最小值,单位Byte。小于限制上传文件大小的最小值会被判为上传失败,返回 403 状态码 + FsizeMin int64 `json:"fsizeMin,omitempty"` + + // 限定上传文件大小最大值,单位Byte。超过限制上传文件大小的最大值会被判为上传失败,返回 413 状态码。 + FsizeLimit int64 `json:"fsizeLimit,omitempty"` + + // 开启 MimeType 侦测功能,并按照下述规则进行侦测;如不能侦测出正确的值,会默认使用 application/octet-stream 。 + // 设为非 0 值,则忽略上传端传递的文件 MimeType 信息,并按如下顺序侦测 MimeType 值: + // 1. 侦测内容; 2. 检查文件扩展名; 3. 检查 Key 扩展名。 + // 默认设为 0 值,如上传端指定了 MimeType 则直接使用该值,否则按如下顺序侦测 MimeType 值: + // 1. 检查文件扩展名; 2. 检查 Key 扩展名; 3. 侦测内容。 + DetectMime uint8 `json:"detectMime,omitempty"` + + // 限定用户上传的文件类型。指定本字段值,七牛服务器会侦测文件内容以判断 MimeType,再用判断值跟指定值进行匹配,匹配成功则允许上传,匹配失败则返回 403 状态码。示例: + // image/* 表示只允许上传图片类型 + // image/jpeg;image/png 表示只允许上传 jpg 和 png 类型的图片 + // !application/json;text/plain 表示禁止上传 json 文本和纯文本。注意最前面的感叹号! + MimeLimit string `json:"mimeLimit,omitempty"` + + // 资源的存储类型,0表示标准存储,1 表示低频存储,2 表示归档存储,3 表示深度归档存储。 + FileType int `json:"fileType,omitempty"` + + CallbackFetchKey uint8 `json:"callbackFetchKey,omitempty"` + + DeleteAfterDays int `json:"deleteAfterDays,omitempty"` +} + +// UploadToken 方法用来进行上传凭证的生成 +// 该方法生成的过期时间是现对于现在的时间 +func (p *PutPolicy) UploadToken(cred *auth.Credentials) string { + return p.uploadToken(cred) +} + +func (p PutPolicy) uploadToken(cred *auth.Credentials) (token string) { + if p.Expires == 0 { + p.Expires = 3600 // 默认一小时过期 + } + p.Expires += uint64(time.Now().Unix()) + putPolicyJSON, _ := json.Marshal(p) + token = cred.SignWithData(putPolicyJSON) + return +} + +// ----------------------------------------------------------------------------------------- + +// Put 用来以表单方式上传一个文件。 +// +// ctx 是请求的上下文。 +// ret 是上传成功后返回的数据。如果 uptoken 中没有设置 callbackUrl 或 returnBody,那么返回的数据结构是 PutRet 结构。 +// uptoken 是由业务服务器颁发的上传凭证。 +// key 是要上传的文件访问路径。比如:"foo/bar.jpg"。注意我们建议 key 不要以 '/' 开头。另外,key 为空字符串是合法的。 +// data 是文件内容的访问接口(io.Reader)。 +// fsize 是要上传的文件大小。 +// extra 是上传的一些可选项。可以指定为nil。详细见 PutExtra 结构的描述。 +func (p *FormUploader) Put( + ctx context.Context, ret interface{}, uptoken, key string, data io.Reader, size int64, extra *PutExtra) (err error) { + err = p.put(ctx, ret, uptoken, key, true, data, size, extra, path.Base(key)) + return +} + +func (p *FormUploader) PutFile( + ctx context.Context, ret interface{}, uptoken, key, localFile string, extra *PutExtra) (err error) { + return p.putFile(ctx, ret, uptoken, key, true, localFile, extra) +} + +func (p *FormUploader) putFile( + ctx context.Context, ret interface{}, upToken string, + key string, hasKey bool, localFile string, extra *PutExtra) (err error) { + + f, err := os.Open(localFile) + if err != nil { + return + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + return + } + fsize := fi.Size() + + return p.put(ctx, ret, upToken, key, hasKey, f, fsize, extra, filepath.Base(localFile)) +} + +func (p *FormUploader) put( + ctx context.Context, ret interface{}, upToken string, + key string, hasKey bool, data io.Reader, size int64, extra *PutExtra, fileName string) error { + + if extra == nil { + extra = &PutExtra{} + } + extra.init() + + seekableData, ok := data.(io.ReadSeeker) + if !ok { + dataBytes, rErr := io.ReadAll(data) + if rErr != nil { + return rErr + } + if size <= 0 { + size = int64(len(dataBytes)) + } + seekableData = bytes.NewReader(dataBytes) + } + + return p.putSeekableData(ctx, ret, upToken, key, hasKey, seekableData, size, extra, fileName) +} + +func (p *FormUploader) putSeekableData(ctx context.Context, ret interface{}, upToken string, + key string, hasKey bool, data io.ReadSeeker, dataSize int64, extra *PutExtra, fileName string) error { + + formFieldBuff := new(bytes.Buffer) + formWriter := multipart.NewWriter(formFieldBuff) + // 写入表单头、token、key、fileName 等信息 + if wErr := writeMultipart(formWriter, upToken, key, hasKey, extra, fileName); wErr != nil { + return wErr + } + + // 计算文件 crc32 + crc32Hash := crc32.NewIEEE() + if _, cErr := io.Copy(crc32Hash, data); cErr != nil { + return cErr + } + crcReader := newCrc32Reader(formWriter.Boundary(), crc32Hash) + crcBytes, rErr := io.ReadAll(crcReader) + if rErr != nil { + return rErr + } + crcReader = nil + + // 表单写入文件 crc32 + if _, wErr := formFieldBuff.Write(crcBytes); wErr != nil { + return wErr + } + crcBytes = nil + + formHead := make(textproto.MIMEHeader) + formHead.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, + escapeQuotes(fileName))) + if extra.MimeType != "" { + formHead.Set("Content-Type", extra.MimeType) + } + if _, cErr := formWriter.CreatePart(formHead); cErr != nil { + return cErr + } + formHead = nil + + // 表单 Fields + formFieldData := formFieldBuff.Bytes() + formFieldBuff = nil + + // 表单最后一行 + formEndLine := []byte(fmt.Sprintf("\r\n--%s--\r\n", formWriter.Boundary())) + + // 不再重新构造 formBody ,避免内存峰值问题 + var formBodyLen int64 = -1 + if dataSize >= 0 { + formBodyLen = int64(len(formFieldData)) + dataSize + int64(len(formEndLine)) + } + + progress := newUploadProgress(extra.OnProgress) + getBodyReader := func() (io.Reader, error) { + if _, err := data.Seek(0, io.SeekStart); err != nil { + return nil, err + } + + var formReader = io.MultiReader(bytes.NewReader(formFieldData), data, bytes.NewReader(formEndLine)) + if extra.OnProgress != nil { + formReader = &readerWithProgress{reader: formReader, fsize: formBodyLen, onProgress: progress.onProgress} + } + return formReader, nil + } + getBodyReadCloser := func() (io.ReadCloser, error) { + reader, err := getBodyReader() + if err != nil { + return nil, err + } + return io.NopCloser(reader), nil + } + + var err error + var hostProvider hostprovider.HostProvider = nil + if extra.UpHost != "" { + hostProvider = hostprovider.NewWithHosts([]string{extra.getUpHost(p.Cfg.UseHTTPS)}) + } else { + hostProvider, err = p.getUpHostProviderFromUploadToken(upToken, extra) + if err != nil { + return err + } + } + + // 上传 + contentType := formWriter.FormDataContentType() + headers := http.Header{} + headers.Add("Content-Type", contentType) + err = doUploadAction(hostProvider, extra.TryTimes, extra.HostFreezeDuration, func(host string) error { + reader, gErr := getBodyReader() + if gErr != nil { + return gErr + } + + return p.Client.CallWithBodyGetter(ctx, ret, "POST", host, headers, reader, getBodyReadCloser, formBodyLen) + }) + if err != nil { + return err + } + if extra.OnProgress != nil { + extra.OnProgress(formBodyLen, formBodyLen) + } + return nil +} + +func (p *FormUploader) getUpHostProviderFromUploadToken(upToken string, extra *PutExtra) (hostprovider.HostProvider, error) { + ak, bucket, err := getAkBucketFromUploadToken(upToken) + if err != nil { + return nil, err + } + return getUpHostProvider(p.Cfg, extra.TryTimes, extra.HostFreezeDuration, ak, bucket) +} + +// retryMax: 为 0,使用默认值,每个域名只请求一次 +// hostFreezeDuration: 为 0,使用默认值:50ms ~ 100ms +func getUpHostProvider(config *Config, retryMax int, hostFreezeDuration time.Duration, ak, bucket string) (hostprovider.HostProvider, error) { + region := config.GetRegion() + var err error + if region == nil { + if region, err = GetRegionWithOptions(ak, bucket, UCApiOptions{ + RetryMax: retryMax, + HostFreezeDuration: hostFreezeDuration, + }); err != nil { + return nil, err + } + } + + hosts := make([]string, 0, 4) + if config.UseCdnDomains && len(region.CdnUpHosts) > 0 { + hosts = append(hosts, region.CdnUpHosts...) + } else if len(region.SrcUpHosts) > 0 { + hosts = append(hosts, region.SrcUpHosts...) + } + + for i := 0; i < len(hosts); i++ { + hosts[i] = endpoint(config.UseHTTPS, hosts[i]) + } + + return hostprovider.NewWithHosts(hosts), nil +} + +func getAkBucketFromUploadToken(token string) (ak, bucket string, err error) { + items := strings.Split(token, ":") + // KODO-11919 + if len(items) == 5 && items[0] == "" { + items = items[2:] + } else if len(items) != 3 { + err = errors.New("invalid upload token, format error") + return + } + + ak = items[0] + policyBytes, dErr := base64.URLEncoding.DecodeString(items[2]) + if dErr != nil { + err = errors.New("invalid upload token, invalid put policy") + return + } + + putPolicy := PutPolicy{} + uErr := json.Unmarshal(policyBytes, &putPolicy) + if uErr != nil { + err = errors.New("invalid upload token, invalid put policy") + return + } + + bucket = strings.Split(putPolicy.Scope, ":")[0] + return +} + +type UCApiOptions struct { + UseHttps bool // + RetryMax int // 单域名重试次数 + // 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。 + HostFreezeDuration time.Duration +} + +// 此处废弃,但为了兼容老版本,单独放置一个文件 + +// UcQueryServerInfo 为查询请求回复中的上传域名信息 +type UcQueryServerInfo struct { + Main []string `json:"main,omitempty"` + Backup []string `json:"backup,omitempty"` + Info string `json:"info,omitempty"` +} + +type UcQueryUp = UcQueryServerInfo +type UcQueryIo = UcQueryServerInfo + +// UcQueryRet 为查询请求的回复 +type UcQueryRet struct { + TTL int `json:"ttl"` + Io map[string]map[string][]string `json:"-"` + IoInfo map[string]UcQueryIo `json:"io"` + IoSrcInfo map[string]UcQueryIo `json:"io_src"` + Up map[string]UcQueryUp `json:"up"` + RsInfo map[string]UcQueryServerInfo `json:"rs"` + RsfInfo map[string]UcQueryServerInfo `json:"rsf"` + ApiInfo map[string]UcQueryServerInfo `json:"api"` +} + +func (uc *UcQueryRet) getOneHostFromInfo(info map[string]UcQueryIo) string { + if len(info["src"].Main) > 0 { + return info["src"].Main[0] + } + + if len(info["acc"].Main) > 0 { + return info["acc"].Main[0] + } + + return "" +} + +var ucQueryV2Group singleflight.Group + +type regionV2CacheValue struct { + Region *Region `json:"region"` + Deadline time.Time `json:"deadline"` +} + +type regionV2CacheMap map[string]regionV2CacheValue + +const regionV2CacheFileName = "query_v2_00.cache.json" + +var ( + regionV2CachePath = filepath.Join(os.TempDir(), "qiniu-golang-sdk", regionV2CacheFileName) + regionV2Cache sync.Map + regionV2CacheLock sync.RWMutex + regionV2CacheSyncLock sync.Mutex + regionV2CacheLoaded bool = false +) + +func loadRegionV2Cache() { + cacheFile, err := os.Open(regionV2CachePath) + if err != nil { + return + } + defer cacheFile.Close() + + var cacheMap regionV2CacheMap + if err = json.NewDecoder(cacheFile).Decode(&cacheMap); err != nil { + return + } + for cacheKey, cacheValue := range cacheMap { + regionV2Cache.Store(cacheKey, cacheValue) + } +} + +func storeRegionV2Cache() { + err := os.MkdirAll(filepath.Dir(regionV2CachePath), 0700) + if err != nil { + return + } + + cacheFile, err := os.OpenFile(regionV2CachePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + if err != nil { + return + } + defer cacheFile.Close() + + cacheMap := make(regionV2CacheMap) + regionV2Cache.Range(func(cacheKey, cacheValue interface{}) bool { + cacheMap[cacheKey.(string)] = cacheValue.(regionV2CacheValue) + return true + }) + if err = json.NewEncoder(cacheFile).Encode(cacheMap); err != nil { + return + } +} + +const ( + defaultApiHost = "api.qiniu.com" + defaultUcHost0 = "kodo-config.qiniuapi.com" + defaultUcHost1 = "uc.qbox.me" +) + +// UcHost 为查询空间相关域名的 API 服务地址 +// 设置 UcHost 时,如果不指定 scheme 默认会使用 https +// Deprecated 使用 SetUcHosts 替换 +var UcHost = "" + +// 公有云包括 defaultApiHost,非 uc query api 使用时需要移除 defaultApiHost +// 用户配置时,不能配置 api 域名 +var ucHosts = []string{defaultUcHost0, defaultUcHost1, defaultApiHost} + +func getUcHost(useHttps bool) string { + // 兼容老版本,优先使用 UcHost + host := "" + if len(UcHost) > 0 { + host = UcHost + } else if len(ucHosts) > 0 { + host = ucHosts[0] + } + return endpoint(useHttps, host) +} + +type ucClientConfig struct { + // 非 uc query api 需要去除默认域名 defaultApiHost + IsUcQueryApi bool + + // 单域名重试次数 + RetryMax int + + // 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。 + HostFreezeDuration time.Duration + + Client *client.Client +} + +// 不带 scheme +func getUcBackupHosts() []string { + var hosts []string + if len(UcHost) > 0 { + hosts = append(hosts, removeHostScheme(UcHost)) + } + + for _, host := range ucHosts { + if len(host) > 0 { + hosts = append(hosts, removeHostScheme(host)) + } + } + + hosts = removeRepeatStringItem(hosts) + return hosts +} + +func removeRepeatStringItem(slc []string) []string { + var result []string + tempMap := map[string]uint8{} + for _, e := range slc { + l := len(tempMap) + tempMap[e] = 0 + if len(tempMap) != l { + result = append(result, e) + } + } + return result +} + +func removeHostScheme(host string) string { + host = strings.TrimPrefix(host, "http://") + host = strings.TrimPrefix(host, "https://") + return host +} + +func getUCClient(config ucClientConfig, mac *auth.Credentials) clientv2.Client { + allHosts := getUcBackupHosts() + var hosts []string = nil + if !config.IsUcQueryApi { + // 非 uc query api 去除 defaultApiHost + for _, host := range allHosts { + if host != defaultApiHost { + hosts = append(hosts, host) + } + } + } else { + hosts = allHosts + } + + is := []clientv2.Interceptor{ + clientv2.NewHostsRetryInterceptor(clientv2.HostsRetryConfig{ + RetryConfig: clientv2.RetryConfig{ + RetryMax: len(hosts), + RetryInterval: nil, + ShouldRetry: nil, + }, + ShouldFreezeHost: nil, + HostFreezeDuration: 0, + HostProvider: hostprovider.NewWithHosts(hosts), + }), + clientv2.NewSimpleRetryInterceptor(clientv2.RetryConfig{ + RetryMax: config.RetryMax, + RetryInterval: nil, + ShouldRetry: nil, + }), + } + + if mac != nil { + is = append(is, clientv2.NewAuthInterceptor(clientv2.AuthConfig{ + Credentials: *mac, + TokenType: auth.TokenQiniu, + })) + } + + return clientv2.NewClient(clientv2.NewClientWithClientV1(config.Client), is...) +} + +func getRegionByV2(ak, bucket string, options UCApiOptions) (*Region, error) { + + regionV2CacheLock.RLock() + if regionV2CacheLoaded { + regionV2CacheLock.RUnlock() + } else { + regionV2CacheLock.RUnlock() + func() { + regionV2CacheLock.Lock() + defer regionV2CacheLock.Unlock() + + if !regionV2CacheLoaded { + loadRegionV2Cache() + regionV2CacheLoaded = true + } + }() + } + + regionID := fmt.Sprintf("%s:%s", ak, bucket) + //check from cache + if v, ok := regionV2Cache.Load(regionID); ok && time.Now().Before(v.(regionV2CacheValue).Deadline) { + return v.(regionV2CacheValue).Region, nil + } + + newRegion, err, _ := ucQueryV2Group.Do(regionID, func() (interface{}, error) { + reqURL := fmt.Sprintf("%s/v2/query?ak=%s&bucket=%s", getUcHost(options.UseHttps), ak, bucket) + + var ret UcQueryRet + c := getUCClient(ucClientConfig{ + IsUcQueryApi: true, + RetryMax: options.RetryMax, + HostFreezeDuration: options.HostFreezeDuration, + }, nil) + _, err := clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{ + Context: context.Background(), + Method: clientv2.RequestMethodGet, + Url: reqURL, + Header: nil, + BodyCreator: nil, + }, &ret) + if err != nil { + return nil, fmt.Errorf("query region error, %s", err.Error()) + } + + ioHost := ret.getOneHostFromInfo(ret.IoInfo) + if len(ioHost) == 0 { + return nil, fmt.Errorf("empty io host list") + } + + ioSrcHost := ret.getOneHostFromInfo(ret.IoSrcInfo) + if len(ioHost) == 0 { + return nil, fmt.Errorf("empty io host list") + } + + rsHost := ret.getOneHostFromInfo(ret.RsInfo) + if len(rsHost) == 0 { + return nil, fmt.Errorf("empty rs host list") + } + + rsfHost := ret.getOneHostFromInfo(ret.RsfInfo) + if len(rsfHost) == 0 { + return nil, fmt.Errorf("empty rsf host list") + } + + apiHost := ret.getOneHostFromInfo(ret.ApiInfo) + if len(apiHost) == 0 { + return nil, fmt.Errorf("empty api host list") + } + + srcUpHosts := ret.Up["src"].Main + if ret.Up["src"].Backup != nil { + srcUpHosts = append(srcUpHosts, ret.Up["src"].Backup...) + } + cdnUpHosts := ret.Up["acc"].Main + if ret.Up["acc"].Backup != nil { + cdnUpHosts = append(cdnUpHosts, ret.Up["acc"].Backup...) + } + + region := &Region{ + SrcUpHosts: srcUpHosts, + CdnUpHosts: cdnUpHosts, + IovipHost: ioHost, + RsHost: rsHost, + RsfHost: rsfHost, + ApiHost: apiHost, + IoSrcHost: ioSrcHost, + } + + regionV2Cache.Store(regionID, regionV2CacheValue{ + Region: region, + Deadline: time.Now().Add(time.Duration(ret.TTL) * time.Second), + }) + + regionV2CacheSyncLock.Lock() + defer regionV2CacheSyncLock.Unlock() + + storeRegionV2Cache() + return region, nil + }) + if newRegion == nil { + return nil, err + } + + return newRegion.(*Region), err +} + +// Zone 是Region的别名 +// 兼容保留 +type Zone = Region + +// GetZone 用来根据ak和bucket来获取空间相关的机房信息 +// 新版本使用GetRegion, 这个函数用来保持兼容 +func GetZone(ak, bucket string) (zone *Zone, err error) { + return GetRegion(ak, bucket) +} + +func DefaultUCApiOptions() UCApiOptions { + return UCApiOptions{ + UseHttps: true, + RetryMax: 0, + HostFreezeDuration: 0, + } +} + +// GetRegion 用来根据ak和bucket来获取空间相关的机房信息 +// 延用 v2, v2 结构和 v4 结构不同且暂不可替代 +// Deprecated 使用 GetRegionWithOptions 替换 +func GetRegion(ak, bucket string) (*Region, error) { + return GetRegionWithOptions(ak, bucket, DefaultUCApiOptions()) +} + +// GetRegionWithOptions 用来根据ak和bucket来获取空间相关的机房信息 +func GetRegionWithOptions(ak, bucket string, options UCApiOptions) (*Region, error) { + return getRegionByV2(ak, bucket, options) +} + +func shouldUploadRetryWithOtherHost(err error) bool { + return clientv2.IsErrorRetryable(err) +} + +func doUploadAction(hostProvider hostprovider.HostProvider, retryMax int, freezeDuration time.Duration, action func(host string) error) error { + for { + host, err := hostProvider.Provider() + if err != nil { + return api.NewError(ErrMaxUpRetry, err.Error()) + } + + for i := 0; ; i++ { + err = action(host) + + // 请求成功 + if err == nil { + return nil + } + + // 不可重试错误 + if !shouldUploadRetryWithOtherHost(err) { + return err + } + + // 超过重试次数退出 + if i >= retryMax { + break + } + } + + // 单个 host 失败,冻结此 host,换其他 host + _ = hostProvider.Freeze(host, err, freezeDuration) + } +} + +var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"") + +func escapeQuotes(s string) string { + return quoteEscaper.Replace(s) +} + +func writeMultipart(writer *multipart.Writer, uptoken, key string, hasKey bool, + extra *PutExtra, fileName string) (err error) { + + //token + if err = writer.WriteField("token", uptoken); err != nil { + return + } + + //key + if hasKey { + if err = writer.WriteField("key", key); err != nil { + return + } + } + + //extra.Params + if extra.Params != nil { + for k, v := range extra.Params { + if (strings.HasPrefix(k, "x:") || strings.HasPrefix(k, "x-qn-meta-")) && v != "" { + err = writer.WriteField(k, v) + if err != nil { + return + } + } + } + } + + return err +} + +// ----------------------------------------------------------------------------------------- + +type crc32Reader struct { + h hash.Hash32 + boundary string + r io.Reader + inited bool + nlDashBoundaryNl string + header string + crc32PadLen int64 +} + +func newCrc32Reader(boundary string, h hash.Hash32) *crc32Reader { + nlDashBoundaryNl := fmt.Sprintf("\r\n--%s\r\n", boundary) + header := `Content-Disposition: form-data; name="crc32"` + "\r\n\r\n" + return &crc32Reader{ + h: h, + boundary: boundary, + nlDashBoundaryNl: nlDashBoundaryNl, + header: header, + crc32PadLen: 10, + } +} + +func (r *crc32Reader) Read(p []byte) (int, error) { + if !r.inited { + crc32Sum := r.h.Sum32() + crc32Line := r.nlDashBoundaryNl + r.header + fmt.Sprintf("%010d", crc32Sum) //padding crc32 results to 10 digits + r.r = strings.NewReader(crc32Line) + r.inited = true + } + return r.r.Read(p) +} + +/* +func (r crc32Reader) length() (length int64) { + return int64(len(r.nlDashBoundaryNl+r.header)) + r.crc32PadLen +} +*/ + +// ----------------------------------------------------------------------------------------- + +type uploadProgress struct { + lastUploadedBytes int64 + progress func(totalBytes, uploadedBytes int64) +} + +func newUploadProgress(progressHandler func(totalBytes, uploadedBytes int64)) *uploadProgress { + return &uploadProgress{ + lastUploadedBytes: 0, + progress: progressHandler, + } +} + +func (p *uploadProgress) onProgress(totalBytes, uploadedBytes int64) { + if p.progress == nil { + return + } + + if p.lastUploadedBytes >= uploadedBytes { + // 过滤重新上传的场景 + return + } + p.lastUploadedBytes = uploadedBytes + p.progress(totalBytes, uploadedBytes) +} + +type readerWithProgress struct { + reader io.Reader + uploaded int64 + fsize int64 + onProgress func(fsize, uploaded int64) +} + +func (p *readerWithProgress) Read(b []byte) (n int, err error) { + if p.uploaded > 0 { + p.onProgress(p.fsize, p.uploaded) + } + + n, err = p.reader.Read(b) + p.uploaded += int64(n) + if p.fsize > 0 && p.uploaded > p.fsize { + p.uploaded = p.fsize + } + return +} + +// ----------------------------------------------------------------------------------------- diff --git a/internal/kodo/freezer/freezer.go b/internal/kodo/freezer/freezer.go new file mode 100644 index 0000000..3490d2a --- /dev/null +++ b/internal/kodo/freezer/freezer.go @@ -0,0 +1,48 @@ +package freezer + +import ( + "sync" + "time" +) + +type Freezer interface { + Available(itemId string) bool + Freeze(itemId string, duration time.Duration) error + Unfreeze(itemId string) error +} + +func New() Freezer { + return &freezer{ + freezerItems: &sync.Map{}, + } +} + +type freezer struct { + freezerItems *sync.Map +} + +func (i *freezer) Available(itemId string) bool { + unfreezeTime, ok := i.freezerItems.Load(itemId) + if !ok { + return true + } + + unfreezeTimeInt64, ok := unfreezeTime.(int64) + if !ok { + return false + } + + timestamp := time.Now().Unix() + return timestamp > unfreezeTimeInt64 +} + +func (i *freezer) Freeze(itemId string, duration time.Duration) error { + timestamp := time.Now().Unix() + i.freezerItems.Store(itemId, timestamp+int64(duration/time.Second)) + return nil +} + +func (i *freezer) Unfreeze(itemId string) error { + i.freezerItems.Delete(itemId) + return nil +} diff --git a/internal/kodo/hostprovider/host_provider.go b/internal/kodo/hostprovider/host_provider.go new file mode 100644 index 0000000..279b9f9 --- /dev/null +++ b/internal/kodo/hostprovider/host_provider.go @@ -0,0 +1,53 @@ +package hostprovider + +import ( + "errors" + "time" + + "github.com/xushiwei/kodofs/internal/kodo/freezer" +) + +type HostProvider interface { + Provider() (string, error) + Freeze(host string, cause error, duration time.Duration) error +} + +func NewWithHosts(hosts []string) HostProvider { + return &arrayHostProvider{ + hosts: hosts, + freezer: freezer.New(), + } +} + +type arrayHostProvider struct { + hosts []string + freezer freezer.Freezer + lastFreezeErr error +} + +func (a *arrayHostProvider) Provider() (string, error) { + if len(a.hosts) == 0 { + return "", errors.New("no host found") + } + + for _, host := range a.hosts { + if a.freezer.Available(host) { + return host, nil + } + } + + if a.lastFreezeErr != nil { + return "", a.lastFreezeErr + } else { + return "", errors.New("all hosts are frozen") + } +} + +func (a *arrayHostProvider) Freeze(host string, cause error, duration time.Duration) error { + if duration <= 0 { + return nil + } + + a.lastFreezeErr = cause + return a.freezer.Freeze(host, duration) +} diff --git a/internal/kodo/log/logger.go b/internal/kodo/log/logger.go new file mode 100644 index 0000000..6cffaa3 --- /dev/null +++ b/internal/kodo/log/logger.go @@ -0,0 +1,76 @@ +// Package log只是SDK本身自己使用,用来调试代码使用,比如输出HTTP请求和响应信息 +package log + +import ( + "io" + "log" + "os" +) + +type Logger struct { + *log.Logger + level LogLevel +} + +// New 返回一个Logger 指针 +func New(out io.Writer, prefix string, flag int, level LogLevel) *Logger { + return &Logger{ + Logger: log.New(out, prefix, flag), + level: level, + } +} + +var ( + std = New(os.Stdout, DebugPrefix, log.LstdFlags, LogDebug) + info = New(os.Stdout, InfoPrefix, log.LstdFlags, LogInfo) + warn = New(os.Stdout, WarnPrefix, log.LstdFlags, LogWarn) +) + +type LogLevel int + +const ( + // LogDebug 调试模式 + LogDebug LogLevel = iota + + // Info + LogInfo + + // Warn + LogWarn +) + +const ( + InfoPrefix = "[I] " + DebugPrefix = "[D] " + WarnPrefix = "[W] " +) + +func (l *Logger) Info(v ...interface{}) { + l.output(LogInfo, v...) +} + +func (l *Logger) output(level LogLevel, v ...interface{}) { + if l.level <= level { + l.Logger.Println(v...) + } +} + +func (l *Logger) Debug(v ...interface{}) { + l.output(LogDebug, v...) +} + +func (l *Logger) Warn(v ...interface{}) { + l.output(LogWarn, v...) +} + +func Debug(v ...interface{}) { + std.Debug(v...) +} + +func Info(v ...interface{}) { + info.Info(v...) +} + +func Warn(v ...interface{}) { + warn.Warn(v...) +} diff --git a/internal/kodo/log/logger_test.go b/internal/kodo/log/logger_test.go new file mode 100644 index 0000000..7e6bafc --- /dev/null +++ b/internal/kodo/log/logger_test.go @@ -0,0 +1,23 @@ +//go:build unit +// +build unit + +package log + +import ( + "bytes" + "log" + "strings" + "testing" +) + +func TestLogger(t *testing.T) { + b := bytes.Buffer{} + logger := New(&b, InfoPrefix, log.LstdFlags, LogInfo) + + logger.Info("hello world") + + splits := strings.Split(b.String(), " ") + if splits[0] != strings.Trim(InfoPrefix, " ") { + t.Errorf("got prefix: %q, want: %q\n", splits[0], InfoPrefix) + } +} diff --git a/internal/kodo/reqid/reqid.go b/internal/kodo/reqid/reqid.go new file mode 100644 index 0000000..93a3fed --- /dev/null +++ b/internal/kodo/reqid/reqid.go @@ -0,0 +1,22 @@ +package reqid + +import ( + "context" +) + +// ----------------------------------------------------------------------------------------- + +type reqidKey struct{} + +// WithReqid 把reqid加入context中 +func WithReqid(ctx context.Context, reqid string) context.Context { + return context.WithValue(ctx, reqidKey{}, reqid) +} + +// ReqidFromContext 从context中获取reqid +func ReqidFromContext(ctx context.Context) (reqid string, ok bool) { + reqid, ok = ctx.Value(reqidKey{}).(string) + return +} + +// ----------------------------------------------------------------------------------------- diff --git a/kodofs.go b/kodofs.go new file mode 100644 index 0000000..06852c4 --- /dev/null +++ b/kodofs.go @@ -0,0 +1,5 @@ +package kodofs + +// ----------------------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------------------- diff --git a/kodoutil/kodoutil.go b/kodoutil/kodoutil.go new file mode 100644 index 0000000..9046f65 --- /dev/null +++ b/kodoutil/kodoutil.go @@ -0,0 +1,99 @@ +package kodoutil + +import ( + "context" + "io" + "io/fs" + "path" + "strings" + "time" + + "github.com/xushiwei/kodofs/internal/kodo" + "github.com/xushiwei/kodofs/internal/kodo/auth" +) + +// ----------------------------------------------------------------------------------------- + +type Credentials = auth.Credentials + +func NewCredentials(accessKey, secretKey string) *Credentials { + return auth.New(accessKey, secretKey) +} + +// ----------------------------------------------------------------------------------------- + +func Upload(ctx context.Context, mac *Credentials, bucket, name string, r io.Reader, fi fs.FileInfo) (err error) { + name = strings.TrimPrefix(name, "/") + putPolicy := kodo.PutPolicy{ + Scope: bucket + ":" + name, + } + upToken := putPolicy.UploadToken(mac) + + var ret kodo.PutRet + formUploader := kodo.NewFormUploaderEx(nil, nil) + return formUploader.Put(ctx, &ret, upToken, name, r, fi.Size(), nil) +} + +// ----------------------------------------------------------------------------------------- + +type WalkFunc = func(path string, info fs.FileInfo, err error) error + +func Walk(ctx context.Context, mac *Credentials, bucket, dir string, fn WalkFunc) (err error) { + m := kodo.NewBucketManager(mac, nil) + if !strings.HasSuffix(dir, "/") { + dir += "/" + } + marker := "" + prefix := kodo.ListInputOptionsPrefix(dir) + for { + ret, hasNext, e := m.ListFilesWithContext(ctx, bucket, prefix, kodo.ListInputOptionsMarker(marker)) + if e != nil { + return e + } + for _, item := range ret.Items { + key := item.Key + if !strings.HasPrefix(key, "/") { + key = "/" + key + } + fn(key, &dataFileInfo{key, item.Fsize}, nil) + } + if !hasNext { + break + } + marker = ret.Marker + } + return +} + +// ----------------------------------------------------------------------------------------- + +type dataFileInfo struct { + name string + size int64 +} + +func (p *dataFileInfo) Name() string { + return path.Base(p.name) +} + +func (p *dataFileInfo) Size() int64 { + return p.size +} + +func (p *dataFileInfo) Mode() fs.FileMode { + return 0 +} + +func (p *dataFileInfo) ModTime() time.Time { + return time.Time{} // zero time +} + +func (p *dataFileInfo) IsDir() bool { + return false +} + +func (p *dataFileInfo) Sys() interface{} { + return nil +} + +// -----------------------------------------------------------------------------------------