-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcmdio.go
More file actions
executable file
·317 lines (277 loc) · 7.27 KB
/
cmdio.go
File metadata and controls
executable file
·317 lines (277 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
/*
Copyright © 2020 streamz <bytecodenerd@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmdio
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"os/user"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
)
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
// Options configures how a command will be executed.
type Options struct {
In io.Reader // Input stream (stdin)
Out io.Writer // Output stream (stdout)
Err io.Writer // Error stream (stderr)
Env []string // Environment variables
Usr *user.User // User to run command as
}
// Info contains information about a command's execution status and result.
type Info struct {
Error error
RunT time.Duration
Pid int
Exit int
StartT int64
EndT int64
Finished bool
Signaled bool
}
type status int
const (
_uninitialized status = iota
_exited
_running
_signaled
)
// CmdIo manages command execution with I/O redirection and process control.
// It provides both synchronous and asynchronous command execution with
// proper cleanup and signal handling.
type CmdIo struct {
in io.Reader
out io.Writer
err io.Writer
env []string
lok *sync.Mutex
usr *user.User
ini *sync.Once
sta atomic.Int32 // status using atomic for thread safety
inf Info
str time.Time
ech chan Info
sch chan bool
syn chan struct{}
ncp noCopy
}
// New creates a new CmdIo instance with the provided options.
// If no user is specified in options, it defaults to the current user.
func New(optFn func() *Options) *CmdIo {
opts := optFn()
usr := opts.Usr
if usr == nil {
usr, _ = user.Current()
}
c := &CmdIo{
in: opts.In,
out: opts.Out,
err: opts.Err,
env: opts.Env,
usr: usr,
lok: &sync.Mutex{},
ini: &sync.Once{},
inf: Info{Pid: 0, Exit: -1},
ech: make(chan Info, 1),
sch: make(chan bool, 1),
syn: make(chan struct{}),
}
c.sta.Store(int32(_uninitialized))
return c
}
// Start asynchronously starts a command and returns channels for monitoring.
// The first channel signals whether the command started successfully.
// The second channel receives the final execution info when the command completes.
func (c *CmdIo) Start(name string, args ...string) (<-chan bool, <-chan Info) {
init := false
c.ini.Do(func() {
init = true
go signalHandler()
go c.runFn(name, args...)
})
if !init {
c.ech <- Info{
Error: fmt.Errorf("already executed, cannot reuse CmdIo"),
RunT: 0,
Pid: 0,
Exit: 0,
StartT: 0,
EndT: 0,
Finished: true,
Signaled: false,
}
}
return c.sch, c.ech
}
// Run synchronously executes a command and waits for it to complete.
// It returns the execution info containing exit status and any errors.
func (c *CmdIo) Run(name string, args ...string) *Info {
_, complete := c.Start(name, args...)
info := <-complete
return &info
}
// RunContext synchronously executes a command with context support.
// The command will be terminated if the context is cancelled.
// It returns the execution info containing exit status and any errors.
func (c *CmdIo) RunContext(ctx context.Context, name string, args ...string) *Info {
started, complete := c.StartContext(ctx, name, args...)
<-started
info := <-complete
return &info
}
// StartContext asynchronously starts a command with context support.
// The command will be terminated if the context is cancelled.
// Returns channels for monitoring start success and completion.
func (c *CmdIo) StartContext(ctx context.Context, name string, args ...string) (<-chan bool, <-chan Info) {
started, complete := c.Start(name, args...)
// Monitor context cancellation
go func() {
select {
case <-ctx.Done():
// Context cancelled, terminate the command
_ = c.Terminate()
case <-c.syn:
// Command completed naturally
}
}()
return started, complete
}
// Terminate sends a SIGTERM signal to the running command and all its children.
// Returns nil if the command is not running or has already finished.
func (c *CmdIo) Terminate() error {
c.lok.Lock()
defer c.lok.Unlock()
if status(c.sta.Load()) == _uninitialized || c.inf.Finished {
return nil
}
c.sta.Store(int32(_signaled))
c.inf.Signaled = true
return syscall.Kill(-c.inf.Pid, syscall.SIGTERM)
}
// Info returns a copy of the current execution state of a command.
// The runtime is updated if the command is still running.
func (c *CmdIo) Info() Info {
c.lok.Lock()
defer c.lok.Unlock()
switch status(c.sta.Load()) {
case _running:
c.inf.RunT = time.Since(c.str)
case _exited:
c.inf.Finished = true
case _signaled:
// Already marked as signaled, runtime was captured at termination
case _uninitialized:
// Command not yet started
}
return c.inf
}
// Join returns a channel that is closed when the command completes.
// This can be used to wait for command completion in a select statement.
func (c *CmdIo) Join() <-chan struct{} {
return c.syn
}
func (c *CmdIo) runFn(name string, args ...string) {
defer func() {
c.ech <- c.Info()
close(c.syn)
}()
cmd := c.newCmd(name, args...)
now := time.Now()
if e := cmd.Start(); e != nil {
c.complete(&now, e)
c.sch <- false
return
}
c.init(&now, cmd)
c.sch <- true
e := cmd.Wait()
c.complete(&now, e)
}
func (c *CmdIo) newCmd(name string, args ...string) *exec.Cmd {
uid, _ := strconv.Atoi(c.usr.Uid)
gid, _ := strconv.Atoi(c.usr.Gid)
cred := &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
NoSetGroups: true,
}
cmd := exec.Command(name, args...)
cmd.SysProcAttr = syscallAttrs(cred)
// wire IO
cmd.Stdin = os.Stdin
if c.in != nil && c.in != os.Stdin {
cmd.Stdin = c.in
}
cmd.Stdout = os.Stdout
if c.out != nil && c.out != os.Stdout {
cmd.Stdout = io.MultiWriter(c.out, os.Stdout)
}
cmd.Stderr = os.Stderr
if c.err != nil && c.err != os.Stderr {
cmd.Stderr = io.MultiWriter(c.err, os.Stderr)
}
cmd.Dir = os.Getenv("PWD")
cmd.Env = os.Environ()
if len(c.env) > 0 {
cmd.Env = c.env
}
return cmd
}
func (c *CmdIo) init(t *time.Time, cmd *exec.Cmd) {
c.lok.Lock()
defer c.lok.Unlock()
c.inf.Pid = cmd.Process.Pid
c.inf.StartT = t.UnixNano()
c.sta.Store(int32(_running))
}
func (c *CmdIo) complete(t *time.Time, err error) {
code := 0
if err != nil {
code = exitErr(err)
}
c.endState(t, code, err)
}
func (c *CmdIo) endState(t *time.Time, code int, err error) {
c.lok.Lock()
defer c.lok.Unlock()
c.inf.Error = err
c.inf.Exit = code
c.inf.StartT = t.UnixNano()
c.inf.EndT = time.Now().UnixNano()
if status(c.sta.Load()) != _signaled {
c.inf.Finished = true
c.sta.Store(int32(_exited))
}
}
func exitErr(err error) int {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok {
if sig := ws.Signal(); sig > 0 {
return int(sig)
}
return ws.ExitStatus()
}
// If type assertion fails, return a generic error code
return 1
}
return 0
}