@@ -29,7 +29,7 @@ use std::sync::Arc;
2929use tokio:: fs:: { File , OpenOptions } ;
3030use tokio:: io:: AsyncWriteExt ;
3131use tokio:: sync:: watch:: { channel, Receiver , Sender } ;
32- use tokio:: sync:: Mutex ;
32+ use tokio:: sync:: { Mutex , Semaphore } ;
3333use tokio:: task:: JoinHandle ;
3434use tokio:: task:: JoinSet ;
3535use tokio:: time:: Duration ;
@@ -70,7 +70,7 @@ pub struct Replicator {
7070 use_compression : CompressionKind ,
7171 encryption_config : Option < EncryptionConfig > ,
7272 max_frames_per_batch : usize ,
73- s3_upload_max_parallelism : usize ,
73+ s3_max_parallelism : usize ,
7474 join_set : JoinSet < ( ) > ,
7575 upload_progress : Arc < Mutex < CompletionProgress > > ,
7676 last_uploaded_frame_no : Receiver < u32 > ,
@@ -117,7 +117,7 @@ pub struct Options {
117117 /// checkpoint never commits.
118118 pub max_batch_interval : Duration ,
119119 /// Maximum number of S3 file upload requests that may happen in parallel.
120- pub s3_upload_max_parallelism : usize ,
120+ pub s3_max_parallelism : usize ,
121121 /// Max number of retries for S3 operations
122122 pub s3_max_retries : u32 ,
123123 /// Skip snapshot upload per checkpoint.
@@ -198,7 +198,7 @@ impl Options {
198198 let region = env_var ( "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION" ) . ok ( ) ;
199199 let max_frames_per_batch =
200200 env_var_or ( "LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES" , 10000 ) . parse :: < usize > ( ) ?;
201- let s3_upload_max_parallelism =
201+ let s3_max_parallelism =
202202 env_var_or ( "LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX" , 32 ) . parse :: < usize > ( ) ?;
203203 let use_compression =
204204 CompressionKind :: parse ( & env_var_or ( "LIBSQL_BOTTOMLESS_COMPRESSION" , "zstd" ) )
@@ -246,7 +246,7 @@ impl Options {
246246 encryption_config,
247247 max_batch_interval,
248248 max_frames_per_batch,
249- s3_upload_max_parallelism ,
249+ s3_max_parallelism ,
250250 aws_endpoint,
251251 access_key_id,
252252 secret_access_key,
@@ -413,7 +413,7 @@ impl Replicator {
413413 let _s3_upload = {
414414 let client = client. clone ( ) ;
415415 let bucket = options. bucket_name . clone ( ) ;
416- let max_parallelism = options. s3_upload_max_parallelism ;
416+ let max_parallelism = options. s3_max_parallelism ;
417417 let upload_progress = upload_progress. clone ( ) ;
418418 let db_name = db_name. clone ( ) ;
419419 let shutdown_watch = Arc :: new ( shutdown_watch) ;
@@ -493,7 +493,7 @@ impl Replicator {
493493 use_compression : options. use_compression ,
494494 encryption_config : options. encryption_config ,
495495 max_frames_per_batch : options. max_frames_per_batch ,
496- s3_upload_max_parallelism : options. s3_upload_max_parallelism ,
496+ s3_max_parallelism : options. s3_max_parallelism ,
497497 skip_snapshot : options. skip_snapshot ,
498498 join_set,
499499 upload_progress,
@@ -1278,17 +1278,13 @@ impl Replicator {
12781278 timestamp : Option < NaiveDateTime > ,
12791279 ) -> Result < ( RestoreAction , bool ) > {
12801280 tracing:: debug!( "restoring from" ) ;
1281- if let Some ( tombstone) = self . get_tombstone ( ) . await ? {
1282- if let Some ( timestamp) = Self :: generation_to_timestamp ( & generation) {
1283- if tombstone. and_utc ( ) . timestamp ( ) as u64 >= timestamp. to_unix ( ) . 0 {
1284- bail ! (
1285- "Couldn't restore from generation {}. Database '{}' has been tombstoned at {}." ,
1286- generation,
1287- self . db_name,
1288- tombstone
1289- ) ;
1290- }
1291- }
1281+ if let Some ( tombstoned_at) = self . is_tombstoned ( generation) . await ? {
1282+ bail ! (
1283+ "Couldn't restore from generation {}. Database '{}' has been tombstoned at {}." ,
1284+ generation,
1285+ self . db_name,
1286+ tombstoned_at
1287+ ) ;
12921288 }
12931289
12941290 let start_ts = Instant :: now ( ) ;
@@ -1684,6 +1680,94 @@ impl Replicator {
16841680 Ok ( ( ) )
16851681 }
16861682
1683+ pub async fn copy ( & mut self , generation : Option < Uuid > , to_dir : String ) -> Result < ( ) > {
1684+ let generation = self
1685+ . choose_generation ( generation, None )
1686+ . await
1687+ . ok_or ( anyhow ! ( "generation not found" ) ) ?;
1688+
1689+ if let Some ( tombstoned_at) = self . is_tombstoned ( generation) . await ? {
1690+ tracing:: warn!( "generation was tombstoned at {tombstoned_at}" ) ;
1691+ }
1692+ std:: fs:: create_dir_all ( PathBuf :: from ( & to_dir) ) ?;
1693+
1694+ let prefix = format ! ( "{}-{}/" , self . db_name, generation) ;
1695+ let mut marker: Option < String > = None ;
1696+ tracing:: info!(
1697+ "ready to copy S3 content from directory {} to local directory {} (parallelism: {})" ,
1698+ & prefix,
1699+ & to_dir,
1700+ self . s3_max_parallelism
1701+ ) ;
1702+ loop {
1703+ let mut list_request = self . list_objects ( ) . prefix ( & prefix) ;
1704+ if let Some ( marker) = marker. take ( ) {
1705+ list_request = list_request. marker ( marker) ;
1706+ }
1707+ let semaphore = Arc :: new ( Semaphore :: new ( self . s3_max_parallelism ) ) ;
1708+ let mut group = JoinSet :: new ( ) ;
1709+ let list_response = list_request. send ( ) . await ?;
1710+ for entry in list_response. contents ( ) {
1711+ let key = String :: from ( entry. key ( ) . unwrap ( ) ) ;
1712+ marker = Some ( key. clone ( ) ) ;
1713+
1714+ let request = self
1715+ . client
1716+ . get_object ( )
1717+ . bucket ( & self . bucket )
1718+ . key ( key. clone ( ) ) ;
1719+ let to_dir = to_dir. clone ( ) ;
1720+ let entry_size = entry. size ( ) . unwrap_or ( 0 ) ;
1721+ let semaphore = semaphore. clone ( ) ;
1722+ group. spawn ( async move {
1723+ let acquired = semaphore. acquire ( ) . await . unwrap ( ) ;
1724+ if let Ok ( response) = request. send ( ) . await {
1725+ tracing:: debug!(
1726+ "start copy of entry {} (size {} bytes)" ,
1727+ & key,
1728+ entry_size,
1729+ ) ;
1730+ let entry_name = key. split ( "/" ) . last ( ) . unwrap ( ) ;
1731+ let mut entry_path = PathBuf :: from ( & to_dir) ;
1732+ entry_path. push ( entry_name) ;
1733+
1734+ let mut entry_file = OpenOptions :: new ( )
1735+ . create ( true )
1736+ . write ( true )
1737+ . read ( true )
1738+ . truncate ( true )
1739+ . open ( entry_path)
1740+ . await
1741+ . unwrap ( ) ;
1742+ let mut body_reader = response. body . into_async_read ( ) ;
1743+ tokio:: io:: copy ( & mut body_reader, & mut entry_file) . await . unwrap ( ) ;
1744+ tracing:: debug!( "finish copy of entry {}" , & key) ;
1745+ }
1746+ drop ( acquired) ;
1747+ } ) ;
1748+ }
1749+ while let Some ( _) = group. join_next ( ) . await { }
1750+ if !marker. is_some ( ) {
1751+ break ;
1752+ }
1753+ }
1754+ Ok ( ( ) )
1755+ }
1756+
1757+ async fn choose_generation (
1758+ & mut self ,
1759+ generation : Option < Uuid > ,
1760+ timestamp : Option < NaiveDateTime > ,
1761+ ) -> Option < Uuid > {
1762+ match generation {
1763+ Some ( gen) => Some ( gen) ,
1764+ None => match self . latest_generation_before ( timestamp. as_ref ( ) ) . await {
1765+ Some ( gen) => Some ( gen) ,
1766+ None => None ,
1767+ } ,
1768+ }
1769+ }
1770+
16871771 /// Restores the database state from newest remote generation
16881772 /// On success, returns the RestoreAction, and whether the database was recovered from backup.
16891773 pub async fn restore (
@@ -1692,15 +1776,12 @@ impl Replicator {
16921776 timestamp : Option < NaiveDateTime > ,
16931777 ) -> Result < ( RestoreAction , bool ) > {
16941778 tracing:: debug!( "restoring with {generation:?} at {timestamp:?}" ) ;
1695- let generation = match generation {
1696- Some ( gen) => gen,
1697- None => match self . latest_generation_before ( timestamp. as_ref ( ) ) . await {
1698- Some ( gen) => gen,
1699- None => {
1700- tracing:: debug!( "No generation found, nothing to restore" ) ;
1701- return Ok ( ( RestoreAction :: SnapshotMainDbFile , false ) ) ;
1702- }
1703- } ,
1779+ let generation = match self . choose_generation ( generation, timestamp) . await {
1780+ Some ( generation) => generation,
1781+ None => {
1782+ tracing:: debug!( "No generation found, nothing to restore" ) ;
1783+ return Ok ( ( RestoreAction :: SnapshotMainDbFile , false ) ) ;
1784+ }
17041785 } ;
17051786
17061787 let ( action, recovered) = self . restore_from ( generation, timestamp) . await ?;
@@ -1750,7 +1831,7 @@ impl Replicator {
17501831 let dir = format ! ( "{}/{}-{}" , self . bucket, self . db_name, generation) ;
17511832 if tokio:: fs:: try_exists ( & dir) . await ? {
17521833 let mut files = tokio:: fs:: read_dir ( & dir) . await ?;
1753- let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( self . s3_upload_max_parallelism ) ) ;
1834+ let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( self . s3_max_parallelism ) ) ;
17541835 while let Some ( file) = files. next_entry ( ) . await ? {
17551836 let fpath = file. path ( ) ;
17561837 if let Some ( key) = Self :: fpath_to_key ( & fpath, & prefix) {
@@ -1779,9 +1860,7 @@ impl Replicator {
17791860 }
17801861 }
17811862 // wait for all started upload tasks to finish
1782- let _ = sem
1783- . acquire_many ( self . s3_upload_max_parallelism as u32 )
1784- . await ?;
1863+ let _ = sem. acquire_many ( self . s3_max_parallelism as u32 ) . await ?;
17851864 if let Err ( e) = tokio:: fs:: remove_dir ( & dir) . await {
17861865 tracing:: warn!( "Couldn't remove backed up directory {}: {}" , dir, e) ;
17871866 }
@@ -1875,6 +1954,17 @@ impl Replicator {
18751954 Ok ( delete_task)
18761955 }
18771956
1957+ pub async fn is_tombstoned ( & self , generation : Uuid ) -> Result < Option < NaiveDateTime > > {
1958+ if let Some ( tombstone) = self . get_tombstone ( ) . await ? {
1959+ if let Some ( timestamp) = Self :: generation_to_timestamp ( & generation) {
1960+ if ( tombstone. and_utc ( ) . timestamp ( ) as u64 ) >= timestamp. to_unix ( ) . 0 {
1961+ return Ok ( Some ( tombstone) ) ;
1962+ } ;
1963+ }
1964+ }
1965+ Ok ( None )
1966+ }
1967+
18781968 /// Checks if current replicator database has been marked as deleted.
18791969 pub async fn get_tombstone ( & self ) -> Result < Option < NaiveDateTime > > {
18801970 let key = format ! ( "{}.tombstone" , self . db_name) ;
0 commit comments