1- use anyhow:: Result ;
1+ use anyhow:: { anyhow , Result } ;
22use aws_sdk_s3:: Client ;
33use bytes:: Bytes ;
44use chrono:: NaiveDateTime ;
55use clap:: { Parser , Subcommand } ;
6+ use libsql_sys:: { connection:: NO_AUTOCHECKPOINT , wal:: Sqlite3WalManager } ;
7+ use rusqlite:: params;
68use std:: path:: PathBuf ;
79
810mod replicator_extras;
@@ -25,10 +27,24 @@ struct Cli {
2527 namespace : Option < String > ,
2628 #[ clap( long) ]
2729 encryption_key : Option < Bytes > ,
30+ #[ clap( long) ]
31+ db_name : Option < String > ,
2832}
2933
3034#[ derive( Debug , Subcommand ) ]
3135enum Commands {
36+ #[ clap( about = "Copy bottomless generation locally" ) ]
37+ Copy {
38+ #[ clap( long, short, long_help = "Generation to copy (latest by default)" ) ]
39+ generation : Option < uuid:: Uuid > ,
40+ #[ clap( long, short, long_help = "Target local directory" ) ]
41+ to_dir : String ,
42+ } ,
43+ #[ clap( about = "Create new generation from database" ) ]
44+ Create {
45+ #[ clap( long, short, long_help = "Path to the source database file" ) ]
46+ source_db_path : String ,
47+ } ,
3248 #[ clap( about = "List available generations" ) ]
3349 Ls {
3450 #[ clap( long, short, long_help = "List details about single generation" ) ]
@@ -113,6 +129,34 @@ enum Commands {
113129 } ,
114130}
115131
132+ async fn detect_database ( options : & Cli , namespace : & str ) -> Result < ( String , String ) > {
133+ let database = match options. database . clone ( ) {
134+ Some ( db) => db,
135+ None => {
136+ let client = Client :: from_conf ( {
137+ let mut loader = aws_config:: defaults ( aws_config:: BehaviorVersion :: latest ( ) ) ;
138+ if let Some ( endpoint) = options. endpoint . clone ( ) {
139+ loader = loader. endpoint_url ( endpoint) ;
140+ }
141+ aws_sdk_s3:: config:: Builder :: from ( & loader. load ( ) . await )
142+ . force_path_style ( true )
143+ . build ( )
144+ } ) ;
145+ let bucket = options. bucket . as_deref ( ) . unwrap_or ( "bottomless" ) ;
146+ match detect_db ( & client, bucket, namespace) . await {
147+ Some ( db) => db,
148+ None => {
149+ return Err ( anyhow ! ( "Could not autodetect the database. Please pass it explicitly with -d option" ) ) ;
150+ }
151+ }
152+ }
153+ } ;
154+ let database_dir = database + "/dbs/" + namespace. strip_prefix ( "ns-" ) . unwrap ( ) ;
155+ let database = database_dir. clone ( ) + "/data" ;
156+ tracing:: info!( "Database: '{}' (namespace: {})" , database, namespace) ;
157+ return Ok ( ( database, database_dir) ) ;
158+ }
159+
116160async fn run ( ) -> Result < ( ) > {
117161 tracing_subscriber:: fmt:: init ( ) ;
118162 let mut options = Cli :: parse ( ) ;
@@ -189,56 +233,109 @@ async fn run() -> Result<()> {
189233 std:: str:: from_utf8 ( encryption_key) ?,
190234 ) ;
191235 }
192- let namespace = options. namespace . as_deref ( ) . unwrap_or ( "ns-default" ) ;
193- std:: env:: set_var ( "LIBSQL_BOTTOMLESS_DATABASE_ID" , namespace) ;
194- let database = match options. database . clone ( ) {
195- Some ( db) => db,
196- None => {
197- let client = Client :: from_conf ( {
198- let mut loader = aws_config:: defaults ( aws_config:: BehaviorVersion :: latest ( ) ) ;
199- if let Some ( endpoint) = options. endpoint . clone ( ) {
200- loader = loader. endpoint_url ( endpoint) ;
201- }
202- aws_sdk_s3:: config:: Builder :: from ( & loader. load ( ) . await )
203- . force_path_style ( true )
204- . build ( )
205- } ) ;
206- let bucket = options. bucket . as_deref ( ) . unwrap_or ( "bottomless" ) ;
207- match detect_db ( & client, bucket, namespace) . await {
208- Some ( db) => db,
209- None => {
210- println ! ( "Could not autodetect the database. Please pass it explicitly with -d option" ) ;
211- return Ok ( ( ) ) ;
212- }
213- }
236+ let namespace_init = std:: env:: var ( "LIBSQL_BOTTOMLESS_DATABASE_ID" ) . unwrap_or ( String :: new ( ) ) ;
237+ if options. db_name . is_some ( ) && options. namespace . is_some ( ) {
238+ return Err ( anyhow ! (
239+ "only one of the arguments --db-name or --namespace is expected to be set"
240+ ) ) ;
241+ }
242+ if let Some ( ref db_name) = options. db_name {
243+ if namespace_init != "" {
244+ std:: env:: set_var (
245+ "LIBSQL_BOTTOMLESS_DATABASE_ID" ,
246+ format ! ( "ns-{}:{}" , & namespace_init, db_name) ,
247+ ) ;
248+ } else {
249+ return Err ( anyhow ! (
250+ "db_name can be set only if LIBSQL_BOTTOMLESS_DATABASE_ID env var has namespace ID"
251+ ) ) ;
214252 }
215- } ;
216- let database_dir = database + "/dbs/" + namespace. strip_prefix ( "ns-" ) . unwrap ( ) ;
217- let database = database_dir. clone ( ) + "/data" ;
218- tracing:: info!( "Database: '{}' (namespace: {})" , database, namespace) ;
219-
220- let mut client = Replicator :: new ( database. clone ( ) ) . await ?;
221-
253+ } else {
254+ let namespace = options. namespace . as_deref ( ) . unwrap_or ( "ns-default" ) ;
255+ std:: env:: set_var ( "LIBSQL_BOTTOMLESS_DATABASE_ID" , namespace) ;
256+ }
257+ let namespace = std:: env:: var ( "LIBSQL_BOTTOMLESS_DATABASE_ID" ) . unwrap ( ) ;
258+ if namespace_init != namespace {
259+ tracing:: info!(
260+ "LIBSQL_BOTTOMLESS_DATABASE_ID env var were updated: '{}' -> '{}'" ,
261+ namespace_init,
262+ namespace
263+ ) ;
264+ }
222265 match options. command {
266+ Commands :: Create { ref source_db_path } => {
267+ let mut client =
268+ Replicator :: new ( detect_database ( & options, & namespace) . await ?. 0 ) . await ?;
269+
270+ let db_path = PathBuf :: from ( client. db_path . clone ( ) ) ;
271+ let db_dir = db_path. parent ( ) . unwrap ( ) ;
272+ if db_dir. exists ( ) {
273+ return Err ( anyhow ! ( "directory for fresh generation must be empty" ) ) ;
274+ }
275+ if options. namespace . is_none ( ) {
276+ return Err ( anyhow ! ( "namespace must be specified explicitly" ) ) ;
277+ }
278+ std:: fs:: create_dir_all ( db_dir) ?;
279+ tracing:: info!(
280+ "created temporary directory for fresh generation: {}" ,
281+ db_dir. to_str( ) . unwrap( )
282+ ) ;
283+ let options = bottomless:: replicator:: Options :: from_env ( ) ?;
284+ if options. encryption_config . is_some ( ) {
285+ return Err ( anyhow ! ( "creation from encrypted DB is not supported" ) ) ;
286+ }
287+ let connection = libsql_sys:: Connection :: open (
288+ format ! ( "file:{}?mode=ro" , source_db_path) ,
289+ rusqlite:: OpenFlags :: SQLITE_OPEN_READ_ONLY
290+ | rusqlite:: OpenFlags :: SQLITE_OPEN_URI
291+ | rusqlite:: OpenFlags :: SQLITE_OPEN_NO_MUTEX ,
292+ Sqlite3WalManager :: new ( ) ,
293+ NO_AUTOCHECKPOINT ,
294+ None ,
295+ ) ?;
296+ tracing:: info!(
297+ "read to VACUUM source database file {} from read-only connection to the DB {}" ,
298+ & source_db_path,
299+ & client. db_path
300+ ) ;
301+ let _ = connection. execute ( "VACUUM INTO ?" , params ! [ & client. db_path] ) ?;
302+ let _ = client. new_generation ( ) . await ;
303+ tracing:: info!( "set generation {} for replicator" , client. generation( ) ?) ;
304+ client. snapshot_main_db_file ( true ) . await ?;
305+ client. wait_until_snapshotted ( ) . await ?;
306+ println ! ( "snapshot uploaded for generation: {}" , client. generation( ) ?) ;
307+ return Ok ( ( ) ) ;
308+ }
309+ Commands :: Copy { generation, to_dir } => {
310+ let temp = std:: env:: temp_dir ( ) . join ( "bottomless-copy-temp-dir" ) ;
311+ let mut client = Replicator :: new ( temp. display ( ) . to_string ( ) ) . await ?;
312+ client. copy ( generation, to_dir) . await ?;
313+ }
223314 Commands :: Ls {
224315 generation,
225316 limit,
226317 older_than,
227318 newer_than,
228319 verbose,
229- } => match generation {
230- Some ( gen) => client. list_generation ( gen) . await ?,
231- None => {
232- client
233- . list_generations ( limit, older_than, newer_than, verbose)
234- . await ?
320+ } => {
321+ let temp = std:: env:: temp_dir ( ) . join ( "bottomless-ls-temp-dir" ) ;
322+ let client = Replicator :: new ( temp. display ( ) . to_string ( ) ) . await ?;
323+ match generation {
324+ Some ( gen) => client. list_generation ( gen) . await ?,
325+ None => {
326+ client
327+ . list_generations ( limit, older_than, newer_than, verbose)
328+ . await ?
329+ }
235330 }
236- } ,
331+ }
237332 Commands :: Restore {
238333 generation,
239334 utc_time,
240335 ..
241336 } => {
337+ let ( database, database_dir) = detect_database ( & options, & namespace) . await ?;
338+ let mut client = Replicator :: new ( database. clone ( ) ) . await ?;
242339 tokio:: fs:: create_dir_all ( & database_dir) . await ?;
243340 client. restore ( generation, utc_time) . await ?;
244341 let db_path = PathBuf :: from ( & database) ;
@@ -252,9 +349,15 @@ async fn run() -> Result<()> {
252349 generation,
253350 utc_time,
254351 } => {
255- let temp = std:: env:: temp_dir ( ) . join ( "bottomless-verification-do-not-touch " ) ;
352+ let temp: PathBuf = std:: env:: temp_dir ( ) . join ( "bottomless-verify-temp-dir " ) ;
256353 let mut client = Replicator :: new ( temp. display ( ) . to_string ( ) ) . await ?;
257354 let _ = tokio:: fs:: remove_file ( & temp) . await ;
355+ tracing:: info!(
356+ "ready to restore DB from generation '{}'" ,
357+ & generation
358+ . map( |x| x. to_string( ) )
359+ . unwrap_or( String :: from( "" ) )
360+ ) ;
258361 client. restore ( generation, utc_time) . await ?;
259362 let size = tokio:: fs:: metadata ( & temp) . await ?. len ( ) ;
260363 println ! ( "Snapshot size: {size}" ) ;
@@ -270,15 +373,23 @@ async fn run() -> Result<()> {
270373 generation,
271374 older_than,
272375 verbose,
273- } => match ( generation, older_than) {
274- ( None , Some ( older_than) ) => client. remove_many ( older_than, verbose) . await ?,
275- ( Some ( generation) , None ) => client. remove ( generation, verbose) . await ?,
276- ( Some ( _) , Some ( _) ) => unreachable ! ( ) ,
277- ( None , None ) => println ! (
278- "rm command cannot be run without parameters; see -h or --help for details"
279- ) ,
280- } ,
376+ } => {
377+ let ( database, _) = detect_database ( & options, & namespace) . await ?;
378+ let client = Replicator :: new ( database. clone ( ) ) . await ?;
379+
380+ match ( generation, older_than) {
381+ ( None , Some ( older_than) ) => client. remove_many ( older_than, verbose) . await ?,
382+ ( Some ( generation) , None ) => client. remove ( generation, verbose) . await ?,
383+ ( Some ( _) , Some ( _) ) => unreachable ! ( ) ,
384+ ( None , None ) => println ! (
385+ "rm command cannot be run without parameters; see -h or --help for details"
386+ ) ,
387+ }
388+ }
281389 Commands :: Snapshot { generation } => {
390+ let ( database, database_dir) = detect_database ( & options, & namespace) . await ?;
391+ let mut client = Replicator :: new ( database. clone ( ) ) . await ?;
392+
282393 tokio:: fs:: create_dir_all ( & database_dir) . await ?;
283394 let generation = if let Some ( gen) = generation {
284395 gen
0 commit comments