Skip to content

Commit e98c6ce

Browse files
authored
Merge pull request #1728 from tursodatabase/libsql-server-toolkit
libsql server toolkit
2 parents 9893b67 + cdf8dc1 commit e98c6ce

6 files changed

Lines changed: 230 additions & 532 deletions

File tree

libsql-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ mod stats;
115115
#[cfg(test)]
116116
mod test;
117117
mod utils;
118+
pub mod wal_toolkit;
118119

119120
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
120121
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;

libsql-server/src/main.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use bytesize::ByteSize;
99
use clap::Parser;
1010
use hyper::client::HttpConnector;
1111
use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_keys, user_auth_strategies, Auth};
12+
use libsql_server::wal_toolkit::{S3Args, WalToolkit};
1213
use tokio::sync::Notify;
1314
use tokio::time::Duration;
1415
use tracing_subscriber::util::SubscriberInitExt;
@@ -317,6 +318,14 @@ enum UtilsSubcommands {
317318
#[clap(long)]
318319
auth: Option<String>,
319320
},
321+
WalToolkit {
322+
#[arg(long, short, default_value = ".compactor")]
323+
path: PathBuf,
324+
#[clap(flatten)]
325+
s3_args: S3Args,
326+
#[clap(subcommand)]
327+
command: WalToolkit,
328+
},
320329
}
321330

322331
impl Cli {
@@ -736,6 +745,13 @@ async fn main() -> Result<()> {
736745
client.run_namespace(ns).await?;
737746
}
738747
}
748+
UtilsSubcommands::WalToolkit {
749+
command,
750+
path,
751+
s3_args,
752+
} => {
753+
command.run(path, s3_args).await?;
754+
}
739755
}
740756

741757
return Ok(());

