Skip to content

Commit d1d8bfc

Browse files
committed
fmt
1 parent e6c0bd4 commit d1d8bfc

11 files changed

Lines changed: 74 additions & 76 deletions

File tree

libsql-server/src/bottomless_migrate.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use std::sync::Arc;
44
use libsql_sys::ffi::Sqlite3DbHeader;
55
use libsql_sys::wal::Sqlite3WalManager;
66
use libsql_wal::io::StdIO;
7+
use libsql_wal::registry::WalRegistry;
78
use libsql_wal::replication::injector::Injector;
89
use libsql_wal::segment::{Frame, FrameHeader};
910
use libsql_wal::storage::NoStorage;
10-
use libsql_wal::registry::WalRegistry;
1111
use tempfile::TempDir;
1212
use tokio::io::AsyncReadExt;
1313
use tokio_stream::StreamExt;
14-
use zerocopy::{FromZeroes, FromBytes};
14+
use zerocopy::{FromBytes, FromZeroes};
1515

1616
#[cfg(not(feature = "durable-wal"))]
1717
use libsql_sys::wal::either::Either as EitherWAL;
@@ -35,8 +35,7 @@ pub async fn bottomless_migrate(
3535
meta_store: MetaStore,
3636
base_config: BaseNamespaceConfig,
3737
primary_config: PrimaryConfig,
38-
) -> anyhow::Result<()>
39-
{
38+
) -> anyhow::Result<()> {
4039
let base_dbs_dir = base_config.base_path.join("dbs");
4140
let base_dbs_dir_tmp = base_config.base_path.join("_dbs");
4241
// the previous migration failed. The _dbs is still present, but the wals is not. In this case
@@ -62,15 +61,17 @@ pub async fn bottomless_migrate(
6261
tokio::spawn(async move {
6362
loop {
6463
match rcv.recv().await {
65-
Some(libsql_wal::checkpointer::CheckpointMessage::Shutdown) | None => {
66-
break
67-
}
64+
Some(libsql_wal::checkpointer::CheckpointMessage::Shutdown) | None => break,
6865
Some(_) => (),
6966
}
7067
}
7168
});
7269

73-
let tmp_registry = Arc::new(WalRegistry::new(tmp.path().join("wals"), NoStorage.into(), sender)?);
70+
let tmp_registry = Arc::new(WalRegistry::new(
71+
tmp.path().join("wals"),
72+
NoStorage.into(),
73+
sender,
74+
)?);
7475

7576
let mut configurators = NamespaceConfigurators::default();
7677

@@ -129,8 +130,7 @@ async fn migrate_one(
129130
tmp: &Path,
130131
tmp_registry: Arc<WalRegistry<StdIO, NoStorage>>,
131132
base_path: &Path,
132-
) -> anyhow::Result<()>
133-
{
133+
) -> anyhow::Result<()> {
134134
let broadcasters = BroadcasterRegistry::default();
135135
// TODO: check if we already have a backup for this db from storage
136136
tracing::info!("started namespace migration");
@@ -150,18 +150,17 @@ async fn migrate_one(
150150
)
151151
.await?;
152152

153-
let db_dir = tmp
154-
.join("dbs")
155-
.join(config.namespace().as_str());
153+
let db_dir = tmp.join("dbs").join(config.namespace().as_str());
156154
tokio::fs::create_dir_all(&db_dir).await?;
157155
let db_path = db_dir.join("data");
158156
let registry = tmp_registry.clone();
159157
let namespace = config.namespace().clone();
160158
let shared = tokio::task::spawn_blocking({
161159
let registry = registry.clone();
162-
move || registry.open(&db_path, &namespace.into()) })
163-
.await
164-
.unwrap()?;
160+
move || registry.open(&db_path, &namespace.into())
161+
})
162+
.await
163+
.unwrap()?;
165164

166165
let mut tx = shared.begin_read(0).into();
167166
shared.upgrade(&mut tx).unwrap();
@@ -200,9 +199,9 @@ async fn migrate_one(
200199

201200
drop(injector);
202201

203-
tokio::task::spawn_blocking(move || {
204-
shared.seal_current()
205-
}).await.unwrap()?;
202+
tokio::task::spawn_blocking(move || shared.seal_current())
203+
.await
204+
.unwrap()?;
206205

207206
tracing::info!("sucessfull migration");
208207

libsql-server/src/lib.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ pub mod version;
9393

9494
pub use hrana::proto as hrana_proto;
9595

96+
mod bottomless_migrate;
9697
mod database;
9798
mod error;
9899
mod h2c;
@@ -112,7 +113,6 @@ mod stats;
112113
#[cfg(test)]
113114
mod test;
114115
mod utils;
115-
mod bottomless_migrate;
116116

117117
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
118118
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
@@ -582,7 +582,6 @@ where
582582
)
583583
.await?;
584584

585-
586585
let namespace_store: NamespaceStore = NamespaceStore::new(
587586
db_kind.is_replica(),
588587
self.db_config.snapshot_at_shutdown,
@@ -868,7 +867,8 @@ where
868867
Either::A(storage)
869868
} else {
870869
Either::B(NoStorage)
871-
}.into();
870+
}
871+
.into();
872872

