Skip to content

Commit bdb9f20

Browse files
authored
Merge pull request #1723 from tursodatabase/storage-compactor
Segment compactor
2 parents 82df117 + 00682fd commit bdb9f20

16 files changed

Lines changed: 1054 additions & 26 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql-wal/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ tracing-subscriber = { version = "0.3.18", optional = true }
4444
aws-credential-types = { version = "1.2.0", optional = true }
4545
dashmap = "6.0.1"
4646
rand = "0.8.5"
47+
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] }
48+
petgraph = "0.6.5"
49+
anyhow = { version = "1.0.86", optional = true }
4750

4851
[dev-dependencies]
4952
criterion = "0.5.1"
@@ -78,9 +81,15 @@ shell-bin = [
7881
"dep:inquire",
7982
"s3",
8083
"dep:tracing-subscriber",
84+
"dep:anyhow",
8185
]
8286

8387
[[bin]]
8488
name = "shell"
8589
path = "src/bins/shell/main.rs"
8690
required-features = ["shell-bin"]
91+
92+
[[bin]]
93+
name = "compactor"
94+
path = "src/bins/compactor/main.rs"
95+
required-features = ["shell-bin"]
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
use std::path::PathBuf;
2+
3+
use anyhow::Context;
4+
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
5+
use aws_credential_types::Credentials;
6+
use aws_sdk_s3::config::SharedCredentialsProvider;
7+
use clap::{Parser, ValueEnum};
8+
use libsql_wal::io::StdIO;
9+
use libsql_wal::storage::backend::s3::S3Backend;
10+
use libsql_wal::storage::compaction::strategy::{
11+
identity::IdentityStrategy, log_strategy::LogReductionStrategy, PartitionStrategy,
12+
};
13+
use libsql_wal::storage::compaction::Compactor;
14+
15+
#[derive(Debug, clap::Args)]
16+
struct S3Args {
17+
#[arg(long, requires = "S3Args")]
18+
enable_s3: bool,
19+
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
20+
cluster_id: Option<String>,
21+
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
22+
s3_url: Option<String>,
23+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
24+
s3_access_key: Option<String>,
25+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
26+
s3_access_key_id: Option<String>,
27+
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
28+
s3_bucket: Option<String>,
29+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
30+
s3_region_id: Option<String>,
31+
}
32+
33+
#[derive(Clone, Debug, ValueEnum)]
34+
enum CompactStrategy {
35+
Logarithmic,
36+
CompactAll,
37+
}
38+
39+
#[derive(Debug, clap::Parser)]
40+
struct Command {
41+
#[arg(long, short, default_value = "compactor")]
42+
path: PathBuf,
43+
#[command(flatten)]
44+
s3_args: S3Args,
45+
#[command(subcommand)]
46+
subcommand: Subcommand,
47+
}
48+
49+
#[derive(Debug, clap::Subcommand)]
50+
enum Subcommand {
51+
/// Register namespaces to monitor
52+
Monitor { namespace: String },
53+
/// Analyze segments for a namespaces
54+
Analyze {
55+
/// list all segments
56+
#[clap(long)]
57+
list_all: bool,
58+
namespace: String,
59+
},
60+
/// Compact segments into bigger segments
61+
Compact {
62+
// compaction strategy
63+
#[clap(long, short)]
64+
strategy: CompactStrategy,
65+
/// prints the compaction plan, but doesn't perform it.
66+
#[clap(long)]
67+
dry_run: bool,
68+
namespace: String,
69+
},
70+
/// Sync namespace metadata from remote storage
71+
Sync {
72+
/// When performing a full sync, all the segment space is scanned again. By default, only
73+
/// segments with frame_no greated that the last frame_no are retrieved.
74+
#[clap(long)]
75+
full: bool,
76+
/// unless this is specified, all monitored namespaces are synced
77+
namespace: Option<String>,
78+
},
79+
/// Restore namespace
80+
Restore {
81+
#[clap(long)]
82+
verify: bool,
83+
namespace: String,
84+
out: PathBuf,
85+
},
86+
}
87+
88+
#[tokio::main]
89+
async fn main() -> anyhow::Result<()> {
90+
let cmd: Command = Command::parse();
91+
92+
let backend = setup_storage(&cmd.s3_args).await?;
93+
tokio::fs::create_dir_all(&cmd.path).await?;
94+
let mut compactor = Compactor::new(backend.into(), &cmd.path)?;
95+
match cmd.subcommand {
96+
Subcommand::Monitor { namespace } => {
97+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
98+
compactor.monitor(&namespace).await?;
99+
println!("monitoring {namespace}");
100+
}
101+
Subcommand::Analyze {
102+
namespace,
103+
list_all,
104+
} => {
105+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
106+
let analysis = compactor.analyze(&namespace)?;
107+
println!("stats for {namespace}:");
108+
println!("- segment count: {}", analysis.segment_count());
109+
println!("- last frame_no: {}", analysis.last_frame_no());
110+
let set = analysis.shortest_restore_path();
111+
println!("- shortest restore path len: {}", set.len());
112+
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
113+
println!(
114+
"- oldest segment: {}-{} ({})",
115+
first.key.start_frame_no, first.key.end_frame_no, first.created_at
116+
);
117+
println!(
118+
"- most recent segment: {}-{} ({})",
119+
last.key.start_frame_no, last.key.end_frame_no, last.created_at
120+
);
121+
}
122+
123+
if list_all {
124+
println!("segments:");
125+
compactor.list_all(&namespace, |info| {
126+
println!(
127+
"- {}-{} ({})",
128+
info.key.start_frame_no, info.key.end_frame_no, info.created_at
129+
);
130+
})?;
131+
}
132+
}
133+
Subcommand::Compact {
134+
strategy,
135+
dry_run,
136+
namespace,
137+
} => {
138+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
139+
let analysis = compactor.analyze(&namespace)?;
140+
let strat: Box<dyn PartitionStrategy> = match strategy {
141+
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
142+
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
143+
};
144+
let set = analysis.shortest_restore_path();
145+
let partition = strat.partition(&set);
146+
147+
println!("initial shortest restore path len: {}", set.len());
148+
println!("compacting into {} segments", partition.len());
149+
for set in partition.iter() {
150+
println!("- {:?}", set.range().unwrap());
151+
}
152+
if dry_run {
153+
println!("dry run: stopping");
154+
} else {
155+
println!("performing compaction");
156+
let part_len = partition.len();
157+
for (idx, set) in partition.into_iter().enumerate() {
158+
let Some((start, end)) = set.range() else {
159+
continue;
160+
};
161+
println!("compacting {start}-{end} ({}/{})", idx + 1, part_len);
162+
// TODO: we can compact in conccurently
163+
compactor.compact(set).await?;
164+
}
165+
}
166+
}
167+
Subcommand::Sync { full, namespace } => match namespace {
168+
Some(_ns) => {
169+
todo!()
170+
}
171+
None if full => {
172+
compactor.sync_full().await?;
173+
println!("all monitored namespace fully up to date.");
174+
}
175+
_ => todo!(),
176+
},
177+
Subcommand::Restore {
178+
namespace,
179+
out,
180+
verify,
181+
} => {
182+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
183+
let analysis = compactor.analyze(&namespace)?;
184+
let set = analysis.shortest_restore_path();
185+
compactor.restore(set, &out).await?;
186+
if verify {
187+
let conn = libsql_sys::rusqlite::Connection::open(&out)?;
188+
conn.pragma_query(None, "integrity_check", |r| {
189+
println!("{r:?}");
190+
Ok(())
191+
})?;
192+
}
193+
}
194+
}
195+
196+
Ok(())
197+
}
198+
199+
async fn setup_storage(opt: &S3Args) -> anyhow::Result<S3Backend<StdIO>> {
200+
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
201+
202+
let mut builder = config.into_builder();
203+
builder.set_endpoint_url(opt.s3_url.clone());
204+
builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into());
205+
builder.set_region(Region::new(
206+
opt.s3_region_id.clone().expect("expected aws region"),
207+
));
208+
let cred = Credentials::new(
209+
opt.s3_access_key_id.as_ref().unwrap(),
210+
opt.s3_access_key.as_ref().unwrap(),
211+
None,
212+
None,
213+
"Static",
214+
);
215+
builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred)));
216+
let config = builder.build();
217+
let backend = S3Backend::from_sdk_config(
218+
config,
219+
opt.s3_bucket.clone().context("missing bucket id")?,
220+
opt.cluster_id.clone().context("missing cluster id")?,
221+
)
222+
.await?;
223+
224+
Ok(backend)
225+
}

