Skip to content

Commit 64c4d22

Browse files
authored
Merge pull request #1716 from tursodatabase/admin-back-channel
Admin API db shell
2 parents 2014773 + 85d677b commit 64c4d22

9 files changed

Lines changed: 751 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql-server/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ tar = "0.4.41"
9595
aws-config = "1"
9696
aws-sdk-s3 = "1"
9797
aws-smithy-runtime = "1.6.2"
98+
dialoguer = { version = "0.11.0", features = ["history"] }
9899

99100
[dev-dependencies]
100101
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
@@ -112,6 +113,8 @@ metrics-util = "0.15"
112113
s3s = "0.8.1"
113114
s3s-fs = "0.8.1"
114115
ring = { version = "0.17.8", features = ["std"] }
116+
tonic-build = "0.11"
117+
prost-build = "0.12"
115118

116119
[build-dependencies]
117120
vergen = { version = "8", features = ["build", "git", "gitcl"] }
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
syntax = "proto3";
2+
3+
package admin_shell;
4+
5+
message Query {
6+
string query = 1;
7+
}
8+
9+
message Value {
10+
oneof value {
11+
Null null = 1;
12+
double real = 2;
13+
int64 integer = 3;
14+
string text = 4;
15+
bytes blob = 5;
16+
}
17+
}
18+
19+
message Null {}
20+
21+
message Row {
22+
repeated Value values = 1;
23+
}
24+
25+
message Rows {
26+
repeated Row rows = 1;
27+
}
28+
29+
message Error {
30+
string Error = 1;
31+
}
32+
33+
message Response {
34+
oneof resp {
35+
Rows rows = 1;
36+
Error error = 2;
37+
}
38+
}
39+
40+
service AdminShellService {
41+
rpc Shell(stream Query) returns (stream Response) {}
42+
}

