Skip to content

Commit d9f7a46

Browse files
authored
Merge pull request #1711 from tursodatabase/libsql-wal-db-restore
libsql-wal: Sync dbs from remote storage on startup
2 parents 8208b4f + fb082ac commit d9f7a46

38 files changed

Lines changed: 647 additions & 311 deletions

File tree

bottomless/src/replicator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,7 @@ impl Replicator {
16521652
let frame = RpcFrame {
16531653
data: frame_to_inject.bytes(),
16541654
timestamp: None,
1655+
durable_frame_no: None,
16551656
};
16561657
injector.inject_frame(frame).await?;
16571658
applied_wal_frame = true;

libsql-replication/proto/replication_log.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ message Frame {
3636
// if this frames is a commit frame, then this can be set
3737
// to the time when the transaction was commited
3838
optional int64 timestamp = 2;
39+
optional uint64 durable_frame_no = 3;
3940
}
4041

4142
message Frames {

libsql-replication/src/generated/wal_log.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ pub struct Frame {
8383
/// to the time when the transaction was commited
8484
#[prost(int64, optional, tag = "2")]
8585
pub timestamp: ::core::option::Option<i64>,
86+
#[prost(uint64, optional, tag = "3")]
87+
pub durable_frame_no: ::core::option::Option<u64>,
8688
}
8789
#[allow(clippy::derive_partial_eq_without_eq)]
8890
#[derive(Clone, PartialEq, ::prost::Message)]

libsql-replication/src/injector/libsql_injector.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,8 @@ impl super::Injector for LibsqlInjector {
4949
.map_err(|e| Error::FatalInjectError(e.into()))?;
5050
Ok(None)
5151
}
52+
53+
fn durable_frame_no(&mut self, frame_no: u64) {
54+
self.injector.set_durable(frame_no);
55+
}
5256
}

libsql-replication/src/injector/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ pub trait Injector {
2929
/// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame
3030
/// are then injected into the wal.
3131
fn flush(&mut self) -> impl Future<Output = Result<Option<FrameNo>>> + Send;
32+
33+
fn durable_frame_no(&mut self, frame_no: u64);
3234
}

libsql-replication/src/injector/sqlite_injector/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ impl Injector for SqliteInjector {
4646
let inner = self.inner.clone();
4747
spawn_blocking(move || inner.lock().flush()).await.unwrap()
4848
}
49+
50+
#[inline]
51+
fn durable_frame_no(&mut self, _frame_no: u64) {}
4952
}
5053

