Patch attic-client to retry on push failure
Some checks failed
Check Flake / check-flake (push) Failing after 4m5s
Some checks failed
Check Flake / check-flake (push) Failing after 4m5s
Backport zhaofengli/attic#246 to work around a hyper connection pool race condition that causes spurious "connection closed before message completed" errors during cache uploads in CI.
This commit is contained in:
@@ -16,6 +16,14 @@ in
|
||||
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ prev.writableTmpDirAsHomeHook ];
|
||||
});
|
||||
|
||||
# Retry on push failure to work around hyper connection pool race condition.
|
||||
# https://github.com/zhaofengli/attic/pull/246
|
||||
attic-client = prev.attic-client.overrideAttrs (old: {
|
||||
patches = (old.patches or [ ]) ++ [
|
||||
../patches/attic-client-push-retry.patch
|
||||
];
|
||||
});
|
||||
|
||||
# Add --zeroconf-port support to Spotify Connect plugin so librespot
|
||||
# binds to a fixed port that can be opened in the firewall.
|
||||
music-assistant = prev.music-assistant.overrideAttrs (old: {
|
||||
|
||||
143
patches/attic-client-push-retry.patch
Normal file
143
patches/attic-client-push-retry.patch
Normal file
@@ -0,0 +1,143 @@
|
||||
diff --git a/attic/src/api/v1/upload_path.rs b/attic/src/api/v1/upload_path.rs
|
||||
index 5b1231e5..cb90928c 100644
|
||||
--- a/attic/src/api/v1/upload_path.rs
|
||||
+++ b/attic/src/api/v1/upload_path.rs
|
||||
@@ -25,7 +25,7 @@ pub const ATTIC_NAR_INFO_PREAMBLE_SIZE: &str = "X-Attic-Nar-Info-Preamble-Size";
|
||||
/// Regardless of client compression, the server will always decompress
|
||||
/// the NAR to validate the NAR hash before applying the server-configured
|
||||
/// compression again.
|
||||
-#[derive(Debug, Serialize, Deserialize)]
|
||||
+#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct UploadPathNarInfo {
|
||||
/// The name of the binary cache to upload to.
|
||||
pub cache: CacheName,
|
||||
diff --git a/client/src/push.rs b/client/src/push.rs
|
||||
index 309bd4b6..f3951d2b 100644
|
||||
--- a/client/src/push.rs
|
||||
+++ b/client/src/push.rs
|
||||
@@ -560,57 +560,83 @@ pub async fn upload_path(
|
||||
);
|
||||
let bar = mp.add(ProgressBar::new(path_info.nar_size));
|
||||
bar.set_style(style);
|
||||
- let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone())
|
||||
- .map_ok(Bytes::from);
|
||||
|
||||
- let start = Instant::now();
|
||||
- match api
|
||||
- .upload_path(upload_info, nar_stream, force_preamble)
|
||||
- .await
|
||||
- {
|
||||
- Ok(r) => {
|
||||
- let r = r.unwrap_or(UploadPathResult {
|
||||
- kind: UploadPathResultKind::Uploaded,
|
||||
- file_size: None,
|
||||
- frac_deduplicated: None,
|
||||
- });
|
||||
-
|
||||
- let info_string: String = match r.kind {
|
||||
- UploadPathResultKind::Deduplicated => "deduplicated".to_string(),
|
||||
- _ => {
|
||||
- let elapsed = start.elapsed();
|
||||
- let seconds = elapsed.as_secs_f64();
|
||||
- let speed = (path_info.nar_size as f64 / seconds) as u64;
|
||||
+ // Create a new stream for each retry attempt
|
||||
+ let bar_ref = &bar;
|
||||
+ let nar_stream = move || {
|
||||
+ NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar_ref.clone())
|
||||
+ .map_ok(Bytes::from)
|
||||
+ };
|
||||
|
||||
- let mut s = format!("{}/s", HumanBytes(speed));
|
||||
+ let start = Instant::now();
|
||||
+ let mut retries = 0;
|
||||
+ const MAX_RETRIES: u32 = 3;
|
||||
+ const RETRY_DELAY: Duration = Duration::from_millis(250);
|
||||
|
||||
- if let Some(frac_deduplicated) = r.frac_deduplicated {
|
||||
- if frac_deduplicated > 0.01f64 {
|
||||
- s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0);
|
||||
+ loop {
|
||||
+ let result = api
|
||||
+ .upload_path(upload_info.clone(), nar_stream(), force_preamble)
|
||||
+ .await;
|
||||
+ match result {
|
||||
+ Ok(r) => {
|
||||
+ let r = r.unwrap_or(UploadPathResult {
|
||||
+ kind: UploadPathResultKind::Uploaded,
|
||||
+ file_size: None,
|
||||
+ frac_deduplicated: None,
|
||||
+ });
|
||||
+
|
||||
+ let info_string: String = match r.kind {
|
||||
+ UploadPathResultKind::Deduplicated => "deduplicated".to_string(),
|
||||
+ _ => {
|
||||
+ let elapsed = start.elapsed();
|
||||
+ let seconds = elapsed.as_secs_f64();
|
||||
+ let speed = (path_info.nar_size as f64 / seconds) as u64;
|
||||
+
|
||||
+ let mut s = format!("{}/s", HumanBytes(speed));
|
||||
+
|
||||
+ if let Some(frac_deduplicated) = r.frac_deduplicated {
|
||||
+ if frac_deduplicated > 0.01f64 {
|
||||
+ s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0);
|
||||
+ }
|
||||
}
|
||||
+
|
||||
+ s
|
||||
}
|
||||
+ };
|
||||
|
||||
- s
|
||||
+ mp.suspend(|| {
|
||||
+ eprintln!(
|
||||
+ "✅ {} ({})",
|
||||
+ path.as_os_str().to_string_lossy(),
|
||||
+ info_string
|
||||
+ );
|
||||
+ });
|
||||
+ bar.finish_and_clear();
|
||||
+
|
||||
+ return Ok(());
|
||||
+ }
|
||||
+ Err(e) => {
|
||||
+ if retries < MAX_RETRIES {
|
||||
+ retries += 1;
|
||||
+ mp.suspend(|| {
|
||||
+ eprintln!(
|
||||
+ "❕ {}: Upload failed, retrying ({}/{})...",
|
||||
+ path.as_os_str().to_string_lossy(),
|
||||
+ retries,
|
||||
+ MAX_RETRIES
|
||||
+ );
|
||||
+ });
|
||||
+ tokio::time::sleep(RETRY_DELAY).await;
|
||||
+ continue;
|
||||
}
|
||||
- };
|
||||
|
||||
- mp.suspend(|| {
|
||||
- eprintln!(
|
||||
- "✅ {} ({})",
|
||||
- path.as_os_str().to_string_lossy(),
|
||||
- info_string
|
||||
- );
|
||||
- });
|
||||
- bar.finish_and_clear();
|
||||
+ mp.suspend(|| {
|
||||
+ eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e);
|
||||
+ });
|
||||
+ bar.finish_and_clear();
|
||||
|
||||
- Ok(())
|
||||
- }
|
||||
- Err(e) => {
|
||||
- mp.suspend(|| {
|
||||
- eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e);
|
||||
- });
|
||||
- bar.finish_and_clear();
|
||||
- Err(e)
|
||||
+ return Err(e);
|
||||
+ }
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user