Skip to content

Commit

Permalink
Use 'journalctl` for getting logs
Browse files Browse the repository at this point in the history
This removes the CGO deps and makes for a completely static Go binary.
The downside is that is fork/execs a binary to get to the logs. It
supports the same options as the current impl. A thing that's different
is the time/date output which doesn't match up.

No real intentation to get this merged (current binary runs on RH8 which
I care about atm), but also don't want to loose the code as there were
some tricky bits about tailing the logs and not creating zombie
journalctls.

See #84

Signed-off-by: Miek Gieben <[email protected]>
  • Loading branch information
miekg committed Feb 4, 2021
1 parent 4bdeaa0 commit d1d8f69
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 84 deletions.
119 changes: 119 additions & 0 deletions internal/provider/journalctl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package provider

import (
"bufio"
"errors"
"fmt"
"io"
"os/exec"
"strings"
"time"

nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api"
)

const journalctl = "journalctl"

func journalReader(namespace, name, container string, logOpts nodeapi.ContainerLogOpts) (io.ReadCloser, func() error, error) {
fnlog := log.
WithField("podNamespace", namespace).
WithField("podName", name).
WithField("containerName", container)

fnlog.Infof("calling for container logs with options %+v", logOpts)
cancel := func() error { return nil } // initialize as noop

unitName := strings.Join([]string{unitPrefix(namespace, name), container, "service"}, separator)

// Handle all the options.
args := []string{"-u", unitName, "--no-hostname"} // only works with -o short-xxx options.
if logOpts.Tail > 0 {
args = append(args, "-n")
args = append(args, fmt.Sprintf("%d", logOpts.Tail))
}
if logOpts.Follow {
args = append(args, "-f")
}
if !logOpts.Timestamps {
args = append(args, "-o")
args = append(args, "cat")
} else {
args = append(args, "-o")
args = append(args, "short-full") // this is _not_ the default Go timestamp output
}
if logOpts.SinceSeconds > 0 {
args = append(args, "-S")
args = append(args, fmt.Sprintf("-%ds", logOpts.SinceSeconds))
}
if !logOpts.SinceTime.IsZero() {
args = append(args, "-S")
args = append(args, logOpts.SinceTime.Format(time.RFC3339))
}
// Previous might not be possible to implement
// TODO(pires,miek) show logs from the current Pod alone https://github.com/virtual-kubelet/systemk/issues/5#issuecomment-765278538
// LimitBytes - unsure (maybe a io.CopyBuffer?)

fnlog.Debugf("getting container logs via: %q %v", journalctl, args)
cmd := exec.Command(journalctl, args...)
p, err := cmd.StdoutPipe()
if err != nil {
return nil, cancel, err
}

if err := cmd.Start(); err != nil {
return nil, cancel, err
}

cancel = func() error {
go func() {
if err := cmd.Wait(); err != nil {
fnlog.Debugf("wait for %q failed: %s", journalctl, err)
}
}()
return cmd.Process.Kill()
}

return p, cancel, nil
}

var ErrExpired = errors.New("timeout expired")

