Skip to content

Commit e38cc1e

Browse files
committed
add admin-shell subcommand
1 parent a76b7db commit e38cc1e

2 files changed

Lines changed: 256 additions & 7 deletions

File tree

libsql-server/src/admin_shell.rs

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,233 @@ use std::fmt::Display;
22
use std::pin::Pin;
33

44
use bytes::Bytes;
5+
use dialoguer::BasicHistory;
6+
use rusqlite::types::ValueRef;
7+
use tokio_stream::{Stream, StreamExt as _};
8+
use tonic::metadata::BinaryMetadataValue;
9+
10+
use crate::connection::Connection as _;
11+
use crate::database::Connection;
12+
use crate::namespace::{NamespaceName, NamespaceStore};
13+
14+
use self::rpc::admin_shell_service_server::{AdminShellService, AdminShellServiceServer};
15+
use self::rpc::response::Resp;
16+
use self::rpc::Null;
17+
18+
mod rpc {
19+
#![allow(clippy::all)]
20+
include!("generated/admin_shell.rs");
21+
}
22+
23+
pub(crate) fn make_svc(namespace_store: NamespaceStore) -> AdminShellServiceServer<AdminShell> {
24+
let admin_shell = AdminShell::new(namespace_store);
25+
rpc::admin_shell_service_server::AdminShellServiceServer::new(admin_shell)
26+
}
27+
28+
pub(super) struct AdminShell {
29+
namespace_store: NamespaceStore,
30+
}
31+
32+
impl AdminShell {
33+
fn new(namespace_store: NamespaceStore) -> Self {
34+
Self { namespace_store }
35+
}
36+
37+
async fn with_namespace(
38+
&self,
39+
ns: Bytes,
40+
queries: impl Stream<Item = Result<rpc::Query, tonic::Status>>,
41+
) -> anyhow::Result<impl Stream<Item = Result<rpc::Response, tonic::Status>>> {
42+
let namespace = NamespaceName::from_bytes(ns).unwrap();
43+
let connection_maker = self
44+
.namespace_store
45+
.with(namespace, |ns| ns.db.connection_maker())
46+
.await?;
47+
let connection = connection_maker.create().await?;
48+
Ok(run_shell(connection, queries))
49+
}
50+
}
51+
52+
fn run_shell(
53+
conn: Connection,
54+
queries: impl Stream<Item = Result<rpc::Query, tonic::Status>>,
55+
) -> impl Stream<Item = Result<rpc::Response, tonic::Status>> {
56+
async_stream::stream! {
57+
tokio::pin!(queries);
58+
while let Some(q) = queries.next().await {
59+
let Ok(q) = q else { break };
60+
let res = tokio::task::block_in_place(|| {
61+
conn.with_raw(move |conn| {
62+
run_one(conn, q.query)
63+
})
64+
});
65+
66+
yield res
67+
}
68+
}
69+
}
70+
71+
fn run_one(conn: &mut rusqlite::Connection, q: String) -> Result<rpc::Response, tonic::Status> {
72+
match try_run_one(conn, q) {
73+
Ok(resp) => Ok(resp),
74+
Err(e) => Ok(rpc::Response {
75+
resp: Some(Resp::Error(rpc::Error {
76+
error: e.to_string(),
77+
})),
78+
}),
79+
}
80+
}
81+
82+
fn try_run_one(conn: &mut rusqlite::Connection, q: String) -> anyhow::Result<rpc::Response> {
83+
let mut stmt = conn.prepare(&q)?;
84+
let col_count = stmt.column_count();
85+
let mut rows = stmt.query(())?;
86+
let mut out_rows = Vec::new();
87+
while let Some(row) = rows.next()? {
88+
let mut out_row = Vec::with_capacity(col_count);
89+
for i in 0..col_count {
90+
let rpc_value = match row.get_ref(i).unwrap() {
91+
ValueRef::Null => rpc::value::Value::Null(Null {}),
92+
ValueRef::Integer(i) => rpc::value::Value::Integer(i),
93+
ValueRef::Real(x) => rpc::value::Value::Real(x),
94+
ValueRef::Text(s) => rpc::value::Value::Text(String::from_utf8(s.to_vec())?),
95+
ValueRef::Blob(b) => rpc::value::Value::Blob(b.to_vec()),
96+
};
97+
out_row.push(rpc::Value {
98+
value: Some(rpc_value),
99+
});
100+
}
101+
out_rows.push(rpc::Row { values: out_row });
102+
}
103+
104+
Ok(rpc::Response {
105+
resp: Some(Resp::Rows(rpc::Rows { rows: out_rows })),
106+
})
107+
}
108+
109+
#[async_trait::async_trait]
110+
impl AdminShellService for AdminShell {
111+
type ShellStream = Pin<Box<dyn Stream<Item = Result<rpc::Response, tonic::Status>> + Send>>;
112+
113+
async fn shell(
114+
&self,
115+
request: tonic::Request<tonic::Streaming<rpc::Query>>,
116+
) -> std::result::Result<tonic::Response<Self::ShellStream>, tonic::Status> {
117+
let Some(namespace) = request.metadata().get_bin("x-namespace-bin") else {
118+
return Err(tonic::Status::new(
119+
tonic::Code::InvalidArgument,
120+
"missing namespace",
121+
));
122+
};
123+
let Ok(ns_bytes) = namespace.to_bytes() else {
124+
return Err(tonic::Status::new(
125+
tonic::Code::InvalidArgument,
126+
"bad namespace encoding",
127+
));
128+
};
129+
130+
match self.with_namespace(ns_bytes, request.into_inner()).await {
131+
Ok(s) => Ok(tonic::Response::new(Box::pin(s))),
132+
Err(e) => Err(tonic::Status::new(
133+
tonic::Code::FailedPrecondition,
134+
e.to_string(),
135+
)),
136+
}
137+
}
138+
}
139+
140+
pub struct AdminShellClient {
141+
remote_url: String,
142+
}
143+
144+
impl AdminShellClient {
145+
pub fn new(remote_url: String) -> Self {
146+
Self { remote_url }
147+
}
148+
149+
pub async fn run_namespace(&self, namespace: &str) -> anyhow::Result<()> {
150+
let namespace = NamespaceName::from_string(namespace.to_string())?;
151+
let mut client = rpc::admin_shell_service_client::AdminShellServiceClient::connect(
152+
self.remote_url.clone(),
153+
)
154+
.await?;
155+
let (sender, receiver) = tokio::sync::mpsc::channel(1);
156+
let req_stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
157+
158+
let mut req = tonic::Request::new(req_stream);
159+
req.metadata_mut().insert_bin(
160+
"x-namespace-bin",
161+
BinaryMetadataValue::from_bytes(namespace.as_slice()),
162+
);
163+
let mut resp_stream = client.shell(req).await?.into_inner();
164+
165+
let mut history = BasicHistory::new();
166+
loop {
167+
// this is blocking, but the shell runs in it's own process with no other tasks, so
168+
// that's ok
169+
let prompt = dialoguer::Input::<String>::new()
170+
.with_prompt("> ")
171+
.history_with(&mut history)
172+
.interact_text();
173+
174+
match prompt {
175+
Ok(query) => {
176+
let q = rpc::Query { query };
177+
sender.send(q).await?;
178+
match resp_stream.next().await {
179+
Some(Ok(rpc::Response {
180+
resp: Some(rpc::response::Resp::Rows(rows)),
181+
})) => {
182+
println!("{}", RowsFormatter(rows));
183+
}
184+
Some(Ok(rpc::Response {
185+
resp: Some(rpc::response::Resp::Error(rpc::Error { error })),
186+
})) => {
187+
println!("query error: {error}");
188+
}
189+
Some(Err(e)) => {
190+
println!("rpc error: {}", e.message());
191+
break;
192+
}
193+
_ => break,
194+
}
195+
}
196+
Err(e) => println!("error: {e}"),
197+
}
198+
}
199+
200+
Ok(())
201+
}
202+
}
203+
204+
struct RowsFormatter(rpc::Rows);
205+
206+
impl Display for RowsFormatter {
207+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208+
for row in self.0.rows.iter() {
209+
let mut is_first = true;
210+
for value in row.values.iter() {
211+
if !is_first {
212+
f.write_str(", ")?;
213+
}
214+
is_first = false;
215+
216+
match value.value.as_ref().unwrap() {
217+
rpc::value::Value::Null(_) => f.write_str("null")?,
218+
rpc::value::Value::Real(x) => write!(f, "{x}")?,
219+
rpc::value::Value::Integer(i) => write!(f, "{i}")?,
220+
rpc::value::Value::Text(s) => f.write_str(&s)?,
221+
rpc::value::Value::Blob(b) => {
222+
for x in b {
223+
write!(f, "{x:0x}")?
224+
}
225+
}
226+
}
227+
}
228+
229+
writeln!(f)?;
230+
}
231+
232+
Ok(())
233+
}
234+
}