5154
impl SqliteInjector {

libsql-replication/src/replicator.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ where
328328
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<(), Error> {
329329
self.frames_synced += 1;
330330

331+
if let Some(frame_no) = frame.durable_frame_no {
332+
self.injector.durable_frame_no(frame_no);
333+
}
334+
331335
match self.injector.inject_frame(frame).await? {
332336
Some(commit_fno) => {
333337
self.client.commit_frame_no(commit_fno).await?;
@@ -772,6 +776,7 @@ mod test {
772776
.map(|f| RpcFrame {
773777
data: f.bytes(),
774778
timestamp: None,
779+
durable_frame_no: None,
775780
})
776781
.take(2)
777782
.map(Ok)
@@ -785,6 +790,7 @@ mod test {
785790
.map(|f| RpcFrame {
786791
data: f.bytes(),
787792
timestamp: None,
793+
durable_frame_no: None,
788794
})
789795
.map(Ok)
790796
.collect::<Vec<_>>();

libsql-server/src/bottomless_migrate.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,7 @@ async fn migrate_one(
162162
.await
163163
.unwrap()?;
164164

165-
let mut tx = shared.begin_read(0).into();
166-
shared.upgrade(&mut tx).unwrap();
167-
let guard = tx
168-
.into_write()
169-
.unwrap_or_else(|_| panic!("should be a write txn"))
170-
.into_lock_owned();
171-
let mut injector = Injector::new(shared.clone(), guard, 10)?;
165+
let mut injector = Injector::new(shared.clone(), 10)?;
172166
let orig_db_path = base_path
173167
.join("dbs")
174168
.join(config.namespace().as_str())

libsql-server/src/lib.rs

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpCo
177177
pub connector: Option<D>,
178178
pub migrate_bottomless: bool,
179179
pub enable_deadlock_monitor: bool,
180+
pub should_sync_from_storage: bool,
181+
pub force_load_wals: bool,
182+
pub sync_conccurency: usize,
180183
}
181184

182185
impl<C, A, D> Default for Server<C, A, D> {
@@ -203,6 +206,9 @@ impl<C, A, D> Default for Server<C, A, D> {
203206
connector: None,
204207
migrate_bottomless: false,
205208
enable_deadlock_monitor: false,
209+
should_sync_from_storage: false,
210+
force_load_wals: false,
211+
sync_conccurency: 8,
206212
}
207213
}
208214
}
@@ -781,10 +787,10 @@ where
781787
tokio::select! {
782788
_ = shutdown.notified() => {
783789
let shutdown = async {
790+
namespace_store.shutdown().await?;
784791
task_manager.shutdown().await?;
785792
// join_set.shutdown().await;
786793
service_shutdown.notify_waiters();
787-
namespace_store.shutdown().await?;
788794

789795
Ok::<_, crate::Error>(())
790796
};
@@ -958,19 +964,30 @@ where
958964
Ok(())
959965
});
960966

961-
// If we have performed the migration, load all shared wals to force flush to storage with
962-
// the new registry
963-
if did_migrate {
967+
// If we performed a migration from bottomless to libsql-wal earlier, then we need to
968+
// forecefully load all the wals, to trigger segment storage with the actual storage. This
969+
// is because migration didn't actually send anything to storage, but just created the
970+
// segments.
971+
if did_migrate || self.should_sync_from_storage || self.force_load_wals {
972+
// eagerly load all namespaces, then call sync_all on the registry
973+
// TODO: do conccurently
964974
let dbs_path = base_config.base_path.join("dbs");
965975
let stream = meta_store.namespaces();
966976
tokio::pin!(stream);
967977
while let Some(conf) = stream.next().await {
968978
let registry = registry.clone();
969979
let namespace = conf.namespace().clone();
970-
let path = dbs_path.join(namespace.as_str()).join("data");
971-
tokio::task::spawn_blocking(move || registry.open(&path, &namespace.into()))
972-
.await
973-
.unwrap()?;
980+
let path = dbs_path.join(namespace.as_str());
981+
tokio::fs::create_dir_all(&path).await?;
982+
tokio::task::spawn_blocking(move || {
983+
registry.open(&path.join("data"), &namespace.into())
984+
})
985+
.await
986+
.unwrap()?;
987+
}
988+
989+
if self.should_sync_from_storage {
990+
registry.sync_all(self.sync_conccurency).await?;
974991
}
975992
}
976993

@@ -1236,31 +1253,31 @@ where
12361253
base_config: &BaseNamespaceConfig,
12371254
primary_config: &PrimaryConfig,
12381255
) -> anyhow::Result<bool> {
1239-
let is_previous_migration_successful = self.check_previous_migration_success()?;
1240-
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
1241-
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
12421256
let is_primary = self.rpc_client_config.is_none();
1243-
let should_attempt_migration = self.migrate_bottomless
1244-
&& is_primary
1245-
&& is_bottomless_enabled
1246-
&& !is_previous_migration_successful
1247-
&& is_libsql_wal;
1248-
1249-
if should_attempt_migration {
1250-
bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?;
1251-
Ok(true)
1252-
} else {
1253-
// the wals directory is present and so is the _dbs. This means that a crash occured
1254-
// before we could remove it. clean it up now. see code in `bottomless_migrate.rs`
1255-
let tmp_dbs_path = base_config.base_path.join("_dbs");
1256-
if tmp_dbs_path.try_exists()? {
1257-
tracing::info!("removed dangling `_dbs` folder");
1258-
tokio::fs::remove_dir_all(&tmp_dbs_path).await?;
1259-
}
1257+
if self.migrate_bottomless && is_primary {
1258+
let is_previous_migration_successful = self.check_previous_migration_success()?;
1259+
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
1260+
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
1261+
let should_attempt_migration =
1262+
is_bottomless_enabled && !is_previous_migration_successful && is_libsql_wal;
1263+
1264+
if should_attempt_migration {
1265+
bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?;
1266+
return Ok(true);
1267+
} else {
1268+
// the wals directory is present and so is the _dbs. This means that a crash occured
1269+
// before we could remove it. clean it up now. see code in `bottomless_migrate.rs`
1270+
let tmp_dbs_path = base_config.base_path.join("_dbs");
1271+
if tmp_dbs_path.try_exists()? {
1272+
tracing::info!("removed dangling `_dbs` folder");
1273+
tokio::fs::remove_dir_all(&tmp_dbs_path).await?;
1274+
}
12601275

1261-
tracing::info!("bottomless already migrated, skipping...");
1262-
Ok(false)
1276+
tracing::info!("bottomless already migrated, skipping...");
1277+
}
12631278
}
1279+
1280+
Ok(false)
12641281
}
12651282

12661283
fn check_previous_migration_success(&self) -> anyhow::Result<bool> {

libsql-server/src/main.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,27 @@ struct Cli {
280280
/// Auth key for the admin API
281281
#[clap(long, env = "LIBSQL_ADMIN_AUTH_KEY", requires = "admin_listen_addr")]
282282
admin_auth_key: Option<String>,
283+
284+
/// Whether to perform a sync of all namespaces with remote on startup
285+
#[clap(
286+
long,
287+
env = "LIBSQL_SYNC_FROM_STORAGE",
288+
requires = "enable_bottomless_replication"
289+
)]
290+
sync_from_storage: bool,
291+
/// Whether to force loading all WAL at startup, with libsql-wal
292+
/// By default, WALs are loaded lazily, as the databases are openned.
293+
/// Whether to force loading all wal at startup
294+
#[clap(long)]
295+
force_load_wals: bool,
296+
/// Sync conccurency
297+
#[clap(
298+
long,
299+
env = "LIBSQL_SYNC_CONCCURENCY",
300+
requires = "sync_from_storage",
301+
default_value = "8"
302+
)]
303+
sync_conccurency: usize,
283304
}
284305

285306
#[derive(clap::Subcommand, Debug)]
@@ -681,6 +702,9 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
681702
connector: Some(https),
682703
migrate_bottomless: config.migrate_bottomless,
683704
enable_deadlock_monitor: config.enable_deadlock_monitor,
705+
should_sync_from_storage: config.sync_from_storage,
706+
force_load_wals: config.force_load_wals,
707+
sync_conccurency: config.sync_conccurency,
684708
})
685709
}
686710

0 commit comments

Comments
 (0)