Skip to content

Commit 30c026d

Browse files
committed
The cloudtask site has added runtime management, adjusts the cloudtask-center code, and modifies the node's on-line runtime processing flow.
1 parent e9d1ff7 commit 30c026d

16 files changed

Lines changed: 598 additions & 672 deletions

File tree

api/handler.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,18 +191,3 @@ func putJobAction(c *Context) error {
191191
response.SetContent(ErrRequestAccepted.Error())
192192
return c.JSON(http.StatusAccepted, response)
193193
}
194-
195-
func deleteRuntime(c *Context) error {
196-
197-
response := &ResponseImpl{}
198-
runtime := ResolveRemoveRuntimeRequest(c)
199-
if runtime == "" {
200-
response.SetContent(ErrRequestResolveInvaild.Error())
201-
return c.JSON(http.StatusBadRequest, response)
202-
}
203-
204-
cacheRepository := c.Get("CacheRepository").(*cache.CacheRepository)
205-
cacheRepository.RemoveLocation(runtime)
206-
response.SetContent(ErrRequestAccepted.Error())
207-
return c.JSON(http.StatusAccepted, response)
208-
}

api/messages.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import "github.com/cloudtask/libtools/gounits/logger"
88
import (
99
"encoding/json"
1010
"fmt"
11+
"strings"
1112
)
1213

1314
//ProcessSystemEventMessage is exported
@@ -20,6 +21,7 @@ func ProcessSystemEventMessage(request *MessageRequest) error {
2021
}
2122