libsql-server/src/main.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -301,16 +301,18 @@ struct Cli {
301301
default_value = "8"
302302
)]
303303
sync_conccurency: usize,
304+
305+
#[clap(subcommand)]
306+
subcommand: Option<UtilsSubcommands>,
304307
}
305308

306309
#[derive(clap::Subcommand, Debug)]
307310
enum UtilsSubcommands {
308-
Dump {
309-
#[clap(long)]
310-
/// Path at which to write the dump
311-
path: Option<PathBuf>,
311+
AdminShell {
312+
#[clap(long, default_value = "http://127.0.0.1:9090")]
313+
admin_api_url: String,
312314
#[clap(long)]
313-
namespace: String,
315+
namespace: Option<String>,
314316
},
315317
}
316318

@@ -710,6 +712,25 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
710712

711713
#[tokio::main]
712714
async fn main() -> Result<()> {
715+
let args = Cli::parse();
716+
717+
if let Some(ref subcommand) = args.subcommand {
718+
match subcommand {
719+
UtilsSubcommands::AdminShell {
720+
admin_api_url,
721+
namespace,
722+
} => {
723+
let client =
724+
libsql_server::admin_shell::AdminShellClient::new(admin_api_url.clone());
725+
if let Some(ns) = namespace {
726+
client.run_namespace(ns).await?;
727+
}
728+
}
729+
}
730+
731+
return Ok(());
732+
}
733+
713734
if std::env::var("RUST_LOG").is_err() {
714735
std::env::set_var("RUST_LOG", "info");
715736
}
@@ -730,8 +751,6 @@ async fn main() -> Result<()> {
730751
)
731752
.init();
732753

733-
let args = Cli::parse();
734-
735754
args.print_welcome_message();
736755
let server = build_server(&args).await?;
737756
server.start().await?;

0 commit comments

Comments
 (0)