Skip to content
This repository was archived by the owner on Feb 16, 2023. It is now read-only.

Commit ef2656e

Browse files
authored
Merge pull request #267 from secrethub/feature/mask-rewrite
Masking rewrite
2 parents e972b86 + 3485cec commit ef2656e

10 files changed

Lines changed: 1035 additions & 567 deletions

File tree

internals/cli/masker/masker.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package masker
2+
3+
import (
4+
"io"
5+
"time"
6+
)
7+
8+
// Masker handles the creation and synchronization of streams that have all their writes scanned for secrets and
9+
// have them redacted if any matches are found. Masking of secrets is a best effort attempt. Output on all streams is
10+
// buffered to increase the chance of finding secrets if they are spread across multiple writes, but it cannot be
11+
// guaranteed that these secrets are masked. The duration bytes spend in the buffer is constant.
12+
//
13+
// Usage:
14+
// 1. Create a new Masker using New()
15+
// 2. Add one more streams using AddStream()
16+
// 3. Run the Start() method in a separate goroutine
17+
// 4. After everything has been written to the io.Writers, flush all buffers using Stop()
18+
type Masker struct {
19+
bufferDelay time.Duration
20+
sequences [][]byte
21+
frames chan frame
22+
stopChan chan struct{}
23+
err error
24+
}
25+
26+
// Options for configuring masking behavior.
27+
type Options struct {
28+
// DisableBuffer completely disables the buffering of the masker. This increases output responsiveness
29+
// but also increases the chance of a secret not being masked.
30+
DisableBuffer bool
31+
32+
// BufferDelay is the constant duration for which input to a stream is buffered. A higher value increases
33+
// the chance of secrets being detected for masking. Especially when writes have a variable delay between them,
34+
// for example in the case data arrives over an unstable network connection.
35+
// Defaults to 50ms if not set.
36+
BufferDelay time.Duration
37+
38+
// FrameBufferLength is the number of frames that can be in the buffer simultaneously.
39+
// If the frame buffer is full, writing to a stream blocks until there is space.
40+
FrameBufferLength int
41+
}
42+
43+
// New creates a new Masker that scans all streams for the given sequences and masks them.
44+
func New(sequences [][]byte, opts *Options) *Masker {
45+
masker := &Masker{
46+
bufferDelay: time.Millisecond * 50,
47+
sequences: sequences,
48+
stopChan: make(chan struct{}),
49+
}
50+
frameChanlength := 1024
51+
if opts != nil {
52+
if opts.DisableBuffer {
53+
masker.bufferDelay = 0
54+
frameChanlength = 0
55+
} else {
56+
if opts.BufferDelay > 0 {
57+
masker.bufferDelay = opts.BufferDelay
58+
}
59+
if opts.FrameBufferLength > 0 {
60+
frameChanlength = opts.FrameBufferLength
61+
}
62+
}
63+
64+
}
65+
masker.frames = make(chan frame, frameChanlength)
66+
67+
return masker
68+
}
69+
70+
// AddStream takes in an io.Writer to mask secrets on and returns an io.Writer that has secrets on its output masked.
71+
func (m *Masker) AddStream(w io.Writer) io.Writer {
72+
s := stream{
73+
dest: w,
74+
registerFrame: m.registerFrame,
75+
matches: matches{},
76+
matcher: newMatcher(m.sequences),
77+
}
78+
return &s
79+
}
80+
81+
// Start continuously flushes the input buffer for each frame for which the buffer delay has passed.
82+
// This method blocks until Stop() is called.
83+
func (m *Masker) Start() {
84+
for {
85+
select {
86+
case <-m.stopChan:
87+
for t := range m.frames {
88+
err := t.stream.flush(t.length)
89+
if err != nil {
90+
m.handleErr(err)
91+
}
92+
}
93+
m.stopChan <- struct{}{}
94+
return
95+
case trigger := <-m.frames:
96+
<-trigger.timer.C
97+
98+
err := trigger.stream.flush(trigger.length)
99+
if err != nil {
100+
m.handleErr(err)
101+
}
102+
}
103+
}
104+
}
105+
106+
// Stop all pending frames and wait for this to complete.
107+
// This should be run after all input has been written to the io.Writers of the streams.
108+
// Calling Write() on a stream after calling Stop() will lead to a panic.
109+
func (m *Masker) Stop() error {
110+
m.stopChan <- struct{}{}
111+
close(m.frames)
112+
<-m.stopChan
113+
114+
return m.err
115+
}
116+
117+
// registerFrame adds a new frame to the frames channel with a timeout of bufferDelay plus the given offset.
118+
// After this timer has passed, the frame will be flushed to the output.
119+
func (m *Masker) registerFrame(s *stream, offset time.Duration, l int) {
120+
m.frames <- frame{
121+
length: l,
122+
stream: s,
123+
timer: time.NewTimer(offset + m.bufferDelay),
124+
}
125+
}
126+
127+
func (m *Masker) handleErr(err error) {
128+
if err != nil && m.err == nil {
129+
m.err = err
130+
}
131+
}
132+
133+
// frame represent a set of bytes in the buffer of a stream that were written in a single call of Write().
134+
// The bytes are written to the destination after the timer has expired.
135+
type frame struct {
136+
length int
137+
stream *stream
138+
timer *time.Timer
139+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
<mxfile host="Electron" modified="2020-04-01T14:17:44.267Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.6.5 Chrome/80.0.3987.141 Electron/8.1.1 Safari/537.36" etag="-4ZPzU0b57b_415PivDi" version="12.6.5" type="device"><diagram id="oNAHip3EKjUoHj94W4kH" name="Page-1">7Vttc6M2EP41nrYf7EESAvtjnNebadrOpZ27fFSMjLlg5Ao5tu/XV4BkXkRyxDaY3NRfghZJwO6zj3ZXygBdLre3nKwW98yj4QBa3naArgYQQsuB8k8i2WUSAPA4k/g88JQsFzwE36kSWkq6DjwalzoKxkIRrMrCGYsiOhMlGeGcbcrd5iwsP3VFfGoIHmYkNKVfAk8sMukYurn8jgb+Qj8ZOJPszpLozupL4gXx2KYgQtcDdMkZE9nVcntJw0R7Wi/ZuJtX7u5fjNNINBnwffP4zbqLbvGfjN3dfA44vVgPkbLGCwnX6ovvSfxMuXpnsdOK4GwdeTSZyxqg6WYRCPqwIrPk7kbaXsoWYhnKFpCXL5SLQCrxIgz8SMoESzqYb6w+IulOtwWR+oJbypZU8J3sou7aSGlT4Wls4ay9yY0DHNVnUTTMWAmJAoS/nzvXmbxQanuHCgE2VPiFS+38+puhQ+pJbKkm42LBfBaR8DqXTstazvv8zhINprr9RoXYKUcha8HKmqfbQHwtXD8mU42wal1t1cxpY6cbkdTD12KjMCpp5sPSlh73qj1jtuYz+pbOFC0Iwn0q3upoZx0Tzb2JD80vysqchkQEL2UnrjN9OtMF52RX6LBiQSTiwoP+SgQ5CgEuoxCDiuu9r7+8yN6gMlq/DpvPY6mlKlD3X38EdqGB3RVnMxrHv8QpdD22FsdRwSmcvqw927VMn7dqfL49l3f76PLWCOKC14OGPq9G7d2+Pae3mzq909DpFT7kJ5QAMkT4EBbo0vGg6Xg9QFDPF42m+NEzNscPQHrMUetIpwjC5+WaItVYHxUpGDVEyqE4UAhD+kEKXqi6NGVvqka1ABbHoBtO/SAWlN9wsqyjHbk2i7LtiQrmZ9IeMkEwo/xl4HkZzGgcfCdP6VSJJVU8JefF0wG+kpKQPNFwSmbPfgrJSxYynj4XzdNfrfnf9AMjnNjnhOpFBsW0q44GhtbIcly7vJTouY4EAHRGuLJGVeZoMcizDePHglOyPC6way3Hg1bv4r2JocF7ImYLGp+Xg91DSFiN0jwM2mNhp2m81zTJyx21EvDB3gd8wGTgZQqhIystLfibY53b33TJzFCW6W+dK6uaykNTWbhGV6g1XQFDV4HUyJZ60/V83gN4VWOfWjqHncLLLHp6NBZBJLmDRedT2D73wM6ZHRCZDvjP6m/2KQFWn1JUeFiOCttLPRBquOgh932LngxOoV0OTsH4uFVPL6egTGmoPEF7a6LdzzLaIUUQMOgotW2ML01y78AX1mnirgKM/oZVWh01uY0TJvnrk1wAHV+kOq9KOPXILH3PKFHESUKM1rIiMCl7KbBsYK4SdaEHAK0tE68FtT2I06rqAtBUV6eBmj3uD6u5vaM1t+myefKK3XFGNSsDT2nYLW0GraedqHGFj1JZ04A9SWUNlctfp6mqDcelSSddBS7IDFz6lXPZtrFY2OZiUZd07fOM07vKeTcsDtraGnW0NaqX8g9Hf2YAMA/X8SKJ4K15srPwcdnvZPsKSeZmuSV/HMKT8B+ulEH1UaQOUjez6uQz2X6QCBaJ9X8cfW9IkHzKXBpojxVLBMv+nj2rkOrYrjl71mkhS5P6R9oDHnW0+4AmDSlVQ7knlIrMiDJ1jvOnVNWzlxNcU8gFYOQUf67pDgi3pDpsVi17fZi1WhmHTo1Cuz3Mil/h9c/r6CdldeRUjmbU7X51yup6Vf+f1V8/sPPjkz39YnVs1il7wurG9lzdifqOWP1fMb3849lxV5g88oftEk/j1dBkpJskxQgiXzNM1yWWxHnizI+SqecsEsqtIGqn2Ou6NUYZt1S9rLUCNKzwKfLTTdOf2QzOZOS4Vv4r51uwZu9/b7kjrSKb+b8GZVlb/h9W6Po/</diagram></mxfile>
55.9 KB
Loading
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package masker
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
"github.com/secrethub/secrethub-go/internals/assert"
12+
"github.com/secrethub/secrethub-go/pkg/randchar"
13+
)
14+
15+
var maskString = "<redacted by SecretHub>"
16+
17+
func TestMasker(t *testing.T) {
18+
delay10s := time.Second * 10
19+
delay1us := time.Microsecond * 1
20+
21+
randomIn, err := randchar.NewGenerator(true).Generate(10000)
22+
assert.OK(t, err)
23+
24+
tests := map[string]struct {
25+
maskStrings []string
26+
inputFunc func(io.Writer)
27+
options *Options
28+
expected string
29+
}{
30+
"no_masking": {
31+
maskStrings: []string{"foo", "bar"},
32+
inputFunc: func(w io.Writer) {
33+
_, err := w.Write([]byte("test"))
34+
assert.OK(t, err)
35+
},
36+
expected: "test",
37+
},
38+
"single mask": {
39+
maskStrings: []string{"foo", "bar"},
40+
inputFunc: func(w io.Writer) {
41+
_, err := w.Write([]byte("test foo test"))
42+
assert.OK(t, err)
43+
},
44+
expected: "test " + maskString + " test",
45+
},
46+
"multiple masks": {
47+
maskStrings: []string{"foo", "bar"},
48+
inputFunc: func(w io.Writer) {
49+
_, err := w.Write([]byte("test foo bar"))
50+
assert.OK(t, err)
51+
},
52+
expected: "test " + maskString + " " + maskString,
53+
},
54+
"incomplete mask": {
55+
maskStrings: []string{"foobar"},
56+
inputFunc: func(w io.Writer) {
57+
_, err := w.Write([]byte("test foo"))
58+
assert.OK(t, err)
59+
},
60+
expected: "test foo",
61+
},
62+
"mask within a mask": {
63+
maskStrings: []string{"foo", "bar", "testfoobartestfoo"},
64+
inputFunc: func(w io.Writer) {
65+
_, err := w.Write([]byte("testfoobartestfoo bar foo"))
66+
assert.OK(t, err)
67+
},
68+
expected: maskString + " " + maskString + " " + maskString,
69+
},
70+
"across multiple writes": {
71+
maskStrings: []string{"foo", "bar"},
72+
inputFunc: func(w io.Writer) {
73+
_, err := w.Write([]byte("fo"))
74+
assert.OK(t, err)
75+
_, err = w.Write([]byte("o bar f"))
76+
assert.OK(t, err)
77+
_, err = w.Write([]byte("o"))
78+
assert.OK(t, err)
79+
},
80+
expected: maskString + " " + maskString + " fo",
81+
},
82+
"within buffer delay": {
83+
maskStrings: []string{"foo", "bar"},
84+
inputFunc: func(w io.Writer) {
85+
_, err := w.Write([]byte("fo"))
86+
assert.OK(t, err)
87+
time.Sleep(time.Nanosecond * 1)
88+
_, err = w.Write([]byte("o test"))
89+
assert.OK(t, err)
90+
},
91+
options: &Options{BufferDelay: delay10s},
92+
expected: maskString + " test",
93+
},
94+
"outside buffer delay": {
95+
maskStrings: []string{"foo", "bar"},
96+
inputFunc: func(w io.Writer) {
97+
_, err := w.Write([]byte("fo"))
98+
assert.OK(t, err)
99+
time.Sleep(time.Millisecond * 10)
100+
_, err = w.Write([]byte("o bar test"))
101+
assert.OK(t, err)
102+
},
103+
options: &Options{BufferDelay: delay1us},
104+
expected: "foo " + maskString + " test",
105+
},
106+
"no buffering": {
107+
maskStrings: []string{"foo", "bar"},
108+
inputFunc: func(w io.Writer) {
109+
_, err := w.Write([]byte("test foo test"))
110+
assert.OK(t, err)
111+
},
112+
options: &Options{DisableBuffer: true},
113+
expected: "test " + maskString + " test",
114+
},
115+
"long input": {
116+
maskStrings: []string{},
117+
inputFunc: func(w io.Writer) {
118+
for _, c := range randomIn {
119+
_, err := w.Write([]byte{c})
120+
assert.OK(t, err)
121+
}
122+
},
123+
expected: string(randomIn),
124+
},
125+
"reuse input buffer": {
126+
maskStrings: []string{},
127+
inputFunc: func(w io.Writer) {
128+
tmp := make([]byte, 1)
129+
for _, c := range randomIn {
130+
copy(tmp, []byte{c})
131+
_, err := w.Write(tmp)
132+
assert.OK(t, err)
133+
}
134+
},
135+
expected: string(randomIn),
136+
},
137+
"masking unicode": {
138+
maskStrings: []string{
139+
"ⓗⓔⓛⓛⓞ",
140+
},
141+
inputFunc: func(w io.Writer) {
142+
_, err := w.Write([]byte("ⓗⓔⓛⓛⓞ world"))
143+
assert.OK(t, err)
144+
},
145+
expected: maskString + " world",
146+
},
147+
}
148+
149+
for name, tc := range tests {
150+
t.Run(name, func(t *testing.T) {
151+
var buf bytes.Buffer
152+
153+
var maskStrings [][]byte
154+
for _, s := range tc.maskStrings {
155+
maskStrings = append(maskStrings, []byte(s))
156+
}
157+
158+
m := New(maskStrings, tc.options)
159+
160+
writer := m.AddStream(&buf)
161+
go m.Start()
162+
tc.inputFunc(writer)
163+
164+
err = m.Stop()
165+
166+
assert.OK(t, err)
167+
assert.Equal(t, buf.String(), tc.expected)
168+
})
169+
}
170+
}
171+
172+
type errWriter struct {
173+
err error
174+
}
175+
176+
func (w errWriter) Write(p []byte) (n int, err error) {
177+
return 0, w.err
178+
}
179+
180+
func TestMasker_WriteError(t *testing.T) {
181+
expectedErr := fmt.Errorf("test")
182+
183+
m := New([][]byte{[]byte("test")}, nil)
184+
writer := m.AddStream(&errWriter{err: expectedErr})
185+
186+
go m.Start()
187+
_, err := writer.Write([]byte{0x01})
188+
assert.OK(t, err)
189+
190+
err = m.Stop()
191+
assert.Equal(t, err, expectedErr)
192+
}
193+
194+
func TestMasker_MultipleStreams(t *testing.T) {
195+
sequences := [][]byte{
196+
[]byte("Gandalf"),
197+
[]byte("uruk-hai army"),
198+
[]byte("Aragorn, son of Arathorn"),
199+
[]byte("hobbit"),
200+
}
201+
202+
input := [][]byte{
203+
[]byte("line 1 "),
204+
[]byte("line 2 "),
205+
[]byte("line 3 "),
206+
[]byte("message from Gandalf the Grey "),
207+
[]byte("line 5 "),
208+
[]byte("an uruk-hai army appears "),
209+
[]byte("say hobbit hobbit hobbit "),
210+
}
211+
212+
bufferDelay := 10 * time.Millisecond
213+
214+
m := New(sequences, &Options{
215+
BufferDelay: bufferDelay,
216+
})
217+
218+
var outputBuffer bytes.Buffer
219+
var streams [3]io.Writer
220+
221+
for i := range streams {
222+
streams[i] = m.AddStream(&outputBuffer)
223+
}
224+
225+
go m.Start()
226+
227+
expected := ""
228+
229+
for i, b := range input {
230+
n, err := streams[i%3].Write(b)
231+
assert.OK(t, err)
232+
assert.Equal(t, n, len(b))
233+
234+
expected += string(b)
235+
}
236+
237+
err := m.Stop()
238+
assert.OK(t, err)
239+
240+
for _, sequence := range sequences {
241+
expected = strings.ReplaceAll(expected, string(sequence), maskString)
242+
}
243+
assert.Equal(t, outputBuffer.String(), expected)
244+
}

0 commit comments

Comments
 (0)