Skip to content

Commit 8a1ba79

Browse files
authored
Merge pull request #1727 from tursodatabase/throttle-segment-creation
Swap segment strategy
2 parents e98c6ce + ecb06b1 commit 8a1ba79

6 files changed

Lines changed: 137 additions & 9 deletions

File tree

libsql-wal/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod io;
66
pub mod registry;
77
pub mod replication;
88
pub mod segment;
9+
mod segment_swap_strategy;
910
pub mod shared_wal;
1011
pub mod storage;
1112
pub mod transaction;

libsql-wal/src/registry.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::num::NonZeroU64;
33
use std::path::{Path, PathBuf};
44
use std::sync::atomic::{AtomicBool, Ordering};
55
use std::sync::Arc;
6-
use std::time::Instant;
6+
use std::time::{Duration, Instant};
77

88
use dashmap::DashMap;
99
use libsql_sys::ffi::Sqlite3DbHeader;
@@ -25,6 +25,9 @@ use crate::replication::storage::{ReplicateFromStorage as _, StorageReplicator};
2525
use crate::segment::list::SegmentList;
2626
use crate::segment::Segment;
2727
use crate::segment::{current::CurrentSegment, sealed::SealedSegment};
28+
use crate::segment_swap_strategy::duration::DurationSwapStrategy;
29+
use crate::segment_swap_strategy::frame_count::FrameCountSwapStrategy;
30+
use crate::segment_swap_strategy::SegmentSwapStrategy;
2831
use crate::shared_wal::{SharedWal, SwapLog};
2932
use crate::storage::{OnStoreCallback, Storage};
3033
use crate::transaction::TxGuard;
@@ -337,6 +340,17 @@ where
337340

338341
let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1);
339342

343+
// FIXME: make swap strategy configurable
344+
// This strategy will perform a swap if either the wal is bigger than 20k frames, or older
345+
// than 10 minutes, or if the frame count is greater than a 1000 and the wal was last
346+
// swapped more than 30 secs ago
347+
let swap_strategy = Box::new(
348+
DurationSwapStrategy::new(Duration::from_secs(5 * 60))
349+
.or(FrameCountSwapStrategy::new(20_000))
350+
.or(FrameCountSwapStrategy::new(1000)
351+
.and(DurationSwapStrategy::new(Duration::from_secs(30)))),
352+
);
353+
340354
let shared = Arc::new(SharedWal {
341355
current,
342356
wal_lock: Default::default(),
@@ -352,8 +366,8 @@ where
352366
)),
353367
shutdown: false.into(),
354368
checkpoint_notifier: self.checkpoint_notifier.clone(),
355-
max_segment_size: 1000.into(),
356369
io: self.io.clone(),
370+
swap_strategy,
357371
});
358372

