diff --git a/overlays/default.nix b/overlays/default.nix index 9ffc29d..fad59b0 100644 --- a/overlays/default.nix +++ b/overlays/default.nix @@ -16,6 +16,14 @@ in nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ prev.writableTmpDirAsHomeHook ]; }); + # Retry on push failure to work around hyper connection pool race condition. + # https://github.com/zhaofengli/attic/pull/246 + attic-client = prev.attic-client.overrideAttrs (old: { + patches = (old.patches or [ ]) ++ [ + ../patches/attic-client-push-retry.patch + ]; + }); + # Add --zeroconf-port support to Spotify Connect plugin so librespot # binds to a fixed port that can be opened in the firewall. music-assistant = prev.music-assistant.overrideAttrs (old: { diff --git a/patches/attic-client-push-retry.patch b/patches/attic-client-push-retry.patch new file mode 100644 index 0000000..0078d71 --- /dev/null +++ b/patches/attic-client-push-retry.patch @@ -0,0 +1,143 @@ +diff --git a/attic/src/api/v1/upload_path.rs b/attic/src/api/v1/upload_path.rs +index 5b1231e5..cb90928c 100644 +--- a/attic/src/api/v1/upload_path.rs ++++ b/attic/src/api/v1/upload_path.rs +@@ -25,7 +25,7 @@ pub const ATTIC_NAR_INFO_PREAMBLE_SIZE: &str = "X-Attic-Nar-Info-Preamble-Size"; + /// Regardless of client compression, the server will always decompress + /// the NAR to validate the NAR hash before applying the server-configured + /// compression again. +-#[derive(Debug, Serialize, Deserialize)] ++#[derive(Debug, Serialize, Deserialize, Clone)] + pub struct UploadPathNarInfo { + /// The name of the binary cache to upload to. + pub cache: CacheName, +diff --git a/client/src/push.rs b/client/src/push.rs +index 309bd4b6..f3951d2b 100644 +--- a/client/src/push.rs ++++ b/client/src/push.rs +@@ -560,57 +560,83 @@ pub async fn upload_path( + ); + let bar = mp.add(ProgressBar::new(path_info.nar_size)); + bar.set_style(style); +- let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone()) +- .map_ok(Bytes::from); + +- let start = Instant::now(); +- match api +- .upload_path(upload_info, nar_stream, force_preamble) +- .await +- { +- Ok(r) => { +- let r = r.unwrap_or(UploadPathResult { +- kind: UploadPathResultKind::Uploaded, +- file_size: None, +- frac_deduplicated: None, +- }); +- +- let info_string: String = match r.kind { +- UploadPathResultKind::Deduplicated => "deduplicated".to_string(), +- _ => { +- let elapsed = start.elapsed(); +- let seconds = elapsed.as_secs_f64(); +- let speed = (path_info.nar_size as f64 / seconds) as u64; ++ // Create a new stream for each retry attempt ++ let bar_ref = &bar; ++ let nar_stream = move || { ++ NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar_ref.clone()) ++ .map_ok(Bytes::from) ++ }; + +- let mut s = format!("{}/s", HumanBytes(speed)); ++ let start = Instant::now(); ++ let mut retries = 0; ++ const MAX_RETRIES: u32 = 3; ++ const RETRY_DELAY: Duration = Duration::from_millis(250); + +- if let Some(frac_deduplicated) = r.frac_deduplicated { +- if frac_deduplicated > 0.01f64 { +- s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); ++ loop { ++ let result = api ++ .upload_path(upload_info.clone(), nar_stream(), force_preamble) ++ .await; ++ match result { ++ Ok(r) => { ++ let r = r.unwrap_or(UploadPathResult { ++ kind: UploadPathResultKind::Uploaded, ++ file_size: None, ++ frac_deduplicated: None, ++ }); ++ ++ let info_string: String = match r.kind { ++ UploadPathResultKind::Deduplicated => "deduplicated".to_string(), ++ _ => { ++ let elapsed = start.elapsed(); ++ let seconds = elapsed.as_secs_f64(); ++ let speed = (path_info.nar_size as f64 / seconds) as u64; ++ ++ let mut s = format!("{}/s", HumanBytes(speed)); ++ ++ if let Some(frac_deduplicated) = r.frac_deduplicated { ++ if frac_deduplicated > 0.01f64 { ++ s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); ++ } + } ++ ++ s + } ++ }; + +- s ++ mp.suspend(|| { ++ eprintln!( ++ "✅ {} ({})", ++ path.as_os_str().to_string_lossy(), ++ info_string ++ ); ++ }); ++ bar.finish_and_clear(); ++ ++ return Ok(()); ++ } ++ Err(e) => { ++ if retries < MAX_RETRIES { ++ retries += 1; ++ mp.suspend(|| { ++ eprintln!( ++ "❕ {}: Upload failed, retrying ({}/{})...", ++ path.as_os_str().to_string_lossy(), ++ retries, ++ MAX_RETRIES ++ ); ++ }); ++ tokio::time::sleep(RETRY_DELAY).await; ++ continue; + } +- }; + +- mp.suspend(|| { +- eprintln!( +- "✅ {} ({})", +- path.as_os_str().to_string_lossy(), +- info_string +- ); +- }); +- bar.finish_and_clear(); ++ mp.suspend(|| { ++ eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e); ++ }); ++ bar.finish_and_clear(); + +- Ok(()) +- } +- Err(e) => { +- mp.suspend(|| { +- eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e); +- }); +- bar.finish_and_clear(); +- Err(e) ++ return Err(e); ++ } + } + } + }