// journalFollow synchronously follows the io.Reader, writing each new journal entry to writer. The
// follow will continue until a single time.Time is received on the until channel (or it's closed).
func journalFollow(until <-chan time.Time, reader io.Reader, writer io.Writer) error {
scanner := bufio.NewScanner(reader)
bufch := make(chan []byte)
errch := make(chan error)

go func() {
for scanner.Scan() {
if err := scanner.Err(); err != nil {
errch <- err
return
}
bufch <- scanner.Bytes()
}
// When the context is Done() the 'until' channel is closed, this kicks in the defers in the GetContainerLogsHandler method.
// this cleans up the journalctl, and closes all file descripters. Scan() then stops with an error (before any reads,
// hence the above if err .. .isn't triggered). In the end this go-routine exits.
// the error here is "read |0: file already closed".
}()

for {
select {
case <-until:
return ErrExpired

case err := <-errch:
return err

case buf := <-bufch:
if _, err := writer.Write(buf); err != nil {
return err
}
if _, err := io.WriteString(writer, "\n"); err != nil {
return err
}
}
}
}
50 changes: 22 additions & 28 deletions internal/provider/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
Expand All @@ -33,48 +32,43 @@ func (p *p) GetContainerLogsHandler(w http.ResponseWriter, r *http.Request) {

r.Header.Set("Transfer-Encoding", "chunked")

// Retrieve the actual systemd journal reader given:
// * it implements io.ReadCloser, and
// * exposes other functionality, like follow mode.
logsReader, err := p.getJournalReader(namespace, pod, container, opts)
logsReader, cancel, err := journalReader(namespace, pod, container, opts)
if err != nil {
return errors.Wrap(err, "failed to get systemd journal logs reader")
}
defer logsReader.Close()
defer cancel()

// ResponseWriter must be flushed after each write.
if _, ok := w.(writeFlusher); !ok {
log.Warn("HTTP response writer does not support flushes")
}
fw := flushOnWrite(w)

if !opts.Follow {
io.Copy(fw, logsReader)
return nil
}

// If in follow mode, follow until interrupted.
if opts.Follow {
untilTime := make(chan time.Time, 1)
errChan := make(chan error, 1)

go func(w io.Writer, errChan chan error) {
err := logsReader.Follow(untilTime, w)
if err != nil && err != sdjournal.ErrExpired {
err = errors.Wrap(err, "failed to follow systemd journal logs")
}
errChan <- err
}(fw, errChan)

// Stop following logs if request context is completed.
select {
case err := <-errChan:
return err
case <-r.Context().Done():
close(untilTime)
untilTime := make(chan time.Time, 1)
errChan := make(chan error, 1)

go func(w io.Writer, errChan chan error) {
err := journalFollow(untilTime, logsReader, w)
if err != nil && err != ErrExpired {
err = errors.Wrap(err, "failed to follow systemd journal logs")
}
return nil
errChan <- err
}(fw, errChan)

// Otherwise, just pipe the journal reader.
} else {
io.Copy(fw, logsReader)
// Stop following logs if request context is completed.
select {
case err := <-errChan:
return err
case <-r.Context().Done():
close(untilTime)
}

return nil
})(w, r)
}
Expand Down
56 changes: 0 additions & 56 deletions internal/provider/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/virtual-kubelet/systemk/internal/ospkg"
"github.com/virtual-kubelet/systemk/internal/unit"
Expand Down Expand Up @@ -290,60 +288,6 @@ func (p *p) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.P
return &pod.Status, nil
}

// getJournalReader returns the actual journal reader.
// This is useful when an io.ReadCloser is not enough, eg we need Follow().
//
// TODO(pires) show logs from the current Pod alone https://github.com/virtual-kubelet/systemk/issues/5#issuecomment-765278538
func (p *p) getJournalReader(namespace, name, container string, logOpts nodeapi.ContainerLogOpts) (*sdjournal.JournalReader, error) {
fnlog := log.
WithField("podNamespace", namespace).
WithField("podName", name).
WithField("containerName", container)

fnlog.Infof("calling for container logs with options %+v", logOpts)

unitName := strings.Join([]string{unitPrefix(namespace, name), container, "service"}, separator)
journalConfig := sdjournal.JournalReaderConfig{
Matches: []sdjournal.Match{
{
// Filter by unit.
Field: sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT,
Value: unitName,
},
},
}
if logOpts.SinceSeconds > 0 {
// Since duration must be negative so we get logs from the past.
journalConfig.Since = -time.Second * time.Duration(logOpts.SinceSeconds)
}
// By default, SinceTime is "0001-01-01 00:00:00 +0000 UTC".
if !logOpts.SinceTime.IsZero() {
journalConfig.Since = time.Since(logOpts.SinceTime)
}
if logOpts.Tail > 0 {
journalConfig.NumFromTail = uint64(logOpts.Tail)
}
// By default, timestamps are present in journal entries.
// Kubernetes defaults to not having timestamps, so we adapt.
if !logOpts.Timestamps {
journalConfig.Formatter = func(entry *sdjournal.JournalEntry) (string, error) {
msg, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]
if !ok {
return "", fmt.Errorf("no %q field present in journal entry", sdjournal.SD_JOURNAL_FIELD_MESSAGE)
}

return fmt.Sprintf("%s\n", msg), nil
}
}

journalReader, err := sdjournal.NewJournalReader(journalConfig)
if err != nil {
fnlog.Error("failed to retrieve logs from journald, for unit %q", unitName, err)
}

return journalReader, err
}

// UpdatePod is a noop,
func (p *p) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
log.
Expand Down

0 comments on commit d1d8f69

Please sign in to comment.