libsql-wal/src/bins/shell/main.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,21 @@ struct Cli {
4040
"s3_bucket",
4141
"cluster_id",
4242
])]
43+
4344
struct S3Args {
4445
#[arg(long, requires = "S3Args")]
4546
enable_s3: bool,
46-
#[arg(long)]
47+
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
4748
cluster_id: Option<String>,
48-
#[arg(long)]
49+
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
4950
s3_url: Option<String>,
50-
#[arg(long)]
51+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
5152
s3_access_key: Option<String>,
52-
#[arg(long)]
53+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
5354
s3_access_key_id: Option<String>,
54-
#[arg(long)]
55+
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
5556
s3_bucket: Option<String>,
56-
#[arg(long)]
57+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
5758
s3_region_id: Option<String>,
5859
}
5960

libsql-wal/src/io/buf.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ impl<T> ZeroCopyBoxIoBuf<T> {
132132
}
133133
}
134134

135+
pub fn new_uninit(inner: Box<T>) -> Self {
136+
Self { init: 0, inner }
137+
}
138+
135139
fn is_init(&self) -> bool {
136140
self.init == size_of::<T>()
137141
}

libsql-wal/src/replication/storage.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use roaring::RoaringBitmap;
77
use tokio_stream::Stream;
88
use zerocopy::FromZeroes;
99

