diff --git a/.env.example b/.env.example index 6fa23fc..8a8157d 100644 --- a/.env.example +++ b/.env.example @@ -10,4 +10,7 @@ DB_HOST= REDIS_HOST= CORES= GUACAMOLE_BASE_URL= -CMS_HOST= \ No newline at end of file +CMS_HOST= +CPU_OVERCOMMIT= +MEM_RESERVE_PCT= +DISK_RESERVE_PCT= \ No newline at end of file diff --git a/api/create_vm.go b/api/create_vm.go index 932b9be..25d6645 100644 --- a/api/create_vm.go +++ b/api/create_vm.go @@ -5,75 +5,16 @@ import ( "errors" "net/http" + "github.com/easy-cloud-Knet/KWS_Control/client/model" "github.com/easy-cloud-Knet/KWS_Control/service" - "github.com/easy-cloud-Knet/KWS_Control/structure" "github.com/easy-cloud-Knet/KWS_Control/util" ) -// ApiCreateVmRequest for POST /vm HTTP Request Body contract. -type ApiCreateVmRequest struct { - DomType string `json:"domType"` - DomName string `json:"domName"` - UUID structure.UUID `json:"uuid"` - OS string `json:"os"` - HWInfo ApiHardwareInfo `json:"HWInfo"` - Network ApiNetworkInfo `json:"network"` - Users []ApiUserInfo `json:"users"` - SubnetType string `json:"Subnettype"` -} - -type ApiHardwareInfo struct { - CPU uint32 `json:"cpu"` - Memory uint32 `json:"memory"` // MiB - Disk uint32 `json:"disk"` // MiB -} - -type ApiNetworkInfo struct { - IPs []string `json:"ips"` - // NetType은 내부에서 0 고정 — API 클라이언트가 전송하더라도 무시됨 -} - -type ApiUserInfo struct { - Name string `json:"name"` - Groups string `json:"groups"` - Password string `json:"passWord"` - SSHAuthorizedKeys []string `json:"ssh"` -} - -// ToServiceInput은 HTTP DTO를 서비스 계층 DTO로 변환 -func (r *ApiCreateVmRequest) ToServiceInput() service.CreateVMInput { - users := make([]service.UserSpec, len(r.Users)) - for i, u := range r.Users { - users[i] = service.UserSpec{ - Name: u.Name, - Groups: u.Groups, - Password: u.Password, - SSHAuthorizedKeys: u.SSHAuthorizedKeys, - } - } - return service.CreateVMInput{ - UUID: r.UUID, - DomType: r.DomType, - DomName: r.DomName, - OS: r.OS, - HardwareInfo: service.HardwareSpec{ - CPU: r.HWInfo.CPU, - Memory: r.HWInfo.Memory, - Disk: r.HWInfo.Disk, - }, - Network: service.NetworkSpec{ - IPs: r.Network.IPs, - }, - Users: users, - SubnetType: r.SubnetType, - } -} - func (c *handlerContext) createVm(w http.ResponseWriter, r *http.Request) { log := util.GetLogger() defer r.Body.Close() - var req ApiCreateVmRequest + var req model.CreateVMRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { log.Error("createVm: failed to parse request body: %v", err, true) var syntaxErr *json.SyntaxError @@ -85,12 +26,13 @@ func (c *handlerContext) createVm(w http.ResponseWriter, r *http.Request) { return } - if req.HWInfo.Memory == 0 || req.HWInfo.CPU == 0 || req.HWInfo.Disk == 0 { + if req.HardwareInfo.Memory == 0 || req.HardwareInfo.CPU == 0 || req.HardwareInfo.Disk == 0 { util.RespondError(w, http.StatusBadRequest, "Memory, CPU, and Disk must be non-zero") return } - if err := service.CreateVM(req.ToServiceInput(), c.context, c.rdb); err != nil { + err := service.CreateVM(req, c.context, c.rdb) + if err != nil { log.Error("createVm: failed to create VM: %v", err, true) util.RespondError(w, http.StatusInternalServerError, err.Error()) return diff --git a/api/get_vm_status.go b/api/get_vm_status.go index 9a28e84..46fe339 100644 --- a/api/get_vm_status.go +++ b/api/get_vm_status.go @@ -14,26 +14,6 @@ type ApiVmStatusRequest struct { Type string `json:"type"` // "cpu", "memory", or "disk" } -type ApiVmCpuStatusResponse struct { - System float64 `json:"system_time"` - Idle float64 `json:"idle_time"` - Usage float64 `json:"usage_percent"` -} - -type ApiVmMemoryStatusResponse struct { - Total uint64 `json:"total_gb"` - Used uint64 `json:"used_gb"` - Available uint64 `json:"available_gb"` - UsedPercent float64 `json:"used_percent"` -} - -type ApiVmDiskStatusResponse struct { - Total uint64 `json:"total_gb"` - Used uint64 `json:"used_gb"` - Free uint64 `json:"free_gb"` - UsedPercent float64 `json:"used_percent"` -} - func (c *handlerContext) vmStatus(w http.ResponseWriter, r *http.Request) { log := util.GetLogger() defer r.Body.Close() @@ -56,14 +36,11 @@ func (c *handlerContext) vmStatus(w http.ResponseWriter, r *http.Request) { switch statusType { case "cpu": - cpu, e := service.GetVMCpuInfo(req.UUID, c.context) - data, err = ApiVmCpuStatusResponse{System: cpu.System, Idle: cpu.Idle, Usage: cpu.Usage}, e + data, err = service.GetVMCpuInfo(req.UUID, c.context) case "memory": - mem, e := service.GetVMMemoryInfo(req.UUID, c.context) - data, err = ApiVmMemoryStatusResponse{Total: mem.Total, Used: mem.Used, Available: mem.Available, UsedPercent: mem.UsedPercent}, e + data, err = service.GetVMMemoryInfo(req.UUID, c.context) case "disk": - disk, e := service.GetVMDiskInfo(req.UUID, c.context) - data, err = ApiVmDiskStatusResponse{Total: disk.Total, Used: disk.Used, Free: disk.Free, UsedPercent: disk.UsedPercent}, e + data, err = service.GetVMDiskInfo(req.UUID, c.context) } if err != nil { diff --git a/client/cms.go b/client/cms.go index 59701b6..629aaa8 100644 --- a/client/cms.go +++ b/client/cms.go @@ -11,102 +11,54 @@ import ( "github.com/easy-cloud-Knet/KWS_Control/util" ) -// CmsClient는 CMS서비스에 서브넷/인스턴스 생성을 요청하는 HTTP 클라이언트. type CmsClient struct { baseURL string client *http.Client } -type CmsNewInstanceResponse struct { - IP string `json:"IP"` +type NewSubnetRequest struct { + IP string `json:"ip"` MacAddr string `json:"macAddr"` SdnUUID string `json:"sdnUUID"` } -type CmsDeleteInstanceResponse struct { - Detail string `json:"detail,omitempty"` -} - -type cmsNewInstanceRequestBody struct { +type subnetRequest struct { Subnet string `json:"Subnet"` } -type cmsDeleteInstanceRequestBody struct { - IP string `json:"IP"` -} - func NewCmsClient() *CmsClient { - host := os.Getenv("CMS_HOST") - if host == "" { + CMS_HOST := os.Getenv("CMS_HOST") + if CMS_HOST == "" { log := util.GetLogger() log.Error("CMS_HOST Re:Check your env variable", true) - host = "localhost:8080" - log.Warn("CMS_HOST set: %s", host, true) + CMS_HOST = "localhost:8080" + log.Warn("CMS_HOST set: %s", CMS_HOST, true) } return &CmsClient{ - baseURL: host, + baseURL: CMS_HOST, client: &http.Client{ Timeout: 10 * time.Second, }, } } -func (c *CmsClient) RequestDeleteInstance(ip string) (*CmsDeleteInstanceResponse, error) { - log := util.GetLogger() - - reqURL := fmt.Sprintf("http://%s/New/Instance", c.baseURL) - jsonBody, err := json.Marshal(cmsDeleteInstanceRequestBody{IP: ip}) - if err != nil { - log.Error("CmsClient.RequestDeleteInstance : failed to marshal JSON: %v", err) - return nil, fmt.Errorf("CmsClient.RequestDeleteInstance: failed to marshal JSON: %w", err) - } - req, err := http.NewRequest("DELETE", reqURL, bytes.NewBuffer(jsonBody)) - if err != nil { - log.Error("CmsClient.RequestDeleteInstance : failed to NewRequest: %v", err) - return nil, fmt.Errorf("CmsClient.RequestDeleteInstance: failed to create HTTP request: %w", err) - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "application/json") - - log.DebugInfo("Making request to: %s", reqURL) - log.DebugInfo("Request body: %s", string(jsonBody)) - - resp, err := c.client.Do(req) - if err != nil { - log.Error("CmsClient.RequestDeleteInstance : failed to send request: %v", err) - return nil, fmt.Errorf("CmsClient.RequestDeleteInstance: failed to send request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - log.Error("CmsClient.RequestDeleteInstance : CMS returned status: %s", resp.Status) - return nil, fmt.Errorf("CmsClient.RequestDeleteInstance: CMS server returned non-OK status: %s", resp.Status) - } - - var addrResp CmsDeleteInstanceResponse - if err := json.NewDecoder(resp.Body).Decode(&addrResp); err != nil { - log.Error("CmsClient.RequestDeleteInstance : failed to decode CMS response: %v", err) - return nil, fmt.Errorf("CmsClient.RequestDeleteInstance: failed to decode response: %w", err) - } - return &addrResp, nil -} - -// RequestNewInstance는 주어진 서브넷에 대해 CMS에 새 인스턴스 할당을 요청한다. -func (c *CmsClient) RequestNewInstance(subnet string) (*CmsNewInstanceResponse, error) { +func (c *CmsClient) RequestSubnet(subnet string) (*NewSubnetRequest, error) { log := util.GetLogger() reqURL := fmt.Sprintf("http://%s/New/Instance", c.baseURL) - jsonBody, err := json.Marshal(cmsNewInstanceRequestBody{Subnet: subnet}) + reqBody := subnetRequest{Subnet: subnet} + jsonBody, err := json.Marshal(reqBody) if err != nil { - log.Error("CmsClient.RequestNewInstance : failed to marshal JSON: %v", err) - return nil, fmt.Errorf("CmsClient.RequestNewInstance: failed to marshal JSON: %w", err) + log.Error("CMS : failed to marshal JSON: %v", err) + return nil, fmt.Errorf("RequestSubnet: failed to marshal JSON: %w", err) } req, err := http.NewRequest("POST", reqURL, bytes.NewBuffer(jsonBody)) if err != nil { - log.Error("CmsClient.RequestNewInstance : failed to NewRequest: %v", err) - return nil, fmt.Errorf("CmsClient.RequestNewInstance: failed to create HTTP request: %w", err) + log.Error("CMS : failed to NewRequest: %v", err) + return nil, fmt.Errorf("RequestSubnet: failed to create HTTP request: %w", err) } + req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") @@ -115,20 +67,21 @@ func (c *CmsClient) RequestNewInstance(subnet string) (*CmsNewInstanceResponse, resp, err := c.client.Do(req) if err != nil { - log.Error("CmsClient.RequestNewInstance : failed to send request: %v", err) - return nil, fmt.Errorf("CmsClient.RequestNewInstance: failed to send request: %w", err) + log.Error("CMS : failed to send request: %v", err) + return nil, fmt.Errorf("RequestSubnet: failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - log.Error("CmsClient.RequestNewInstance : CMS returned status: %s", resp.Status) - return nil, fmt.Errorf("CmsClient.RequestNewInstance: CMS server returned non-OK status: %s", resp.Status) + log.Error("CMS : CMS returned status: %s", resp.Status) + return nil, fmt.Errorf("CMS server returned non-OK status: %s", resp.Status) } - var addrResp CmsNewInstanceResponse + var addrResp NewSubnetRequest if err := json.NewDecoder(resp.Body).Decode(&addrResp); err != nil { - log.Error("CmsClient.RequestNewInstance : failed to decode CMS response: %v", err) - return nil, fmt.Errorf("CmsClient.RequestNewInstance: failed to decode response: %w", err) + log.Error("CMS : failed to decode CMS response: %v", err) + return nil, fmt.Errorf("RequestSubnet: failed to decode response: %w", err) } + return &addrResp, nil } diff --git a/client/model/vm.go b/client/model/vm.go index a495ced..9ec2e63 100644 --- a/client/model/vm.go +++ b/client/model/vm.go @@ -86,6 +86,13 @@ type CoreMachineCpuInfoResponse struct { System float64 `json:"system_time"` Idle float64 `json:"idle_time"` Usage float64 `json:"usage_percent"` + // Desc는 호스트 /getStatusHost(host_dataType=0) 응답에만 존재(runtime.NumCPU()). + // VM별 /getStatusUUID 응답에는 없으므로 포인터로 두어 미존재를 nil로 감지한다. + Desc *VCPUStatus `json:"vcpu_status"` +} + +type VCPUStatus struct { + Total int `json:"total"` // 코어의 논리 CPU 총 개수 } type CoreMachineMemoryInfoResponse struct { diff --git a/client/vm.go b/client/vm.go index 923f6fb..93e0242 100644 --- a/client/vm.go +++ b/client/vm.go @@ -90,9 +90,8 @@ func (c *CoreClient) DeleteVM(context context.Context, req model.DeleteVMRequest return response, nil } -// 현재 미사용 중 -// 코어 vcpu 갯수 가져오는 함수 -// 코어에 문의 해봐야함 옛날에 구현 안됬다고 해서 컨트롤에서 9999로 하드코딩 했던거 같음 +// 코어의 논리 CPU 총 개수를 /getStatusHost(host_dataType=0)에서 가져온다. +// 응답의 vcpu_status.total(=runtime.NumCPU())을 CoreInfoIdx.Cpu 캐시로 사용. func (c *CoreClient) GetCoreMachineCpuInfo(context context.Context) (*model.CoreMachineCpuInfoResponse, error) { var response model.CoreResponse[model.CoreMachineCpuInfoResponse] err := c.doRequest(context, http.MethodGet, "/getStatusHost", model.GetMachineStatusRequest{ diff --git a/main.go b/main.go index 1816d29..c826d0c 100755 --- a/main.go +++ b/main.go @@ -3,10 +3,12 @@ package main import ( "context" "fmt" + "time" "github.com/easy-cloud-Knet/KWS_Control/structure" "github.com/easy-cloud-Knet/KWS_Control/api" + "github.com/easy-cloud-Knet/KWS_Control/service" "github.com/easy-cloud-Knet/KWS_Control/startup" "github.com/easy-cloud-Knet/KWS_Control/util" ) @@ -16,7 +18,7 @@ func main() { ctx := context.Background() - //Redis 초기화 + //Redis 초기화 (VM status 저장 + 코어별 할당 집계 공용) rdb, err := startup.InitializeRedis(ctx) if err != nil { log.Error("Failed to initialize Redis: %v", err, true) @@ -30,7 +32,15 @@ func main() { log.Error("Failed to initialize: %v", err, true) panic(err) } - printCores(contextStruct.Resources.Cores) + printCores(contextStruct.Cores) + + // DB 인스턴스 합계로 코어별 할당(core:{ip}:{port}:alloc) 재구성(시작 시 1회, 멱등). 실패해도 기동은 계속. + if err := service.RebuildCoreAllocFromDB(ctx, &contextStruct, rdb); err != nil { + log.Error("Failed to rebuild core alloc from DB: %v", err, true) + } + + // 주기적 헬스체크(코어 가용성/용량 갱신) + go service.StartHealthcheck(ctx, &contextStruct, 30*time.Second) go func() { err := api.Server(contextStruct.Config.Port, &contextStruct, rdb) diff --git a/resources/config.yaml b/resources/config.yaml index 0e2ca65..29d410f 100644 --- a/resources/config.yaml +++ b/resources/config.yaml @@ -8,6 +8,11 @@ cores: port: 8081 +# 코어 선택 알고리즘 파라미터 +cpu_overcommit: 4.0 # vCPU 오버커밋 배수 (logical_cpu * cpu_overcommit = 가용 vCPU) +mem_reserve_pct: 0.1 # 메모리 여유분 비율 (0..1) +disk_reserve_pct: 0.1 # 디스크 여유분 비율 (0..1) + db: user: "root" password: "password" diff --git a/service/alloc_redis.go b/service/alloc_redis.go new file mode 100644 index 0000000..94f9ae6 --- /dev/null +++ b/service/alloc_redis.go @@ -0,0 +1,135 @@ +package service + +import ( + "context" + "fmt" + "strconv" + + "github.com/easy-cloud-Knet/KWS_Control/util" + "github.com/redis/go-redis/v9" + + vms "github.com/easy-cloud-Knet/KWS_Control/structure" +) + +// 코어별 할당 집계는 기존 Redis(VM status와 동일 인스턴스)에 코어당 하나의 HASH로 저장한다. +// VM status 키(=UUID 문자열)와 네임스페이스가 겹치지 않도록 core: 접두어를 쓴다. +// key = core:{ip}:{port}:alloc +// field = cpu | mem | disk (mem/disk는 MiB, cpu는 논리 코어 수) +// +// 이 값이 자원 회계의 단일 진실 소스다(인메모리 Free*는 표시용 캐시로 격하). + +// CoreAlloc은 한 코어에 현재 할당된 자원 합계. +type CoreAlloc struct { + CPU int64 + Mem int64 + Disk int64 +} + +func coreAllocKey(ip string, port uint16) string { + return fmt.Sprintf("core:%s:%d:alloc", ip, port) +} + +func parseAllocField(s string) int64 { + if s == "" { + return 0 + } + v, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0 + } + return v +} + +// GetCoreAlloc은 코어의 현재 할당 합계를 읽는다. 키/필드가 없으면 0. +func GetCoreAlloc(ctx context.Context, rdb *redis.Client, ip string, port uint16) (CoreAlloc, error) { + res, err := rdb.HGetAll(ctx, coreAllocKey(ip, port)).Result() + if err != nil { + return CoreAlloc{}, fmt.Errorf("GetCoreAlloc %s:%d: %w", ip, port, err) + } + return CoreAlloc{ + CPU: parseAllocField(res["cpu"]), + Mem: parseAllocField(res["mem"]), + Disk: parseAllocField(res["disk"]), + }, nil +} + +// IncrCoreAlloc은 코어 할당을 차원별로 가감한다(예약 +, 회수/롤백 -). +// 3개 HIncrBy를 TxPipeline으로 묶어 한 차원만 반영되는 일을 막는다. +// (read-then-decide의 직렬화는 호출자의 contextStruct.Lock()이 담당 — alloc_redis.go 주석 참고.) +func IncrCoreAlloc(ctx context.Context, rdb *redis.Client, ip string, port uint16, dCPU, dMem, dDisk int64) error { + key := coreAllocKey(ip, port) + pipe := rdb.TxPipeline() + pipe.HIncrBy(ctx, key, "cpu", dCPU) + pipe.HIncrBy(ctx, key, "mem", dMem) + pipe.HIncrBy(ctx, key, "disk", dDisk) + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("IncrCoreAlloc %s:%d: %w", ip, port, err) + } + return nil +} + +// SetCoreAlloc은 코어 할당을 절댓값으로 덮어쓴다(rebuild 전용). +func SetCoreAlloc(ctx context.Context, rdb *redis.Client, ip string, port uint16, cpu, mem, disk int64) error { + key := coreAllocKey(ip, port) + if err := rdb.HSet(ctx, key, "cpu", cpu, "mem", mem, "disk", disk).Err(); err != nil { + return fmt.Errorf("SetCoreAlloc %s:%d: %w", ip, port, err) + } + return nil +} + +// RebuildCoreAllocFromDB는 DB의 인스턴스 합계로 alloc-Redis를 재구성한다(시작 시 1회, 멱등). +// inst_loc.core(코어 인덱스)별 합계를 구해 IP:Port 키로 매핑하고, 결과에 없는 코어는 0으로 +// 초기화해 이전 실행에서 남은 stale 값을 제거한다. +func RebuildCoreAllocFromDB(ctx context.Context, contextStruct *vms.ControlContext, rdb *redis.Client) error { + log := util.GetLogger() + + rows, err := contextStruct.DB.QueryContext(ctx, + "SELECT loc.core, COALESCE(SUM(info.inst_vcpu),0), COALESCE(SUM(info.inst_mem),0), COALESCE(SUM(info.inst_disk),0) "+ + "FROM inst_loc loc JOIN inst_info info ON loc.uuid = info.uuid GROUP BY loc.core") + if err != nil { + return fmt.Errorf("RebuildCoreAllocFromDB: query failed: %w", err) + } + defer rows.Close() + + sums := make(map[int]CoreAlloc) + for rows.Next() { + var coreIdx int + var a CoreAlloc + if err := rows.Scan(&coreIdx, &a.CPU, &a.Mem, &a.Disk); err != nil { + return fmt.Errorf("RebuildCoreAllocFromDB: scan failed: %w", err) + } + sums[coreIdx] = a + } + if err := rows.Err(); err != nil { + return fmt.Errorf("RebuildCoreAllocFromDB: rows error: %w", err) + } + + // 코어 목록 스냅샷(idx → IP:Port). 시작 시 1회 호출이지만 healthcheck와의 경합 대비 RLock. + contextStruct.RLock() + type coreAddr struct { + ip string + port uint16 + } + addrs := make([]coreAddr, len(contextStruct.Cores)) + for i := range contextStruct.Cores { + addrs[i] = coreAddr{ip: contextStruct.Cores[i].IP, port: contextStruct.Cores[i].Port} + } + contextStruct.RUnlock() + + for i, addr := range addrs { + a := sums[i] // 합계에 없는 코어는 zero value → 0으로 초기화(stale 제거) + if err := SetCoreAlloc(ctx, rdb, addr.ip, addr.port, a.CPU, a.Mem, a.Disk); err != nil { + return fmt.Errorf("RebuildCoreAllocFromDB: %w", err) + } + log.Info("rebuilt alloc for core %s:%d -> cpu=%d mem=%d disk=%d", addr.ip, addr.port, a.CPU, a.Mem, a.Disk, true) + } + + // DB의 core 인덱스가 현재 코어 목록 범위를 벗어나는 경우 경고(설정 변경 등으로 매핑 불가). + for idx := range sums { + if idx < 0 || idx >= len(addrs) { + log.Warn("RebuildCoreAllocFromDB: DB core index %d has no matching core, alloc ignored", idx, true) + } + } + + return nil +} diff --git a/service/cleanup.go b/service/cleanup.go deleted file mode 100644 index 7b19d56..0000000 --- a/service/cleanup.go +++ /dev/null @@ -1,20 +0,0 @@ -package service - -// cleanupChain will execute registered cleanup functions in reverse order when run() is called. -// cleanup chain will register cleanup functions for each step of the process, -// so that if any step fails, all previous steps can be undone to maintain system consistency. -type cleanupChain struct { - steps []func() -} - -func (c *cleanupChain) push(fn func()) { - c.steps = append(c.steps, fn) -} - -func (c *cleanupChain) run() { - for i := len(c.steps) - 1; i >= 0; i-- { - c.steps[i]() - } - //for Idempotency - c.steps = nil -} diff --git a/service/core_allocation.go b/service/core_allocation.go index 564e5fa..081570a 100644 --- a/service/core_allocation.go +++ b/service/core_allocation.go @@ -1,5 +1,194 @@ package service -//현재 미사용중 -//이거 인스턴스 생성할 때 라운드로빈으로 코어에 할당해주는애 아니었나? -//확인 필요 +import ( + "context" + "fmt" + "math" + + "github.com/easy-cloud-Knet/KWS_Control/client/model" + "github.com/easy-cloud-Knet/KWS_Control/util" + "github.com/redis/go-redis/v9" + + vms "github.com/easy-cloud-Knet/KWS_Control/structure" +) + +// 코어 선택 = 하드 필터(가용성 AND 조건) → 코사인 유사도 가중치. +// +// 단위: mem/disk는 전 구간 MiB, cpu는 논리 코어 수. 추가 변환 금지. + +// vec3는 (cpu, mem, disk) 3차원 벡터. +type vec3 struct { + cpu, mem, disk float64 +} + +func (v vec3) magnitude() float64 { + return math.Sqrt(v.cpu*v.cpu + v.mem*v.mem + v.disk*v.disk) +} + +func dot(a, b vec3) float64 { + return a.cpu*b.cpu + a.mem*b.mem + a.disk*b.disk +} + +// coreCap은 한 코어의 차원별 용량. +// - cpu: logical_cpu * cpu_overcommit (오버커밋이 이미 반영된 가용 vCPU) +// - mem/disk: 코어의 전체 용량(MiB) +type coreCap struct { + cpu, mem, disk float64 +} + +// candidate는 하드 필터를 통과한 후보 코어와 점수. +type candidate struct { + coreIndex int + score float64 + remaining vec3 // 남은 자원 비율 벡터(코사인은 크기 무시 → 타이브레이커로 magnitude 사용) +} + +// cosineSimilarity는 두 벡터의 코사인 유사도. 영벡터(zero-magnitude)는 0을 반환(NaN 방지). +func cosineSimilarity(a, b vec3) float64 { + ma, mb := a.magnitude(), b.magnitude() + if ma == 0 || mb == 0 { + return 0 + } + return dot(a, b) / (ma * mb) +} + +// ratio는 num/den. den이 0 이하이거나 num이 음수면 0(정의 불가/음수 비율 방지). +func ratio(num, den float64) float64 { + if den <= 0 || num < 0 { + return 0 + } + return num / den +} + +// feasible은 차원별로 req가 (용량 - 할당 - reserve) 안에 들어가는지 판정한다. +// 하드 필터 정의와 정확히 일치: +// - cpu : req.cpu ≤ cap.cpu - alloc.cpu (cap.cpu = logical*overcommit) +// - mem : req.mem ≤ cap.mem - (alloc.mem + cap.mem*memReservePct) +// - disk: req.disk ≤ cap.disk - (alloc.disk + cap.disk*diskReservePct) +func feasible(req vec3, cap coreCap, alloc vec3, memReservePct, diskReservePct float64) (cpuOk, memOk, diskOk bool) { + cpuOk = req.cpu <= cap.cpu-alloc.cpu + memOk = req.mem <= cap.mem-(alloc.mem+cap.mem*memReservePct) + diskOk = req.disk <= cap.disk-(alloc.disk+cap.disk*diskReservePct) + return +} + +// coreRemainingVector는 차원별 남은 비율 ((capacity - used)/capacity). reserve는 미반영(필터에서만 적용). +func coreRemainingVector(cap coreCap, alloc vec3) vec3 { + return vec3{ + cpu: ratio(cap.cpu-alloc.cpu, cap.cpu), + mem: ratio(cap.mem-alloc.mem, cap.mem), + disk: ratio(cap.disk-alloc.disk, cap.disk), + } +} + +// requestVector는 해당 코어 기준 요청 비율 (req.cpu/cap.cpu, req.mem/cap.mem, req.disk/cap.disk). +func requestVector(req vec3, cap coreCap) vec3 { + return vec3{ + cpu: ratio(req.cpu, cap.cpu), + mem: ratio(req.mem, cap.mem), + disk: ratio(req.disk, cap.disk), + } +} + +// scoreCore는 코어의 점수와 남은 비율 벡터를 계산한다. +// 어느 차원이든 남은 값이 0 이하이면 강한 음수 점수(가드) — 하드 필터의 이중 안전장치. +func scoreCore(req vec3, cap coreCap, alloc vec3) (score float64, remaining vec3) { + remaining = coreRemainingVector(cap, alloc) + if remaining.cpu <= 0 || remaining.mem <= 0 || remaining.disk <= 0 { + return -1, remaining + } + return cosineSimilarity(remaining, requestVector(req, cap)), remaining +} + +// chooseBest는 후보 중 최적 인덱스를 결정적으로 고른다. 빈 슬라이스면 -1. +// 타이브레이커: ① 코사인 최대 → ② 남은 자원 magnitude 최대(더 빈 코어 우선) +// → ③ 낮은 인덱스(전방 스캔 + strict `>`로 자연 보장). +func chooseBest(cands []candidate) int { + best := -1 + for i := range cands { + switch { + case best == -1: + best = i + case cands[i].score > cands[best].score: + best = i + case cands[i].score == cands[best].score && + cands[i].remaining.magnitude() > cands[best].remaining.magnitude(): + best = i + } + } + return best +} + +// SelectCore는 살아있는 코어 중 하드 필터를 통과한 후보를 코사인 유사도로 평가해 최적 코어를 고른다. +// +// 동시성: 호출자는 contextStruct.Lock()을 SelectCore + 예약(IncrCoreAlloc) 구간 전체에 유지해야 한다. +// SelectCore 자체는 contextStruct.Cores를 락 없이 읽으므로(읽기 전용) 호출자 락에 의존한다. +func SelectCore(ctx context.Context, req model.HardwareInfo, + contextStruct *vms.ControlContext, rdb *redis.Client) (*vms.Core, int, error) { + log := util.GetLogger() + + overcommit := contextStruct.Config.CpuOvercommit + if overcommit <= 0 { + overcommit = 1.0 + } + memReserve := contextStruct.Config.MemReservePct + diskReserve := contextStruct.Config.DiskReservePct + + reqVec := vec3{ + cpu: float64(req.CPU), + mem: float64(req.Memory), + disk: float64(req.Disk), + } + + var cands []candidate + aliveCount := 0 + + for i := range contextStruct.Cores { + core := &contextStruct.Cores[i] + if !core.IsAlive { + continue + } + aliveCount++ + + cap := coreCap{ + cpu: float64(core.CoreInfoIdx.Cpu) * overcommit, + mem: float64(core.CoreInfoIdx.Memory), + disk: float64(core.CoreInfoIdx.Disk), + } + + alloc, err := GetCoreAlloc(ctx, rdb, core.IP, core.Port) + if err != nil { + log.Warn("SelectCore: failed to read alloc for core %s:%d, skipping: %v", core.IP, core.Port, err) + continue + } + allocVec := vec3{ + cpu: float64(alloc.CPU), + mem: float64(alloc.Mem), + disk: float64(alloc.Disk), + } + + cpuOk, memOk, diskOk := feasible(reqVec, cap, allocVec, memReserve, diskReserve) + if !(cpuOk && memOk && diskOk) { + log.DebugInfo("core %s:%d rejected by filter: cpu=%t mem=%t disk=%t (alloc cpu=%d mem=%d disk=%d)", + core.IP, core.Port, cpuOk, memOk, diskOk, alloc.CPU, alloc.Mem, alloc.Disk) + continue + } + + score, remaining := scoreCore(reqVec, cap, allocVec) + cands = append(cands, candidate{coreIndex: i, score: score, remaining: remaining}) + log.DebugInfo("core %s:%d candidate: score=%.4f remaining=(%.3f,%.3f,%.3f)", + core.IP, core.Port, score, remaining.cpu, remaining.mem, remaining.disk) + } + + if len(cands) == 0 { + return nil, -1, fmt.Errorf("SelectCore: no feasible core (alive=%d, req cpu=%d mem=%d disk=%d)", + aliveCount, req.CPU, req.Memory, req.Disk) + } + + best := chooseBest(cands) + chosen := cands[best] + core := &contextStruct.Cores[chosen.coreIndex] + log.Info("SelectCore: chose core %s:%d (score=%.4f) for req cpu=%d mem=%d disk=%d", + core.IP, core.Port, chosen.score, req.CPU, req.Memory, req.Disk, true) + return core, chosen.coreIndex, nil +} diff --git a/service/core_allocation_test.go b/service/core_allocation_test.go new file mode 100644 index 0000000..6fba9a5 --- /dev/null +++ b/service/core_allocation_test.go @@ -0,0 +1,156 @@ +package service + +import ( + "math" + "testing" +) + +const eps = 1e-9 + +func almostEqual(a, b float64) bool { + return math.Abs(a-b) < eps +} + +func TestCosineSimilarity(t *testing.T) { + tests := []struct { + name string + a, b vec3 + want float64 + }{ + {"identical direction", vec3{1, 1, 1}, vec3{2, 2, 2}, 1.0}, + {"same vector", vec3{0.5, 0.3, 0.2}, vec3{0.5, 0.3, 0.2}, 1.0}, + {"orthogonal", vec3{1, 0, 0}, vec3{0, 1, 0}, 0.0}, + {"zero a", vec3{0, 0, 0}, vec3{1, 1, 1}, 0.0}, + {"zero b", vec3{1, 1, 1}, vec3{0, 0, 0}, 0.0}, + {"both zero", vec3{0, 0, 0}, vec3{0, 0, 0}, 0.0}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := cosineSimilarity(tt.a, tt.b) + if math.IsNaN(got) { + t.Fatalf("cosineSimilarity(%v,%v) = NaN, want %v", tt.a, tt.b, tt.want) + } + if !almostEqual(got, tt.want) { + t.Errorf("cosineSimilarity(%v,%v) = %v, want %v", tt.a, tt.b, got, tt.want) + } + }) + } +} + +func TestRatio(t *testing.T) { + tests := []struct { + name string + num, den float64 + want float64 + }{ + {"normal fraction", 1, 4, 0.25}, + {"den zero", 5, 0, 0}, + {"den negative", 5, -2, 0}, + {"num negative", -3, 10, 0}, + {"both zero", 0, 0, 0}, + {"num zero", 0, 10, 0}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ratio(tt.num, tt.den); !almostEqual(got, tt.want) { + t.Errorf("ratio(%v,%v) = %v, want %v", tt.num, tt.den, got, tt.want) + } + }) + } +} + +func TestFeasibleBoundaries(t *testing.T) { + // logical=4, overcommit=2 → cpu cap = 8 + // mem total = 1000, mem reserve = 0.1 → reserve 100 + // disk total = 2000, disk reserve = 0.25 → reserve 500 + cap := coreCap{cpu: 8, mem: 1000, disk: 2000} + alloc := vec3{cpu: 2, mem: 200, disk: 300} + memReserve, diskReserve := 0.1, 0.25 + + // available per dim: + // cpu : 8 - 2 = 6 + // mem : 1000 - (200 + 1000*0.1) = 700 + // disk: 2000 - (300 + 2000*0.25) = 1200 + + // exact boundary: all pass + if c, m, d := feasible(vec3{6, 700, 1200}, cap, alloc, memReserve, diskReserve); !(c && m && d) { + t.Errorf("boundary req should pass, got cpu=%t mem=%t disk=%t", c, m, d) + } + + // cpu + 1 fails, others pass + if c, m, d := feasible(vec3{7, 700, 1200}, cap, alloc, memReserve, diskReserve); c || !m || !d { + t.Errorf("cpu+1 should fail only cpu, got cpu=%t mem=%t disk=%t", c, m, d) + } + // mem + 1 fails, others pass + if c, m, d := feasible(vec3{6, 701, 1200}, cap, alloc, memReserve, diskReserve); !c || m || !d { + t.Errorf("mem+1 should fail only mem, got cpu=%t mem=%t disk=%t", c, m, d) + } + // disk + 1 fails, others pass + if c, m, d := feasible(vec3{6, 700, 1201}, cap, alloc, memReserve, diskReserve); !c || !m || d { + t.Errorf("disk+1 should fail only disk, got cpu=%t mem=%t disk=%t", c, m, d) + } +} + +func TestFeasibleOvercommit(t *testing.T) { + // Without overcommit (cap.cpu = logical = 4), req 6 fails. + // With overcommit x2 (cap.cpu = 8), req 6 passes. + noOC := coreCap{cpu: 4, mem: 1000, disk: 1000} + withOC := coreCap{cpu: 8, mem: 1000, disk: 1000} + alloc := vec3{} + + if c, _, _ := feasible(vec3{6, 0, 0}, noOC, alloc, 0, 0); c { + t.Errorf("req cpu=6 should fail without overcommit (cap=4)") + } + if c, _, _ := feasible(vec3{6, 0, 0}, withOC, alloc, 0, 0); !c { + t.Errorf("req cpu=6 should pass with overcommit (cap=8)") + } +} + +func TestChooseBest(t *testing.T) { + t.Run("empty returns -1", func(t *testing.T) { + if got := chooseBest(nil); got != -1 { + t.Errorf("chooseBest(nil) = %d, want -1", got) + } + }) + + t.Run("highest score (shape) wins", func(t *testing.T) { + cands := []candidate{ + {coreIndex: 0, score: 0.5, remaining: vec3{0.9, 0.9, 0.9}}, + {coreIndex: 1, score: 0.95, remaining: vec3{0.2, 0.2, 0.2}}, + {coreIndex: 2, score: 0.7, remaining: vec3{0.5, 0.5, 0.5}}, + } + if got := chooseBest(cands); got != 1 { + t.Errorf("chooseBest = %d, want 1 (highest score)", got) + } + }) + + t.Run("guard negative excluded when feasible exists", func(t *testing.T) { + cands := []candidate{ + {coreIndex: 0, score: -1, remaining: vec3{0, 0.5, 0.5}}, + {coreIndex: 1, score: 0.3, remaining: vec3{0.4, 0.4, 0.4}}, + } + if got := chooseBest(cands); got != 1 { + t.Errorf("chooseBest = %d, want 1 (positive over guard-negative)", got) + } + }) + + t.Run("tie on score breaks by magnitude (emptier core)", func(t *testing.T) { + cands := []candidate{ + {coreIndex: 0, score: 0.8, remaining: vec3{0.3, 0.3, 0.3}}, // smaller magnitude + {coreIndex: 1, score: 0.8, remaining: vec3{0.6, 0.6, 0.6}}, // larger magnitude + } + if got := chooseBest(cands); got != 1 { + t.Errorf("chooseBest = %d, want 1 (larger remaining magnitude)", got) + } + }) + + t.Run("tie on score and magnitude breaks by lowest index", func(t *testing.T) { + cands := []candidate{ + {coreIndex: 3, score: 0.8, remaining: vec3{0.5, 0.5, 0.5}}, + {coreIndex: 7, score: 0.8, remaining: vec3{0.5, 0.5, 0.5}}, + } + if got := chooseBest(cands); got != 0 { + t.Errorf("chooseBest = %d, want 0 (lowest index via forward scan)", got) + } + }) +} diff --git a/service/dto.go b/service/dto.go deleted file mode 100644 index c217392..0000000 --- a/service/dto.go +++ /dev/null @@ -1,69 +0,0 @@ -package service - -import ( - vms "github.com/easy-cloud-Knet/KWS_Control/structure" -) - -// Service Layer DTO. -// DTOs for service layer to decouple API layer from internal implementation details. -// API Handler will convert API layer DTOs to service layer DTOs. -// Service layer will convert to internal client/model structures as needed. -// - -type CreateVMInput struct { - UUID vms.UUID - DomType string - DomName string - OS string - HardwareInfo HardwareSpec - Network NetworkSpec - Users []UserSpec - SubnetType string // "Add" 또는 그 외(=신규 서브넷) -} - -type HardwareSpec struct { - CPU uint32 - Memory uint32 // MiB - Disk uint32 // MiB -} - -type NetworkSpec struct { - IPs []string -} - -type UserSpec struct { - Name string - Groups string - Password string - SSHAuthorizedKeys []string -} - -// Output DTOs -type VMInfo struct { - UUID vms.UUID - CPU uint32 - Memory uint32 // MiB - Disk uint32 // MiB - IP string - Status string -} - -type VMCpuStatus struct { - System float64 - Idle float64 - Usage float64 -} - -type VMMemoryStatus struct { - Total uint64 - Used uint64 - Available uint64 - UsedPercent float64 -} - -type VMDiskStatus struct { - Total uint64 - Used uint64 - Free uint64 - UsedPercent float64 -} diff --git a/service/guacamole.go b/service/guacamole.go index 7a6e747..9febb6e 100644 --- a/service/guacamole.go +++ b/service/guacamole.go @@ -14,9 +14,9 @@ func GetGuacamoleToken(uuid structure.UUID, ctx *structure.ControlContext) (stri return "", structure.ErrCoreNotFound(uuid) } - ctx.Resources.RLock() + ctx.RLock() vm, exists := core.VMInfoIdx[uuid] - ctx.Resources.RUnlock() + ctx.RUnlock() if exists { guacClient := client.NewGuacamoleClient(&ctx.Config) diff --git a/service/healthcheck.go b/service/healthcheck.go new file mode 100644 index 0000000..127d3c8 --- /dev/null +++ b/service/healthcheck.go @@ -0,0 +1,113 @@ +package service + +import ( + "context" + "time" + + "github.com/easy-cloud-Knet/KWS_Control/client" + "github.com/easy-cloud-Knet/KWS_Control/util" + + vms "github.com/easy-cloud-Knet/KWS_Control/structure" +) + +// StartHealthcheck는 주기적으로 모든 코어의 상태를 갱신한다(기본 30s). +// ctx.Done()이면 종료. main에서 go로 실행. +func StartHealthcheck(ctx context.Context, contextStruct *vms.ControlContext, interval time.Duration) { + log := util.GetLogger() + + if interval <= 0 { + interval = 30 * time.Second + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + log.Info("healthcheck started (interval=%s)", interval, true) + + for { + select { + case <-ctx.Done(): + log.Info("healthcheck stopped", true) + return + case <-ticker.C: + checkAllCores(ctx, contextStruct) + } + } +} + +// checkAllCores는 코어 목록을 스냅샷한 뒤 락 없이 /getStatusHost를 호출하고, +// 결과만 Lock 하에 반영한다(CreateVM과 동일 규율: 네트워크 I/O는 락 밖, 쓰기만 락 안). +func checkAllCores(ctx context.Context, contextStruct *vms.ControlContext) { + log := util.GetLogger() + + type snap struct { + ip string + port uint16 + cpuCached uint32 + } + + contextStruct.RLock() + snaps := make([]snap, len(contextStruct.Cores)) + for i := range contextStruct.Cores { + snaps[i] = snap{ + ip: contextStruct.Cores[i].IP, + port: contextStruct.Cores[i].Port, + cpuCached: contextStruct.Cores[i].CoreInfoIdx.Cpu, + } + } + contextStruct.RUnlock() + + for _, s := range snaps { + tmp := &vms.Core{IP: s.ip, Port: s.port} + coreClient := client.NewCoreClient(tmp) + + memResp, memErr := coreClient.GetCoreMachineMemoryInfo(ctx) + diskResp, diskErr := coreClient.GetCoreMachineDiskInfo(ctx) + + // CPU 총량은 캐시가 비었을 때(==0)만 새로 측정. 평소엔 mem/disk만 갱신. + var newCpu uint32 + if s.cpuCached == 0 { + if cpuResp, cpuErr := coreClient.GetCoreMachineCpuInfo(ctx); cpuErr == nil && + cpuResp != nil && cpuResp.Desc != nil && cpuResp.Desc.Total > 0 { + newCpu = uint32(cpuResp.Desc.Total) + } + } + + // 양방향: mem/disk 모두 성공해야 alive(복귀 코어 재활성화 포함). + // 응답 본문(Information 포인터)이 nil이면 데이터가 없는 것이므로 not-alive 처리. + alive := memErr == nil && diskErr == nil && memResp != nil && diskResp != nil + + contextStruct.Lock() + core := findCoreByAddr(contextStruct.Cores, s.ip, s.port) + if core != nil { + wasAlive := core.IsAlive + core.IsAlive = alive + if alive { + core.CoreInfoIdx.Memory = uint32(memResp.Total * 1024) + core.CoreInfoIdx.Disk = uint32(diskResp.Total * 1024) + core.FreeMemory = uint32(memResp.Available * 1024) + core.FreeDisk = uint32(diskResp.Free * 1024) + if newCpu > 0 { + core.CoreInfoIdx.Cpu = newCpu + } + } + if wasAlive != alive { + log.Info("healthcheck: core %s:%d alive %t -> %t", s.ip, s.port, wasAlive, alive, true) + } + } + contextStruct.Unlock() + + if !alive { + log.DebugWarn("healthcheck: core %s:%d unreachable (memErr=%v, diskErr=%v)", s.ip, s.port, memErr, diskErr) + } + } +} + +func findCoreByAddr(cores []vms.Core, ip string, port uint16) *vms.Core { + for i := range cores { + if cores[i].IP == ip && cores[i].Port == port { + return &cores[i] + } + } + return nil +} diff --git a/service/network.go b/service/network.go index 33c7e93..eadf55d 100644 --- a/service/network.go +++ b/service/network.go @@ -9,8 +9,7 @@ import ( "github.com/easy-cloud-Knet/KWS_Control/util" ) -// AddCmsSubnet은 기존 VM의 IP가 속한 서브넷으로 CMS에 새 인스턴스 할당을 요청 -func AddCmsSubnet(c *client.CmsClient, ctx *vms.ControlContext, uuid vms.UUID) (*client.CmsNewInstanceResponse, error) { +func AddCmsSubnet(c *client.CmsClient, ctx *vms.ControlContext, uuid vms.UUID) (*client.NewSubnetRequest, error) { log := util.GetLogger() ip, err := GetVMIPByUUID(ctx, uuid) @@ -23,64 +22,51 @@ func AddCmsSubnet(c *client.CmsClient, ctx *vms.ControlContext, uuid vms.UUID) ( log.Error("AddCmsSubnet : GetSubnetFromIP: %v", err) return nil, fmt.Errorf("AddCmsSubnet: failed to get subnet: %w", err) } - resp, err := c.RequestNewInstance(subnet) + resp, err := c.RequestSubnet(subnet) if err != nil { - log.Error("AddCmsSubnet : RequestNewInstance: %v", err) - return nil, fmt.Errorf("AddCmsSubnet: CMS request failed: %w", err) + log.Error("AddCmsSubnet : RequestSubnet: %v", err) + return nil, fmt.Errorf("AddCmsSubnet: RequestSubnet failed: %w", err) } + return resp, nil } -// NewCmsSubnet은 다음 서브넷을 계산하여 DB에 선점한 뒤 CMS에 인스턴스 할당을 요청 -func NewCmsSubnet(c *client.CmsClient, ctx *vms.ControlContext) (*client.CmsNewInstanceResponse, error) { +func NewCmsSubnet(c *client.CmsClient, ctx *vms.ControlContext) (*client.NewSubnetRequest, error) { log := util.GetLogger() - lastSubnet := ctx.Last_subnet - nextLastSubnet := pkgnetwork.FindSubnet(lastSubnet) - log.Info("NewCmsSubnet : next_last_subnet: %s", nextLastSubnet) + last_subnet := ctx.Last_subnet + next_last_subnet := pkgnetwork.FindSubnet(last_subnet) + log.Info("NewCmsSubnet : next_last_subnet: %s", next_last_subnet) - //CMS 호출 전에 다음 서브넷을 선점하여 동시 호출 시 중복 할당 방지 - _, err := ctx.DB.Exec("UPDATE subnet SET last_subnet = ? WHERE id = 1", nextLastSubnet) + // DB를 먼저 업데이트하여 서브넷을 선점한다. + // CMS 호출 전에 선점해야 실패 시 동일 서브넷이 중복 할당되는 것을 방지할 수 있다. + _, err := ctx.DB.Exec("UPDATE subnet SET last_subnet = ? WHERE id = 1", next_last_subnet) if err != nil { log.Error("Failed to update last_subnet in database: %v", err) return nil, fmt.Errorf("NewCmsSubnet: failed to update last_subnet in DB: %w", err) } - ctx.Last_subnet = nextLastSubnet + ctx.Last_subnet = next_last_subnet - resp, err := c.RequestNewInstance(nextLastSubnet) + resp, err := c.RequestSubnet(next_last_subnet) if err != nil { - log.Error("NewCmsSubnet : RequestNewInstance: %v", err) + log.Error("NewCmsSubnet : RequestSubnet: %v", err) // CMS 호출 실패 시 DB를 원래 값으로 롤백 - if _, rbErr := ctx.DB.Exec("UPDATE subnet SET last_subnet = ? WHERE id = 1", lastSubnet); rbErr != nil { + if _, rbErr := ctx.DB.Exec("UPDATE subnet SET last_subnet = ? WHERE id = 1", last_subnet); rbErr != nil { log.Error("NewCmsSubnet : failed to rollback last_subnet: %v", rbErr) - return nil, fmt.Errorf("NewCmsSubnet: CMS request failed: %w, rollback also failed: %v", err, rbErr) + return nil, fmt.Errorf("NewCmsSubnet: RequestSubnet failed: %w, rollback also failed: %v", err, rbErr) } - ctx.Last_subnet = lastSubnet - return nil, fmt.Errorf("NewCmsSubnet: CMS request failed: %w", err) + ctx.Last_subnet = last_subnet + return nil, fmt.Errorf("NewCmsSubnet: RequestSubnet failed: %w", err) } - return resp, nil -} -func DeleteCmsSubnet(c *client.CmsClient, ctx *vms.ControlContext, uuid vms.UUID) error { - log := util.GetLogger() - ip, err := GetVMIPByUUID(ctx, uuid) - if err != nil { - log.Error("DeleteCmsSubnet : GetVMIPByUUID: %v", err) - return fmt.Errorf("DeleteCmsSubnet: failed to get VM IP: %w", err) - } - _, err = c.RequestDeleteInstance(ip) - if err != nil { - log.Error("DeleteCmsSubnet : RequestDeleteInstance: %v", err) - return fmt.Errorf("DeleteCmsSubnet: CMS request failed: %w", err) - } - return nil + return resp, nil } func GetVMIPByUUID(ctx *vms.ControlContext, uuid vms.UUID) (string, error) { - ctx.Resources.RLock() - defer ctx.Resources.RUnlock() + ctx.RLock() + defer ctx.RUnlock() - core, ok := ctx.Resources.VMLocation[uuid] + core, ok := ctx.VMLocation[uuid] if !ok { return "", fmt.Errorf("UUID %s not found in VMLocation", uuid) } diff --git a/service/redis.go b/service/redis.go index 9335a7a..e953dcb 100644 --- a/service/redis.go +++ b/service/redis.go @@ -11,46 +11,12 @@ import ( "github.com/redis/go-redis/v9" ) -// redisVMInfo is the internal structure for storing instance info in Redis. -// For Separation of Concerns, this is not exposed outside the service layer. -// service DTOs are converted to/from this format for Redis operations. -type redisVMInfo struct { - UUID structure.UUID `json:"uuid"` - CPU uint32 `json:"cpu"` - Memory uint32 `json:"memory"` // MiB - Disk uint32 `json:"disk"` // MiB - IP string `json:"ip"` - Status string `json:"status"` - Time int64 `json:"time"` -} - -func (r redisVMInfo) toServiceDTO() VMInfo { - return VMInfo{ - UUID: r.UUID, - CPU: r.CPU, - Memory: r.Memory, - Disk: r.Disk, - IP: r.IP, - Status: r.Status, - } -} - -// StoreVMInfoToRedis will serialize the given VMInfo and store it in Redis under the key of its UUID. -func StoreVMInfoToRedis(ctx context.Context, rdb *redis.Client, vmInfo VMInfo, timestamp int64) error { +func StoreVMInfoToRedis(ctx context.Context, rdb *redis.Client, vmInfo model.VMRedisInfo) error { log := util.GetLogger() key := string(vmInfo.UUID) - stored := redisVMInfo{ - UUID: vmInfo.UUID, - CPU: vmInfo.CPU, - Memory: vmInfo.Memory, - Disk: vmInfo.Disk, - IP: vmInfo.IP, - Status: vmInfo.Status, - Time: timestamp, - } - jsonData, err := json.Marshal(stored) + jsonData, err := json.Marshal(vmInfo) if err != nil { log.Error("failed to marshal vm for redis %v", err, true) return fmt.Errorf("failed to marshal vm for redis: %w", err) @@ -62,7 +28,8 @@ func StoreVMInfoToRedis(ctx context.Context, rdb *redis.Client, vmInfo VMInfo, t } log.Info("vm info stored in redis: UUID=%s, CPU=%d, Memory=%d, Disk=%d, IP=%s, Status=%s, Time=%d", - key, stored.CPU, stored.Memory, stored.Disk, stored.IP, stored.Status, stored.Time, true) + key, vmInfo.CPU, vmInfo.Memory, vmInfo.Disk, vmInfo.IP, vmInfo.Status, vmInfo.Time, true) + return nil } @@ -70,6 +37,7 @@ func RemoveVMInfoFromRedis(ctx context.Context, rdb *redis.Client, uuid structur log := util.GetLogger() key := string(uuid) + result, err := rdb.Del(ctx, key).Result() if err != nil { log.Error("failed to delete vm from redis: UUID=%s, error=%v", key, err, true) @@ -81,60 +49,51 @@ func RemoveVMInfoFromRedis(ctx context.Context, rdb *redis.Client, uuid structur } else { log.Info("vm info removed from redis: UUID=%s", key, true) } - return nil -} -// GetVMInfoFromRedis는 service DTO를 반환 -func GetVMInfoFromRedis(ctx context.Context, rdb *redis.Client, uuid structure.UUID) (VMInfo, error) { - log := util.GetLogger() - - stored, err := loadRedisVMInfo(ctx, rdb, uuid) - if err != nil { - return VMInfo{}, err - } - log.DebugInfo("vm info retrieved from redis: UUID=%s", string(uuid)) - return stored.toServiceDTO(), nil + return nil } -// loadRedisVMInfo는 Redis에서 raw 저장 포맷을 읽음 (service 계층) -// UpdateVMStatusInRedis에서의 Time 필드 보존을 위해 필요 -func loadRedisVMInfo(ctx context.Context, rdb *redis.Client, uuid structure.UUID) (redisVMInfo, error) { +func GetVMInfoFromRedis(ctx context.Context, rdb *redis.Client, uuid structure.UUID) (model.VMRedisInfo, error) { log := util.GetLogger() key := string(uuid) - var stored redisVMInfo + var vmInfo model.VMRedisInfo jsonData, err := rdb.Get(ctx, key).Result() if err != nil { if err == redis.Nil { log.Warn("vm info not found in redis: UUID=%s", key, true) - return stored, fmt.Errorf("vm info not found in redis: %s", key) + return vmInfo, fmt.Errorf("vm info not found in redis: %s", key) } log.Error("failed to get vm from redis: UUID=%s, error=%v", key, err, true) - return stored, fmt.Errorf("failed to get vm from redis: %w", err) + return vmInfo, fmt.Errorf("failed to get vm from redis: %w", err) } - if err := json.Unmarshal([]byte(jsonData), &stored); err != nil { + if err := json.Unmarshal([]byte(jsonData), &vmInfo); err != nil { log.Error("failed to unmarshal vm from redis: UUID=%s, error=%v", key, err, true) - return stored, fmt.Errorf("failed to unmarshal vm from redis: %w", err) + return vmInfo, fmt.Errorf("failed to unmarshal vm from redis: %w", err) } - return stored, nil + + log.DebugInfo("vm info retrieved from redis: UUID=%s", key) + return vmInfo, nil } +// VM status랑 time 한 번에 업뎃.. 통합 필요 func UpdateVMStatusInRedis(ctx context.Context, rdb *redis.Client, uuid structure.UUID, status string, timestamp int64) error { log := util.GetLogger() key := string(uuid) - stored, err := loadRedisVMInfo(ctx, rdb, uuid) + + vmInfo, err := GetVMInfoFromRedis(ctx, rdb, uuid) if err != nil { log.Error("failed to get existing vm info from redis: UUID=%s, error=%v", key, err, true) return fmt.Errorf("failed to get existing vm info from redis: %w", err) } - stored.Status = status - stored.Time = timestamp + vmInfo.Status = status + vmInfo.Time = timestamp - jsonData, err := json.Marshal(stored) + jsonData, err := json.Marshal(vmInfo) if err != nil { log.Error("failed to marshal updated vm for redis: UUID=%s, error=%v", key, err, true) return fmt.Errorf("failed to marshal updated vm for redis: %w", err) @@ -148,15 +107,3 @@ func UpdateVMStatusInRedis(ctx context.Context, rdb *redis.Client, uuid structur log.Info("vm status updated in redis: UUID=%s, Status=%s, Time=%d", key, status, timestamp, true) return nil } - -// VM 상태 상수는 client/model에서 재노출 (api/update_redis.go도 동일 상수 정의, 추후 통합 예정) -const ( - VMStatusPrepareBegin = model.VMStatusPrepareBegin - VMStatusStartBegin = model.VMStatusStartBegin - VMStatusStarted = model.VMStatusStarted - VMStatusStopped = model.VMStatusStopped - VMStatusRelease = model.VMStatusRelease - VMStatusMigrate = model.VMStatusMigrate - VMStatusRestore = model.VMStatusRestore - VMStatusUnknown = model.VMStatusUnknown -) diff --git a/service/vm.go b/service/vm.go index 7904d3c..b0d6745 100644 --- a/service/vm.go +++ b/service/vm.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "slices" "time" "github.com/easy-cloud-Knet/KWS_Control/client" @@ -15,215 +16,198 @@ import ( vms "github.com/easy-cloud-Knet/KWS_Control/structure" ) -func CreateVM(input CreateVMInput, contextStruct *vms.ControlContext, rdb *redis.Client) error { +// 새 VM 만드는 무언가. +// 하드 필터+코사인으로 적합 코어를 고르고(SelectCore), alloc-Redis에 예약한 뒤 +// CMS/Guacamole/Core에 VM을 생성한다. 실패 시 cleanup()이 예약을 롤백한다. +func CreateVM(req model.CreateVMRequest, contextStruct *vms.ControlContext, rdb *redis.Client) error { log := util.GetLogger() - uuid := input.UUID - hwReq := vms.HardwareRequirement{ - Memory: input.HardwareInfo.Memory, - CPU: input.HardwareInfo.CPU, - Disk: input.HardwareInfo.Disk, - } - - log.Info("func CreateVM() memory=%d GiB, cpu=%d, disk=%d GiB", hwReq.Memory, hwReq.CPU, hwReq.Disk, true) + ctx := context.Background() - // 1) 코어 선택 - selectedCore, selectedCoreIndex, err := selectCoreOrFail(contextStruct, hwReq) - if err != nil { - return err - } + log.Info("func CreateVM() memory=%d MiB, cpu=%d, disk=%d MiB", req.HardwareInfo.Memory, req.HardwareInfo.CPU, req.HardwareInfo.Disk, true) - // 2) SSH 키 생성 + // SSH 키는 예약 전에 생성 — 실패해도 롤백할 예약이 없도록. privateKeyPEM, publicKeyOpenSSH, err := internalssh.GenerateSSHKey() if err != nil { log.Error("GenerateSshKey() failed: %v", err, true) return fmt.Errorf("CreateVM: failed to generate SSH key: %w", err) } - // 단계별 롤백 등록을 위한 chain - cleanup := &cleanupChain{} - - //사용자 수 확인 - if len(input.Users) == 0 { - cleanup.run() - log.Error("CreateVM: at least one user is required for Guacamole configuration", true) - return fmt.Errorf("CreateVM: at least one user is required") + uuid := vms.UUID(req.UUID.String().(string)) + + // ---- 코어 선택 + 예약 (단일 임계구역) ---- + // Lock → SelectCore → IncrCoreAlloc(+) → Unlock 을 한 임계구역으로 묶어, + // 두 CreateVM이 같은 할당값을 읽고 같은 코어를 더블 부킹하는 것을 막는다. + // HINCRBY 원자성만으론 read-then-decide가 안전하지 않으므로 인메모리 mutex가 직렬화 지점. + // (단일 Control 프로세스 가정 — 수평 확장 시 Lua CAS로 교체.) + // 느린 I/O(CMS/Guacamole/Core /createVM)는 락을 잡지 않고 임계구역 밖에서 수행. + contextStruct.Lock() + selectedCore, selectedCoreIndex, selErr := SelectCore(ctx, req.HardwareInfo, contextStruct, rdb) + if selErr != nil { + contextStruct.Unlock() + log.Error("CreateVM: %v", selErr, true) + return fmt.Errorf("CreateVM: %w", selErr) + } + if reserveErr := IncrCoreAlloc(ctx, rdb, selectedCore.IP, selectedCore.Port, + int64(req.HardwareInfo.CPU), int64(req.HardwareInfo.Memory), int64(req.HardwareInfo.Disk)); reserveErr != nil { + contextStruct.Unlock() + log.Error("CreateVM: failed to reserve alloc on core %s:%d: %v", selectedCore.IP, selectedCore.Port, reserveErr, true) + return fmt.Errorf("CreateVM: failed to reserve allocation: %w", reserveErr) + } + contextStruct.Unlock() + + // 롤백 플래그 + allocReserved := true // alloc-Redis 예약 완료(IncrCoreAlloc+) + guacamoleConfigured := false // Guacamole 설정 완료 + vmTracked := false // VMInfoIdx 등록 + Free* 표시 캐시 반영 완료 + newSubnetAllocated := false + + cleanup := func() { + if guacamoleConfigured { + log.Info("clean up clean up") + if cleanupErr := guacamole.Cleanup(string(uuid), contextStruct.GuacDB); cleanupErr != nil { + log.Error("Failed to cleanup Guacamole config during rollback: %v", cleanupErr) + } + } + if vmTracked { + // VMInfoIdx와 Free* 표시 캐시는 여러 핸들러가 동시에 접근하므로 Lock 필요 + contextStruct.Lock() + delete(selectedCore.VMInfoIdx, uuid) + selectedCore.FreeMemory += req.HardwareInfo.Memory + selectedCore.FreeCPU += req.HardwareInfo.CPU + selectedCore.FreeDisk += req.HardwareInfo.Disk + contextStruct.Unlock() + } + if allocReserved { + // 예약을 감산해 alloc-Redis를 이전 합으로 되돌린다. + if relErr := IncrCoreAlloc(ctx, rdb, selectedCore.IP, selectedCore.Port, + -int64(req.HardwareInfo.CPU), -int64(req.HardwareInfo.Memory), -int64(req.HardwareInfo.Disk)); relErr != nil { + log.Error("Failed to release alloc reservation during rollback on core %s:%d: %v", selectedCore.IP, selectedCore.Port, relErr) + } + allocReserved = false + } + if newSubnetAllocated { + //subnet-- + } } - // 3) CMS 서브넷 할당 (Add: 기존 서브넷 / New: 신규 서브넷) - cmsResp, isNewSubnet, err := allocateCmsSubnet(contextStruct, input.SubnetType, uuid) + // add : back -> vm uuid -> cms 다른 api + // new : subnet 찾기 -> cms + var subnetReq *client.NewSubnetRequest + cmsClient := client.NewCmsClient() + + if req.Subnettype == "Add" { + subnetReq, err = AddCmsSubnet(cmsClient, contextStruct, uuid) + } else { + subnetReq, err = NewCmsSubnet(cmsClient, contextStruct) + newSubnetAllocated = true + } if err != nil { - //TODO AllocateCmsSubnet cleanup logic implement - log.Error("CreateVM: failed to allocate CMS subnet: %v", err, true) - return fmt.Errorf("CreateVM: failed to allocate CMS subnet: %w", err) + log.Error("CreateVM: failed to configure cms: %v", err, true) + cleanup() + return fmt.Errorf("CreateVM: failed to configure cms: %w", err) } - log.DebugInfo("CMS allocated: ip=%s, mac=%s, sdn=%s", cmsResp.IP, cmsResp.MacAddr, cmsResp.SdnUUID) + fmt.Printf("%s\n", subnetReq.IP) + fmt.Printf("%s\n", subnetReq.MacAddr) + fmt.Printf("%s\n", subnetReq.SdnUUID) + + userPass := guacamole.Configure(req.Users[0].Name, string(req.UUID), subnetReq.IP, privateKeyPEM, contextStruct.GuacDB) - // 4) Guacamole 사용자/커넥션 설정 - userPass := guacamole.Configure(input.Users[0].Name, string(uuid), cmsResp.IP, privateKeyPEM, contextStruct.GuacDB) if userPass == "" { - cleanup.run() log.Error("CreateVM: failed to configure Guacamole", true) + cleanup() return fmt.Errorf("CreateVM: failed to configure Guacamole") } - cleanup.push(func() { - log.Info("clean up: guacamole") - if err := guacamole.Cleanup(string(uuid), contextStruct.GuacDB); err != nil { - log.Error("Failed to cleanup Guacamole config during rollback: %v", err) - } - }) + guacamoleConfigured = true newVM := &vms.VMInfo{ UUID: uuid, GuacPassword: userPass, - MacAddr: cmsResp.MacAddr, - Memory: hwReq.Memory, - Cpu: hwReq.CPU, - Disk: hwReq.Disk, - IP_VM: cmsResp.IP, + MacAddr: subnetReq.MacAddr, + Memory: req.HardwareInfo.Memory, + Cpu: req.HardwareInfo.CPU, + Disk: req.HardwareInfo.Disk, + IP_VM: subnetReq.IP, + } + + // VMInfoIdx 등록 + Free* 표시 캐시 감소(자원 회계의 진실 소스는 위 IncrCoreAlloc). + contextStruct.Lock() + if selectedCore.VMInfoIdx == nil { + selectedCore.VMInfoIdx = make(map[vms.UUID]*vms.VMInfo) + } + selectedCore.VMInfoIdx[uuid] = newVM + selectedCore.FreeMemory -= req.HardwareInfo.Memory + selectedCore.FreeCPU -= req.HardwareInfo.CPU + selectedCore.FreeDisk -= req.HardwareInfo.Disk + vmTracked = true + contextStruct.Unlock() + // 이후 HTTP 전송은 락 밖에서 — 네트워크 콜 중 락을 잡으면 다른 요청 전체가 블로킹됨 + + log.DebugInfo("core %s reserved alloc+ cpu=%d mem=%d disk=%d", selectedCore.IP, req.HardwareInfo.CPU, req.HardwareInfo.Memory, req.HardwareInfo.Disk) + + req.NetConf.Ips = []string{subnetReq.IP} + req.SdnUUID = subnetReq.SdnUUID + req.MacAddr = subnetReq.MacAddr + req.NetConf.NetType = 0 + req.Users[0].SSHAuthorizedKeys = []string{publicKeyOpenSSH} + + vmRedisInfo := model.VMRedisInfo{ + UUID: uuid, + CPU: req.HardwareInfo.CPU, + Memory: req.HardwareInfo.Memory, + Disk: req.HardwareInfo.Disk, + IP: subnetReq.IP, + Status: model.VMStatusUnknown, // prepare begin 으로 초기화 함 + Time: time.Now().Unix(), } - // 5) 코어 자원 할당 (VMInfoIdx + Free* 원자적 갱신) - contextStruct.Resources.AllocateResources(selectedCore, uuid, newVM, hwReq) - cleanup.push(func() { - contextStruct.Resources.DeallocateResources(selectedCore, uuid, hwReq) - }) - log.DebugInfo("core %s updated: FreeMemory=%d, FreeCPU=%d, FreeDisk=%d", - selectedCore.IP, selectedCore.FreeMemory, selectedCore.FreeCPU, selectedCore.FreeDisk) - - // 6) Redis에 초기 상태 저장 (Core 호출 전에 — Core가 상태 갱신할 수 있도록) - if err := StoreVMInfoToRedis(context.Background(), rdb, VMInfo{ - UUID: uuid, - CPU: hwReq.CPU, - Memory: hwReq.Memory, - Disk: hwReq.Disk, - IP: cmsResp.IP, - Status: VMStatusUnknown, - }, time.Now().Unix()); err != nil { + // HTTP 전송 전에 저장을 완료하여 Core에서 업데이트할 수 있도록 순서 보장 + if err := StoreVMInfoToRedis(ctx, rdb, vmRedisInfo); err != nil { log.Warn("failed to store VM info to redis: %v", err, true) - // redis 저장 실패는 vm 생성 실패로 처리하지 않음 + // redis 저장 실패를 vm생성 실패로 처리하지는 않음 } - // 7) Core에 VM 생성 요청 — service DTO를 client contract로 변환 - coreReq := buildCoreCreateVMRequest(input, cmsResp, publicKeyOpenSSH) + // Redis 저장 완료 후 HTTP 전송 (Core에서 Redis 업데이트 가능) coreClient := client.NewCoreClient(selectedCore) - if _, err := coreClient.CreateVM(context.Background(), coreReq); err != nil { + _, err = coreClient.CreateVM(ctx, req) + if err != nil { log.Error("Error creating VM on core %s: %v", selectedCore.IP, err, true) - cleanup.run() + cleanup() // 직접 지우지 말고 요 함수 하나로-- return fmt.Errorf("CreateVM: failed to create VM on core %s: %w", selectedCore.IP, err) } - // 8) DB에 인스턴스 정보 영속화 - if err := contextStruct.VMRepo.AddInstance(newVM, selectedCoreIndex); err != nil { + err = contextStruct.AddInstance(newVM, selectedCoreIndex) + if err != nil { log.Error("Error database instance insertion failed: %v", err, true) - cleanup.run() - return fmt.Errorf("CreateVM: failed to persist instance: %w", err) + cleanup() // 직접 지우지 말고 요 함수 하나로-- + return fmt.Errorf("CreateVM: failed to persist instance %s: %w", uuid, err) } - // 8-1) 신규 서브넷 할당이었다면 last_subnet을 실제 할당된 IP로 갱신 - // (NewCmsSubnet에서 계산값으로 선점한 것을 CMS 응답값으로 정정) - if isNewSubnet { - if _, err := contextStruct.DB.Exec("UPDATE subnet SET last_subnet = ? WHERE id = '1'", cmsResp.IP); err != nil { - log.Error("Error database Subnet update failed: %v", err, true) + if newSubnetAllocated { + _, err := contextStruct.DB.Exec("UPDATE subnet SET last_subnet = ? WHERE id = '1'", subnetReq.IP) + if err != nil { + log.Error("Error database Subnet insertion failed: %v", err, true) } } - contextStruct.Resources.RegisterVM(uuid, &contextStruct.Resources.Cores[selectedCoreIndex], newVM) + // VMLocation map과 AliveVM slice를 하나의 Lock으로 묶어 일관성 보장 + // (VMLocation에는 있는데 AliveVM에는 없는 중간 상태가 노출되지 않도록) + contextStruct.Lock() + if contextStruct.VMLocation == nil { + contextStruct.VMLocation = make(map[vms.UUID]*vms.Core) + } + contextStruct.VMLocation[uuid] = &contextStruct.Cores[selectedCoreIndex] + contextStruct.AliveVM = append(contextStruct.AliveVM, newVM) + contextStruct.Unlock() log.Info("VM %s added to ControlContext", uuid, true) log.Info("UUID %s CreateVM request success on core %s", uuid, selectedCore.IP, true) return nil } -// 추후 Mapper 계층이 필요할 것으로 예상됨 -func buildCoreCreateVMRequest(input CreateVMInput, cmsResp *client.CmsNewInstanceResponse, publicKeyOpenSSH string) model.CreateVMRequest { - users := make([]model.UserInfoVM, len(input.Users)) - for i, u := range input.Users { - users[i] = model.UserInfoVM{ - Name: u.Name, - Groups: u.Groups, - Password: u.Password, - SSHAuthorizedKeys: u.SSHAuthorizedKeys, - } - } - // 첫 번째 사용자는 생성 시 발급한 SSH 공개키를 사용 - if len(users) > 0 { - users[0].SSHAuthorizedKeys = []string{publicKeyOpenSSH} - } - return model.CreateVMRequest{ - DomType: input.DomType, - DomName: input.DomName, - UUID: input.UUID, - OS: input.OS, - HardwareInfo: model.HardwareInfo{CPU: input.HardwareInfo.CPU, Memory: input.HardwareInfo.Memory, Disk: input.HardwareInfo.Disk}, - NetConf: model.NetDefine{Ips: []string{cmsResp.IP}, NetType: 0}, - Users: users, - SdnUUID: cmsResp.SdnUUID, - MacAddr: cmsResp.MacAddr, - Subnettype: input.SubnetType, - } -} - -// selectCoreOrFail은 코어 선택 + 실패 시 진단 로그 출력 캡슐화를 진행 -func selectCoreOrFail(contextStruct *vms.ControlContext, req vms.HardwareRequirement) (*vms.Core, int, error) { - log := util.GetLogger() - - log.DebugInfo("core selection process. req: memory=%d GiB, cpu=%d, disk=%d", req.Memory, req.CPU, req.Disk) - result := contextStruct.Resources.SelectCore(req) - - if result.Core != nil { - log.DebugInfo("core found: %s (idx=%d)", result.Core.IP, result.Index) - return result.Core, result.Index, nil - } - - log.Error("No suitable core found! Total cores: %d, Alive cores: %d, Required: Memory=%d CPU=%d Disk=%d", - result.TotalCores, result.AliveCount, req.Memory, req.CPU, req.Disk, true) - - if result.AliveCount > 0 { - log.DebugError("alive cores:") - contextStruct.Resources.RLock() - for i := range contextStruct.Resources.Cores { - core := &contextStruct.Resources.Cores[i] - if core.IsAlive { - log.DebugError(" %s: Memory=%d/%d, CPU=%d/%d, Disk=%d/%d", - core.IP, core.FreeMemory, core.CoreInfoIdx.Memory, - core.FreeCPU, core.CoreInfoIdx.Cpu, - core.FreeDisk, core.CoreInfoIdx.Disk) - } - } - contextStruct.Resources.RUnlock() - } else { - log.DebugError("no alive cores available") - } - - return nil, -1, fmt.Errorf("CreateVM: no suitable core found") -} - -func allocateCmsSubnet(contextStruct *vms.ControlContext, subnetType string, uuid vms.UUID) (*client.CmsNewInstanceResponse, bool, error) { - log := util.GetLogger() - cmsClient := client.NewCmsClient() - - if subnetType == "Add" { - resp, err := AddCmsSubnet(cmsClient, contextStruct, uuid) - if err != nil { - log.Error("CreateVM: failed to configure cms (Add): %v", err, true) - return nil, false, fmt.Errorf("CreateVM: failed to configure cms: %w", err) - } - return resp, false, nil - } - - resp, err := NewCmsSubnet(cmsClient, contextStruct) - if err != nil { - log.Error("CreateVM: failed to configure cms (New): %v", err, true) - return nil, false, fmt.Errorf("CreateVM: failed to configure cms: %w", err) - } - // 신규 서브넷 할당 시: 후속 단계 실패에 대한 별도 DB 롤백 로직은 미구현 상태 (TODO) - - return resp, true, nil -} - func DeleteVM(uuid vms.UUID, contextStruct *vms.ControlContext, rdb *redis.Client) error { log := util.GetLogger() + ctx := context.Background() core := contextStruct.FindCoreByVmUUID(uuid) if core == nil { @@ -231,41 +215,70 @@ func DeleteVM(uuid vms.UUID, contextStruct *vms.ControlContext, rdb *redis.Clien return fmt.Errorf("VM with UUID %s not found", string(uuid)) } - // 1) Core에 VM 삭제 요청 + // 삭제 전에 인메모리 VMInfoIdx에서 회수할 자원 크기를 확보. + contextStruct.RLock() + var dCPU, dMem, dDisk int64 + sizeKnown := false + if vmInfo, ok := core.VMInfoIdx[uuid]; ok { + dCPU, dMem, dDisk = int64(vmInfo.Cpu), int64(vmInfo.Memory), int64(vmInfo.Disk) + sizeKnown = true + } + contextStruct.RUnlock() + coreClient := client.NewCoreClient(core) - if _, err := coreClient.DeleteVM(context.Background(), model.DeleteVMRequest{ + _, err := coreClient.DeleteVM(ctx, model.DeleteVMRequest{ UUID: uuid, Type: model.HardDelete, - }); err != nil { + }) + if err != nil { log.Error("error deleting VM %s on core %s: %v", uuid, core.IP, err) return fmt.Errorf("DeleteVM: failed to delete VM %s on core %s: %w", uuid, core.IP, err) } - cmsClient := client.NewCmsClient() - if err := DeleteCmsSubnet(cmsClient, contextStruct, uuid); err != nil { - log.Error("DeleteVM: failed to delete CMS subnet for VM %s: %v", uuid, err) - // CMS 삭제 실패는 로그만 남기고 삭제 자체는 성공으로 처리 (추가적인 수동 정리 필요) + err = contextStruct.DeleteInstance(uuid) + if err != nil { + log.Error("error deleting instance %s from ControlContext: %v", uuid, err) + return fmt.Errorf("DeleteVM: failed to delete instance %s: %w", uuid, err) + } + if cleanupErr := guacamole.Cleanup(string(uuid), contextStruct.GuacDB); cleanupErr != nil { + log.Error("Failed to cleanup Guacamole config during rollback: %v", cleanupErr) } - // 2) DB에서 인스턴스 정보 삭제 - if err := contextStruct.VMRepo.DeleteInstance(uuid); err != nil { - log.Error("error deleting instance %s from DB: %v", uuid, err) - return fmt.Errorf("DeleteVM: failed to delete instance %s: %w", uuid, err) + if err := RemoveVMInfoFromRedis(ctx, rdb, uuid); err != nil { + log.Warn("failed to remove vm info from redis (vm deletion succeeded but..): %v", err, true) + // 얘도 create할 때처럼 redis 값 삭제 실패를 했을 때 del vm 자체를 실패했다고 처리하지는 않음 + // 물론 어케 처리할지 고민을.. } - // 3) Guacamole 사용자/커넥션 정리 - if err := guacamole.Cleanup(string(uuid), contextStruct.GuacDB); err != nil { - log.Error("Failed to cleanup Guacamole config: %v", err) + // Core + DB 삭제 성공 후 alloc-Redis 회수(회수된 용량 즉시 재사용 가능). + if sizeKnown { + if err := IncrCoreAlloc(ctx, rdb, core.IP, core.Port, -dCPU, -dMem, -dDisk); err != nil { + log.Warn("DeleteVM: failed to release alloc for %s on core %s:%d: %v", uuid, core.IP, core.Port, err, true) + } + } else { + log.Warn("DeleteVM: size for %s unknown (not in VMInfoIdx); alloc not adjusted, rebuild on restart will correct", uuid, true) } - // 4) Redis 정리 (실패해도 삭제 자체는 성공으로 처리 추가 처리 로직 필요) - if err := RemoveVMInfoFromRedis(context.Background(), rdb, uuid); err != nil { - log.Warn("failed to remove vm info from redis (vm deletion succeeded but..): %v", err, true) + // 인메모리 상태 정리(기존 누락 갭 보완): VMInfoIdx / VMLocation / AliveVM + Free* 표시 캐시 복원. + contextStruct.Lock() + delete(core.VMInfoIdx, uuid) + delete(contextStruct.VMLocation, uuid) + for i, vm := range contextStruct.AliveVM { + if vm.UUID == uuid { + contextStruct.AliveVM = slices.Delete(contextStruct.AliveVM, i, i+1) + break + } + } + if sizeKnown { + core.FreeMemory += uint32(dMem) + core.FreeCPU += uint32(dCPU) + core.FreeDisk += uint32(dDisk) } + contextStruct.Unlock() + log.Info("VM %s deleted from core %s", uuid, core.IP, true) return nil } - func StartVM(uuid vms.UUID, contextStruct *vms.ControlContext) error { log := util.GetLogger() @@ -275,7 +288,10 @@ func StartVM(uuid vms.UUID, contextStruct *vms.ControlContext) error { } coreClient := client.NewCoreClient(core) - if _, err := coreClient.StartVM(context.Background(), model.StartVMRequest{UUID: uuid}); err != nil { + _, err := coreClient.StartVM(context.Background(), model.StartVMRequest{ + UUID: uuid, + }) + if err != nil { return fmt.Errorf("StartVM: failed to start VM %s: %w", uuid, err) } @@ -290,85 +306,96 @@ func ShutdownVM(uuid vms.UUID, contextStruct *vms.ControlContext, rdb *redis.Cli } coreClient := client.NewCoreClient(core) - if _, err := coreClient.ForceShutdownVM(context.Background(), model.ForceShutdownVMRequest{UUID: uuid}); err != nil { + _, err := coreClient.ForceShutdownVM(context.Background(), model.ForceShutdownVMRequest{ + UUID: uuid, + }) + + if err != nil { return fmt.Errorf("ShutdownVM: failed to shutdown VM %s: %w", uuid, err) } - contextStruct.Resources.UnregisterAlive(uuid) + foundIndex := -1 + // 탐색과 삭제를 하나의 Lock 안에서 — 탐색 후 삭제 전에 다른 goroutine이 AliveVM을 바꾸면 index가 틀어짐 + contextStruct.Lock() + for i, vm := range contextStruct.AliveVM { + if vm.UUID == uuid { + foundIndex = i + break + } + } + + if foundIndex != -1 { + contextStruct.AliveVM = slices.Delete(contextStruct.AliveVM, foundIndex, foundIndex+1) + } + contextStruct.Unlock() - if err := UpdateVMStatusInRedis(context.Background(), rdb, uuid, VMStatusStopped, time.Now().Unix()); err != nil { - util.GetLogger().Warn("failed to update vm status in redis %v", err, true) + if err := UpdateVMStatusInRedis(context.Background(), rdb, uuid, model.VMStatusStopped, time.Now().Unix()); err != nil { + log := util.GetLogger() + log.Warn("failed to update vm status in redis %v", err, true) } return nil } -func GetVMCpuInfo(uuid vms.UUID, contextStruct *vms.ControlContext) (VMCpuStatus, error) { +func GetVMCpuInfo(uuid vms.UUID, contextStruct *vms.ControlContext) (model.CoreMachineCpuInfoResponse, error) { log := util.GetLogger() core := contextStruct.FindCoreByVmUUID(uuid) if core == nil { log.Error("GetVMCpuInfo: VM with UUID %s not found", string(uuid), true) - return VMCpuStatus{}, fmt.Errorf("GetVMCpuInfo: VM with UUID %s not found", string(uuid)) + return model.CoreMachineCpuInfoResponse{}, fmt.Errorf("GetVMCpuInfo: VM with UUID %s not found", string(uuid)) } coreClient := client.NewCoreClient(core) + cpuInfo, err := coreClient.GetVMCpuInfo(context.Background(), uuid) if err != nil { log.Error("GetVMCpuInfo: error getting CPU info for VM %s on core %s: %v", uuid, core.IP, err, true) - return VMCpuStatus{}, fmt.Errorf("GetVMCpuInfo: error getting CPU info for VM %s on core %s: %w", uuid, core.IP, err) + return model.CoreMachineCpuInfoResponse{}, fmt.Errorf("GetVMCpuInfo: error getting CPU info for VM %s on core %s: %w", uuid, core.IP, err) } log.DebugInfo("Retrieved CPU status for VM %s on core %s", uuid, core.IP) - return VMCpuStatus{System: cpuInfo.System, Idle: cpuInfo.Idle, Usage: cpuInfo.Usage}, nil + return cpuInfo, nil } -func GetVMMemoryInfo(uuid vms.UUID, contextStruct *vms.ControlContext) (VMMemoryStatus, error) { +func GetVMMemoryInfo(uuid vms.UUID, contextStruct *vms.ControlContext) (model.CoreMachineMemoryInfoResponse, error) { log := util.GetLogger() core := contextStruct.FindCoreByVmUUID(uuid) if core == nil { log.Error("GetVMMemoryInfo: VM with UUID %s not found", string(uuid), true) - return VMMemoryStatus{}, fmt.Errorf("GetVMMemoryInfo: VM with UUID %s not found", string(uuid)) + return model.CoreMachineMemoryInfoResponse{}, fmt.Errorf("GetVMMemoryInfo: VM with UUID %s not found", string(uuid)) } coreClient := client.NewCoreClient(core) + memoryInfo, err := coreClient.GetVMMemoryInfo(context.Background(), uuid) if err != nil { log.Error("GetVMMemoryInfo: error getting memory info for VM %s on core %s: %v", uuid, core.IP, err, true) - return VMMemoryStatus{}, fmt.Errorf("GetVMMemoryInfo: error getting memory info for VM %s on core %s: %w", uuid, core.IP, err) + return model.CoreMachineMemoryInfoResponse{}, fmt.Errorf("GetVMMemoryInfo: error getting memory info for VM %s on core %s: %w", uuid, core.IP, err) } log.DebugInfo("Retrieved Memory status for VM %s on core %s", uuid, core.IP) - return VMMemoryStatus{ - Total: memoryInfo.Total, - Used: memoryInfo.Used, - Available: memoryInfo.Available, - UsedPercent: memoryInfo.UsedPercent, - }, nil + return memoryInfo, nil } -func GetVMDiskInfo(uuid vms.UUID, contextStruct *vms.ControlContext) (VMDiskStatus, error) { +func GetVMDiskInfo(uuid vms.UUID, contextStruct *vms.ControlContext) (model.CoreMachineDiskInfoResponse, error) { log := util.GetLogger() core := contextStruct.FindCoreByVmUUID(uuid) if core == nil { log.Error("GetVMDiskInfo: VM with UUID %s not found", string(uuid), true) - return VMDiskStatus{}, fmt.Errorf("GetVMDiskInfo: VM with UUID %s not found", string(uuid)) + return model.CoreMachineDiskInfoResponse{}, fmt.Errorf("GetVMDiskInfo: VM with UUID %s not found", string(uuid)) } coreClient := client.NewCoreClient(core) + diskInfo, err := coreClient.GetVMDiskInfo(context.Background(), uuid) if err != nil { log.Error("GetVMDiskInfo: error getting disk info for VM %s on core %s: %v", uuid, core.IP, err, true) - return VMDiskStatus{}, fmt.Errorf("GetVMDiskInfo: error getting disk info for VM %s on core %s: %w", uuid, core.IP, err) + return model.CoreMachineDiskInfoResponse{}, fmt.Errorf("GetVMDiskInfo: error getting disk info for VM %s on core %s: %w", uuid, core.IP, err) } log.DebugInfo("Retrieved Disk status for VM %s on core %s", uuid, core.IP) - return VMDiskStatus{ - Total: diskInfo.Total, - Used: diskInfo.Used, - Free: diskInfo.Free, - UsedPercent: diskInfo.UsedPercent, - }, nil + return diskInfo, nil } diff --git a/startup/core_ip_config.go b/startup/core_ip_config.go index 0f38e0f..997c76b 100644 --- a/startup/core_ip_config.go +++ b/startup/core_ip_config.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "github.com/easy-cloud-Knet/KWS_Control/structure" "github.com/easy-cloud-Knet/KWS_Control/util" @@ -55,5 +56,50 @@ func tryReadConfig(path string) (structure.Config, error) { return structure.Config{}, fmt.Errorf("failed to decode config file: %w", err) } + applyAllocDefaults(&config) + return config, nil } + +// applyAllocDefaults는 코어 선택 파라미터에 env 오버라이드를 적용한 뒤 안전한 기본값으로 보정한다. +// env 패턴은 init.go의 DB 설정과 동일(env 우선, 없으면 config 값 사용). +// - cpu_overcommit ≤ 0 → 1.0 (오버커밋 없음) +// - *_reserve_pct 범위 밖([0,1)) → 0.0 (여유분 없음) +func applyAllocDefaults(config *structure.Config) { + log := util.GetLogger() + + if v := os.Getenv("CPU_OVERCOMMIT"); v != "" { + if f, err := strconv.ParseFloat(v, 64); err == nil { + config.CpuOvercommit = f + } else { + log.Warn("invalid CPU_OVERCOMMIT=%q, ignoring: %v", v, err) + } + } + if v := os.Getenv("MEM_RESERVE_PCT"); v != "" { + if f, err := strconv.ParseFloat(v, 64); err == nil { + config.MemReservePct = f + } else { + log.Warn("invalid MEM_RESERVE_PCT=%q, ignoring: %v", v, err) + } + } + if v := os.Getenv("DISK_RESERVE_PCT"); v != "" { + if f, err := strconv.ParseFloat(v, 64); err == nil { + config.DiskReservePct = f + } else { + log.Warn("invalid DISK_RESERVE_PCT=%q, ignoring: %v", v, err) + } + } + + if config.CpuOvercommit <= 0 { + config.CpuOvercommit = 1.0 + } + if config.MemReservePct < 0 || config.MemReservePct >= 1 { + config.MemReservePct = 0.0 + } + if config.DiskReservePct < 0 || config.DiskReservePct >= 1 { + config.DiskReservePct = 0.0 + } + + log.DebugInfo("alloc params: cpu_overcommit=%.2f, mem_reserve_pct=%.2f, disk_reserve_pct=%.2f", + config.CpuOvercommit, config.MemReservePct, config.DiskReservePct) +} diff --git a/startup/init.go b/startup/init.go index 3a2cdc1..1b025ed 100644 --- a/startup/init.go +++ b/startup/init.go @@ -146,8 +146,6 @@ func InitializeCoreData(configPath string) (structure.ControlContext, error) { infra.Config = config infra.GuacDB = db infra.DB = mainDB - infra.VMRepo = structure.NewMySQLVMRepository(mainDB) - infra.Resources = structure.NewResourceManager() var last_subnet_db string err = infra.DB.QueryRow("SELECT last_subnet from core_base.subnet where id = '1'").Scan(&last_subnet_db) @@ -156,11 +154,12 @@ func InitializeCoreData(configPath string) (structure.ControlContext, error) { } infra.Last_subnet = last_subnet_db // 모든 Core 정의 - for i := range infra.Resources.Cores { - for vmUUID := range infra.Resources.Cores[i].VMInfoIdx { - infra.Resources.VMLocation[vmUUID] = &infra.Resources.Cores[i] + infra.VMLocation = make(map[structure.UUID]*structure.Core) + for i := range infra.Cores { + for vmUUID := range infra.Cores[i].VMInfoIdx { + infra.VMLocation[vmUUID] = &infra.Cores[i] } - infra.Resources.Cores[i].IsAlive = false + infra.Cores[i].IsAlive = false } // config에 설정된 코어에 대해서 정보 업데이트 @@ -186,15 +185,15 @@ func InitializeCoreData(configPath string) (structure.ControlContext, error) { return structure.ControlContext{}, fmt.Errorf("error converting port number from %s: %w", coreAddress, err) } - core := findCore(infra.Resources.Cores, ip, uint16(port)) + core := findCore(infra.Cores, ip, uint16(port)) if core == nil { newCore := structure.Core{ IP: ip, Port: uint16(port), IsAlive: true, } - infra.Resources.Cores = append(infra.Resources.Cores, newCore) - core = &infra.Resources.Cores[len(infra.Resources.Cores)-1] + infra.Cores = append(infra.Cores, newCore) + core = &infra.Cores[len(infra.Cores)-1] log.DebugInfo("Added new core: %s:%d", ip, port) } else { core.IsAlive = true @@ -220,12 +219,23 @@ func InitializeCoreData(configPath string) (structure.ControlContext, error) { freeMemoryMiB := uint32(memResp.Available * 1024) freeDiskMiB := uint32(diskResp.Free * 1024) + // CPU 총량은 코어별로 한 번만 실측해 캐시한다(재확인 X). + // 이미 캐시된 값(>0)이 있으면 그대로 쓰고, 없을 때만 /getStatusHost로 받아온다. var totalCpuCores uint32 if currentCore.CoreInfoIdx.Cpu > 0 { totalCpuCores = currentCore.CoreInfoIdx.Cpu } else { - log.DebugInfo("currentCore.CoreInfoIdx.Cpu: %d", currentCore.CoreInfoIdx.Cpu) - totalCpuCores = 9999 // 음 코어를 현재 반환받지 못하는- + cpuResp, err := coreClient.GetCoreMachineCpuInfo(ctx) + if err != nil { + currentCore.IsAlive = false + return fmt.Errorf("failed to get CPU info for core %s:%d: %w", currentCore.IP, currentCore.Port, err) + } + if cpuResp == nil || cpuResp.Desc == nil || cpuResp.Desc.Total <= 0 { + currentCore.IsAlive = false + return fmt.Errorf("core %s:%d returned no vcpu_status.total", currentCore.IP, currentCore.Port) + } + totalCpuCores = uint32(cpuResp.Desc.Total) + log.DebugInfo("core %s:%d measured CPU total=%d", currentCore.IP, currentCore.Port, totalCpuCores) } currentCore.CoreInfoIdx.Cpu = totalCpuCores @@ -244,23 +254,23 @@ func InitializeCoreData(configPath string) (structure.ControlContext, error) { return structure.ControlContext{}, fmt.Errorf("failed to get core info: %w", err) } - vmInfoList, coreIdxList, err := infra.VMRepo.GetAllInstanceInfo() + vmInfoList, coreIdxList, err := infra.GetAllInstanceInfo() if err != nil { return structure.ControlContext{}, fmt.Errorf("failed to get all instance info: %w", err) } for i, vmInfo := range vmInfoList { coreIdx := coreIdxList[i] - if coreIdx < 0 || coreIdx >= len(infra.Resources.Cores) { + if coreIdx < 0 || coreIdx >= len(infra.Cores) { return structure.ControlContext{}, fmt.Errorf("core index %d out of range for core list", coreIdx) } - core := &infra.Resources.Cores[coreIdx] + core := &infra.Cores[coreIdx] if core.VMInfoIdx == nil { core.VMInfoIdx = make(map[structure.UUID]*structure.VMInfo) } if _, exists := core.VMInfoIdx[vmInfo.UUID]; !exists { core.VMInfoIdx[vmInfo.UUID] = &vmInfo - infra.Resources.VMLocation[vmInfo.UUID] = core + infra.VMLocation[vmInfo.UUID] = core } else { log.DebugInfo("VM %s already exists in core %d, skipping", vmInfo.UUID, coreIdx) } diff --git a/structure/control_infra.go b/structure/control_infra.go index cce24e3..9a1c57a 100644 --- a/structure/control_infra.go +++ b/structure/control_infra.go @@ -1,45 +1,210 @@ package structure import ( + "context" "database/sql" + "sync" + "time" "github.com/easy-cloud-Knet/KWS_Control/util" ) type ControlContext struct { - VMRepo VMRepository - Resources *ResourceManager + mu sync.RWMutex Config Config - DB *sql.DB // subnet 등 직접 쿼리용 (추후 별도 Repository로 분리 예정) + DB *sql.DB GuacDB *sql.DB Last_subnet string + Cores []Core // 모든 코어를 관리 + AliveVM []*VMInfo //현재 가동중인 VM의 정보 + VMLocation map[UUID]*Core //UUID를 기반으로 어떤 VM이 어느 Core에 있는지 확인하는 포인터 + // => 여기에다가는 사실 core에 ip주소를 기반으로 어느 ip에 어느 UUID를 가진 VM이 있는지만 확인할 수 있으면 될거같아요. + // => 근데 이거 자료형을 어떤걸 써야할지 모르겠어서 일단 이렇게 map[UUID]*Core로 해놓은거지 2차원 벡터로 해도 무방할거 같습니다. + // => 이렇게 한 이유는 이전 버전에서 VMPool map[UUID]*VM 이렇게 해놓았던데 이렇게 하면 VM이 많아졌을 때 너무 오래 걸릴거 같아서 IP를 기반으로 먼저 찾고 해당 core로 넘어가서 처리하면 더 좋지않을까? 생각했어요. } -// FindCoreByVmUUID search cores for the given VM UUID and returns the corresponding Core. +func (c *ControlContext) Lock() { c.mu.Lock() } +func (c *ControlContext) Unlock() { c.mu.Unlock() } +func (c *ControlContext) RLock() { c.mu.RLock() } +func (c *ControlContext) RUnlock() { c.mu.RUnlock() } + func (c *ControlContext) FindCoreByVmUUID(uuid UUID) *Core { log := util.GetLogger() - // Searching in-memory cache - c.Resources.RLock() - if core, ok := c.Resources.VMLocation[uuid]; ok { - c.Resources.RUnlock() - return core - } - c.Resources.RUnlock() - - // If not found in cache, query the repository - coreIdx, err := c.VMRepo.GetInstanceLocation(uuid) + coreIdx, err := c.GetInstanceLocation(uuid) if err != nil { log.Error("Core not found for VM UUID %s", uuid, true) return nil } - c.Resources.Lock() - defer c.Resources.Unlock() - if coreIdx < 0 || coreIdx >= len(c.Resources.Cores) { - log.Error("Core index %d out of range for VM UUID %s", coreIdx, uuid, true) - return nil + return &c.Cores[coreIdx] +} + +func (contextStructure *ControlContext) AddInstance(instanceInfo *VMInfo, coreIdx int) error { + log := util.GetLogger() + tx, err := contextStructure.DB.Begin() + if err != nil { + log.Error("Failed to start transaction %v", err) + return err + } + defer tx.Rollback() + + // Set timeout for the transaction + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = tx.ExecContext(ctx, "INSERT INTO inst_info (uuid, inst_ip, guac_pass, inst_mem, inst_vcpu, inst_disk) VALUES (?, ?, ?, ?, ?, ?)", + string(instanceInfo.UUID), + instanceInfo.IP_VM, + instanceInfo.GuacPassword, + instanceInfo.Memory, + instanceInfo.Cpu, + instanceInfo.Disk) + if err != nil { + log.Error("Failed to insert instance info: %v", err) + return err + } + _, err = tx.ExecContext(ctx, "INSERT INTO inst_loc (uuid, core) VALUES (?, ?)", + string(instanceInfo.UUID), + coreIdx) + if err != nil { + log.Error("Failed to insert instance core relation: %v", err) + return err + } + return tx.Commit() +} + +// 현재 미사용중 +// 이건 미사용중이면 안되지않나? 인스턴스정보 DB에 업데이트 하는거같은데 +func (contextStructure *ControlContext) UpdateInstance(instanceInfo *VMInfo) error { + log := util.GetLogger() + tx, err := contextStructure.DB.Begin() + if err != nil { + log.Error("Failed to start transaction %v", err) + return err + } + defer tx.Rollback() + + //Set timeout for the transaction + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + _, err = tx.ExecContext(ctx, "UPDATE inst_info SET inst_ip = ?, inst_mem = ?, inst_vcpu = ?, inst_disk = ? WHERE uuid = ?", + instanceInfo.IP_VM, + instanceInfo.Memory, + instanceInfo.Cpu, + instanceInfo.Disk, + string(instanceInfo.UUID)) + if err != nil { + log.Error("Failed to update instance info: %v", err) + return err + } + return tx.Commit() +} + +func (contextStructure *ControlContext) DeleteInstance(uuid UUID) error { + log := util.GetLogger() + tx, err := contextStructure.DB.Begin() + if err != nil { + log.Error("Failed to start transaction: %v", err) + return err + } + defer tx.Rollback() + // Set timeout for the transaction + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = tx.ExecContext(ctx, "DELETE FROM inst_info WHERE uuid = ?", uuid) + if err != nil { + return err + } + _, err = tx.ExecContext(ctx, "DELETE FROM inst_loc WHERE uuid = ?", uuid) + if err != nil { + return err + } + return tx.Commit() +} + +func (contextStructure *ControlContext) GetInstance(uuid UUID) (*VMInfo, error) { + log := util.GetLogger() + tx, err := contextStructure.DB.Begin() + if err != nil { + log.Error("Failed to start transaction: %v", err) + return nil, err + } + defer tx.Rollback() + //set timeout for the transaction + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var instance VMInfo + err = tx.QueryRowContext(ctx, "SELECT uuid, inst_ip, guac_pass, inst_mem, inst_vcpu, inst_disk FROM inst_info WHERE uuid = ?", uuid).Scan( + &instance.UUID, + &instance.IP_VM, + &instance.GuacPassword, + &instance.Memory, + &instance.Cpu, + &instance.Disk) + if err != nil { + log.Error("Failed to get instance info: %v", err) + return nil, err + } + return &instance, tx.Commit() +} + +func (contextStructure *ControlContext) GetInstanceLocation(uuid UUID) (int, error) { + log := util.GetLogger() + tx, err := contextStructure.DB.Begin() + if err != nil { + log.Error("Failed to start transaction: %v", err) + return 0, err + } + defer tx.Rollback() + //set timeout for the transaction + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var coreIdx int + err = tx.QueryRowContext(ctx, "SELECT core FROM inst_loc WHERE uuid = ?", uuid).Scan(&coreIdx) + if err != nil { + log.Error("Failed to get instance location: %v", err) + return 0, err + } + return coreIdx, tx.Commit() +} + +func (contextStructure *ControlContext) GetAllInstanceInfo() ([]VMInfo, []int, error) { + log := util.GetLogger() + tx, err := contextStructure.DB.Begin() + if err != nil { + log.Error("Failed to start transaction: %v", err) + return nil, nil, err + } + defer tx.Rollback() + //set timeout for the transaction + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var rows *sql.Rows + rows, err = tx.QueryContext(ctx, "SELECT info.uuid, loc.core, info.inst_ip, info.guac_pass, info.inst_vcpu, info.inst_mem, info.inst_disk FROM inst_loc loc JOIN inst_info info ON loc.uuid = info.uuid") + if err != nil { + log.Error("Failed to get joined instance info: %v", err) + return nil, nil, err + } + + var coreIdxList []int + var VMInfoList []VMInfo + + for rows.Next() { + var coreIdx int + var info VMInfo + + if err := rows.Scan(&info.UUID, &coreIdx, &info.IP_VM, &info.GuacPassword, &info.Cpu, &info.Memory, &info.Disk); err != nil { + log.Error("Failed to scan instance location: %v", err) + return nil, nil, err + } + log.DebugInfo("Found instance: %s on core %d", info.UUID, coreIdx) + VMInfoList = append(VMInfoList, info) + coreIdxList = append(coreIdxList, coreIdx) } - core := &c.Resources.Cores[coreIdx] - c.Resources.VMLocation[uuid] = core - return core + return VMInfoList, coreIdxList, tx.Commit() } diff --git a/structure/mysql_vm_repository.go b/structure/mysql_vm_repository.go deleted file mode 100644 index 16edd5b..0000000 --- a/structure/mysql_vm_repository.go +++ /dev/null @@ -1,221 +0,0 @@ -package structure - -import ( - "context" - "database/sql" - "time" - - "github.com/easy-cloud-Knet/KWS_Control/util" -) - -// MySQL VMRepository 구현체 -type MySQLVMRepository struct { - DB *sql.DB -} - -func NewMySQLVMRepository(db *sql.DB) *MySQLVMRepository { - return &MySQLVMRepository{DB: db} -} - -// AddInstance adds a new instance to the DB and associates it with a core index. -func (r *MySQLVMRepository) AddInstance(instanceInfo *VMInfo, coreIdx int) error { - log := util.GetLogger() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - tx, err := r.DB.BeginTx(ctx, nil) - if err != nil { - log.Error("Failed to start transaction %v", err) - return err - } - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error("Failed to rollback transaction: %v", err) - } - }() - - _, err = tx.ExecContext(ctx, "INSERT INTO inst_info (uuid, inst_ip, guac_pass, inst_mem, inst_vcpu, inst_disk) VALUES (?, ?, ?, ?, ?, ?)", - string(instanceInfo.UUID), - instanceInfo.IP_VM, - instanceInfo.GuacPassword, - instanceInfo.Memory, - instanceInfo.Cpu, - instanceInfo.Disk) - if err != nil { - log.Error("Failed to insert instance info: %v", err) - return err - } - _, err = tx.ExecContext(ctx, "INSERT INTO inst_loc (uuid, core) VALUES (?, ?)", - string(instanceInfo.UUID), - coreIdx) - if err != nil { - log.Error("Failed to insert instance core relation: %v", err) - return err - } - return tx.Commit() -} - -// UpdateInstance updates the instance information in DB for the given VM UUID. -// Note: Guacamole password is not updated here as it is generated only once at instance creation. If needed, it can be added as well. -func (r *MySQLVMRepository) UpdateInstance(instanceInfo *VMInfo) error { - log := util.GetLogger() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - tx, err := r.DB.BeginTx(ctx, nil) - if err != nil { - log.Error("Failed to start transaction %v", err) - return err - } - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error("Failed to rollback transaction: %v", err) - } - }() - - _, err = tx.ExecContext(ctx, "UPDATE inst_info SET inst_ip = ?, inst_mem = ?, inst_vcpu = ?, inst_disk = ? WHERE uuid = ?", - instanceInfo.IP_VM, - instanceInfo.Memory, - instanceInfo.Cpu, - instanceInfo.Disk, - string(instanceInfo.UUID)) - if err != nil { - log.Error("Failed to update instance info: %v", err) - return err - } - return tx.Commit() -} - -// DeleteInstance deletes the instance in DB for the given VM UUID. -func (r *MySQLVMRepository) DeleteInstance(uuid UUID) error { - log := util.GetLogger() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - tx, err := r.DB.BeginTx(ctx, nil) - if err != nil { - log.Error("Failed to start transaction: %v", err) - return err - } - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error("Failed to rollback transaction: %v", err) - } - }() - - _, err = tx.ExecContext(ctx, "DELETE FROM inst_info WHERE uuid = ?", uuid) - if err != nil { - return err - } - _, err = tx.ExecContext(ctx, "DELETE FROM inst_loc WHERE uuid = ?", uuid) - if err != nil { - return err - } - return tx.Commit() -} - -// GetInstance returns the Instance information for the given VM UUID. -func (r *MySQLVMRepository) GetInstance(uuid UUID) (*VMInfo, error) { - log := util.GetLogger() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - tx, err := r.DB.BeginTx(ctx, nil) - if err != nil { - log.Error("Failed to start transaction: %v", err) - return nil, err - } - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error("Failed to rollback transaction: %v", err) - } - }() - - var instance VMInfo - err = tx.QueryRowContext(ctx, "SELECT uuid, inst_ip, guac_pass, inst_mem, inst_vcpu, inst_disk FROM inst_info WHERE uuid = ?", uuid).Scan( - &instance.UUID, - &instance.IP_VM, - &instance.GuacPassword, - &instance.Memory, - &instance.Cpu, - &instance.Disk) - if err != nil { - log.Error("Failed to get instance info: %v", err) - return nil, err - } - return &instance, tx.Commit() -} - -// GetInstanceLocation returns the core index for the given VM UUID. -func (r *MySQLVMRepository) GetInstanceLocation(uuid UUID) (int, error) { - log := util.GetLogger() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - tx, err := r.DB.BeginTx(ctx, nil) - if err != nil { - log.Error("Failed to start transaction: %v", err) - return 0, err - } - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error("Failed to rollback transaction: %v", err) - } - }() - - var coreIdx int - err = tx.QueryRowContext(ctx, "SELECT core FROM inst_loc WHERE uuid = ?", uuid).Scan(&coreIdx) - if err != nil { - log.Error("Failed to get instance location: %v", err) - return 0, err - } - return coreIdx, tx.Commit() -} - -// GetAllInstanceInfo returns the list of all instance information and their corresponding core indices. -func (r *MySQLVMRepository) GetAllInstanceInfo() ([]VMInfo, []int, error) { - log := util.GetLogger() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - tx, err := r.DB.BeginTx(ctx, nil) - if err != nil { - log.Error("Failed to start transaction: %v", err) - return nil, nil, err - } - defer func() { - if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error("Failed to rollback transaction: %v", err) - } - }() - - var rows *sql.Rows - rows, err = tx.QueryContext(ctx, "SELECT info.uuid, loc.core, info.inst_ip, info.guac_pass, info.inst_vcpu, info.inst_mem, info.inst_disk FROM inst_loc loc JOIN inst_info info ON loc.uuid = info.uuid") - if err != nil { - log.Error("Failed to get joined instance info: %v", err) - return nil, nil, err - } - - var coreIdxList []int - var VMInfoList []VMInfo - - for rows.Next() { - var coreIdx int - var info VMInfo - - if err := rows.Scan(&info.UUID, &coreIdx, &info.IP_VM, &info.GuacPassword, &info.Cpu, &info.Memory, &info.Disk); err != nil { - log.Error("Failed to scan instance location: %v", err) - return nil, nil, err - } - log.DebugInfo("Found instance: %s on core %d", info.UUID, coreIdx) - VMInfoList = append(VMInfoList, info) - coreIdxList = append(coreIdxList, coreIdx) - } - - if err := rows.Err(); err != nil { - log.Error("Failed to iterate instance information rows: %v", err) - return nil, nil, err - } - - return VMInfoList, coreIdxList, tx.Commit() -} diff --git a/structure/repository.go b/structure/repository.go deleted file mode 100644 index d727b13..0000000 --- a/structure/repository.go +++ /dev/null @@ -1,12 +0,0 @@ -package structure - -// VM 인스턴스의 데이터베이스 영속성 인터페이스 -// ControlContext에서 DB 관련 책임을 분리 -type VMRepository interface { - AddInstance(instanceInfo *VMInfo, coreIdx int) error - UpdateInstance(instanceInfo *VMInfo) error - DeleteInstance(uuid UUID) error - GetInstance(uuid UUID) (*VMInfo, error) - GetInstanceLocation(uuid UUID) (int, error) - GetAllInstanceInfo() ([]VMInfo, []int, error) -} diff --git a/structure/resource_manager.go b/structure/resource_manager.go deleted file mode 100644 index 553ee31..0000000 --- a/structure/resource_manager.go +++ /dev/null @@ -1,117 +0,0 @@ -package structure - -import ( - "slices" - "sync" -) - -// ResourceManager는 코어/VM의 런타임 인메모리 상태를 관리 -// ControlContext에서 뮤텍스와 상태 필드를 분리 -type ResourceManager struct { - mu sync.RWMutex - Cores []Core // 모든 코어를 리스트 - AliveVM []*VMInfo // 현재 가동중인 VM의 정보 - VMLocation map[UUID]*Core // UUID 기반 VM 코어 위치 확인 -} - -func NewResourceManager() *ResourceManager { - return &ResourceManager{ - VMLocation: make(map[UUID]*Core), - } -} - -func (rm *ResourceManager) Lock() { rm.mu.Lock() } -func (rm *ResourceManager) Unlock() { rm.mu.Unlock() } -func (rm *ResourceManager) RLock() { rm.mu.RLock() } -func (rm *ResourceManager) RUnlock() { rm.mu.RUnlock() } - -// HardwareRequirement는 코어 선택 시 필요한 자원 요구량 구조체 -type HardwareRequirement struct { - Memory uint32 // MiB - CPU uint32 // logical cores - Disk uint32 // MiB -} - -// CoreSelectionResult는 SelectCore의 반환값으로 진단 정보를 내포 -type CoreSelectionResult struct { - Core *Core - Index int - AliveCount int - TotalCores int -} - -// SelectCore는 요청한 자원을 만족하는 첫 번째 살아있는 코어를 탐색. -// RLock 범위 내에서 코어 슬라이스를 순회 (네트워크 콜은 호출자가 락 밖에서 수행). -// 적합한 코어가 없으면 Core==nil로 반환하며, 진단 로그를 위한 카운트 정보를 함께 제공 -func (rm *ResourceManager) SelectCore(req HardwareRequirement) CoreSelectionResult { - rm.mu.RLock() - defer rm.mu.RUnlock() - - result := CoreSelectionResult{ - Index: -1, - TotalCores: len(rm.Cores), - } - - for i := range rm.Cores { - core := &rm.Cores[i] - if !core.IsAlive { - continue - } - result.AliveCount++ - - if core.FreeMemory >= req.Memory && core.FreeCPU >= req.CPU && core.FreeDisk >= req.Disk { - result.Core = core - result.Index = i - return result - } - } - return result -} - -// AllocateResources는 코어의 VMInfoIdx 맵에 VM을 등록하고 Free* 필드를 차감 -func (rm *ResourceManager) AllocateResources(core *Core, uuid UUID, vm *VMInfo, req HardwareRequirement) { - rm.mu.Lock() - defer rm.mu.Unlock() - - if core.VMInfoIdx == nil { - core.VMInfoIdx = make(map[UUID]*VMInfo) - } - core.VMInfoIdx[uuid] = vm - core.FreeMemory -= req.Memory - core.FreeCPU -= req.CPU - core.FreeDisk -= req.Disk -} - -// DeallocateResources는 AllocateResources의 역연산 -func (rm *ResourceManager) DeallocateResources(core *Core, uuid UUID, req HardwareRequirement) { - rm.mu.Lock() - defer rm.mu.Unlock() - - delete(core.VMInfoIdx, uuid) - core.FreeMemory += req.Memory - core.FreeCPU += req.CPU - core.FreeDisk += req.Disk -} - -// RegisterVM은 VMLocation 맵과 AliveVM 슬라이스에 VM을 동시에 등록 -func (rm *ResourceManager) RegisterVM(uuid UUID, core *Core, vm *VMInfo) { - rm.mu.Lock() - defer rm.mu.Unlock() - - rm.VMLocation[uuid] = core - rm.AliveVM = append(rm.AliveVM, vm) -} - -// UnregisterAlive는 AliveVM 슬라이스에서 해당 UUID를 제거 -func (rm *ResourceManager) UnregisterAlive(uuid UUID) bool { - rm.mu.Lock() - defer rm.mu.Unlock() - - for i, vm := range rm.AliveVM { - if vm.UUID == uuid { - rm.AliveVM = slices.Delete(rm.AliveVM, i, i+1) - return true - } - } - return false -} diff --git a/structure/vm.go b/structure/vm.go index 0627ebc..f6d2182 100644 --- a/structure/vm.go +++ b/structure/vm.go @@ -14,6 +14,11 @@ type Config struct { Redis string `yaml:"redis"` DB DBConfig `yaml:"db"` GuacDB DBConfig `yaml:"guac_db"` + + // 코어 선택 알고리즘 파라미터 (기본값은 startup.applyAllocDefaults에서 보정) + CpuOvercommit float64 `yaml:"cpu_overcommit"` // vCPU 오버커밋 배수, ≤0이면 1.0 + MemReservePct float64 `yaml:"mem_reserve_pct"` // 0..1, 메모리 여유분 비율 + DiskReservePct float64 `yaml:"disk_reserve_pct"` // 0..1, 디스크 여유분 비율 } type DBConfig struct { diff --git a/tests/docker-compose.test.yml b/tests/docker-compose.test.yml index 04348a2..f596fd7 100644 --- a/tests/docker-compose.test.yml +++ b/tests/docker-compose.test.yml @@ -7,11 +7,10 @@ services: volumes: - ./init-test-db.sql:/docker-entrypoint-initdb.d/init.sql healthcheck: - test: ["CMD", "mysql", "-uroot", "-ptestpass", "-h", "127.0.0.1", "-P", "3306"] + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-ptestpass"] interval: 3s timeout: 5s - retries: 30 - start_period: 50s + retries: 20 redis-test: image: redis:7-alpine @@ -40,7 +39,6 @@ services: context: .. dockerfile: Dockerfile container_name: kws-test-control - restart: on-failure:3 ports: - "18081:8081" environment: