Skip to content

Commit cdf8dc1

Browse files
committed
move compactor logic to sqld wal-toolkit command
1 parent 206a5fa commit cdf8dc1

3 files changed

Lines changed: 229 additions & 0 deletions

File tree

libsql-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ mod stats;
115115
#[cfg(test)]
116116
mod test;
117117
mod utils;
118+
pub mod wal_toolkit;
118119

119120
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
120121
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;

libsql-server/src/main.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use bytesize::ByteSize;
88
use clap::Parser;
99
use hyper::client::HttpConnector;
1010
use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_keys, user_auth_strategies, Auth};
11+
use libsql_server::wal_toolkit::{S3Args, WalToolkit};
1112
use tokio::sync::Notify;
1213
use tokio::time::Duration;
1314
use tracing_subscriber::prelude::*;
@@ -316,6 +317,14 @@ enum UtilsSubcommands {
316317
#[clap(long)]
317318
auth: Option<String>,
318319
},
320+
WalToolkit {
321+
#[arg(long, short, default_value = ".compactor")]
322+
path: PathBuf,
323+
#[clap(flatten)]
324+
s3_args: S3Args,
325+
#[clap(subcommand)]
326+
command: WalToolkit,
327+
},
319328
}
320329

321330
impl Cli {
@@ -731,6 +740,13 @@ async fn main() -> Result<()> {
731740
client.run_namespace(ns).await?;
732741
}
733742
}
743+
UtilsSubcommands::WalToolkit {
744+
command,
745+
path,
746+
s3_args,
747+
} => {
748+
command.run(path, s3_args).await?;
749+
}
734750
}
735751

736752
return Ok(());