10+
use crate::io::buf::ZeroCopyBoxIoBuf;
1011
use crate::segment::Frame;
1112
use crate::storage::Storage;
1213

@@ -61,8 +62,9 @@ where
6162
},
6263
};
6364

64-
let (frame, ret) = segment.read_frame(Frame::new_box_zeroed(), offset as u32).await;
65+
let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await;
6566
ret?;
67+
let frame = frame.into_inner();
6668
debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0");
6769
if frame.header().frame_no() >= until {
6870
yield frame;

libsql-wal/src/segment/compacted.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ use std::mem::size_of;
44
use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64};
55
use zerocopy::{AsBytes, FromBytes, FromZeroes};
66

7-
use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf};
7+
use crate::io::buf::{IoBufMut, ZeroCopyBuf};
88
use crate::io::FileExt;
9+
use crate::segment::FrameHeader;
910
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};
1011

1112
use super::{Frame, Result};
@@ -82,14 +83,33 @@ impl<F: FileExt> CompactedSegment<F> {
8283
Ok(Self { file, header })
8384
}
8485

85-
pub(crate) async fn read_frame(
86+
pub(crate) fn from_parts(file: F, header: CompactedSegmentDataHeader) -> Self {
87+
Self { header, file }
88+
}
89+
90+
pub(crate) async fn read_frame<B: IoBufMut + Send + 'static>(
8691
&self,
87-
frame: Box<Frame>,
92+
buf: B,
8893
offset: u32,
89-
) -> (Box<Frame>, io::Result<()>) {
94+
) -> (B, io::Result<()>) {
95+
assert_eq!(buf.bytes_init(), 0);
96+
assert_eq!(buf.bytes_total(), size_of::<Frame>());
9097
let offset = size_of::<CompactedSegmentDataHeader>() + size_of::<Frame>() * offset as usize;
91-
let buf = ZeroCopyBoxIoBuf::new(frame);
9298
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;
93-
(buf.into_inner(), ret)
99+
(buf, ret)
100+
}
101+
102+
pub(crate) async fn read_page<B: IoBufMut + Send + 'static>(
103+
&self,
104+
buf: B,
105+
offset: u32,
106+
) -> (B, io::Result<()>) {
107+
assert_eq!(buf.bytes_init(), 0);
108+
assert_eq!(buf.bytes_total(), LIBSQL_PAGE_SIZE as usize);
109+
let offset = size_of::<CompactedSegmentDataHeader>()
110+
+ size_of::<Frame>() * offset as usize
111+
+ size_of::<FrameHeader>();
112+
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;
113+
(buf, ret)
94114
}
95115
}

0 commit comments

Comments
 (0)