Skip to content

Commit 28adf72

Browse files
committed
add bottomless migration
1 parent d2893bd commit 28adf72

3 files changed

Lines changed: 169 additions & 3 deletions

File tree

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::path::{Path, PathBuf};
2+
use std::sync::Arc;
3+
4+
use libsql_replication::LIBSQL_PAGE_SIZE;
5+
use libsql_sys::wal::Sqlite3WalManager;
6+
use libsql_wal::io::StdIO;
7+
use libsql_wal::replication::injector::Injector;
8+
use libsql_wal::segment::{Frame, FrameHeader};
9+
use libsql_wal::storage::Storage;
10+
use libsql_wal::{registry::WalRegistry, segment::sealed::SealedSegment};
11+
use tempfile::TempDir;
12+
use tokio::io::AsyncReadExt;
13+
use tokio_stream::StreamExt;
14+
use zerocopy::FromZeroes;
15+
16+
#[cfg(not(feature = "durable-wal"))]
17+
use libsql_sys::wal::either::Either as EitherWAL;
18+
#[cfg(feature = "durable-wal")]
19+
use libsql_sys::wal::either::Either3 as EitherWAL;
20+
21+
use crate::namespace::broadcasters::BroadcasterRegistry;
22+
use crate::namespace::configurator::{
23+
BaseNamespaceConfig, NamespaceConfigurators, PrimaryConfig, PrimaryConfigurator,
24+
};
25+
use crate::namespace::meta_store::{MetaStore, MetaStoreHandle};
26+
use crate::namespace::NamespaceStore;
27+
28+
/// The process for migrating from bottomless to libsql wal is simple:
29+
/// 1) iteratate over all namespaces, and make sure that they
30+
pub async fn bottomless_migrate<S>(
31+
meta_store: MetaStore,
32+
storage: Arc<S>,
33+
base_config: BaseNamespaceConfig,
34+
primary_config: PrimaryConfig,
35+
) -> anyhow::Result<()>
36+
where
37+
S: Storage<Segment = SealedSegment<std::fs::File>>,
38+
{
39+
tracing::info!("attempting bottomless migration to libsql-wal");
40+
41+
let tmp = TempDir::new()?;
42+
43+
tokio::fs::create_dir_all(tmp.path().join("dbs")).await?;
44+
tokio::fs::create_dir_all(tmp.path().join("wals")).await?;
45+
46+
let configs_stream = meta_store.namespaces();
47+
tokio::pin!(configs_stream);
48+
49+
let (sender, _) = tokio::sync::mpsc::channel(1);
50+
let tmp_registry = Arc::new(WalRegistry::new(tmp.path().join("wals"), storage, sender)?);
51+
52+
let mut configurators = NamespaceConfigurators::default();
53+
54+
let make_wal_manager = Arc::new(|| EitherWAL::A(Sqlite3WalManager::default()));
55+
let primary_configurator =
56+
PrimaryConfigurator::new(base_config.clone(), primary_config, make_wal_manager);
57+
configurators.with_primary(primary_configurator);
58+
59+
let dummy_store = NamespaceStore::new(
60+
false,
61+
false,
62+
1000,
63+
meta_store.clone(),
64+
NamespaceConfigurators::default(),
65+
crate::database::DatabaseKind::Primary,
66+
)
67+
.await?;
68+
69+
while let Some(config) = configs_stream.next().await {
70+
migrate_one(
71+
&configurators,
72+
config,
73+
dummy_store.clone(),
74+
tmp.path(),
75+
tmp_registry.clone(),
76+
&base_config.base_path,
77+
)
78+
.await?;
79+
}
80+
81+
Ok(())
82+
}
83+
84+
/// this may not be the most efficient method to perform a migration, but it has the advantage of
85+
/// being atomic. when all namespaces are migrated, be rename the dbs and wals folders from the tmp
86+
/// directory, in that order. If we don't find a wals folder in the db directory, we'll just
87+
/// atttempt migrating again, because:
88+
/// - either the migration didn't happen
89+
/// - a crash happened before we could swap the directories
90+
#[tracing::instrument(skip_all, fields(namespace = config.namespace().as_str()))]
91+
async fn migrate_one<S>(
92+
configurators: &NamespaceConfigurators,
93+
config: MetaStoreHandle,
94+
dummy_store: NamespaceStore,
95+
tmp: &Path,
96+
tmp_registry: Arc<WalRegistry<StdIO, S>>,
97+
base_path: &Path,
98+
) -> anyhow::Result<()>
99+
where
100+
S: Storage<Segment = SealedSegment<std::fs::File>>,
101+
{
102+
let broadcasters = BroadcasterRegistry::default();
103+
// TODO: check if we already have a backup for this db from storage
104+
tracing::info!("started db migrating");
105+
// we load the namespace ensuring it's restored to the latest version
106+
configurators
107+
.configure_primary()?
108+
.setup(
109+
config.clone(),
110+
crate::namespace::RestoreOption::Latest,
111+
config.namespace(),
112+
// don't care about reset
113+
Box::new(|_| ()),
114+
// don't care about attach
115+
Arc::new(|_| Ok(PathBuf::new().into())),
116+
dummy_store.clone(),
117+
broadcasters.handle(config.namespace().clone()),
118+
)
119+
.await?;
120+
121+
let db_path = tmp
122+
.join("dbs")
123+
.join(config.namespace().as_str())
124+
.join("data");
125+
let registry = tmp_registry.clone();
126+
let namespace = config.namespace().clone();
127+
let shared = tokio::task::spawn_blocking(move || registry.open(&db_path, &namespace.into()))
128+
.await
129+
.unwrap()
130+
.unwrap();
131+
132+
let mut tx = shared.begin_read(0).into();
133+
shared.upgrade(&mut tx).unwrap();
134+
let guard = tx
135+
.into_write()
136+
.unwrap_or_else(|_| panic!("should be a write txn"))
137+
.into_lock_owned();
138+
let mut injector = Injector::new(shared.clone(), guard, 10)?;
139+
let orig_db_path = base_path
140+
.join("dbs")
141+
.join(config.namespace().as_str())
142+
.join("data");
143+
let mut orig_db_file = tokio::fs::File::open(orig_db_path).await?;
144+
let orig_db_file_len = orig_db_file.metadata().await?.len();
145+
for i in 0..(orig_db_file_len / LIBSQL_PAGE_SIZE as u64) {
146+
let mut frame: Box<Frame> = Frame::new_box_zeroed();
147+
*frame.header_mut() = FrameHeader {
148+
page_no: (i as u32 + 1).into(),
149+
size_after: 0.into(),
150+
frame_no: (i + 1).into(),
151+
};
152+
orig_db_file.read_exact(frame.data_mut()).await?;
153+
injector.insert_frame(frame).await?;
154+
}
155+
156+
tracing::info!("sucessfull migration");
157+
158+
Ok(())
159+
}