libsql-server/src/admin_shell.rs

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
use std::fmt::Display;
2+
use std::pin::Pin;
3+
use std::str::FromStr;
4+
5+
use bytes::Bytes;
6+
use dialoguer::BasicHistory;
7+
use rusqlite::types::ValueRef;
8+
use tokio_stream::{Stream, StreamExt as _};
9+
use tonic::metadata::{AsciiMetadataValue, BinaryMetadataValue};
10+
11+
use crate::connection::Connection as _;
12+
use crate::database::Connection;
13+
use crate::namespace::{NamespaceName, NamespaceStore};
14+
15+
use self::rpc::admin_shell_service_server::{AdminShellService, AdminShellServiceServer};
16+
use self::rpc::response::Resp;
17+
use self::rpc::Null;
18+
19+
mod rpc {
20+
#![allow(clippy::all)]
21+
include!("generated/admin_shell.rs");
22+
}
23+
24+
pub(crate) fn make_svc(namespace_store: NamespaceStore) -> AdminShellServiceServer<AdminShell> {
25+
let admin_shell = AdminShell::new(namespace_store);
26+
rpc::admin_shell_service_server::AdminShellServiceServer::new(admin_shell)
27+
}
28+
29+
pub(super) struct AdminShell {
30+
namespace_store: NamespaceStore,
31+
}
32+
33+
impl AdminShell {
34+
fn new(namespace_store: NamespaceStore) -> Self {
35+
Self { namespace_store }
36+
}
37+
38+
async fn with_namespace(
39+
&self,
40+
ns: Bytes,
41+
queries: impl Stream<Item = Result<rpc::Query, tonic::Status>>,
42+
) -> anyhow::Result<impl Stream<Item = Result<rpc::Response, tonic::Status>>> {
43+
let namespace = NamespaceName::from_bytes(ns).unwrap();
44+
let connection_maker = self
45+
.namespace_store
46+
.with(namespace, |ns| ns.db.connection_maker())
47+
.await?;
48+
let connection = connection_maker.create().await?;
49+
Ok(run_shell(connection, queries))
50+
}
51+
}
52+
53+
fn run_shell(
54+
conn: Connection,
55+
queries: impl Stream<Item = Result<rpc::Query, tonic::Status>>,
56+
) -> impl Stream<Item = Result<rpc::Response, tonic::Status>> {
57+
async_stream::stream! {
58+
tokio::pin!(queries);
59+
while let Some(q) = queries.next().await {
60+
let Ok(q) = q else { break };
61+
let res = tokio::task::block_in_place(|| {
62+
conn.with_raw(move |conn| {
63+
run_one(conn, q.query)
64+
})
65+
});
66+
67+
yield res
68+
}
69+
}
70+
}
71+
72+
fn run_one(conn: &mut rusqlite::Connection, q: String) -> Result<rpc::Response, tonic::Status> {
73+
match try_run_one(conn, q) {
74+
Ok(resp) => Ok(resp),
75+
Err(e) => Ok(rpc::Response {
76+
resp: Some(Resp::Error(rpc::Error {
77+
error: e.to_string(),
78+
})),
79+
}),
80+
}
81+
}
82+
83+
fn try_run_one(conn: &mut rusqlite::Connection, q: String) -> anyhow::Result<rpc::Response> {
84+
let mut stmt = conn.prepare(&q)?;
85+
let col_count = stmt.column_count();
86+
let mut rows = stmt.query(())?;
87+
let mut out_rows = Vec::new();
88+
while let Some(row) = rows.next()? {
89+
let mut out_row = Vec::with_capacity(col_count);
90+
for i in 0..col_count {
91+
let rpc_value = match row.get_ref(i).unwrap() {
92+
ValueRef::Null => rpc::value::Value::Null(Null {}),
93+
ValueRef::Integer(i) => rpc::value::Value::Integer(i),
94+
ValueRef::Real(x) => rpc::value::Value::Real(x),
95+
ValueRef::Text(s) => rpc::value::Value::Text(String::from_utf8(s.to_vec())?),
96+
ValueRef::Blob(b) => rpc::value::Value::Blob(b.to_vec()),
97+
};
98+
out_row.push(rpc::Value {
99+
value: Some(rpc_value),
100+
});
101+
}
102+
out_rows.push(rpc::Row { values: out_row });
103+
}
104+
105+
Ok(rpc::Response {
106+
resp: Some(Resp::Rows(rpc::Rows { rows: out_rows })),
107+
})
108+
}
109+
110+
#[async_trait::async_trait]
111+
impl AdminShellService for AdminShell {
112+
type ShellStream = Pin<Box<dyn Stream<Item = Result<rpc::Response, tonic::Status>> + Send>>;
113+
114+
async fn shell(
115+
&self,
116+
request: tonic::Request<tonic::Streaming<rpc::Query>>,
117+
) -> std::result::Result<tonic::Response<Self::ShellStream>, tonic::Status> {
118+
let Some(namespace) = request.metadata().get_bin("x-namespace-bin") else {
119+
return Err(tonic::Status::new(
120+
tonic::Code::InvalidArgument,
121+
"missing namespace",
122+
));
123+
};
124+
let Ok(ns_bytes) = namespace.to_bytes() else {
125+
return Err(tonic::Status::new(
126+
tonic::Code::InvalidArgument,
127+
"bad namespace encoding",
128+
));
129+
};
130+
131+
match self.with_namespace(ns_bytes, request.into_inner()).await {
132+
Ok(s) => Ok(tonic::Response::new(Box::pin(s))),
133+
Err(e) => Err(tonic::Status::new(
134+
tonic::Code::FailedPrecondition,
135+
e.to_string(),
136+
)),
137+
}
138+
}
139+
}
140+
141+
pub struct AdminShellClient {
142+
remote_url: String,
143+
auth: Option<String>,
144+
}
145+
146+
impl AdminShellClient {
147+
pub fn new(remote_url: String, auth: Option<String>) -> Self {
148+
Self { remote_url, auth }
149+
}
150+
151+
pub async fn run_namespace(&self, namespace: &str) -> anyhow::Result<()> {
152+
let namespace = NamespaceName::from_string(namespace.to_string())?;
153+
let mut client = rpc::admin_shell_service_client::AdminShellServiceClient::connect(
154+
self.remote_url.clone(),
155+
)
156+
.await?;
157+
let (sender, receiver) = tokio::sync::mpsc::channel(1);
158+
let req_stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
159+
160+
let mut req = tonic::Request::new(req_stream);
161+
req.metadata_mut().insert_bin(
162+
"x-namespace-bin",
163+
BinaryMetadataValue::from_bytes(namespace.as_slice()),
164+
);
165+
166+
if let Some(ref auth) = self.auth {
167+
req.metadata_mut().insert(
168+
"authorization",
169+
AsciiMetadataValue::from_str(&format!("basic {auth}")).unwrap(),
170+
);
171+
}
172+
173+
let mut resp_stream = client.shell(req).await?.into_inner();
174+
175+
let mut history = BasicHistory::new();
176+
loop {
177+
// this is blocking, but the shell runs in it's own process with no other tasks, so
178+
// that's ok
179+
let prompt = dialoguer::Input::<String>::new()
180+
.with_prompt("> ")
181+
.history_with(&mut history)
182+
.interact_text();
183+
184+
match prompt {
185+
Ok(query) => {
186+
let q = rpc::Query { query };
187+
sender.send(q).await?;
188+
match resp_stream.next().await {
189+
Some(Ok(rpc::Response {
190+
resp: Some(rpc::response::Resp::Rows(rows)),
191+
})) => {
192+
println!("{}", RowsFormatter(rows));
193+
}
194+
Some(Ok(rpc::Response {
195+
resp: Some(rpc::response::Resp::Error(rpc::Error { error })),
196+
})) => {
197+
println!("query error: {error}");
198+
}
199+
Some(Err(e)) => {
200+
println!("rpc error: {}", e.message());
201+
break;
202+
}
203+
_ => break,
204+
}
205+
}
206+
Err(e) => println!("error: {e}"),
207+
}
208+
}
209+
210+
Ok(())
211+
}
212+
}
213+
214+
struct RowsFormatter(rpc::Rows);
215+
216+
impl Display for RowsFormatter {
217+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218+
for row in self.0.rows.iter() {
219+
let mut is_first = true;
220+
for value in row.values.iter() {
221+
if !is_first {
222+
f.write_str(", ")?;
223+
}
224+
is_first = false;
225+
226+
match value.value.as_ref().unwrap() {
227+
rpc::value::Value::Null(_) => f.write_str("null")?,
228+
rpc::value::Value::Real(x) => write!(f, "{x}")?,
229+
rpc::value::Value::Integer(i) => write!(f, "{i}")?,
230+
rpc::value::Value::Text(s) => f.write_str(&s)?,
231+
rpc::value::Value::Blob(b) => {
232+
for x in b {
233+
write!(f, "{x:0x}")?
234+
}
235+
}
236+
}
237+
}
238+
239+
writeln!(f)?;
240+
}
241+
242+
Ok(())
243+
}
244+
}

0 commit comments

Comments
 (0)