Skip to content

Commit 4cc07b7

Browse files
authored
feat: dev mode commands (#69)
1 parent c070106 commit 4cc07b7

13 files changed

Lines changed: 198 additions & 43 deletions

File tree

.github/workflows/pr.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ jobs:
2323
# uses: mxschmitt/action-tmate@v3
2424
- run: make test-integration-docker
2525
name: Run integration tests inside Docker
26+
27+
#- name: Setup tmate session
28+
# uses: mxschmitt/action-tmate@v3
2629
- run: make test
2730
name: Unit tests
2831
- run: make build

cmd/run.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ var RunCmd = &cobra.Command{
4545
queueManager := services.NewQueueManager(scrollService, processLauncher)
4646
snapshotService := snapshotService.NewSnapshotService()
4747

48-
_, err = initScroll(scrollService, snapshotService, processLauncher, queueManager)
48+
go queueManager.Work()
49+
_, err = initScroll(scrollService, snapshotService, processLauncher)
4950
if err != nil {
5051
return fmt.Errorf("error initializing scroll: %w", err)
5152
}

cmd/serve.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ to interact and monitor the Scroll Application`,
128128
coldStarter := services.NewColdStarter(portService, queueManager, snapshotService, scrollService.GetDir())
129129

130130
uiService := services.NewUiService(scrollService)
131-
uiDevService := services.NewUiDevService()
131+
uiDevService := services.NewUiDevService(
132+
queueManager, scrollService,
133+
)
132134

133135
scrollHandler := handler.NewScrollHandler(scrollService, pluginManager, processLauncher, queueManager, processManager)
134136
processHandler := handler.NewProcessHandler(processManager)
@@ -163,6 +165,9 @@ to interact and monitor the Scroll Application`,
163165
go portService.StartMonitoring(ctx, watchPortsInterfaces, currentScroll.KeepAlivePPM)
164166
}
165167

168+
logger.Log().Info("Starting queue manager")
169+
go queueManager.Work()
170+
166171
if !idleScroll {
167172

168173
doneChan := make(chan error, 1)
@@ -306,7 +311,7 @@ func startup(scrollService *services.ScrollService, snapshotService ports.Snapsh
306311

307312
logger.Log().Info("Initializing scroll")
308313

309-
newScroll, err := initScroll(scrollService, snapshotService, processLauncher, queueManager)
314+
newScroll, err := initScroll(scrollService, snapshotService, processLauncher)
310315

311316
if err != nil {
312317
doneChan <- err
@@ -380,7 +385,7 @@ func startup(scrollService *services.ScrollService, snapshotService ports.Snapsh
380385

381386
}
382387

383-
func initScroll(scrollService *services.ScrollService, snapshotService ports.SnapshotService, processLauncher *services.ProcedureLauncher, queueManager *services.QueueManager) (bool, error) {
388+
func initScroll(scrollService *services.ScrollService, snapshotService ports.SnapshotService, processLauncher *services.ProcedureLauncher) (bool, error) {
384389

385390
lock, err := scrollService.ReloadLock(ignoreVersionCheck)
386391
if err != nil {
@@ -450,8 +455,5 @@ func initScroll(scrollService *services.ScrollService, snapshotService ports.Sna
450455
}
451456
}
452457

453-
logger.Log().Info("Starting queue manager")
454-
go queueManager.Work()
455-
456458
return newScroll, nil
457459
}

internal/core/domain/queue_item.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package domain
22

33
type QueueItem struct {
4+
Name string
45
Status ScrollLockStatus
56
Error error
67
UpdateLockStatus bool
78
RunAfterExecution func()
9+
DoneChan chan struct{}
810
}

internal/core/domain/scroll.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import (
1515
type RunMode string
1616

1717
const (
18-
RunModeAlways RunMode = "always" //default
19-
RunModeOnce RunMode = "once"
20-
RunModeRestart RunMode = "restart"
18+
RunModeAlways RunMode = "always" //default
19+
RunModeOnce RunMode = "once" //runs only once
20+
RunModeRestart RunMode = "restart" //restarts on failure
2121
RunModePersistent RunMode = "persistent" //restarts on failure and on program restart
2222
)
2323

internal/core/ports/services_ports.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type ScrollServiceInterface interface {
3636
GetCommand(cmd string) (*domain.CommandInstructionSet, error)
3737
InitFiles(files ...string) error
3838
InitTemplateFiles(files ...string) error
39+
AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet)
3940
}
4041

4142
type ProcedureLauchnerInterface interface {
@@ -107,6 +108,7 @@ type QueueManagerInterface interface {
107108
AddAndRememberItem(cmd string) error
108109
AddTempItem(cmd string) error
109110
AddShutdownItem(cmd string) error
111+
AddTempItemWithWait(cmd string) error
110112
GetQueue() map[string]domain.ScrollLockStatus
111113
}
112114

@@ -180,4 +182,5 @@ type UiDevServiceInterface interface {
180182
Unsubscribe(client chan *[]byte)
181183
GetWatchedPaths() []string
182184
IsWatching() bool
185+
SetCommands(procs map[string]*domain.CommandInstructionSet)
183186
}

internal/core/services/process_manager.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,6 @@ func (po *ProcessManager) Run(commandName string, command []string, dir string)
242242
// Wait for goroutine to print everything (watchdog closes stdin)
243243
exitCode := process.Cmd.ProcessState.ExitCode()
244244

245-
println("Exit code", exitCode)
246245
console.MarkExited(exitCode)
247246

248247
close(combinedChannel)

internal/core/services/queue_manager.go

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ var ErrCommandDoneOnce = fmt.Errorf("command is already done and has run mode on
1616

1717
type AddItemOptions struct {
1818
Remember bool
19+
Wait bool
1920
RunAfterExecution func()
2021
}
2122

@@ -40,8 +41,8 @@ func NewQueueManager(
4041
scrollService: scrollService,
4142
processLauncher: processLauncher,
4243
commandQueue: make(map[string]*domain.QueueItem),
43-
taskChan: make(chan string),
44-
taskDoneChan: make(chan struct{}),
44+
taskChan: make(chan string, 100), // FIXED: Buffered channel
45+
taskDoneChan: make(chan struct{}, 1), // FIXED: Buffered channel
4546
shutdownChan: make(chan struct{}),
4647
notifierChan: make([]chan []string, 0),
4748
callbacksPostRun: make(map[string]func()),
@@ -66,15 +67,20 @@ func (sc *QueueManager) workItem(cmd string) error {
6667
}
6768

6869
func (sc *QueueManager) notify() {
70+
sc.mu.Lock()
6971
queuedCommands := make([]string, 0)
7072

71-
for cmd, _ := range sc.commandQueue {
72-
if sc.getStatus(cmd) != domain.ScrollLockStatusDone && sc.getStatus(cmd) != domain.ScrollLockStatusError {
73+
for cmd, item := range sc.commandQueue {
74+
if item.Status != domain.ScrollLockStatusDone && item.Status != domain.ScrollLockStatusError {
7375
queuedCommands = append(queuedCommands, cmd)
7476
}
7577
}
7678

77-
for _, notifier := range sc.notifierChan {
79+
notifiers := make([]chan []string, len(sc.notifierChan))
80+
copy(notifiers, sc.notifierChan)
81+
sc.mu.Unlock()
82+
83+
for _, notifier := range notifiers {
7884
select {
7985
case notifier <- queuedCommands:
8086
// Successfully sent queuedCommands to the notifier channel
@@ -111,9 +117,15 @@ func (sc *QueueManager) AddItemWithCallback(cmd string, cb func()) error {
111117
})
112118
}
113119

120+
func (sc *QueueManager) AddTempItemWithWait(cmd string) error {
121+
return sc.addQueueItem(cmd, AddItemOptions{
122+
Remember: false,
123+
Wait: true,
124+
})
125+
}
126+
114127
func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
115128
sc.mu.Lock()
116-
defer sc.mu.Unlock()
117129

118130
setLock := options.Remember
119131

@@ -124,6 +136,7 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
124136
command, err := sc.scrollService.GetCommand(cmd)
125137

126138
if err != nil {
139+
sc.mu.Unlock()
127140
return err
128141
}
129142

@@ -135,17 +148,25 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
135148
if value, ok := sc.commandQueue[cmd]; ok {
136149

137150
if value.Status != domain.ScrollLockStatusDone && value.Status != domain.ScrollLockStatusError {
151+
sc.mu.Unlock()
138152
return ErrAlreadyInQueue
139153
}
140154

141155
if value.Status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce {
156+
sc.mu.Unlock()
142157
return ErrCommandDoneOnce
143158
}
144159
}
145160

161+
var doneChan chan struct{}
162+
if options.Wait {
163+
doneChan = make(chan struct{})
164+
}
165+
146166
item := &domain.QueueItem{
147167
Status: domain.ScrollLockStatusWaiting,
148168
UpdateLockStatus: setLock,
169+
DoneChan: doneChan,
149170
}
150171

151172
if options.RunAfterExecution != nil {
@@ -157,12 +178,27 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
157178
if setLock {
158179
lock, err := sc.scrollService.GetLock()
159180
if err != nil {
181+
sc.mu.Unlock()
160182
return err
161183
}
162184
lock.SetStatus(cmd, domain.ScrollLockStatusWaiting, nil)
163185
}
186+
187+
sc.mu.Unlock()
188+
189+
// FIXED: Non-blocking send to buffered channel
164190
sc.taskChan <- cmd
165191

192+
// Wait for completion if requested
193+
if options.Wait {
194+
<-doneChan
195+
// Return error if command failed
196+
item := sc.GetQueueItem(cmd)
197+
if item != nil && item.Error != nil {
198+
return item.Error
199+
}
200+
}
201+
166202
return nil
167203
}
168204

@@ -303,13 +339,19 @@ func (sc *QueueManager) RunQueue() {
303339
logger.Log().Info("Running command", zap.String("command", cmd))
304340
go func(c string, i *domain.QueueItem) {
305341
defer func() {
342+
// Signal completion if someone is waiting
343+
if i.DoneChan != nil {
344+
close(i.DoneChan)
345+
}
346+
306347
if i.RunAfterExecution != nil {
307348
i.RunAfterExecution()
308349
}
309350
if callback, ok := sc.callbacksPostRun[c]; ok && callback != nil {
310351
callback()
311352
}
312353

354+
// FIXED: Non-blocking send to buffered channel
313355
sc.taskDoneChan <- struct{}{}
314356
}()
315357
err := sc.workItem(c)
@@ -340,33 +382,30 @@ func (sc *QueueManager) Shutdown() {
340382
}
341383

342384
func (sc *QueueManager) WaitUntilEmpty() {
343-
notifier := make(chan []string)
385+
notifier := make(chan []string, 10) // FIXED: Buffered channel
386+
387+
sc.mu.Lock()
344388
sc.notifierChan = append(sc.notifierChan, notifier)
345-
println("WaitUntilEmpty")
346-
for k, n := range sc.commandQueue {
347-
if n.Status == domain.ScrollLockStatusError {
348-
println(k + "---: " + string(n.Status) + " " + n.Error.Error())
349-
} else {
350-
println(k + "---: " + string(n.Status))
351-
}
352-
}
389+
sc.mu.Unlock()
390+
353391
for {
392+
sc.mu.Lock()
393+
for cmd, item := range sc.commandQueue {
394+
println(cmd + ": " + string(item.Status))
395+
}
396+
sc.mu.Unlock()
397+
354398
cmds := <-notifier
355399
if len(cmds) == 0 {
356400
// remove notifier
401+
sc.mu.Lock()
357402
for i, n := range sc.notifierChan {
358403
if n == notifier {
359404
sc.notifierChan = append(sc.notifierChan[:i], sc.notifierChan[i+1:]...)
360405
break
361406
}
362407
}
363-
for k, n := range sc.commandQueue {
364-
if n.Status == domain.ScrollLockStatusError {
365-
println(k + ": " + string(n.Status) + " " + n.Error.Error())
366-
} else {
367-
println(k + ": " + string(n.Status))
368-
}
369-
}
408+
sc.mu.Unlock()
370409
return
371410
}
372411
}

internal/core/services/scroll_service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,11 @@ func (sc *ScrollService) GetCommand(cmd string) (*domain.CommandInstructionSet,
266266
return nil, errors.New("command " + cmd + " not found")
267267
}
268268
}
269+
270+
func (sc *ScrollService) AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet) {
271+
scroll := sc.GetFile()
272+
if scroll.Commands == nil {
273+
scroll.Commands = make(map[string]*domain.CommandInstructionSet)
274+
}
275+
scroll.Commands[cmd] = instructions
276+
}

internal/core/services/ui_dev_service.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,30 @@ type UiDevService struct {
3333
ctx context.Context
3434
cancel context.CancelFunc
3535
isWatching bool
36+
commands map[string]*domain.CommandInstructionSet
37+
queueManager ports.QueueManagerInterface
38+
scrollService ports.ScrollServiceInterface
39+
events uint32
3640
}
3741

3842
// NewUiDevService creates a new instance of UiDevService
39-
func NewUiDevService() ports.UiDevServiceInterface {
43+
func NewUiDevService(
44+
queueManager ports.QueueManagerInterface, scrollService ports.ScrollServiceInterface,
45+
) ports.UiDevServiceInterface {
4046
return &UiDevService{
41-
watchPaths: make([]string, 0),
42-
isWatching: false,
47+
watchPaths: make([]string, 0),
48+
isWatching: false,
49+
queueManager: queueManager,
50+
scrollService: scrollService,
51+
}
52+
}
53+
54+
func (uds *UiDevService) SetCommands(commands map[string]*domain.CommandInstructionSet) {
55+
uds.mu.Lock()
56+
defer uds.mu.Unlock()
57+
uds.commands = commands
58+
for key, cmd := range commands {
59+
uds.scrollService.AddTemporaryCommand(key, cmd)
4360
}
4461
}
4562

@@ -215,7 +232,10 @@ func (uds *UiDevService) processEvents() {
215232
logger.Log().Info("File watcher events channel closed")
216233
return
217234
}
218-
235+
if event.Op&fsnotify.Chmod == fsnotify.Chmod {
236+
// Ignore chmod events
237+
continue
238+
}
219239
uds.handleFileEvent(event)
220240

221241
case err, ok := <-uds.watcher.Errors:
@@ -238,6 +258,18 @@ func (uds *UiDevService) handleFileEvent(event fsnotify.Event) {
238258
relativePath = relPath
239259
}
240260
}
261+
uds.events++
262+
if uds.commands != nil && len(uds.commands) > 0 {
263+
go func() {
264+
e := uds.events - 1
265+
for uds.events != e {
266+
e = uds.events
267+
for key, _ := range uds.commands {
268+
uds.queueManager.AddTempItemWithWait(key)
269+
}
270+
}
271+
}()
272+
}
241273

242274
changeEvent := FileChangeEvent{
243275
Path: relativePath,

0 commit comments

Comments
 (0)