Skip to content

Commit

Permalink
[fix]: new version filebeat collection log failed
Browse files Browse the repository at this point in the history
[fix]: new version filebeat collection log failed

[fix]: new version filebeat collection log failed
  • Loading branch information
Cairry committed May 21, 2024
1 parent 5f5b0f9 commit 14a4701
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 14 deletions.
2 changes: 1 addition & 1 deletion assets/filebeat/config.filebeat
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ output.elasticsearch:
${ELASTICSEARCH_PATH:+path: ${ELASTICSEARCH_PATH}}
${ELASTICSEARCH_BULK_MAX_SIZE:+bulk_max_size: ${ELASTICSEARCH_BULK_MAX_SIZE}}
indices:
- index: "${LOG_PREFIX}-%{+yyy-MM.dd}"
- index: "${LOG_PREFIX:-watchlog}-%{+yyy-MM.dd}"
EOF
}

Expand Down
4 changes: 1 addition & 3 deletions log/config/logConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ func parseLogConfig(name string, info *nodeInfo.LogInfoNode, jsonLogPath string)
rt := os.Getenv("RUNTIME_TYPE")
var lt string
switch rt {
case "docker":
lt = "log"
case "containerd":
case "docker", "containerd":
lt = "container"
}

Expand Down
13 changes: 6 additions & 7 deletions pkg/client/filebeat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (p *FilebeatPointer) canRemoveConf(container string, registry map[string]Re
log.Warnf("%s->%s registry not exist", container, logFile)
continue
}
if registry[logFile].Offset < info.Size() {
if registry[logFile].V.Offset < info.Size() {
if autoMount { // ephemeral logs
log.Infof("%s->%s does not finish to read", container, logFile)
return false
Expand Down Expand Up @@ -276,18 +276,17 @@ func (p *FilebeatPointer) getRegistryState() (map[string]RegistryState, error) {
defer f.Close()

decoder := json.NewDecoder(f)
states := make([]RegistryState, 0)
err = decoder.Decode(&states)
var state RegistryState
err = decoder.Decode(&state)
if err != nil {
return nil, err
}

statesMap := make(map[string]RegistryState, 0)
for _, state := range states {
if _, ok := statesMap[state.Source]; !ok {
statesMap[state.Source] = state
}
if _, ok := statesMap[state.V.Source]; !ok {
statesMap[state.V.Source] = state
}

return statesMap, nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/client/filebeat/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const (
FilebeatBaseConf = "/usr/share/filebeat"
FilebeatExecCmd = FilebeatBaseConf + "/filebeat"
FilebeatRegistry = FilebeatBaseConf + "/data/registry"
FilebeatRegistry = FilebeatBaseConf + "/data/registry/filebeat/log.json"
FilebeatConfDir = FilebeatBaseConf + "/inputs.d"
FilebeatConfFile = FilebeatBaseConf + "/filebeat.yml"

Expand Down Expand Up @@ -44,9 +44,14 @@ type FileInode struct {

// RegistryState represents log offsets
type RegistryState struct {
K string `json:"k"`
V RegistryV `json:"v"`
}

type RegistryV struct {
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
Timestamp []time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
FileStateOS FileInode
Expand Down
2 changes: 1 addition & 1 deletion pkg/ctx/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewContext(baseDir string, p filebeat.InterFilebeatPointer) *Context {

logPrefix := "watchlog"
lp := os.Getenv("LOG_PREFIX")
if lp != "" {
if len(lp) > 1 {
logPrefix = lp
}

Expand Down

0 comments on commit 14a4701

Please sign in to comment.