Skip to content

Commit 8ffd3f7

Browse files
committed
fix migration
1 parent 7c88057 commit 8ffd3f7

2 files changed

Lines changed: 97 additions & 38 deletions

File tree

libsql-server/src/bottomless_migrate.rs

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
use std::path::{Path, PathBuf};
22
use std::sync::Arc;
33

4-
use libsql_replication::LIBSQL_PAGE_SIZE;
4+
use libsql_sys::ffi::Sqlite3DbHeader;
55
use libsql_sys::wal::Sqlite3WalManager;
66
use libsql_wal::io::StdIO;
77
use libsql_wal::replication::injector::Injector;
88
use libsql_wal::segment::{Frame, FrameHeader};
9-
use libsql_wal::storage::Storage;
10-
use libsql_wal::{registry::WalRegistry, segment::sealed::SealedSegment};
9+
use libsql_wal::storage::NoStorage;
10+
use libsql_wal::registry::WalRegistry;
1111
use tempfile::TempDir;
1212
use tokio::io::AsyncReadExt;
1313
use tokio_stream::StreamExt;
14-
use zerocopy::FromZeroes;
14+
use zerocopy::{FromZeroes, FromBytes};
1515

1616
#[cfg(not(feature = "durable-wal"))]
1717
use libsql_sys::wal::either::Either as EitherWAL;
@@ -27,14 +27,11 @@ use crate::namespace::NamespaceStore;
2727

2828
/// The process for migrating from bottomless to libsql wal is simple:
2929
/// 1) iteratate over all namespaces, and make sure that they
30-
pub async fn bottomless_migrate<S>(
30+
pub async fn bottomless_migrate(
3131
meta_store: MetaStore,
32-
storage: Arc<S>,
3332
base_config: BaseNamespaceConfig,
3433
primary_config: PrimaryConfig,
3534
) -> anyhow::Result<()>
36-
where
37-
S: Storage<Segment = SealedSegment<std::fs::File>>,
3835
{
3936
tracing::info!("attempting bottomless migration to libsql-wal");
4037

@@ -46,8 +43,21 @@ where
4643
let configs_stream = meta_store.namespaces();
4744
tokio::pin!(configs_stream);
4845

49-
let (sender, _) = tokio::sync::mpsc::channel(1);
50-
let tmp_registry = Arc::new(WalRegistry::new(tmp.path().join("wals"), storage, sender)?);
46+
let (sender, mut rcv) = tokio::sync::mpsc::channel(1);
47+
48+
tokio::spawn(async move {
49+
loop {
50+
match rcv.recv().await {
51+
Some(libsql_wal::checkpointer::CheckpointMessage::Shutdown) => {
52+
break
53+
}
54+
Some(_) => (),
55+
None => break,
56+
}
57+
}
58+
});
59+
60+
let tmp_registry = Arc::new(WalRegistry::new(tmp.path().join("wals"), NoStorage.into(), sender)?);
5161

5262
let mut configurators = NamespaceConfigurators::default();
5363

@@ -78,6 +88,14 @@ where
7888
.await?;
7989
}
8090

91+
tmp_registry.shutdown().await?;
92+
93+
// FIXME: this is not atomic!!
94+
tokio::fs::remove_dir_all(base_config.base_path.join("dbs")).await?;
95+
tokio::fs::rename(tmp.path().join("dbs"), base_config.base_path.join("dbs")).await?;
96+
tokio::fs::rename(tmp.path().join("wals"), base_config.base_path.join("wals")).await?;
97+
98+
8199
Ok(())
82100
}
83101

