Skip to content

Commit

Permalink
fix: handle child process stream correctly to avoid broken pipe (#5835)
Browse files Browse the repository at this point in the history
  • Loading branch information
rangoo94 authored Sep 6, 2024
1 parent 612ef01 commit ea56c53
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
8 changes: 5 additions & 3 deletions cmd/testworkflow-init/obfuscator/obfuscator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (s *obfuscator) Write(p []byte) (n int, err error) {
s.writeMu.Lock()
defer s.writeMu.Unlock()

size := len(p)

// Read data from the previous cycle
if s.currentBuffer != nil {
p = append(s.currentBuffer, p...)
Expand Down Expand Up @@ -114,7 +116,7 @@ func (s *obfuscator) Write(p []byte) (n int, err error) {
s.currentEnd = currentEnd

// End a call
return n + len(p), nil
return size, nil
}

// Flush the acknowledged sensitive data
Expand All @@ -134,9 +136,9 @@ func (s *obfuscator) Write(p []byte) (n int, err error) {
// Write the rest of data
if len(p) > 0 {
nn, err = s.dst.Write(p)
return n + nn, err
return size, err
}
return n, nil
return size, nil
}

func (s *obfuscator) Flush() error {
Expand Down
16 changes: 12 additions & 4 deletions cmd/testworkflow-init/obfuscator/obfuscator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ func TestObfuscator_Partial(t *testing.T) {
"scope",
})

_, _ = passthrough.Write([]byte("there is some sensitiv"))
_, _ = passthrough.Write([]byte("e content in scope of testkube"))
content1 := []byte("there is some sensitiv")
content2 := []byte("e content in scope of testkube")
n1, _ := passthrough.Write(content1)
n2, _ := passthrough.Write(content2)

result, err := io.ReadAll(buf)

assert.NoError(t, err)
assert.Equal(t, len(content1), n1)
assert.Equal(t, len(content2), n2)
assert.Equal(t, "there is some ***** content in ***** of testkube", string(result))
}

Expand Down Expand Up @@ -95,12 +99,14 @@ func TestObfuscator_FlushDoubleHit(t *testing.T) {
"tiv",
})

_, _ = passthrough.Write([]byte("sensitiv"))
content := []byte("sensitiv")
n, _ := passthrough.Write(content)
passthrough.Flush()

result, err := io.ReadAll(buf)

assert.NoError(t, err)
assert.Equal(t, n, len(content))
assert.Equal(t, "*****i*****", string(result))
}

Expand All @@ -111,11 +117,13 @@ func TestObfuscator_Order(t *testing.T) {
"sens",
})

_, _ = passthrough.Write([]byte("there is some sensitive content in scope of testkube"))
content := []byte("there is some sensitive content in scope of testkube")
n, _ := passthrough.Write(content)

result, err := io.ReadAll(buf)

assert.NoError(t, err)
assert.Equal(t, n, len(content))
assert.Equal(t, "there is some ***** content in scope of testkube", string(result))
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-init/orchestration/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (e *execution) Run() (*executionResult, error) {
// Handle edge case, when i.e. EPIPE happened
if !aborted {
aborted = true
e.cmd.Stderr.Write([]byte(fmt.Sprintf("\nThe process has been corrupted: %s\nIt may be caused by lack of resources on node (i.e. memory or disk space), or external issues.\n", exitDetails)))
e.cmd.Stderr.Write([]byte(fmt.Sprintf("\nThe process has been corrupted: %s.\n", exitDetails)))
}
}
} else {
Expand Down
8 changes: 7 additions & 1 deletion cmd/testworkflow-init/output/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ type printer struct {

// Write sends bytes, sanitizing it
func (s *printer) Write(p []byte) (n int, err error) {
return s.through.Write(p)
n, err = s.through.Write(p)
if err != nil {
return n, err
}
// On success, the stream needs to return same number of bytes if used as command pipe,
// otherwise, the child process will receive SIGPIPE.
return len(p), err
}

// Printf sends a formatted string via stream, sanitizing it
Expand Down

0 comments on commit ea56c53

Please sign in to comment.