873873
let primary_config = PrimaryConfig {
874874
max_log_size: self.db_config.max_log_size,
@@ -879,18 +879,14 @@ where
879879
};
880880

881881
// perform migration before creating the actual registry creation
882-
let did_migrate = self.maybe_migrate_bottomless(
883-
meta_store.clone(),
884-
&base_config,
885-
&primary_config,
886-
).await?;
887-
882+
let did_migrate = self
883+
.maybe_migrate_bottomless(meta_store.clone(), &base_config, &primary_config)
884+
.await?;
888885

889886
if self.rpc_server_config.is_some() && matches!(*storage, Either::B(_)) {
890887
anyhow::bail!("replication without bottomless not supported yet");
891888
}
892889

893-
894890
let registry = Arc::new(WalRegistry::new(wal_path, storage, sender)?);
895891
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
896892
task_manager.spawn_with_shutdown_notify(|_| async move {
@@ -908,13 +904,12 @@ where
908904
let registry = registry.clone();
909905
let namespace = conf.namespace().clone();
910906
let path = dbs_path.join(namespace.as_str()).join("data");
911-
tokio::task::spawn_blocking(move || {
912-
registry.open(&path, &namespace.into())
913-
}).await.unwrap()?;
907+
tokio::task::spawn_blocking(move || registry.open(&path, &namespace.into()))
908+
.await
909+
.unwrap()?;
914910
}
915911
}
916912