359373
self.opened
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::time::{Duration, Instant};
2+
3+
use parking_lot::Mutex;
4+
5+
use super::SegmentSwapStrategy;
6+
7+
/// A wal swap strategy that swaps the current wal if it's older that some duration
8+
pub struct DurationSwapStrategy {
9+
swap_after: Duration,
10+
last_swapped_at: Mutex<Instant>,
11+
}
12+
13+
impl DurationSwapStrategy {
14+
pub fn new(swap_after: Duration) -> Self {
15+
Self {
16+
swap_after,
17+
last_swapped_at: Mutex::new(Instant::now()),
18+
}
19+
}
20+
}
21+
22+
impl SegmentSwapStrategy for DurationSwapStrategy {
23+
#[inline(always)]
24+
fn should_swap(&self, _frames_in_wal: usize) -> bool {
25+
let last_swapped_at = self.last_swapped_at.lock();
26+
last_swapped_at.elapsed() >= self.swap_after
27+
}
28+
29+
#[inline(always)]
30+
fn swapped(&self) {
31+
*self.last_swapped_at.lock() = Instant::now();
32+
}
33+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use super::SegmentSwapStrategy;
2+
3+
/// A swap strategy that swaps if the count of frames in the wal exceed some threshold
4+
pub struct FrameCountSwapStrategy {
5+
max_frames_in_wal: usize,
6+
}
7+
8+
impl FrameCountSwapStrategy {
9+
pub fn new(max_frames_in_wal: usize) -> Self {
10+
Self { max_frames_in_wal }
11+
}
12+
}
13+
14+
impl SegmentSwapStrategy for FrameCountSwapStrategy {
15+
#[inline(always)]
16+
fn should_swap(&self, frames_in_wal: usize) -> bool {
17+
frames_in_wal >= self.max_frames_in_wal
18+
}
19+
20+
#[inline(always)]
21+
fn swapped(&self) {}
22+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
pub(crate) mod duration;
2+
pub(crate) mod frame_count;
3+
4+
pub(crate) trait SegmentSwapStrategy: Sync + Send + 'static {
5+
fn should_swap(&self, frames_in_wal: usize) -> bool;
6+
fn swapped(&self);
7+
8+
fn and<O: SegmentSwapStrategy>(self, other: O) -> And<Self, O>
9+
where
10+
Self: Sized,
11+
{
12+
And(self, other)
13+
}
14+
15+
fn or<O: SegmentSwapStrategy>(self, other: O) -> Or<Self, O>
16+
where
17+
Self: Sized,
18+
{
19+
Or(self, other)
20+
}
21+
}
22+
23+
pub struct And<A, B>(A, B);
24+
25+
impl<A, B> SegmentSwapStrategy for And<A, B>
26+
where
27+
A: SegmentSwapStrategy,
28+
B: SegmentSwapStrategy,
29+
{
30+
#[inline(always)]
31+
fn should_swap(&self, frames_in_wal: usize) -> bool {
32+
self.0.should_swap(frames_in_wal) && self.1.should_swap(frames_in_wal)
33+
}
34+
35+
#[inline(always)]
36+
fn swapped(&self) {
37+
self.0.swapped();
38+
self.1.swapped();
39+
}
40+
}
41+
42+
pub struct Or<A, B>(A, B);
43+
44+
impl<A, B> SegmentSwapStrategy for Or<A, B>
45+
where
46+
A: SegmentSwapStrategy,
47+
B: SegmentSwapStrategy,
48+
{
49+
#[inline(always)]
50+
fn should_swap(&self, frames_in_wal: usize) -> bool {
51+
self.0.should_swap(frames_in_wal) || self.1.should_swap(frames_in_wal)
52+
}
53+
54+
#[inline(always)]
55+
fn swapped(&self) {
56+
self.0.swapped();
57+
self.1.swapped();
58+
}
59+
}

libsql-wal/src/shared_wal.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::BTreeMap;
2-
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
2+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
use std::sync::Arc;
44
use std::time::Instant;
55

@@ -16,6 +16,7 @@ use crate::io::file::FileExt;
1616
use crate::io::Io;
1717
use crate::replication::storage::ReplicateFromStorage;
1818
use crate::segment::current::CurrentSegment;
19+
use crate::segment_swap_strategy::SegmentSwapStrategy;
1920
use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
2021
use libsql_sys::name::NamespaceName;
2122

@@ -46,15 +47,14 @@ pub struct SharedWal<IO: Io> {
4647
pub(crate) registry: Arc<dyn SwapLog<IO>>,
4748
#[allow(dead_code)] // used by replication
4849
pub(crate) checkpointed_frame_no: AtomicU64,
49-
/// max frame_no acknoledged by the durable storage
50+
/// max frame_no acknowledged by the durable storage
5051
pub(crate) durable_frame_no: Arc<Mutex<u64>>,
5152
pub(crate) new_frame_notifier: tokio::sync::watch::Sender<u64>,
5253
pub(crate) stored_segments: Box<dyn ReplicateFromStorage>,
5354
pub(crate) shutdown: AtomicBool,
5455
pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
55-
/// maximum size the segment is allowed to grow
56-
pub(crate) max_segment_size: AtomicUsize,
5756
pub(crate) io: Arc<IO>,
57+
pub(crate) swap_strategy: Box<dyn SegmentSwapStrategy>,
5858
}
5959

6060
impl<IO: Io> SharedWal<IO> {
@@ -274,10 +274,9 @@ impl<IO: Io> SharedWal<IO> {
274274
self.new_frame_notifier.send_replace(last_committed);
275275
}
276276

277-
if tx.is_commited()
278-
&& current.count_committed() > self.max_segment_size.load(Ordering::Relaxed)
279-
{
277+
if tx.is_commited() && self.swap_strategy.should_swap(current.count_committed()) {
280278
self.swap_current(&tx)?;
279+
self.swap_strategy.swapped();
281280
}
282281

283282
Ok(())

0 commit comments

Comments
 (0)