diff --git a/attic/src/api/v1/upload_path.rs b/attic/src/api/v1/upload_path.rs index 5b1231e5..cb90928c 100644 --- a/attic/src/api/v1/upload_path.rs +++ b/attic/src/api/v1/upload_path.rs @@ -25,7 +25,7 @@ pub const ATTIC_NAR_INFO_PREAMBLE_SIZE: &str = "X-Attic-Nar-Info-Preamble-Size"; /// Regardless of client compression, the server will always decompress /// the NAR to validate the NAR hash before applying the server-configured /// compression again. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct UploadPathNarInfo { /// The name of the binary cache to upload to. pub cache: CacheName, diff --git a/client/src/push.rs b/client/src/push.rs index 309bd4b6..f3951d2b 100644 --- a/client/src/push.rs +++ b/client/src/push.rs @@ -560,57 +560,83 @@ pub async fn upload_path( ); let bar = mp.add(ProgressBar::new(path_info.nar_size)); bar.set_style(style); - let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone()) - .map_ok(Bytes::from); - let start = Instant::now(); - match api - .upload_path(upload_info, nar_stream, force_preamble) - .await - { - Ok(r) => { - let r = r.unwrap_or(UploadPathResult { - kind: UploadPathResultKind::Uploaded, - file_size: None, - frac_deduplicated: None, - }); - - let info_string: String = match r.kind { - UploadPathResultKind::Deduplicated => "deduplicated".to_string(), - _ => { - let elapsed = start.elapsed(); - let seconds = elapsed.as_secs_f64(); - let speed = (path_info.nar_size as f64 / seconds) as u64; + // Create a new stream for each retry attempt + let bar_ref = &bar; + let nar_stream = move || { + NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar_ref.clone()) + .map_ok(Bytes::from) + }; - let mut s = format!("{}/s", HumanBytes(speed)); + let start = Instant::now(); + let mut retries = 0; + const MAX_RETRIES: u32 = 3; + const RETRY_DELAY: Duration = Duration::from_millis(250); - if let Some(frac_deduplicated) = r.frac_deduplicated { - if frac_deduplicated > 0.01f64 { - s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); + loop { + let result = api + .upload_path(upload_info.clone(), nar_stream(), force_preamble) + .await; + match result { + Ok(r) => { + let r = r.unwrap_or(UploadPathResult { + kind: UploadPathResultKind::Uploaded, + file_size: None, + frac_deduplicated: None, + }); + + let info_string: String = match r.kind { + UploadPathResultKind::Deduplicated => "deduplicated".to_string(), + _ => { + let elapsed = start.elapsed(); + let seconds = elapsed.as_secs_f64(); + let speed = (path_info.nar_size as f64 / seconds) as u64; + + let mut s = format!("{}/s", HumanBytes(speed)); + + if let Some(frac_deduplicated) = r.frac_deduplicated { + if frac_deduplicated > 0.01f64 { + s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); + } } + + s } + }; - s + mp.suspend(|| { + eprintln!( + "✅ {} ({})", + path.as_os_str().to_string_lossy(), + info_string + ); + }); + bar.finish_and_clear(); + + return Ok(()); + } + Err(e) => { + if retries < MAX_RETRIES { + retries += 1; + mp.suspend(|| { + eprintln!( + "❕ {}: Upload failed, retrying ({}/{})...", + path.as_os_str().to_string_lossy(), + retries, + MAX_RETRIES + ); + }); + tokio::time::sleep(RETRY_DELAY).await; + continue; } - }; - mp.suspend(|| { - eprintln!( - "✅ {} ({})", - path.as_os_str().to_string_lossy(), - info_string - ); - }); - bar.finish_and_clear(); + mp.suspend(|| { + eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e); + }); + bar.finish_and_clear(); - Ok(()) - } - Err(e) => { - mp.suspend(|| { - eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e); - }); - bar.finish_and_clear(); - Err(e) + return Err(e); + } } } }