Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve caching when fetching capabilities. #780

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 160 additions & 59 deletions capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ package signaling
import (
"context"
"encoding/json"
"fmt"
"errors"
"io"
"log"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/marcw/cachecontrol"
)

const (
Expand All @@ -41,16 +43,140 @@ const (
// Name of capability to enable the "v3" API for the signaling endpoint.
FeatureSignalingV3Api = "signaling-v3"

// Cache received capabilities for one hour.
CapabilitiesCacheDuration = time.Hour
// Cache capabilities for one minute if response does not contain a
// "Cache-Control" header.
defaultCapabilitiesCacheDuration = time.Minute

// Don't invalidate more than once per minute.
maxInvalidateInterval = time.Minute
)

var (
ErrUnexpectedHttpStatus = errors.New("unexpected_http_status")
)

type capabilitiesEntry struct {
nextUpdate time.Time
capabilities map[string]interface{}
mu sync.RWMutex
nextUpdate time.Time
etag string
mustRevalidate bool
capabilities map[string]interface{}
}

func newCapabilitiesEntry() *capabilitiesEntry {
return &capabilitiesEntry{}
}

func (e *capabilitiesEntry) valid(now time.Time) bool {
e.mu.RLock()
defer e.mu.RUnlock()

return e.nextUpdate.After(now)
}

func (e *capabilitiesEntry) updateRequest(r *http.Request) {
e.mu.RLock()
defer e.mu.RUnlock()

if e.etag != "" {
r.Header.Set("If-None-Match", e.etag)
}
}

func (e *capabilitiesEntry) invalidate() {
e.mu.Lock()
defer e.mu.Unlock()

e.nextUpdate = time.Now()
}

func (e *capabilitiesEntry) errorIfMustRevalidate(err error) error {
if !e.mustRevalidate {
return nil
}

e.capabilities = nil
return err
}

func (e *capabilitiesEntry) update(response *http.Response, now time.Time) error {
e.mu.Lock()
defer e.mu.Unlock()

url := response.Request.URL
e.etag = response.Header.Get("ETag")

var maxAge time.Duration
if cacheControl := response.Header.Get("Cache-Control"); cacheControl != "" {
cc := cachecontrol.Parse(cacheControl)
if nc, _ := cc.NoCache(); !nc {
maxAge = cc.MaxAge()
}
e.mustRevalidate = cc.MustRevalidate()
} else {
maxAge = defaultCapabilitiesCacheDuration
}
e.nextUpdate = now.Add(maxAge)

if response.StatusCode == http.StatusNotModified {
log.Printf("Capabilities %+v from %s have not changed", e.capabilities, url)
return nil
} else if response.StatusCode != http.StatusOK {
log.Printf("Received unexpected HTTP status from %s: %s", url, response.Status)
return e.errorIfMustRevalidate(ErrUnexpectedHttpStatus)
}

ct := response.Header.Get("Content-Type")
if !strings.HasPrefix(ct, "application/json") {
log.Printf("Received unsupported content-type from %s: %s (%s)", url, ct, response.Status)
return e.errorIfMustRevalidate(ErrUnsupportedContentType)
}

body, err := io.ReadAll(response.Body)
if err != nil {
log.Printf("Could not read response body from %s: %s", url, err)
return e.errorIfMustRevalidate(err)
}

var ocs OcsResponse
if err := json.Unmarshal(body, &ocs); err != nil {
log.Printf("Could not decode OCS response %s from %s: %s", string(body), url, err)
return e.errorIfMustRevalidate(err)
} else if ocs.Ocs == nil || len(ocs.Ocs.Data) == 0 {
log.Printf("Incomplete OCS response %s from %s", string(body), url)
return e.errorIfMustRevalidate(ErrIncompleteResponse)
}

var capaResponse CapabilitiesResponse
if err := json.Unmarshal(ocs.Ocs.Data, &capaResponse); err != nil {
log.Printf("Could not decode OCS response body %s from %s: %s", string(ocs.Ocs.Data), url, err)
return e.errorIfMustRevalidate(err)
}

capaObj, found := capaResponse.Capabilities[AppNameSpreed]
if !found || len(capaObj) == 0 {
log.Printf("No capabilities received for app spreed from %s: %+v", url, capaResponse)
e.capabilities = nil
return nil
}

var capa map[string]interface{}
if err := json.Unmarshal(capaObj, &capa); err != nil {
log.Printf("Unsupported capabilities received for app spreed from %s: %+v", url, capaResponse)
e.capabilities = nil
return nil
}

log.Printf("Received capabilities %+v from %s", capa, url)
e.capabilities = capa
return nil
}

func (e *capabilitiesEntry) GetCapabilities() map[string]interface{} {
e.mu.RLock()
defer e.mu.RUnlock()

return e.capabilities
}

type Capabilities struct {
Expand Down Expand Up @@ -92,42 +218,46 @@ type CapabilitiesResponse struct {
Capabilities map[string]json.RawMessage `json:"capabilities"`
}

func (c *Capabilities) getCapabilities(key string) (map[string]interface{}, bool) {
func (c *Capabilities) getCapabilities(key string) (*capabilitiesEntry, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

now := c.getNow()
if entry, found := c.entries[key]; found && entry.nextUpdate.After(now) {
return entry.capabilities, true
entry, found := c.entries[key]
if found && entry.valid(now) {
return entry, true
}

return nil, false
return entry, false
}

func (c *Capabilities) setCapabilities(key string, capabilities map[string]interface{}) {
func (c *Capabilities) invalidateCapabilities(key string) {
c.mu.Lock()
defer c.mu.Unlock()

now := c.getNow()
entry := &capabilitiesEntry{
nextUpdate: now.Add(CapabilitiesCacheDuration),
capabilities: capabilities,
if entry, found := c.nextInvalidate[key]; found && entry.After(now) {
return
}

c.entries[key] = entry
if entry, found := c.entries[key]; found {
entry.invalidate()
}

c.nextInvalidate[key] = now.Add(maxInvalidateInterval)
}

func (c *Capabilities) invalidateCapabilities(key string) {
func (c *Capabilities) newCapabilitiesEntry(key string) *capabilitiesEntry {
c.mu.Lock()
defer c.mu.Unlock()

now := c.getNow()
if entry, found := c.nextInvalidate[key]; found && entry.After(now) {
return
entry, found := c.entries[key]
if !found {
entry = newCapabilitiesEntry()
c.entries[key] = entry
}

delete(c.entries, key)
c.nextInvalidate[key] = now.Add(maxInvalidateInterval)
return entry
}

func (c *Capabilities) getKeyForUrl(u *url.URL) string {
Expand All @@ -137,8 +267,9 @@ func (c *Capabilities) getKeyForUrl(u *url.URL) string {

func (c *Capabilities) loadCapabilities(ctx context.Context, u *url.URL) (map[string]interface{}, bool, error) {
key := c.getKeyForUrl(u)
if caps, found := c.getCapabilities(key); found {
return caps, true, nil
entry, valid := c.getCapabilities(key)
if valid {
return entry.GetCapabilities(), true, nil
}

capUrl := *u
Expand Down Expand Up @@ -168,55 +299,25 @@ func (c *Capabilities) loadCapabilities(ctx context.Context, u *url.URL) (map[st
req.Header.Set("Accept", "application/json")
req.Header.Set("OCS-APIRequest", "true")
req.Header.Set("User-Agent", "nextcloud-spreed-signaling/"+c.version)
if entry != nil {
entry.updateRequest(req)
}

resp, err := client.Do(req)
if err != nil {
return nil, false, err
}
defer resp.Body.Close()

ct := resp.Header.Get("Content-Type")
if !strings.HasPrefix(ct, "application/json") {
log.Printf("Received unsupported content-type from %s: %s (%s)", capUrl.String(), ct, resp.Status)
return nil, false, ErrUnsupportedContentType
}

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("Could not read response body from %s: %s", capUrl.String(), err)
return nil, false, err
}

var ocs OcsResponse
if err := json.Unmarshal(body, &ocs); err != nil {
log.Printf("Could not decode OCS response %s from %s: %s", string(body), capUrl.String(), err)
return nil, false, err
} else if ocs.Ocs == nil || len(ocs.Ocs.Data) == 0 {
log.Printf("Incomplete OCS response %s from %s", string(body), u)
return nil, false, fmt.Errorf("incomplete OCS response")
if entry == nil {
entry = c.newCapabilitiesEntry(key)
}

var response CapabilitiesResponse
if err := json.Unmarshal(ocs.Ocs.Data, &response); err != nil {
log.Printf("Could not decode OCS response body %s from %s: %s", string(ocs.Ocs.Data), capUrl.String(), err)
if err := entry.update(resp, c.getNow()); err != nil {
return nil, false, err
}

capaObj, found := response.Capabilities[AppNameSpreed]
if !found || len(capaObj) == 0 {
log.Printf("No capabilities received for app spreed from %s: %+v", capUrl.String(), response)
return nil, false, nil
}

var capa map[string]interface{}
if err := json.Unmarshal(capaObj, &capa); err != nil {
log.Printf("Unsupported capabilities received for app spreed from %s: %+v", capUrl.String(), response)
return nil, false, nil
}

log.Printf("Received capabilities %+v from %s", capa, capUrl.String())
c.setCapabilities(key, capa)
return capa, false, nil
return entry.GetCapabilities(), false, nil
}

func (c *Capabilities) HasCapabilityFeature(ctx context.Context, u *url.URL, feature string) bool {
Expand Down
Loading
Loading