Skip to content

Commit

Permalink
fix:change file will not notify to client (#1255)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Sep 17, 2023
1 parent 3378169 commit d4e54ff
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
12 changes: 12 additions & 0 deletions common/utils/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ func (s *SyncMap[K, V]) Range(f func(key K, val V) bool) {
}
}

// Values
func (s *SyncMap[K, V]) Values() []V {
s.lock.RLock()
defer s.lock.RUnlock()

ret := make([]V, 0, len(s.m))
for _, v := range s.m {
ret = append(ret, v)
}
return ret
}

// Delete
func (s *SyncMap[K, V]) Delete(key K) {
s.lock.Lock()
Expand Down
24 changes: 15 additions & 9 deletions config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ const (
type FileReleaseCallback func(clientId string, rsp *apiconfig.ConfigClientResponse) bool

type watchContext struct {
fileReleaseCb FileReleaseCallback
ClientVersion uint64
watchConfigFiles []*apiconfig.ClientConfigFileInfo
clientId string
fileReleaseCb FileReleaseCallback
ClientVersion uint64
}

// watchCenter 处理客户端订阅配置请求,监听配置文件发布事件通知客户端
type watchCenter struct {
subCtx *eventhub.SubscribtionContext
lock sync.Mutex
connManager *connManager
subCtx *eventhub.SubscribtionContext
lock sync.Mutex
// fileId -> clientId -> watchContext
configFileWatchers *utils.SyncMap[string, *utils.SyncMap[string, *watchContext]]
}
Expand Down Expand Up @@ -95,8 +98,10 @@ func (wc *watchCenter) AddWatcher(clientId string, watchConfigFiles []*apiconfig
return newWatchers
})
watchers.Store(clientId, &watchContext{
fileReleaseCb: fileReleaseCb,
ClientVersion: file.Version.GetValue(),
clientId: clientId,
fileReleaseCb: fileReleaseCb,
ClientVersion: file.Version.GetValue(),
watchConfigFiles: watchConfigFiles,
})
}
}
Expand Down Expand Up @@ -129,7 +134,9 @@ func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFil
response := GenConfigFileResponse(publishConfigFile.Namespace, publishConfigFile.Group,
publishConfigFile.FileName, "", publishConfigFile.Md5, publishConfigFile.Version)

watchers.Range(func(clientId string, watchCtx *watchContext) bool {
waitNotifiers := watchers.Values()
for _, watchCtx := range waitNotifiers {
clientId := watchCtx.clientId
if watchCtx.ClientVersion < publishConfigFile.Version {
watchCtx.fileReleaseCb(clientId, response)
log.Info("[Config][Watcher] notify to client.",
Expand All @@ -141,6 +148,5 @@ func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFil
zap.Uint64("client-version", watchCtx.ClientVersion),
zap.Uint64("version", publishConfigFile.Version))
}
return true
})
}
}

0 comments on commit d4e54ff

Please sign in to comment.