diff --git a/Cargo.lock b/Cargo.lock index dedfc6d..3fa04a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -572,6 +572,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -643,6 +653,7 @@ dependencies = [ "directories", "dlmgr", "duct", + "fs2", "futures-util", "hex-literal", "home", @@ -2464,6 +2475,22 @@ dependencies = [ "libc", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.11" @@ -2473,6 +2500,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.62.2" diff --git a/Cargo.toml b/Cargo.toml index 689ee76..ca7b4f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,13 @@ dialoguer = { version = "0.12.0" } directories = "6.0.0" dlmgr = "0.3.1" duct = { version = "1.1.1" } +fs2 = "0.4.3" futures-util = { version = "0.3" } hex-literal = "1.1.0" home = { version = "0.5.12" } humansize = "2" -qemu_img_cmd_types = { path = "libs/qemu_img_cmd_types" } pem = "3.0.6" +qemu_img_cmd_types = { path = "libs/qemu_img_cmd_types" } rcgen = { version = "0.14.7", features = ["crypto","ring", "x509-parser"] } reqwest = { version = "0.13.2", features = ["json", "query", "rustls"] } rustls = "0.23.37" @@ -37,10 +38,10 @@ tempfile = "3.27.0" time = "0.3.47" tokio = { version = "1.50.0", features = ["rt", "macros", "io-std", "fs"] } tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-webpki-roots", "stream", "connect"], default-features = false } -url = { version = "2.5.0" } -x509-parser = "0.18.1" unicode-segmentation = "1.12.0" +url = { version = "2.5.0" } which = "8.0.2" +x509-parser = "0.18.1" [build-dependencies] diff --git a/src/api/cluster_vm_api/entities.rs b/src/api/cluster_vm_api/entities.rs new file mode 100644 index 0000000..fcb502b --- /dev/null +++ b/src/api/cluster_vm_api/entities.rs @@ -0,0 +1,97 @@ +// ============================================================================= +// AUTO-GENERATED — DO NOT EDIT +// ============================================================================= + +use serde::{Deserialize, Serialize}; + +pub type Ipv4Network = String; + +/// If dhcp is enabled, ip and gateway must be null. If dhcp is disabled, ip and gateway must be provided and valid. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddressConfig { + pub dhcp: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub gateway: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ip: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DnsConfiguration { + pub nameservers: Vec, +} + +/// An identifier that uniquely identifies an event, user, resource object, etc in Gallium's platform. +pub type GalliumSlug = String; + +/// Path parameters for ListVirtualMachines GET /cluster-api/{cluster_id}/vm/{kube_ns} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListVirtualMachinesPathParams { + pub cluster_id: GalliumSlug, + pub kube_ns: String, +} + +/// MAC address as a string +pub type MacAddress = String; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VmNetworkInterfaceConfiguration { + #[serde(skip_serializing_if = "Option::is_none")] + pub dns: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ip: Option, + #[serde(rename = "macAddr")] + #[serde(skip_serializing_if = "Option::is_none")] + pub mac_addr: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub network: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum VirtualMachineStatus { + Stopped, + Provisioning, + Starting, + Running, + Paused, + Stopping, + Terminating, + Migrating, + Unknown, + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VirtualMachineVolume { + pub bootable: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub bus: Option, + #[serde(rename = "volumeKubeName")] + pub volume_kube_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VirtualMachine { + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + pub interfaces: Vec, + #[serde(rename = "kubeName")] + pub kube_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + pub status: VirtualMachineStatus, + pub volumes: Vec, +} + +/// Response for ListVirtualMachines GET /cluster-api/{cluster_id}/vm/{kube_ns} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VirtualMachineListResponse { + pub items: Vec, +} + +// ============================================================================= +// GET /cluster-api/{cluster_id}/vm/{kube_ns} ListVirtualMachines +// List existing virtual machines +// Security: bearerAuth +// ============================================================================= +// (no request body) diff --git a/src/api/cluster_vm_api/mod.rs b/src/api/cluster_vm_api/mod.rs new file mode 100644 index 0000000..ffe7791 --- /dev/null +++ b/src/api/cluster_vm_api/mod.rs @@ -0,0 +1,38 @@ +use crate::api::ApiClient; +use crate::api::cluster_vm_api::entities::{ + ListVirtualMachinesPathParams, VirtualMachineListResponse, +}; +use crate::api::errors::ApiClientError; +use derive_more::Constructor; +use reqwest::Method; +use std::sync::Arc; + +#[allow(unused)] +pub mod entities; +#[derive(Constructor)] +pub struct ClusterVmApi { + api_client: Arc, +} + +impl ClusterVmApi { + pub async fn list_virtual_machines( + &self, + path_params: &ListVirtualMachinesPathParams, + ) -> Result { + let response = self + .api_client + .request_authed( + Method::GET, + &[ + "cluster-api", + &path_params.cluster_id, + "vm", + &path_params.kube_ns, + ], + )? + .send() + .await?; + + self.api_client.deser_response(response).await + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index 8704f42..f075d88 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,6 +1,7 @@ use crate::api::common_api::entities::GalliumApiErrorResponse; use crate::api::errors::ApiClientError; +use crate::api::cluster_vm_api::ClusterVmApi; use crate::api::command_v2_api::CommandApi; use crate::api::login_api::LoginApi; use crate::api::storage_api::StorageApi; @@ -11,6 +12,7 @@ use serde::de::DeserializeOwned; use std::sync::Arc; use url::Url; +pub mod cluster_vm_api; pub mod command_v2_api; mod common_api; pub mod errors; @@ -114,6 +116,10 @@ impl ApiClient { } } + pub fn cluster_vm_api(self: &Arc) -> ClusterVmApi { + ClusterVmApi::new(self.clone()) + } + pub fn login_api(self: &Arc) -> LoginApi { LoginApi::new(self.clone()) } diff --git a/src/api/storage_api/entities.rs b/src/api/storage_api/entities.rs index 58769e3..7dadbec 100644 --- a/src/api/storage_api/entities.rs +++ b/src/api/storage_api/entities.rs @@ -78,6 +78,48 @@ pub struct ListDiskPoolsPathParams { pub cluster_id: GalliumSlug, } +/// Path parameters for ListVolumes GET /cluster-api/{cluster_id}/volume/{kube_ns} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListVolumesPathParams { + pub cluster_id: GalliumSlug, + pub kube_ns: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum VolumeTypeLabel { + TemplateImage, + Iso, + UserStorage, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Volume { + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + #[serde(rename = "kubeName")] + pub kube_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + /// The total size of the volume + #[serde(rename = "sizeBytes")] + pub size_bytes: u64, + #[serde(rename = "storageClass")] + pub storage_class: String, + /// The amount of data that is actually allocated within the volume (actualSize in longhorn terminology). NOTE: Space free-ed by the OS may not be reliably reclaimed. + #[serde(rename = "usedSizeBytes")] + #[serde(skip_serializing_if = "Option::is_none")] + pub used_size_bytes: Option, + #[serde(rename = "volumeType")] + #[serde(skip_serializing_if = "Option::is_none")] + pub volume_type: Option, +} + +/// Response for ListVolumes GET /cluster-api/{cluster_id}/volume/{kube_ns} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VolumeListResponse { + pub volumes: Vec, +} + // ============================================================================= // POST /cluster-api/{cluster_id}/volume/{kube_ns}/{kube_name}/nbd/export ExportNbdVolume // Command to export a volume via NBD @@ -120,3 +162,10 @@ pub struct VolumeNbdImportRequest { // Security: bearerAuth // ============================================================================= // (no request body) + +// ============================================================================= +// GET /cluster-api/{cluster_id}/volume/{kube_ns} ListVolumes +// List existing volumes +// Security: bearerAuth +// ============================================================================= +// (no request body) diff --git a/src/api/storage_api/mod.rs b/src/api/storage_api/mod.rs index 6539f02..bbff2bc 100644 --- a/src/api/storage_api/mod.rs +++ b/src/api/storage_api/mod.rs @@ -3,7 +3,8 @@ use crate::api::errors::ApiClientError; use crate::api::ApiClient; use crate::api::storage_api::entities::{ CmdSubmitResponse, DiskPoolListResponse, ExportNbdVolumePathParams, ImportNbdVolumePathParams, - ListDiskPoolsPathParams, VolumeNbdExportRequest, VolumeNbdImportRequest, + ListDiskPoolsPathParams, ListVolumesPathParams, VolumeListResponse, VolumeNbdExportRequest, + VolumeNbdImportRequest, }; use derive_more::Constructor; use reqwest::Method; @@ -79,4 +80,25 @@ impl StorageApi { .await?; self.api_client.deser_response(response).await } + + pub async fn list_volumes( + &self, + path_params: &ListVolumesPathParams, + ) -> Result { + let response = self + .api_client + .request_authed( + Method::GET, + &[ + "cluster-api", + &path_params.cluster_id, + "volume", + &path_params.kube_ns, + ], + )? + .send() + .await?; + + self.api_client.deser_response(response).await + } } diff --git a/src/helpers/mod.rs b/src/helpers/mod.rs index e997304..0fee812 100644 --- a/src/helpers/mod.rs +++ b/src/helpers/mod.rs @@ -6,3 +6,4 @@ pub mod helper_cmd_error; pub mod mtls; pub mod nbd; pub mod qemu; +pub mod ui; diff --git a/src/helpers/qemu/mod.rs b/src/helpers/qemu/mod.rs index 4c18ddd..3e01e96 100644 --- a/src/helpers/qemu/mod.rs +++ b/src/helpers/qemu/mod.rs @@ -122,13 +122,11 @@ impl QemuImgConvert { } } -pub async fn qemu_img_convert( - qemu_img: QemuImgCmdProvider, - args: QemuImgConvert, -) -> ( - Arc, - JoinHandle, std::io::Error>>, -) { +pub struct ConvertTask { + pub progress: Arc, + pub handle: JoinHandle, std::io::Error>>, +} +pub async fn qemu_img_convert(qemu_img: QemuImgCmdProvider, args: QemuImgConvert) -> ConvertTask { let convert_progress_provider = Arc::new(QemuConvertProgressProvider::default()); let convert_progress_provider2 = convert_progress_provider.clone(); let task_handle = tokio::task::spawn_blocking(move || { @@ -136,5 +134,8 @@ pub async fn qemu_img_convert( report_progress(convert_progress_provider2, reader) }); - (convert_progress_provider, task_handle) + ConvertTask { + progress: convert_progress_provider, + handle: task_handle, + } } diff --git a/src/helpers/ui/mod.rs b/src/helpers/ui/mod.rs new file mode 100644 index 0000000..6d16470 --- /dev/null +++ b/src/helpers/ui/mod.rs @@ -0,0 +1 @@ +pub mod transfer_progress_ui; diff --git a/src/helpers/ui/transfer_progress_ui.rs b/src/helpers/ui/transfer_progress_ui.rs new file mode 100644 index 0000000..72ced85 --- /dev/null +++ b/src/helpers/ui/transfer_progress_ui.rs @@ -0,0 +1,78 @@ +use crate::api::command_v2_api::entities::ApiCmdStatus; +use crate::helpers::cmd::cmd_progress::CommandProgressUpdater; +use crate::helpers::qemu::{ConvertTask, QemuImgConvert}; +use crate::task_common::error::TaskError; +use cliclack::{MultiProgress, ProgressBar, multi_progress, progress_bar, spinner}; + +pub struct TransferProgressUi { + pub multi: MultiProgress, + pub spinner_init: ProgressBar, + pub pb: ProgressBar, + pub spinner_final: ProgressBar, +} + +impl TransferProgressUi { + pub fn init(task_description: String) -> Self { + let multi = multi_progress(task_description); + let spinner_init = multi.add(spinner()); + let pb = multi.add(progress_bar(10000)); + let spinner_final = multi.add(spinner()); + + Self { + multi, + spinner_init, + pb, + spinner_final, + } + } +} + +pub async fn transfer_progress_ui( + mut convert_task: ConvertTask, + progress_updater: CommandProgressUpdater, + ui: TransferProgressUi, +) -> Result<(), TaskError> { + let mut ui_tick = tokio::time::interval(tokio::time::Duration::from_millis(100)); + let mut backend_tick = tokio::time::interval(tokio::time::Duration::from_millis(5000)); + + // qemu-img can take some time after progress has reached 100% before it will actually terminate. + // (it is waiting for the file/volume to fsync, among other things) + // Rather than show a progress bar stuck at 100%, switch to a spinner. + let mut waiting_for_cmd_to_complete = false; + + loop { + tokio::select! { + _ = ui_tick.tick() => { + if !waiting_for_cmd_to_complete { + let p = convert_task.progress.read_progress(); + ui.pb.set_position(p as u64); + if p == 10000 { + ui.pb.stop("Sending data"); + waiting_for_cmd_to_complete = true; + ui.spinner_final.start("Waiting for completion"); + } + } + } + _ = backend_tick.tick() => { + progress_updater.update_progress(convert_task.progress.read_progress() as f64, 10000.0); + } + r = &mut convert_task.handle => { + return match QemuImgConvert::assert_ok(r) { + Ok(_) => { + progress_updater.complete(ApiCmdStatus::COMPLETE).await?; + ui.spinner_final.stop("Complete"); + ui.multi.stop(); + Ok(()) + } + Err(e) => { + progress_updater.complete(ApiCmdStatus::FAILED).await.ok(); + ui.spinner_final.error("Failed"); + ui.multi.stop(); + + Err(TaskError::HelperCommand { source: e }) + } + }; + } + } + } +} diff --git a/src/task_common/error.rs b/src/task_common/error.rs index cc2cd0d..ef2f833 100644 --- a/src/task_common/error.rs +++ b/src/task_common/error.rs @@ -15,6 +15,8 @@ pub enum TaskError { }, #[snafu(display("Invalid state: {reason}"))] InvalidState { reason: &'static str }, + #[snafu(display("Required parameter missing: {reason}"))] + RequiredParameterMissing { reason: &'static str }, #[snafu(display("Invalid state for {command}: {reason}"))] InvalidStateForCommand { command: &'static str, diff --git a/src/tasks/export/mod.rs b/src/tasks/export/mod.rs index 298a38e..ad24010 100644 --- a/src/tasks/export/mod.rs +++ b/src/tasks/export/mod.rs @@ -1,7 +1,8 @@ mod format; +mod ui_confirm; +mod volume_scan; use crate::api::ApiClient; -use crate::api::command_v2_api::entities::ApiCmdStatus; use crate::api::storage_api::entities::{ExportNbdVolumePathParams, VolumeNbdExportRequest}; use crate::args::GlobalArguments; use crate::helpers::auth::get_login_response_for_saved_credentials; @@ -12,26 +13,37 @@ use crate::task_common::error::HelperCommandSnafu; use snafu::ResultExt; use crate::helpers::qemu::{ConvertOperation, QemuImgConvert, qemu_img_convert}; +use crate::helpers::ui::transfer_progress_ui::{TransferProgressUi, transfer_progress_ui}; use crate::task_common::error::TaskError; use crate::tasks::export::format::ExportFormat; +use crate::tasks::export::ui_confirm::confirm_export; +use crate::tasks::export::volume_scan::{ScannedVolume, scan_volumes_for_export}; use crate::tasks_internal::qemu_img::ensure_qemu_img; -use cliclack::{multi_progress, progress_bar, spinner}; use std::path::PathBuf; use std::sync::Arc; #[derive(clap::Args)] +#[clap(arg_required_else_help = true)] pub struct ExportArguments { /// Deployment ID to export from #[arg(short, long)] pub source: String, - /// Volume ID to export - #[arg(short, long)] - pub vol: String, + /// Volume names / IDs to export + #[arg(short = 'n', long)] + pub vol: Vec, + + /// Virtual machine names / Ids to export + #[arg(short = 'v', long)] + pub vm: Vec, /// Format to export #[arg(short, long)] pub format: ExportFormat, + + #[arg(short, long)] + /// Do not ask for confirmation - just go ahead if parameters are valid + yes: bool, } pub async fn export_main( global_args: &GlobalArguments, @@ -43,36 +55,49 @@ pub async fn export_main( .try_into()?, ); - //TODO: confirm parameters are correct, source volume exists, - // free space for export, etc - - process(api_client, args.source, args.vol, args.format).await?; + let volumes = scan_volumes_for_export(&api_client, &args).await?; + if confirm_export(&volumes, &args)? { + for volume in volumes { + process(&api_client, args.source.clone(), volume, args.format).await?; + } + } Ok(()) } async fn process( - api_client: Arc, + api_client: &Arc, source: String, - vol_name: String, + volume: ScannedVolume, exp_format: ExportFormat, ) -> Result<(), TaskError> { let qemu_img = ensure_qemu_img().await.context(HelperCommandSnafu)?; let storage_api = api_client.storage_api(); - let export_filename = format!("{vol_name}_export{}", exp_format.as_ext()); + let export_filename = format!("{}_export{}", volume.kube_name, exp_format.as_ext()); + + let export_file: PathBuf; + if let Some(dir_name) = volume.vm_name.as_deref() { + tokio::fs::create_dir_all(&dir_name) + .await + .whatever_context::<_, TaskError>("create dir for vm exports")?; + export_file = PathBuf::from(dir_name).join(&export_filename); + } else { + export_file = PathBuf::from(&export_filename); + } - let multi = multi_progress(format!("Exporting {vol_name} to {export_filename}")); - let spinner_init = multi.add(spinner()); - let pb = multi.add(progress_bar(10000)); - let spinner_final = multi.add(spinner()); + let ui = TransferProgressUi::init(format!( + "Exporting {} to {}", + volume.kube_name, + export_file.display() + )); let mtls_helper = MtlsCredentialHelper::new().context(HelperCommandSnafu)?; let path_params = ExportNbdVolumePathParams { cluster_id: source, - kube_name: vol_name, + kube_name: volume.kube_name, kube_ns: "default".to_string(), }; @@ -97,11 +122,11 @@ async fn process( .context(HelperCommandSnafu)? .to_path_buf(); - spinner_init.start("Waiting for deployment"); + ui.spinner_init.start("Waiting for deployment"); let nbd = poll_for_nbd_response(&cmd_api, &submit_resp).await?; - spinner_init.stop("Deployment waiting for connection"); + ui.spinner_init.stop("Deployment waiting for connection"); let convert_cmd = QemuImgConvert { cert_dir, @@ -109,7 +134,7 @@ async fn process( nbd_host: nbd.host_ip, nbd_port: nbd.host_port, op: ConvertOperation::Export { - target_file: PathBuf::from(export_filename), + target_file: export_file, target_format: exp_format.as_qemu_img_fmt().to_string(), }, }; @@ -117,46 +142,6 @@ async fn process( let progress_updater = CommandProgressUpdater::build_and_spawn(cmd_api, &submit_resp, "AWAIT_NBD_COMPLETION")?; - //TODO: this is copy-pasted from import, it should be factored out. - // (but, does it need the same logic around waiting for completion?) - let (progress, mut task) = qemu_img_convert(qemu_img.clone(), convert_cmd).await; - - let mut ui_tick = tokio::time::interval(tokio::time::Duration::from_millis(100)); - let mut backend_tick = tokio::time::interval(tokio::time::Duration::from_millis(5000)); - let mut waiting_for_completion = false; - loop { - tokio::select! { - _ = ui_tick.tick() => { - if !waiting_for_completion { - let p = progress.read_progress(); - pb.set_position(p as u64); - if p == 10000 { - pb.stop("Sending data"); - waiting_for_completion = true; - spinner_final.start("Waiting for completion"); - } - } - } - _ = backend_tick.tick() => { - // TODO: inform backend of progress - } - r = &mut task => { - return match QemuImgConvert::assert_ok(r) { - Ok(_) => { - progress_updater.complete(ApiCmdStatus::COMPLETE).await?; - spinner_final.stop("Export complete"); - multi.stop(); - Ok(()) - } - Err(e) => { - progress_updater.complete(ApiCmdStatus::FAILED).await.ok(); - spinner_final.error("Export failed"); - multi.stop(); - - Err(TaskError::HelperCommand { source: e }) - } - }; - } - } - } + let convert_task = qemu_img_convert(qemu_img.clone(), convert_cmd).await; + transfer_progress_ui(convert_task, progress_updater, ui).await } diff --git a/src/tasks/export/ui_confirm.rs b/src/tasks/export/ui_confirm.rs new file mode 100644 index 0000000..c133c0c --- /dev/null +++ b/src/tasks/export/ui_confirm.rs @@ -0,0 +1,98 @@ +use crate::task_common::error::TaskError; +use crate::tasks::export::ExportArguments; +use crate::tasks::export::volume_scan::ScannedVolume; +use cliclack::{confirm, log}; +use fs2::available_space; +use humansize::{BINARY, format_size}; +use snafu::ResultExt; +use std::collections::BTreeMap; + +pub fn confirm_export( + export_vols: &[ScannedVolume], + args: &ExportArguments, +) -> Result { + if export_vols.is_empty() { + log::warning("Nothing to export.") + .whatever_context::<_, TaskError>("writing to terminal")?; + return Ok(false); + } + + let mut grouped: BTreeMap, Vec<&ScannedVolume>> = BTreeMap::new(); + for vol in export_vols { + grouped.entry(vol.vm_name.as_deref()).or_default().push(vol); + } + + let mut lines = Vec::new(); + let mut total_bytes: u64 = 0; + + for (vm_name, vols) in &grouped { + if let Some(name) = vm_name { + lines.push(format!(" VM: {}", name)); + } + for v in vols { + let size_str = match v.used_size_bytes { + Some(used) => { + total_bytes += used; + format_size(used, BINARY).to_string() + } + None => { + total_bytes += v.size_bytes; + format!("{} (allocated)", format_size(v.size_bytes, BINARY)) + } + }; + let indent = if vm_name.is_some() { " " } else { " " }; + lines.push(format!( + "{}{} (estimated: {})", + indent, v.kube_name, size_str + )); + } + } + + lines.push(format!( + "\n Total export size: {}", + format_size(total_bytes, BINARY) + )); + + log::info(format!( + "The following {} will be exported:\n{}", + if export_vols.len() == 1 { + "volume" + } else { + "volumes" + }, + lines.join("\n"), + )) + .whatever_context::<_, TaskError>("writing to terminal")?; + + maybe_print_disk_space_warning(total_bytes)?; + + if !args.yes { + let proceed = confirm("Proceed with export?") + .initial_value(true) + .interact() + .whatever_context::<_, TaskError>("reading confirmation")?; + if !proceed { + log::warning("Export cancelled.") + .whatever_context::<_, TaskError>("writing to terminal")?; + return Ok(false); + } + } + + Ok(true) +} + +fn maybe_print_disk_space_warning(total_bytes: u64) -> Result<(), TaskError> { + let free_space_bytes = + available_space(".").whatever_context::<_, TaskError>("checking available space")?; + + if free_space_bytes < total_bytes { + log::warning(format!( + "Insufficient free disk space: {} available, estimated {} required", + format_size(free_space_bytes, BINARY), + format_size(total_bytes, BINARY), + )) + .whatever_context::<_, TaskError>("writing to terminal")?; + } + + Ok(()) +} diff --git a/src/tasks/export/volume_scan.rs b/src/tasks/export/volume_scan.rs new file mode 100644 index 0000000..dd72b2f --- /dev/null +++ b/src/tasks/export/volume_scan.rs @@ -0,0 +1,165 @@ +use crate::api::ApiClient; +use crate::api::cluster_vm_api::entities::{ListVirtualMachinesPathParams, VirtualMachine}; +use crate::api::storage_api::entities::{ListVolumesPathParams, Volume}; +use crate::task_common::error::TaskError; +use crate::tasks::export::ExportArguments; +use std::collections::HashSet; +use std::sync::Arc; + +pub struct ScannedVolume { + pub vm_name: Option, + pub kube_name: String, + pub size_bytes: u64, + pub used_size_bytes: Option, +} + +pub async fn scan_volumes_for_export( + api_client: &Arc, + args: &ExportArguments, +) -> Result, TaskError> { + if args.vm.is_empty() && args.vol.is_empty() { + return Err(TaskError::RequiredParameterMissing { + reason: "At least one VM or volume must be specified for export.", + }); + } else if args.vm.is_empty() == args.vol.is_empty() { + // this restriction is imposed for now, only to avoid having to deal with the case where + // a user specifies a volume and the VM that volume is a part of in one command. + return Err(TaskError::InvalidState { + reason: "Cannot specify both VMs and volumes in one export command.", + }); + } + let storage_api = api_client.storage_api(); + let volume_list = storage_api + .list_volumes(&ListVolumesPathParams { + cluster_id: args.source.clone(), + kube_ns: "default".into(), + }) + .await? + .volumes; + + let vm_api = api_client.cluster_vm_api(); + let vm_list = vm_api + .list_virtual_machines(&ListVirtualMachinesPathParams { + cluster_id: args.source.clone(), + kube_ns: "default".into(), + }) + .await? + .items; + + let mut scanned_volumes = vec![]; + + let mut matched_vms = HashSet::new(); + + for vm_arg in args.vm.iter() { + let vm = match_vm_arg(vm_arg, &vm_list)?; + if !matched_vms.insert(vm.kube_name.to_string()) { + return Err(TaskError::UserInputInvalidValueReason { + val: vm_arg.to_string(), + field: "vm", + reason: "Same VM specified more than once", + }); + } + + for vm_vol in vm.volumes.iter() { + let vol = volume_list + .iter() + .find(|v| v.kube_name.as_str() == vm_vol.volume_kube_name.as_str()) + .ok_or_else(|| TaskError::InvalidState { + reason: "Volume present on VM missing from volume list", + })?; + scanned_volumes.push(ScannedVolume { + vm_name: Some(vm.kube_name.clone()), + kube_name: vol.kube_name.clone(), + size_bytes: vol.size_bytes, + used_size_bytes: vol.used_size_bytes, + }); + } + } + + let mut matched_vols = HashSet::new(); + for vol_arg in args.vol.iter() { + let vol = match_vol_arg(vol_arg, &volume_list)?; + if !matched_vols.insert(vol.kube_name.to_string()) { + return Err(TaskError::UserInputInvalidValueReason { + val: vol_arg.to_string(), + field: "vm", + reason: "Same volume specified more than once", + }); + } else { + scanned_volumes.push(ScannedVolume { + vm_name: None, + kube_name: vol.kube_name.clone(), + size_bytes: vol.size_bytes, + used_size_bytes: vol.used_size_bytes, + }); + } + } + + Ok(scanned_volumes) +} + +fn match_vm_arg<'a>( + vm_arg: &str, + vm_list: &'a [VirtualMachine], +) -> Result<&'a VirtualMachine, TaskError> { + let mut matched_vm = None; + let mut matched_count = 0; + for vm in vm_list.iter() { + if vm.kube_name.as_str() == vm_arg { + return Ok(vm); + } else if vm.name.as_deref() == Some(vm_arg) { + matched_vm = Some(vm); + matched_count += 1; + } + } + if matched_count == 1 + && let Some(vm) = matched_vm + { + Ok(vm) + } else if matched_count == 0 { + Err(TaskError::UserInputInvalidValueReason { + val: vm_arg.to_string(), + field: "vm", + reason: "Does not match any VMs on deployment", + }) + } else { + //TODO: this should log the specific VMs + Err(TaskError::UserInputInvalidValueReason { + val: vm_arg.to_string(), + field: "vm", + reason: "Ambiguously matches multiple VMs", + }) + } +} + +// Is it worth trying to build a generic matcher? I think we will end up with this pattern repeated more. +fn match_vol_arg<'a>(vol_arg: &str, vol_list: &'a [Volume]) -> Result<&'a Volume, TaskError> { + let mut matched_vol = None; + let mut matched_count = 0; + for vol in vol_list.iter() { + if vol.kube_name.as_str() == vol_arg { + return Ok(vol); + } else if vol.name.as_deref() == Some(vol_arg) { + matched_vol = Some(vol); + matched_count += 1; + } + } + + if matched_count == 1 + && let Some(vol) = matched_vol + { + Ok(vol) + } else if matched_count == 0 { + Err(TaskError::UserInputInvalidValueReason { + val: vol_arg.to_string(), + field: "vol", + reason: "Does not match any volumes on deployment", + }) + } else { + Err(TaskError::UserInputInvalidValueReason { + val: vol_arg.to_string(), + field: "vol", + reason: "Ambiguously matches multiple volumes", + }) + } +} diff --git a/src/tasks/import/mod.rs b/src/tasks/import/mod.rs index f88e6f4..08158eb 100644 --- a/src/tasks/import/mod.rs +++ b/src/tasks/import/mod.rs @@ -6,7 +6,6 @@ use crate::task_common::error::TaskError; use crate::tasks::import::source_scan::{ImportSource, ScanResult, scan_import_sources}; use crate::api::ApiClient; -use crate::api::command_v2_api::entities::ApiCmdStatus; use crate::api::storage_api::entities::{ImportNbdVolumePathParams, VolumeNbdImportRequest}; use crate::helpers::auth::get_login_response_for_saved_credentials; use crate::helpers::cmd::cmd_progress::CommandProgressUpdater; @@ -14,17 +13,19 @@ use crate::helpers::mtls::MtlsCredentialHelper; use crate::helpers::nbd::poll_for_nbd_response; use crate::helpers::qemu::qemu_img_cmd_provider::QemuImgCmdProvider; use crate::helpers::qemu::{ConvertOperation, QemuImgConvert, qemu_img_convert}; +use crate::helpers::ui::transfer_progress_ui::{TransferProgressUi, transfer_progress_ui}; use crate::task_common::error::HelperCommandSnafu; use crate::tasks::import::disk_pool::{DiskPoolDetermination, determine_disk_pool}; use crate::tasks::import::param_helpers::{description, truncate_name}; use crate::tasks_internal::qemu_img::ensure_qemu_img; -use cliclack::{confirm, log, multi_progress, progress_bar, spinner}; +use cliclack::{confirm, log}; use humansize::{BINARY, format_size}; use snafu::ResultExt; use std::path::PathBuf; use std::sync::Arc; #[derive(clap::Parser)] +#[clap(arg_required_else_help = true)] pub(crate) struct ImportArguments { #[arg(short, long)] /// The ID or name of the disk pool where the import should be stored @@ -134,12 +135,8 @@ async fn process( ) -> Result<(), TaskError> { let storage_api = api_client.storage_api(); - let multi = multi_progress(format!("Importing {}", source.name_part)); - let spinner_init = multi.add(spinner()); - let pb = multi.add(progress_bar(10000)); - let spinner_final = multi.add(spinner()); - - spinner_init.start("Preparing import"); + let ui = TransferProgressUi::init(format!("Importing {}", source.name_part)); + ui.spinner_init.start("Preparing import"); let mtls_helper = MtlsCredentialHelper::new().context(HelperCommandSnafu)?; @@ -158,7 +155,7 @@ async fn process( }; let submit_resp = storage_api.import_nbd_volume(&path_params, &req).await?; - spinner_init.start("Waiting for volume"); + ui.spinner_init.start("Waiting for volume"); //TODO: Poll all the commands to provide more detailed status as import progresses let cmd_api = api_client.command_api(); @@ -176,11 +173,11 @@ async fn process( .context(HelperCommandSnafu)? .to_path_buf(); - spinner_init.start("Waiting for deployment"); + ui.spinner_init.start("Waiting for deployment"); let nbd = poll_for_nbd_response(&cmd_api, &submit_resp).await?; - spinner_init.stop("Deployment ready to receive import"); + ui.spinner_init.stop("Deployment ready to receive import"); let convert_cmd = QemuImgConvert { cert_dir, @@ -196,49 +193,7 @@ async fn process( let progress_updater = CommandProgressUpdater::build_and_spawn(cmd_api, &submit_resp, "AWAIT_NBD_COMPLETION")?; - let (progress, mut task) = qemu_img_convert(qemu_img.clone(), convert_cmd).await; - - let mut ui_tick = tokio::time::interval(tokio::time::Duration::from_millis(100)); - let mut backend_tick = tokio::time::interval(tokio::time::Duration::from_millis(5000)); - - // qemu-img can take some time after progress has reached 100% before it will actually terminate. - // (it is waiting for the other side to fsync out the file, among other things) - // rather than show a progress bar stuck at 100%, switch to a spinner. - let mut waiting_for_cmd_to_complete = false; - - loop { - tokio::select! { - _ = ui_tick.tick() => { - if !waiting_for_cmd_to_complete { - let p = progress.read_progress(); - pb.set_position(p as u64); - if p == 10000 { - pb.stop("Sending data"); - waiting_for_cmd_to_complete = true; - spinner_final.start("Waiting for completion"); - } - } - } - _ = backend_tick.tick() => { - progress_updater.update_progress(progress.read_progress()as f64, 10000.0); - } - r = &mut task => { - return match QemuImgConvert::assert_ok(r) { - Ok(_) => { - progress_updater.complete(ApiCmdStatus::COMPLETE).await?; - spinner_final.stop("Import complete"); - multi.stop(); - Ok(()) - } - Err(e) => { - progress_updater.complete(ApiCmdStatus::FAILED).await.ok(); - spinner_final.error("Import failed"); - multi.stop(); - - Err(TaskError::HelperCommand { source: e }) - } - }; - } - } - } + let convert_task = qemu_img_convert(qemu_img.clone(), convert_cmd).await; + + transfer_progress_ui(convert_task, progress_updater, ui).await }