2223
cacheRepository := request.Context.Get("CacheRepository").(*cache.CacheRepository)
24+
scheduler := request.Context.Get("Scheduler").(*scheduler.Scheduler)
2325
switch systemEvent.Event {
2426
case models.RemoveGroupEvent:
2527
{ //只考虑删除组情况,创建和修改组不会对分配表造成改变.
@@ -33,7 +35,6 @@ func ProcessSystemEventMessage(request *MessageRequest) error {
3335
if len(systemEvent.JobIds) > 0 {
3436
jobId := systemEvent.JobIds[0]
3537
if job := cacheRepository.GetRawJob(jobId); job != nil {
36-
scheduler := request.Context.Get("Scheduler").(*scheduler.Scheduler)
3738
scheduler.SingleJobAlloc(systemEvent.Runtime, jobId)
3839
}
3940
}
@@ -54,7 +55,6 @@ func ProcessSystemEventMessage(request *MessageRequest) error {
5455
job := cacheRepository.GetRawJob(systemEvent.JobIds[0])
5556
if job != nil {
5657
if job.Enabled == 1 {
57-
scheduler := request.Context.Get("Scheduler").(*scheduler.Scheduler)
5858
jobData := cacheRepository.GetAllocJob(job.Location, job.JobId)
5959
if jobData == nil {
6060
scheduler.SingleJobAlloc(job.Location, job.JobId) //重新加入分配表
@@ -79,6 +79,36 @@ func ProcessSystemEventMessage(request *MessageRequest) error {
7979
}
8080
cacheRepository.UpdateAllocJobs(systemEvent.Runtime, systemEvent.JobIds)
8181
}
82+
case models.CreateRuntimeEvent:
83+
{
84+
if strings.TrimSpace(systemEvent.Runtime) != "" {
85+
logger.INFO("[#api#] ### %s, %+v", models.CreateRuntimeEvent, systemEvent)
86+
workLocation := cacheRepository.GetLocation(systemEvent.Runtime)
87+
if workLocation == nil {
88+
cacheRepository.CreateLocationAlloc(systemEvent.Runtime)
89+
}
90+
}
91+
}
92+
case models.ChangeRuntimeEvent:
93+
{
94+
if strings.TrimSpace(systemEvent.Runtime) != "" {
95+
logger.INFO("[#api#] ### %s, %+v", models.ChangeRuntimeEvent, systemEvent)
96+
workLocation := cacheRepository.GetLocation(systemEvent.Runtime)
97+
if workLocation != nil {
98+
cacheRepository.ChangeLocationAlloc(systemEvent.Runtime)
99+
}
100+
}
101+
}
102+
case models.RemoveRuntimeEvent:
103+
{
104+
if strings.TrimSpace(systemEvent.Runtime) != "" {
105+
logger.INFO("[#api#] ### %s, %+v", models.RemoveRuntimeEvent, systemEvent)
106+
workLocation := cacheRepository.GetLocation(systemEvent.Runtime)
107+
if workLocation != nil {
108+
cacheRepository.RemoveLocationAlloc(systemEvent.Runtime)
109+
}
110+
}
111+
}
82112
}
83113
return nil
84114
}

api/resolve.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,3 @@ func ResloveJogRequest(c *Context) *JobLogRequest {
101101
request.Context = c
102102
return request
103103
}
104-
105-
//ResolveRemoveRuntimeRequest is exported
106-
func ResolveRemoveRuntimeRequest(c *Context) string {
107-
108-
vars := mux.Vars(c.request)
109-
runtime := strings.TrimSpace(vars["runtime"])
110-
if len(runtime) == 0 {
111-
return ""
112-
}
113-
return runtime
114-
}

api/router.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ var routes = map[string]map[string]handler{
2424
"PUT": {
2525
"/cloudtask/v2/jobs/action": putJobAction,
2626
},
27-
"DELETE": {
28-
"/cloudtask/v2/runtimes/{runtime}": deleteRuntime,
29-
},
3027
}
3128

3229
func NewRouter(enableCors bool, store Store) *mux.Router {

cache/alloc.go

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,41 @@ import (
77
"sync"
88
)
99

10+
//AllocEvent is exported
11+
type AllocEvent int
12+
13+
const (
14+
ALLOC_CREATED_EVENT AllocEvent = iota + 1
15+
ALLOC_CHANGED_EVENT
16+
ALLOC_REMOVED_EVENT
17+
)
18+
19+
func (event AllocEvent) String() string {
20+
21+
switch event {
22+
case ALLOC_CREATED_EVENT:
23+
return "ALLOC_CREATED_EVENT"
24+
case ALLOC_CHANGED_EVENT:
25+
return "ALLOC_CHANGED_EVENT"
26+
case ALLOC_REMOVED_EVENT:
27+
return "ALLOC_REMOVED_EVENT"
28+
}
29+
return ""
30+
}
31+
32+
//AllocCacheEventHandlerFunc is exported
33+
type AllocCacheEventHandlerFunc func(event AllocEvent, location string, data []byte, err error)
34+
1035
//AllocStore is exported
1136
type AllocStore struct {
1237
sync.RWMutex
1338
allocPool *sync.Pool
1439
tableData models.AllocMapper
15-
ICacheRepositoryHandler
40+
callback AllocCacheEventHandlerFunc
1641
}
1742

1843
//NewAllocStore is exported
19-
func NewAllocStore(handler ICacheRepositoryHandler) *AllocStore {
44+
func NewAllocStore(callback AllocCacheEventHandlerFunc) *AllocStore {
2045

2146
allocPool := &sync.Pool{
2247
New: func() interface{} { //数据编码池,默认分配128K
@@ -25,9 +50,9 @@ func NewAllocStore(handler ICacheRepositoryHandler) *AllocStore {
2550
}
2651

2752
return &AllocStore{
28-
allocPool: allocPool,
29-
tableData: make(models.AllocMapper, 0),
30-
ICacheRepositoryHandler: handler,
53+
allocPool: allocPool,
54+
tableData: make(models.AllocMapper, 0),
55+
callback: callback,
3156
}
3257
}
3358

@@ -54,42 +79,6 @@ func (store *AllocStore) GetAlloc(location string) *models.JobsAlloc {
5479
return nil
5580
}
5681

57-
//MakeAllocBuffer is exported
58-
//make empty alloc to buffer.
59-
func (store *AllocStore) MakeAllocBuffer() ([]byte, error) {
60-
61-
return models.JobsAllocEnCode(store.allocPool, &models.JobsAlloc{
62-
Version: 0,
63-
Jobs: make([]*models.JobData, 0),
64-
})
65-
}
66-
67-
//SetAllocBuffer is exported
68-
func (store *AllocStore) SetAllocBuffer(location string, data []byte) error {
69-
70-
store.Lock()
71-
defer store.Unlock()
72-
jobsAlloc := &models.JobsAlloc{Version: 0, Jobs: make([]*models.JobData, 0)}
73-
if err := models.JobsAllocDeCode(data, jobsAlloc); err != nil {
74-
return err
75-
}
76-
store.tableData[location] = jobsAlloc
77-
return nil
78-
}
79-
80-
//ClearAllocBuffer is exported
81-
func (store *AllocStore) ClearAllocBuffer(location string) {
82-
83-
store.Lock()
84-
defer store.Unlock()
85-
if jobsAlloc, ret := store.tableData[location]; ret {
86-
jobsAlloc.Jobs = []*models.JobData{}
87-
jobsAlloc.Version = jobsAlloc.Version + 1
88-
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
89-
store.OnAllocCacheChangedHandlerFunc(location, data, err)
90-
}
91-
}
92-
9382
//HasAlloc is exported
9483
func (store *AllocStore) HasAlloc(location string) bool {
9584

@@ -160,7 +149,7 @@ func (store *AllocStore) SetAllocJobsKey(location string, jobs map[string]string
160149

161150
jobsAlloc.Version = jobsAlloc.Version + 1
162151
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
163-
store.OnAllocCacheChangedHandlerFunc(location, data, err)
152+
store.callback(ALLOC_CHANGED_EVENT, location, data, err)
164153
}
165154

166155
//GetAllocJob is exported
@@ -185,6 +174,7 @@ func (store *AllocStore) CreateAllocJob(location string, key string, jobId strin
185174

186175
store.Lock()
187176
defer store.Unlock()
177+
var allocEvent AllocEvent
188178
jobsAlloc, ret := store.tableData[location]
189179
if !ret {
190180
jobsAlloc = &models.JobsAlloc{
@@ -198,6 +188,7 @@ func (store *AllocStore) CreateAllocJob(location string, key string, jobId strin
198188
},
199189
}
200190
store.tableData[location] = jobsAlloc
191+
allocEvent = ALLOC_CREATED_EVENT
201192
} else {
202193
found := false
203194
for _, jobData := range jobsAlloc.Jobs {
@@ -214,10 +205,11 @@ func (store *AllocStore) CreateAllocJob(location string, key string, jobId strin
214205
Version: 1})
215206
}
216207
jobsAlloc.Version = jobsAlloc.Version + 1
208+
allocEvent = ALLOC_CHANGED_EVENT
217209
}
218210

219211
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
220-
store.OnAllocCacheChangedHandlerFunc(location, data, err)
212+
store.callback(allocEvent, location, data, err)
221213
}
222214

223215
//UpdateAllocJobs is exported
@@ -246,7 +238,7 @@ func (store *AllocStore) UpdateAllocJobs(location string, jobIds []string) {
246238

247239
jobsAlloc.Version = jobsAlloc.Version + 1
248240
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
249-
store.OnAllocCacheChangedHandlerFunc(location, data, err)
241+
store.callback(ALLOC_CHANGED_EVENT, location, data, err)
250242
}
251243

252244
//RemoveAllocJob is exported
@@ -269,7 +261,7 @@ func (store *AllocStore) RemoveAllocJob(location string, jobId string) {
269261
jobsAlloc.Jobs = append(jobsAlloc.Jobs[:i], jobsAlloc.Jobs[i+1:]...)
270262
jobsAlloc.Version = jobsAlloc.Version + 1
271263
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
272-
store.OnAllocCacheChangedHandlerFunc(location, data, err)
264+
store.callback(ALLOC_CHANGED_EVENT, location, data, err)
273265
break
274266
}
275267
}
@@ -303,17 +295,35 @@ func (store *AllocStore) RemoveAllocJobs(location string, jobIds []string) {
303295
if found {
304296
jobsAlloc.Version = jobsAlloc.Version + 1
305297
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
306-
store.OnAllocCacheChangedHandlerFunc(location, data, err)
298+
store.callback(ALLOC_CHANGED_EVENT, location, data, err)
307299
}
308300
}
309301

302+
//CreateAlloc is exported
303+
func (store *AllocStore) CreateAlloc(location string, data []byte) {
304+
305+
store.Lock()
306+
if _, ret := store.tableData[location]; !ret {
307+
jobsAlloc := &models.JobsAlloc{}
308+
var err error
309+
if err = models.JobsAllocDeCode(data, jobsAlloc); err == nil {
310+
store.tableData[location] = jobsAlloc
311+
}
312+
store.callback(ALLOC_CREATED_EVENT, location, data, err)
313+
}
314+
store.Unlock()
315+
}
316+
310317
//RemoveAlloc is exported
311318
func (store *AllocStore) RemoveAlloc(location string) {
312319

313320
store.Lock()
314-
if _, ret := store.tableData[location]; ret {
321+
if jobsAlloc, ret := store.tableData[location]; ret {
315322
delete(store.tableData, location)
316-
store.OnAllocCacheLocationRemovedHandlerFunc(location)
323+
jobsAlloc.Jobs = []*models.JobData{}
324+
jobsAlloc.Version = jobsAlloc.Version + 1
325+
data, err := models.JobsAllocEnCode(store.allocPool, jobsAlloc)
326+
store.callback(ALLOC_REMOVED_EVENT, location, data, err)
317327
}
318328
store.Unlock()
319329
}

0 commit comments

Comments
 (0)