@@ -88,20 +106,18 @@ where
88106
/// - either the migration didn't happen
89107
/// - a crash happened before we could swap the directories
90108
#[tracing::instrument(skip_all, fields(namespace = config.namespace().as_str()))]
91-
async fn migrate_one<S>(
109+
async fn migrate_one(
92110
configurators: &NamespaceConfigurators,
93111
config: MetaStoreHandle,
94112
dummy_store: NamespaceStore,
95113
tmp: &Path,
96-
tmp_registry: Arc<WalRegistry<StdIO, S>>,
114+
tmp_registry: Arc<WalRegistry<StdIO, NoStorage>>,
97115
base_path: &Path,
98116
) -> anyhow::Result<()>
99-
where
100-
S: Storage<Segment = SealedSegment<std::fs::File>>,
101117
{
102118
let broadcasters = BroadcasterRegistry::default();
103119
// TODO: check if we already have a backup for this db from storage
104-
tracing::info!("started db migrating");
120+
tracing::info!("started namespace migration");
105121
// we load the namespace ensuring it's restored to the latest version
106122
configurators
107123
.configure_primary()?
@@ -118,13 +134,16 @@ where
118134
)
119135
.await?;
120136

121-
let db_path = tmp
137+
let db_dir = tmp
122138
.join("dbs")
123-
.join(config.namespace().as_str())
124-
.join("data");
139+
.join(config.namespace().as_str());
140+
tokio::fs::create_dir_all(&db_dir).await?;
141+
let db_path = db_dir.join("data");
125142
let registry = tmp_registry.clone();
126143
let namespace = config.namespace().clone();
127-
let shared = tokio::task::spawn_blocking(move || registry.open(&db_path, &namespace.into()))
144+
let shared = tokio::task::spawn_blocking({
145+
let registry = registry.clone();
146+
move || registry.open(&db_path, &namespace.into()) })
128147
.await
129148
.unwrap()
130149
.unwrap();
@@ -141,18 +160,35 @@ where
141160
.join(config.namespace().as_str())
142161
.join("data");
143162
let mut orig_db_file = tokio::fs::File::open(orig_db_path).await?;
144-
let orig_db_file_len = orig_db_file.metadata().await?.len();
145-
for i in 0..(orig_db_file_len / LIBSQL_PAGE_SIZE as u64) {
163+
let mut db_size = usize::MAX;
164+
let mut current = 0;
165+
while current < db_size {
146166
let mut frame: Box<Frame> = Frame::new_box_zeroed();
167+
orig_db_file.read_exact(frame.data_mut()).await?;
168+
if current == 0 {
169+
let header: Sqlite3DbHeader = Sqlite3DbHeader::read_from_prefix(frame.data()).unwrap();
170+
db_size = header.db_size.get() as usize;
171+
}
172+
let size_after = if current == db_size - 1 {
173+
db_size as u32
174+
} else {
175+
0
176+
};
147177
*frame.header_mut() = FrameHeader {
148-
page_no: (i as u32 + 1).into(),
149-
size_after: 0.into(),
150-
frame_no: (i + 1).into(),
178+
page_no: (current as u32 + 1).into(),
179+
size_after: size_after.into(),
180+
frame_no: (current as u64 + 1).into(),
151181
};
152-
orig_db_file.read_exact(frame.data_mut()).await?;
153182
injector.insert_frame(frame).await?;
183+
current += 1;
154184
}
155185

186+
drop(injector);
187+
188+
tokio::task::spawn_blocking(move || {
189+
shared.seal_current()
190+
}).await.unwrap()?;
191+
156192
tracing::info!("sucessfull migration");
157193

158194
Ok(())

libsql-server/src/lib.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use libsql_wal::registry::WalRegistry;
5454
use libsql_wal::segment::sealed::SealedSegment;
5555
use libsql_wal::storage::async_storage::{AsyncStorage, AsyncStorageInitConfig};
5656
use libsql_wal::storage::backend::s3::S3Backend;
57-
use libsql_wal::storage::{NoStorage, Storage};
57+
use libsql_wal::storage::NoStorage;
5858
use namespace::meta_store::MetaStoreHandle;
5959
use namespace::NamespaceName;
6060
use net::Connector;
@@ -65,6 +65,7 @@ use tokio::runtime::Runtime;
6565
use tokio::sync::{mpsc, Notify, Semaphore};
6666
use tokio::task::JoinSet;
6767
use tokio::time::Duration;
68+
use tokio_stream::StreamExt as _;
6869
use tonic::transport::Channel;
6970
use url::Url;
7071
use utils::services::idle_shutdown::IdleShutdownKicker;
@@ -878,24 +879,42 @@ where
878879
};
879880

880881
// perform migration before creating the actual registry creation
881-
self.maybe_migrate_bottomless(
882-
meta_store,
883-
storage.clone(),
882+
let did_migrate = self.maybe_migrate_bottomless(
883+
meta_store.clone(),
884884
&base_config,
885885
&primary_config,
886886
).await?;
887887

888+
888889
if self.rpc_server_config.is_some() && matches!(*storage, Either::B(_)) {
889890
anyhow::bail!("replication without bottomless not supported yet");
890891
}
891892

893+
892894
let registry = Arc::new(WalRegistry::new(wal_path, storage, sender)?);
893895
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
894896
task_manager.spawn_with_shutdown_notify(|_| async move {
895897
checkpointer.run().await;
896898
Ok(())
897899
});
898900

901+
// If we have performed the migration, load all shared wals to force flush to storage with
902+
// the new registry
903+
if did_migrate {
904+
let dbs_path = base_config.base_path.join("dbs");
905+
let stream = meta_store.namespaces();
906+
tokio::pin!(stream);
907+
while let Some(conf) = stream.next().await {
908+
let registry = registry.clone();
909+
let namespace = conf.namespace().clone();
910+
let path = dbs_path.join(namespace.as_str()).join("data");
911+
tokio::task::spawn_blocking(move || {
912+
registry.open(&path, &namespace.into())
913+
}).await.unwrap()?;
914+
}
915+
}
916+
917+
899918
let namespace_resolver = Arc::new(|path: &Path| {
900919
NamespaceName::from_string(
901920
path.parent()
@@ -1139,15 +1158,13 @@ where
11391158
/// - migrate_bottomless flag is raised
11401159
/// - there hasn't been a previous successfull migration (wals directory is either absent,
11411160
/// or emtpy)
1142-
async fn maybe_migrate_bottomless<S>(
1161+
/// returns whether the migration was performed
1162+
async fn maybe_migrate_bottomless(
11431163
&self,
11441164
meta_store: MetaStore,
1145-
storage: Arc<S>,
11461165
base_config: &BaseNamespaceConfig,
11471166
primary_config: &PrimaryConfig,
1148-
) -> anyhow::Result<()>
1149-
where S: Storage<Segment = SealedSegment<std::fs::File>>,
1150-
{
1167+
) -> anyhow::Result<bool> {
11511168
let is_previous_migration_successful = self.check_previous_migration_success()?;
11521169
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
11531170
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
@@ -1159,18 +1176,24 @@ where
11591176
&& is_libsql_wal;
11601177

11611178
if should_attempt_migration {
1162-
bottomless_migrate(meta_store, storage, base_config.clone(), primary_config.clone()).await?;
1179+
bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?;
1180+
Ok(true)
1181+
} else {
1182+
tracing::info!("bottomless already migrated, skipping...");
1183+
Ok(false)
11631184
}
1164-
1165-
Ok(())
11661185
}
11671186

11681187
fn check_previous_migration_success(&self) -> anyhow::Result<bool> {
11691188
let wals_path = self.path.join("wals");
1189+
if !wals_path.try_exists()? {
1190+
return Ok(false)
1191+
}
1192+
11701193
let dir = std::fs::read_dir(&wals_path)?;
11711194

11721195
// wals dir exist and is not empty
1173-
Ok(wals_path.try_exists()? && dir.count() != 0)
1196+
Ok(dir.count() != 0)
11741197
}
11751198
}
11761199

0 commit comments

Comments
 (0)