|
| 1 | +use std::path::{Path, PathBuf}; |
| 2 | +use std::sync::Arc; |
| 3 | + |
| 4 | +use libsql_sys::ffi::Sqlite3DbHeader; |
| 5 | +use libsql_sys::wal::Sqlite3WalManager; |
| 6 | +use libsql_wal::io::StdIO; |
| 7 | +use libsql_wal::registry::WalRegistry; |
| 8 | +use libsql_wal::replication::injector::Injector; |
| 9 | +use libsql_wal::segment::{Frame, FrameHeader}; |
| 10 | +use libsql_wal::storage::NoStorage; |
| 11 | +use tempfile::TempDir; |
| 12 | +use tokio::io::AsyncReadExt; |
| 13 | +use tokio_stream::StreamExt; |
| 14 | +use zerocopy::{FromBytes, FromZeroes}; |
| 15 | + |
| 16 | +#[cfg(not(feature = "durable-wal"))] |
| 17 | +use libsql_sys::wal::either::Either as EitherWAL; |
| 18 | +#[cfg(feature = "durable-wal")] |
| 19 | +use libsql_sys::wal::either::Either3 as EitherWAL; |
| 20 | + |
| 21 | +use crate::namespace::broadcasters::BroadcasterRegistry; |
| 22 | +use crate::namespace::configurator::{ |
| 23 | + BaseNamespaceConfig, NamespaceConfigurators, PrimaryConfig, PrimaryConfigurator, |
| 24 | +}; |
| 25 | +use crate::namespace::meta_store::{MetaStore, MetaStoreHandle}; |
| 26 | +use crate::namespace::NamespaceStore; |
| 27 | + |
| 28 | +/// The process for migrating from bottomless to libsql wal is simple: |
| 29 | +/// 1) iteratate over all namespaces, and make sure that they are up to date with bottomless by |
| 30 | +/// loading them |
| 31 | +/// 2) with a dummy registry, in a temp directory, with no storage, and no checkpointer, inject all the pages from the |
| 32 | +/// original db into a new temp db |
| 33 | +/// 3) when all namespace have been successfully migrated, make the dbs and wals folders permanent |
| 34 | +pub async fn bottomless_migrate( |
| 35 | + meta_store: MetaStore, |
| 36 | + base_config: BaseNamespaceConfig, |
| 37 | + primary_config: PrimaryConfig, |
| 38 | +) -> anyhow::Result<()> { |
| 39 | + let base_dbs_dir = base_config.base_path.join("dbs"); |
| 40 | + let base_dbs_dir_tmp = base_config.base_path.join("_dbs"); |
| 41 | + // the previous migration failed. The _dbs is still present, but the wals is not. In this case |
| 42 | + // we delete the current dbs if it exists and replace it with _dbs, and attempt migration again |
| 43 | + if base_dbs_dir_tmp.try_exists()? { |
| 44 | + tokio::fs::remove_dir_all(&base_dbs_dir).await?; |
| 45 | + tokio::fs::rename(&base_dbs_dir_tmp, &base_dbs_dir).await?; |
| 46 | + } |
| 47 | + |
| 48 | + tracing::info!("attempting bottomless migration to libsql-wal"); |
| 49 | + |
| 50 | + let tmp = TempDir::new()?; |
| 51 | + |
| 52 | + tokio::fs::create_dir_all(tmp.path().join("dbs")).await?; |
| 53 | + tokio::fs::create_dir_all(tmp.path().join("wals")).await?; |
| 54 | + |
| 55 | + let configs_stream = meta_store.namespaces(); |
| 56 | + tokio::pin!(configs_stream); |
| 57 | + |
| 58 | + let (sender, mut rcv) = tokio::sync::mpsc::channel(1); |
| 59 | + |
| 60 | + // we are not checkpointing anything, be we want to drain the receiver |
| 61 | + tokio::spawn(async move { |
| 62 | + loop { |
| 63 | + match rcv.recv().await { |
| 64 | + Some(libsql_wal::checkpointer::CheckpointMessage::Shutdown) | None => break, |
| 65 | + Some(_) => (), |
| 66 | + } |
| 67 | + } |
| 68 | + }); |
| 69 | + |
| 70 | + let tmp_registry = Arc::new(WalRegistry::new( |
| 71 | + tmp.path().join("wals"), |
| 72 | + NoStorage.into(), |
| 73 | + sender, |
| 74 | + )?); |
| 75 | + |
| 76 | + let mut configurators = NamespaceConfigurators::default(); |
| 77 | + |
| 78 | + let make_wal_manager = Arc::new(|| EitherWAL::A(Sqlite3WalManager::default())); |
| 79 | + let primary_configurator = |
| 80 | + PrimaryConfigurator::new(base_config.clone(), primary_config, make_wal_manager); |
| 81 | + configurators.with_primary(primary_configurator); |
| 82 | + |
| 83 | + let dummy_store = NamespaceStore::new( |
| 84 | + false, |
| 85 | + false, |
| 86 | + 1000, |
| 87 | + meta_store.clone(), |
| 88 | + NamespaceConfigurators::default(), |
| 89 | + crate::database::DatabaseKind::Primary, |
| 90 | + ) |
| 91 | + .await?; |
| 92 | + |
| 93 | + while let Some(config) = configs_stream.next().await { |
| 94 | + migrate_one( |
| 95 | + &configurators, |
| 96 | + config, |
| 97 | + dummy_store.clone(), |
| 98 | + tmp.path(), |
| 99 | + tmp_registry.clone(), |
| 100 | + &base_config.base_path, |
| 101 | + ) |
| 102 | + .await?; |
| 103 | + } |
| 104 | + |
| 105 | + tmp_registry.shutdown().await?; |
| 106 | + |
| 107 | + // unix prevents atomically renaming directories with mv, so we first rename dbs to _dbs, then |
| 108 | + // move the new dbs and wals, then remove old dbs. |
| 109 | + // when we perform a check form migration, whe verify if _dbs exists. If it exists, and wals |
| 110 | + // doesn't exist, then we restore it, otherwise, we delete it. |
| 111 | + tokio::fs::rename(&base_dbs_dir, &base_dbs_dir_tmp).await?; |
| 112 | + tokio::fs::rename(tmp.path().join("dbs"), base_dbs_dir).await?; |
| 113 | + tokio::fs::rename(tmp.path().join("wals"), base_config.base_path.join("wals")).await?; |
| 114 | + tokio::fs::remove_dir_all(base_config.base_path.join("_dbs")).await?; |
| 115 | + |
| 116 | + Ok(()) |
| 117 | +} |
| 118 | + |
| 119 | +/// this may not be the most efficient method to perform a migration, but it has the advantage of |
| 120 | +/// being atomic. when all namespaces are migrated, be rename the dbs and wals folders from the tmp |
| 121 | +/// directory, in that order. If we don't find a wals folder in the db directory, we'll just |
| 122 | +/// atttempt migrating again, because: |
| 123 | +/// - either the migration didn't happen |
| 124 | +/// - a crash happened before we could swap the directories |
| 125 | +#[tracing::instrument(skip_all, fields(namespace = config.namespace().as_str()))] |
| 126 | +async fn migrate_one( |
| 127 | + configurators: &NamespaceConfigurators, |
| 128 | + config: MetaStoreHandle, |
| 129 | + dummy_store: NamespaceStore, |
| 130 | + tmp: &Path, |
| 131 | + tmp_registry: Arc<WalRegistry<StdIO, NoStorage>>, |
| 132 | + base_path: &Path, |
| 133 | +) -> anyhow::Result<()> { |
| 134 | + let broadcasters = BroadcasterRegistry::default(); |
| 135 | + // TODO: check if we already have a backup for this db from storage |
| 136 | + tracing::info!("started namespace migration"); |
| 137 | + // we load the namespace ensuring it's restored to the latest version |
| 138 | + configurators |
| 139 | + .configure_primary()? |
| 140 | + .setup( |
| 141 | + config.clone(), |
| 142 | + crate::namespace::RestoreOption::Latest, |
| 143 | + config.namespace(), |
| 144 | + // don't care about reset |
| 145 | + Box::new(|_| ()), |
| 146 | + // don't care about attach |
| 147 | + Arc::new(|_| Ok(PathBuf::new().into())), |
| 148 | + dummy_store.clone(), |
| 149 | + broadcasters.handle(config.namespace().clone()), |
| 150 | + ) |
| 151 | + .await?; |
| 152 | + |
| 153 | + let db_dir = tmp.join("dbs").join(config.namespace().as_str()); |
| 154 | + tokio::fs::create_dir_all(&db_dir).await?; |
| 155 | + let db_path = db_dir.join("data"); |
| 156 | + let registry = tmp_registry.clone(); |
| 157 | + let namespace = config.namespace().clone(); |
| 158 | + let shared = tokio::task::spawn_blocking({ |
| 159 | + let registry = registry.clone(); |
| 160 | + move || registry.open(&db_path, &namespace.into()) |
| 161 | + }) |
| 162 | + .await |
| 163 | + .unwrap()?; |
| 164 | + |
| 165 | + let mut tx = shared.begin_read(0).into(); |
| 166 | + shared.upgrade(&mut tx).unwrap(); |
| 167 | + let guard = tx |
| 168 | + .into_write() |
| 169 | + .unwrap_or_else(|_| panic!("should be a write txn")) |
| 170 | + .into_lock_owned(); |
| 171 | + let mut injector = Injector::new(shared.clone(), guard, 10)?; |
| 172 | + let orig_db_path = base_path |
| 173 | + .join("dbs") |
| 174 | + .join(config.namespace().as_str()) |
| 175 | + .join("data"); |
| 176 | + let mut orig_db_file = tokio::fs::File::open(orig_db_path).await?; |
| 177 | + let mut db_size = usize::MAX; |
| 178 | + let mut current = 0; |
| 179 | + while current < db_size { |
| 180 | + let mut frame: Box<Frame> = Frame::new_box_zeroed(); |
| 181 | + orig_db_file.read_exact(frame.data_mut()).await?; |
| 182 | + if current == 0 { |
| 183 | + let header: Sqlite3DbHeader = Sqlite3DbHeader::read_from_prefix(frame.data()).unwrap(); |
| 184 | + db_size = header.db_size.get() as usize; |
| 185 | + } |
| 186 | + let size_after = if current == db_size - 1 { |
| 187 | + db_size as u32 |
| 188 | + } else { |
| 189 | + 0 |
| 190 | + }; |
| 191 | + *frame.header_mut() = FrameHeader { |
| 192 | + page_no: (current as u32 + 1).into(), |
| 193 | + size_after: size_after.into(), |
| 194 | + frame_no: (current as u64 + 1).into(), |
| 195 | + }; |
| 196 | + injector.insert_frame(frame).await?; |
| 197 | + current += 1; |
| 198 | + } |
| 199 | + |
| 200 | + drop(injector); |
| 201 | + |
| 202 | + tokio::task::spawn_blocking(move || shared.seal_current()) |
| 203 | + .await |
| 204 | + .unwrap()?; |
| 205 | + |
| 206 | + tracing::info!("sucessfull migration"); |
| 207 | + |
| 208 | + Ok(()) |
| 209 | +} |
0 commit comments