libsql-server/src/wal_toolkit.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
use std::path::{Path, PathBuf};
2+
3+
use anyhow::Context as _;
4+
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
5+
use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider};
6+
use libsql_wal::io::StdIO;
7+
use libsql_wal::storage::backend::s3::S3Backend;
8+
use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy;
9+
use libsql_wal::storage::compaction::strategy::log_strategy::LogReductionStrategy;
10+
use libsql_wal::storage::compaction::strategy::PartitionStrategy;
11+
use libsql_wal::storage::compaction::Compactor;
12+
13+
#[derive(Clone, Debug, clap::ValueEnum)]
14+
pub enum CompactStrategy {
15+
Logarithmic,
16+
CompactAll,
17+
}
18+
19+
#[derive(Debug, clap::Subcommand)]
20+
pub enum WalToolkit {
21+
/// Register namespaces to monitor
22+
Monitor { namespace: String },
23+
/// Analyze segments for a namespaces
24+
Analyze {
25+
/// list all segments
26+
#[clap(long)]
27+
list_all: bool,
28+
namespace: String,
29+
},
30+
/// Compact segments into bigger segments
31+
Compact {
32+
/// compaction strategy
33+
#[clap(long, short)]
34+
strategy: CompactStrategy,
35+
/// prints the compaction plan, but doesn't perform it.
36+
#[clap(long)]
37+
dry_run: bool,
38+
namespace: String,
39+
},
40+
/// Sync namespace metadata from remote storage
41+
Sync {
42+
/// When performing a full sync, all the segment space is scanned again. By default, only
43+
/// segments with frame_no greated that the last frame_no are retrieved.
44+
#[clap(long)]
45+
full: bool,
46+
/// unless this is specified, all monitored namespaces are synced
47+
namespace: Option<String>,
48+
},
49+
/// Restore namespace
50+
Restore {
51+
#[clap(long)]
52+
verify: bool,
53+
namespace: String,
54+
out: PathBuf,
55+
},
56+
}
57+
58+
impl WalToolkit {
59+
pub async fn run(&self, compact_path: &Path, s3_args: &S3Args) -> anyhow::Result<()> {
60+
let backend = setup_storage(s3_args).await?;
61+
tokio::fs::create_dir_all(compact_path).await?;
62+
let mut compactor = Compactor::new(backend.into(), compact_path)?;
63+
match self {
64+
Self::Monitor { namespace } => {
65+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
66+
compactor.monitor(&namespace).await?;
67+
println!("monitoring {namespace}");
68+
}
69+
Self::Analyze {
70+
namespace,
71+
list_all,
72+
} => {
73+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
74+
let analysis = compactor.analyze(&namespace)?;
75+
println!("stats for {namespace}:");
76+
println!("- segment count: {}", analysis.segment_count());
77+
println!("- last frame_no: {}", analysis.last_frame_no());
78+
let set = analysis.shortest_restore_path();
79+
println!("- shortest restore path len: {}", set.len());
80+
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
81+
println!(
82+
"- oldest segment: {}-{} ({})",
83+
first.key.start_frame_no, first.key.end_frame_no, first.created_at
84+
);
85+
println!(
86+
"- most recent segment: {}-{} ({})",
87+
last.key.start_frame_no, last.key.end_frame_no, last.created_at
88+
);
89+
}
90+
91+
if *list_all {
92+
println!("segments:");
93+
compactor.list_all(&namespace, |info| {
94+
println!(
95+
"- {}-{} ({})",
96+
info.key.start_frame_no, info.key.end_frame_no, info.created_at
97+
);
98+
})?;
99+
}
100+
}
101+
Self::Compact {
102+
strategy,
103+
dry_run,
104+
namespace,
105+
} => {
106+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
107+
let analysis = compactor.analyze(&namespace)?;
108+
let strat: Box<dyn PartitionStrategy> = match strategy {
109+
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
110+
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
111+
};
112+
let set = analysis.shortest_restore_path();
113+
let partition = strat.partition(&set);
114+
115+
println!("initial shortest restore path len: {}", set.len());
116+
println!("compacting into {} segments", partition.len());
117+
for set in partition.iter() {
118+
println!("- {:?}", set.range().unwrap());
119+
}
120+
if *dry_run {
121+
println!("dry run: stopping");
122+
} else {
123+
println!("performing compaction");
124+
let part_len = partition.len();
125+
for (idx, set) in partition.into_iter().enumerate() {
126+
let Some((start, end)) = set.range() else {
127+
continue;
128+
};
129+
println!("compacting {start}-{end} ({}/{})", idx + 1, part_len);
130+
// TODO: we can compact in conccurently
131+
compactor.compact(set).await?;
132+
}
133+
}
134+
}
135+
Self::Sync { full, namespace } => match namespace {
136+
Some(_ns) => {
137+
todo!()
138+
}
139+
None if *full => {
140+
compactor.sync_full().await?;
141+
println!("all monitored namespace fully up to date.");
142+
}
143+
_ => todo!(),
144+
},
145+
Self::Restore {
146+
namespace,
147+
out,
148+
verify,
149+
} => {
150+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
151+
let analysis = compactor.analyze(&namespace)?;
152+
let set = analysis.shortest_restore_path();
153+
compactor.restore(set, &out).await?;
154+
if *verify {
155+
let conn = libsql_sys::rusqlite::Connection::open(&out)?;
156+
conn.pragma_query(None, "integrity_check", |r| {
157+
println!("{r:?}");
158+
Ok(())
159+
})?;
160+
}
161+
}
162+
}
163+
164+
Ok(())
165+
}
166+
}
167+
168+
async fn setup_storage(opt: &S3Args) -> anyhow::Result<S3Backend<StdIO>> {
169+
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
170+
171+
let mut builder = config.into_builder();
172+
builder.set_endpoint_url(opt.s3_url.clone());
173+
builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into());
174+
builder.set_region(Region::new(
175+
opt.s3_region_id.clone().expect("expected aws region"),
176+
));
177+
let cred = Credentials::new(
178+
opt.s3_access_key_id.as_ref().unwrap(),
179+
opt.s3_access_key.as_ref().unwrap(),
180+
None,
181+
None,
182+
"Static",
183+
);
184+
builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred)));
185+
let config = builder.build();
186+
let backend = S3Backend::from_sdk_config(
187+
config,
188+
opt.s3_bucket.clone().context("missing bucket id")?,
189+
opt.cluster_id.clone().context("missing cluster id")?,
190+
)
191+
.await?;
192+
193+
Ok(backend)
194+
}
195+
196+
#[derive(Debug, clap::Args)]
197+
pub struct S3Args {
198+
#[arg(long, requires = "S3Args")]
199+
enable_s3: bool,
200+
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
201+
cluster_id: Option<String>,
202+
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
203+
s3_url: Option<String>,
204+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
205+
s3_access_key: Option<String>,
206+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
207+
s3_access_key_id: Option<String>,
208+
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
209+
s3_bucket: Option<String>,
210+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
211+
s3_region_id: Option<String>,
212+
}

0 commit comments

Comments
 (0)