libsql-server/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ use tonic::transport::Channel;
6969
use url::Url;
7070
use utils::services::idle_shutdown::IdleShutdownKicker;
7171

72+
use self::bottomless_migrate::bottomless_migrate;
7273
use self::config::MetaStoreConfig;
7374
use self::connection::connection_manager::InnerWalManager;
7475
use self::namespace::configurator::{
@@ -110,6 +111,7 @@ mod stats;
110111
#[cfg(test)]
111112
mod test;
112113
mod utils;
114+
mod bottomless_migrate;
113115

114116
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
115117
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
@@ -1157,6 +1159,7 @@ where
11571159
&& is_libsql_wal;
11581160

11591161
if should_attempt_migration {
1162+
bottomless_migrate(meta_store, storage, base_config.clone(), primary_config.clone()).await?;
11601163
}
11611164

11621165
Ok(())

libsql-wal/src/segment/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,9 @@ impl<T: Segment> Segment for Arc<T> {
221221
#[repr(C)]
222222
#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
223223
pub struct FrameHeader {
224-
page_no: U32,
225-
size_after: U32,
226-
frame_no: U64,
224+
pub page_no: U32,
225+
pub size_after: U32,
226+
pub frame_no: U64,
227227
}
228228

229229
impl FrameHeader {
@@ -303,6 +303,10 @@ impl Frame {
303303
let size_after = self.header().size_after.get();
304304
(size_after != 0).then_some(size_after)
305305
}
306+
307+
pub fn data_mut(&mut self) -> &mut [u8] {
308+
&mut self.data
309+
}
306310
}
307311

308312
/// offset of the CheckedFrame in a current of sealed segment

0 commit comments

Comments
 (0)