libsql-server/src/wal_toolkit.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
use std::path::{Path, PathBuf};
2+
3+
use anyhow::Context as _;
4+
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
5+
use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider};
6+
use libsql_wal::io::StdIO;
7+
use libsql_wal::storage::backend::s3::S3Backend;
8+
use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy;
9+
use libsql_wal::storage::compaction::strategy::log_strategy::LogReductionStrategy;
10+
use libsql_wal::storage::compaction::strategy::PartitionStrategy;
11+
use libsql_wal::storage::compaction::Compactor;
12+
13+
#[derive(Clone, Debug, clap::ValueEnum)]
14+
pub enum CompactStrategy {
15+
Logarithmic,
16+
CompactAll,
17+
}
18+
19+
#[derive(Debug, clap::Subcommand)]
20+
pub enum WalToolkit {
21+
/// Register namespaces to monitor
22+
Monitor { namespace: String },
23+
/// Analyze segments for a namespaces
24+
Analyze {
25+
/// list all segments
26+
#[clap(long)]
27+
list_all: bool,
28+
namespace: String,
29+
},
30+
/// Compact segments into bigger segments
31+
Compact {
32+
/// compaction strategy
33+
#[clap(long, short)]
34+
strategy: CompactStrategy,
35+
/// prints the compaction plan, but doesn't perform it.
36+
#[clap(long)]
37+
dry_run: bool,
38+
namespace: String,
39+
},
40+
/// Sync namespace metadata from remote storage
41+
Sync {
42+
/// When performing a full sync, all the segment space is scanned again. By default, only
43+
/// segments with frame_no greated that the last frame_no are retrieved.
44+
#[clap(long)]
45+
full: bool,
46+
/// unless this is specified, all monitored namespaces are synced
47+
namespace: Option<String>,
48+
},
49+
/// Restore namespace
50+
Restore {
51+
#[clap(long)]
52+
verify: bool,
53+
namespace: String,
54+
out: PathBuf,
55+
},
56+
}
57+
58+
impl WalToolkit {
59+
pub async fn run(&self, compact_path: &Path, s3_args: &S3Args) -> anyhow::Result<()> {
60+
let backend = setup_storage(s3_args).await?;
61+
tokio::fs::create_dir_all(compact_path).await?;
62+
let mut compactor = Compactor::new(backend.into(), compact_path)?;
63+
match self {
64+
Self::Monitor { namespace } => {
65+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
66+
compactor.monitor(&namespace).await?;
67+
println!("monitoring {namespace}");
68+
}
69+
Self::Analyze {
70+
namespace,
71+
list_all,
72+
} => {
73+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
74+
let analysis = compactor.analyze(&namespace)?;
75+
println!("stats for {namespace}:");
76+
println!("- segment count: {}", analysis.segment_count());
77+
println!("- last frame_no: {}", analysis.last_frame_no());
78+
let set = analysis.shortest_restore_path();
79+
println!("- shortest restore path len: {}", set.len());
80+
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
81+
println!(
82+
"- oldest segment: {}-{} ({})",
83+
first.key.start_frame_no, first.key.end_frame_no, first.created_at
84+
);
85+
println!(
86+
"- most recent segment: {}-{} ({})",
87+
last.key.start_frame_no, last.key.end_frame_no, last.created_at
88+
);
89+
}
90+
91+
if *list_all {
92+
println!("segments:");
93+
compactor.list_all(&namespace, |info| {
94+
println!(
95+
"- {}-{} ({})",
96+
info.key.start_frame_no, info.key.end_frame_no, info.created_at
97+
);
98+
})?;
99+
}
100+
}
101+
Self::Compact {
102+
strategy,
103+
dry_run,
104+
namespace,
105+
} => {
106+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
107+
let analysis = compactor.analyze(&namespace)?;
108+
let strat: Box<dyn PartitionStrategy> = match strategy {
109+
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
110+
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
111+
};
112+
let set = analysis.shortest_restore_path();
113+
let partition = strat.partition(&set);
114+
115+
println!("initial shortest restore path len: {}", set.len());
116+
println!("compacting into {} segments", partition.len());
117+
for set in partition.iter() {
118+
println!("- {:?}", set.range().unwrap());
119+
}
120+
if *dry_run {
121+
println!("dry run: stopping");
122+
} else {
123+
println!("performing compaction");
124+
let part_len = partition.len();
125+
for (idx, set) in partition.into_iter().enumerate() {
126+
let Some((start, end)) = set.range() else {
127+
continue;
128+
};
129+
println!("compacting {start}-{end} ({}/{})", idx + 1, part_len);
130+
// TODO: we can compact in conccurently
131+
compactor.compact(set).await?;
132+
}
133+
}
134+
}
135+
Self::Sync { full, namespace } => match namespace {
136+
Some(_ns) => {
137+
todo!()
138+
}
139+
None if *full => {
140+
compactor.sync_full().await?;
141+
println!("all monitored namespace fully up to date.");
142+
}
143+
_ => todo!(),
144+
},
145+
Self::Restore {
146+
namespace,
147+
out,
148+
verify,
149+
} => {
150+
let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string());
151+
let analysis = compactor.analyze(&namespace)?;
152+
let set = analysis.shortest_restore_path();
153+
compactor.restore(set, &out).await?;
154+
if *verify {
155+
let conn = libsql_sys::rusqlite::Connection::open(&out)?;
156+
conn.pragma_query(None, "integrity_check", |r| {
157+
println!("{r:?}");
158+
Ok(())
159+
})?;
160+
}
161+
}
162+
}
163+
164+
Ok(())
165+
}
166+
}
167+
168+
async fn setup_storage(opt: &S3Args) -> anyhow::Result<S3Backend<StdIO>> {
169+
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
170+
171+
let mut builder = config.into_builder();
172+
builder.set_endpoint_url(opt.s3_url.clone());
173+
builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into());
174+
builder.set_region(Region::new(
175+
opt.s3_region_id.clone().expect("expected aws region"),
176+
));
177+
let cred = Credentials::new(
178+
opt.s3_access_key_id.as_ref().unwrap(),
179+
opt.s3_access_key.as_ref().unwrap(),
180+
None,
181+
None,
182+
"Static",
183+
);
184+
builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred)));
185+
let config = builder.build();
186+
let backend = S3Backend::from_sdk_config(
187+
config,
188+
opt.s3_bucket.clone().context("missing bucket id")?,
189+
opt.cluster_id.clone().context("missing cluster id")?,
190+
)
191+
.await?;
192+
193+
Ok(backend)
194+
}
195+
196+
#[derive(Debug, clap::Args)]
197+
pub struct S3Args {
198+
#[arg(long, requires = "S3Args")]
199+
enable_s3: bool,
200+
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
201+
cluster_id: Option<String>,
202+
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
203+
s3_url: Option<String>,
204+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
205+
s3_access_key: Option<String>,
206+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
207+
s3_access_key_id: Option<String>,
208+
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
209+
s3_bucket: Option<String>,
210+
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
211+
s3_region_id: Option<String>,
212+
}

libsql-wal/Cargo.toml

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,28 +68,11 @@ name = "benchmarks"
6868
harness = false
6969

7070
[features]
71-
default = ["s3", "shell-bin"]
71+
default = ["s3"]
7272
s3 = [
7373
"dep:hyper",
7474
"dep:aws-smithy-runtime",
7575
"dep:aws-sdk-s3",
7676
"dep:aws-config",
7777
"dep:aws-credential-types",
7878
]
79-
shell-bin = [
80-
"dep:clap",
81-
"dep:inquire",
82-
"s3",
83-
"dep:tracing-subscriber",
84-
"dep:anyhow",
85-
]
86-
87-
[[bin]]
88-
name = "shell"
89-
path = "src/bins/shell/main.rs"
90-
required-features = ["shell-bin"]
91-
92-
[[bin]]
93-
name = "compactor"
94-
path = "src/bins/compactor/main.rs"
95-
required-features = ["shell-bin"]

0 commit comments

Comments
 (0)