917-
918913
let namespace_resolver = Arc::new(|path: &Path| {
919914
NamespaceName::from_string(
920915
path.parent()
@@ -1164,12 +1159,12 @@ where
11641159
meta_store: MetaStore,
11651160
base_config: &BaseNamespaceConfig,
11661161
primary_config: &PrimaryConfig,
1167-
) -> anyhow::Result<bool> {
1162+
) -> anyhow::Result<bool> {
11681163
let is_previous_migration_successful = self.check_previous_migration_success()?;
11691164
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
11701165
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
11711166
let is_primary = self.rpc_client_config.is_none();
1172-
let should_attempt_migration = self.migrate_bottomless
1167+
let should_attempt_migration = self.migrate_bottomless
11731168
&& is_primary
11741169
&& is_bottomless_enabled
11751170
&& !is_previous_migration_successful
@@ -1195,7 +1190,7 @@ where
11951190
fn check_previous_migration_success(&self) -> anyhow::Result<bool> {
11961191
let wals_path = self.path.join("wals");
11971192
if !wals_path.try_exists()? {
1198-
return Ok(false)
1193+
return Ok(false);
11991194
}
12001195

12011196
let dir = std::fs::read_dir(&wals_path)?;

libsql-server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
670670
use_custom_wal: config.use_custom_wal,
671671
storage_server_address: config.storage_server_address.clone(),
672672
connector: Some(https),
673-
migrate_bottomless: config.migrate_bottomless
673+
migrate_bottomless: config.migrate_bottomless,
674674
})
675675
}
676676

libsql-server/src/namespace/meta_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ impl MetaStore {
611611
}
612612

613613
/// returns an iterator over all manespaces config handles
614-
pub(crate) fn namespaces(&self) -> impl Stream<Item = MetaStoreHandle> + '_ {
614+
pub(crate) fn namespaces(&self) -> impl Stream<Item = MetaStoreHandle> + '_ {
615615
async_stream::stream! {
616616
let lock = self.inner.configs.lock().await;
617617
for (ns, sender) in lock.iter() {

libsql-wal/src/bins/shell/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ where
9999
match &cli.subcommand {
100100
Subcommand::Shell { db_path } => {
101101
let (sender, receiver) = tokio::sync::mpsc::channel(64);
102-
let registry = Arc::new(WalRegistry::new(db_path.clone(), storage.into(), sender).unwrap());
102+
let registry =
103+
Arc::new(WalRegistry::new(db_path.clone(), storage.into(), sender).unwrap());
103104
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 64);
104105
join_set.spawn(checkpointer.run());
105106
run_shell(

libsql-wal/src/io/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ impl Io for StdIO {
6464
path: &Path,
6565
) -> io::Result<Self::File> {
6666
std::fs::OpenOptions::new()
67-
.create_new(create_new)
68-
.create(write)
69-
.read(read)
70-
.write(write)
71-
.open(path)
67+
.create_new(create_new)
68+
.create(write)
69+
.read(read)
70+
.write(write)
71+
.open(path)
7272
}
7373

7474
fn tempfile(&self) -> io::Result<Self::TempFile> {

libsql-wal/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ impl LibsqlFooter {
4141

4242
fn validate(&self) -> error::Result<()> {
4343
if self.magic.get() != LIBSQL_MAGIC {
44-
return Err(error::Error::InvalidFooterMagic)
44+
return Err(error::Error::InvalidFooterMagic);
4545
}
4646

4747
if self.version.get() != LIBSQL_WAL_VERSION {
48-
return Err(error::Error::InvalidFooterVersion)
48+
return Err(error::Error::InvalidFooterVersion);
4949
}
5050

5151
Ok(())

libsql-wal/src/registry.rs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -284,20 +284,20 @@ where
284284
let footer = self.try_read_footer(&db_file)?;
285285

286286
let log_id = match footer {
287-
Some(footer) if tail.is_empty() => {
288-
footer.log_id()
289-
}
290-
None if tail.is_empty() => {
291-
self.io.uuid()
292-
}
287+
Some(footer) if tail.is_empty() => footer.log_id(),
288+
None if tail.is_empty() => self.io.uuid(),
293289
Some(footer) => {
294-
let log_id = tail.with_head(|h| h.header().log_id.get()).expect("non-empty list should have a head");
290+
let log_id = tail
291+
.with_head(|h| h.header().log_id.get())
292+
.expect("non-empty list should have a head");
295293
let log_id = Uuid::from_u128(log_id);
296294
assert_eq!(log_id, footer.log_id());
297295
log_id
298296
}
299297
None => {
300-
let log_id = tail.with_head(|h| h.header().log_id.get()).expect("non-empty list should have a head");
298+
let log_id = tail
299+
.with_head(|h| h.header().log_id.get())
300+
.expect("non-empty list should have a head");
301301
Uuid::from_u128(log_id)
302302
}
303303
};
@@ -307,15 +307,13 @@ where
307307
let header = segment.header();
308308
(header.size_after(), header.next_frame_no())
309309
})
310-
.unwrap_or_else(|| {
311-
match header {
312-
Some(header) => (
313-
header.db_size.get(),
314-
NonZeroU64::new(header.replication_index.get() + 1)
310+
.unwrap_or_else(|| match header {
311+
Some(header) => (
312+
header.db_size.get(),
313+
NonZeroU64::new(header.replication_index.get() + 1)
315314
.unwrap_or(NonZeroU64::new(1).unwrap()),
316-
),
317-
None => (0, NonZeroU64::new(1).unwrap())
318-
}
315+
),
316+
None => (0, NonZeroU64::new(1).unwrap()),
319317
});
320318

321319
let current_path = path.join(format!("{namespace}:{next_frame_no:020}.seg"));
@@ -327,13 +325,9 @@ where
327325
// if there is a tail, then the latest checkpointed frame_no is one before the the
328326
// start frame_no of the tail. We must read it from the tail, because a partial
329327
// checkpoint may have occured before a crash.
330-
Some(last) => {
331-
(last.start_frame_no() - 1).max(1)
332-
}
328+
Some(last) => (last.start_frame_no() - 1).max(1),
333329
// otherwise, we read the it from the footer.
334-
None => {
335-
footer.map(|f| f.replication_index.get()).unwrap_or(0)
336-
}
330+
None => footer.map(|f| f.replication_index.get()).unwrap_or(0),
337331
};
338332

339333
let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create(
@@ -373,20 +367,19 @@ where
373367
return Ok(shared);
374368
}
375369

376-
fn try_read_footer(&self, db_file: &impl FileExt) -> Result<Option<LibsqlFooter>>{
370+
fn try_read_footer(&self, db_file: &impl FileExt) -> Result<Option<LibsqlFooter>> {
377371
let len = db_file.len()?;
378372
if len as usize % LIBSQL_PAGE_SIZE as usize == size_of::<LibsqlFooter>() {
379373
let mut footer: LibsqlFooter = LibsqlFooter::new_zeroed();
380374
let footer_offset = (len / LIBSQL_PAGE_SIZE as u64) * LIBSQL_PAGE_SIZE as u64;
381375
db_file.read_exact_at(footer.as_bytes_mut(), footer_offset)?;
382376
footer.validate()?;
383377
Ok(Some(footer))
384-
} else {
378+
} else {
385379
Ok(None)
386380
}
387381
}
388382

389-
390383
// On shutdown, we checkpoint all the WALs. This require sealing the current segment, and when
391384
// checkpointing all the segments
392385
pub async fn shutdown(self: Arc<Self>) -> Result<()> {

libsql-wal/src/segment/list.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,17 +267,18 @@ where
267267
}
268268

269269
pub(crate) fn last(&self) -> Option<Seg>
270-
where Seg: Clone,
270+
where
271+
Seg: Clone,
271272
{
272273
let mut current = self.list.head.load().clone();
273274
loop {
274275
match current.as_ref() {
275276
Some(c) => {
276277
if c.next.load().is_none() {
277-
return Some(c.item.clone())
278+
return Some(c.item.clone());
278279
}
279280
current = c.next.load().clone();
280-
},
281+
}
281282
None => return None,
282283
}
283284
}

libsql-wal/src/storage/backend/s3.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,11 @@ struct FolderKey<'a> {
314314

315315
impl fmt::Display for FolderKey<'_> {
316316
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317-
write!(f, "v2/clusters/{}/namespaces/{}", self.cluster_id, self.namespace)
317+
write!(
318+
f,
319+
"v2/clusters/{}/namespaces/{}",
320+
self.cluster_id, self.namespace
321+
)
318322
}
319323
}
320324

0 commit comments

Comments
 (0)