Skip to content

Commit c1cf1f4

Browse files
authored
Merge pull request #1732 from tursodatabase/libsql-pitr-alt
libsql-wal PITR
2 parents ae3bb0f + 0c22fa8 commit c1cf1f4

24 files changed

Lines changed: 404 additions & 387 deletions

File tree

libsql-server/src/http/admin/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use axum::extract::{FromRef, Path, State};
44
use axum::middleware::Next;
55
use axum::routing::delete;
66
use axum::Json;
7-
use chrono::NaiveDateTime;
7+
use chrono::{DateTime, Utc};
88
use futures::{SinkExt, StreamExt, TryStreamExt};
99
use hyper::{Body, Request, StatusCode};
1010
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
@@ -427,7 +427,7 @@ async fn handle_create_namespace<C: Connector>(
427427

428428
#[derive(Debug, Deserialize)]
429429
struct ForkNamespaceReq {
430-
timestamp: NaiveDateTime,
430+
timestamp: DateTime<Utc>,
431431
}
432432

433433
async fn handle_fork_namespace<C>(

libsql-server/src/namespace/configurator/libsql_primary.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ use std::pin::Pin;
33
use std::sync::atomic::{AtomicBool, Ordering};
44
use std::sync::Arc;
55

6+
use chrono::{DateTime, Utc};
67
use futures::prelude::Future;
78
use libsql_sys::name::NamespaceResolver;
9+
use libsql_sys::wal::either::Either;
810
use libsql_wal::io::StdIO;
911
use libsql_wal::registry::WalRegistry;
12+
use libsql_wal::storage::backend::Backend;
1013
use libsql_wal::wal::LibsqlWalManager;
1114
use tokio::task::JoinSet;
1215

@@ -263,13 +266,38 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {
263266

264267
fn fork<'a>(
265268
&'a self,
266-
_from_ns: &'a Namespace,
269+
from_ns: &'a Namespace,
267270
_from_config: MetaStoreHandle,
268271
_to_ns: NamespaceName,
269272
_to_config: MetaStoreHandle,
270-
_timestamp: Option<chrono::prelude::NaiveDateTime>,
273+
timestamp: Option<DateTime<Utc>>,
271274
_store: NamespaceStore,
272275
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
273-
unimplemented!()
276+
Box::pin(async move {
277+
match self.registry.storage() {
278+
Either::A(s) => {
279+
match timestamp {
280+
Some(ts) => {
281+
let ns: libsql_sys::name::NamespaceName = from_ns.name().clone().into();
282+
let _key = s
283+
.backend()
284+
.find_segment(
285+
&s.backend().default_config(),
286+
&ns,
287+
libsql_wal::storage::backend::FindSegmentReq::Timestamp(ts),
288+
)
289+
.await
290+
.unwrap();
291+
todo!()
292+
}
293+
// find the most recent frame_no
294+
None => todo!("fork from most recent"),
295+
};
296+
}
297+
Either::B(_) => {
298+
todo!("cannot fork without storage");
299+
}
300+
}
301+
})
274302
}
275303
}

libsql-server/src/namespace/configurator/libsql_replica.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::future::Future;
22
use std::pin::Pin;
33
use std::sync::Arc;
44

5+
use chrono::{DateTime, Utc};
56
use hyper::Uri;
67
use libsql_replication::injector::LibsqlInjector;
78
use libsql_replication::replicator::Replicator;
@@ -265,7 +266,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
265266
_from_config: MetaStoreHandle,
266267
_to_ns: NamespaceName,
267268
_to_config: MetaStoreHandle,
268-
_timestamp: Option<chrono::prelude::NaiveDateTime>,
269+
_timestamp: Option<DateTime<Utc>>,
269270
_store: NamespaceStore,
270271
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
271272
Box::pin(std::future::ready(Err(crate::Error::Fork(

libsql-server/src/namespace/configurator/libsql_schema.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::path::Path;
22
use std::sync::Arc;
33

4+
use chrono::{DateTime, Utc};
45
use futures::prelude::Future;
56
use libsql_sys::name::NamespaceResolver;
67
use libsql_wal::io::StdIO;
@@ -159,22 +160,13 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {
159160

160161
fn fork<'a>(
161162
&'a self,
162-
from_ns: &'a Namespace,
163-
from_config: MetaStoreHandle,
164-
to_ns: NamespaceName,
165-
to_config: MetaStoreHandle,
166-
timestamp: Option<chrono::prelude::NaiveDateTime>,
167-
store: NamespaceStore,
163+
_from_ns: &'a Namespace,
164+
_from_config: MetaStoreHandle,
165+
_to_ns: NamespaceName,
166+
_to_config: MetaStoreHandle,
167+
_timestamp: Option<DateTime<Utc>>,
168+
_store: NamespaceStore,
168169
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
169-
Box::pin(super::fork::fork(
170-
from_ns,
171-
from_config,
172-
to_ns,
173-
to_config,
174-
timestamp,
175-
store,
176-
&self.primary_config,
177-
self.base.base_path.clone(),
178-
))
170+
todo!()
179171
}
180172
}

libsql-server/src/namespace/configurator/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::pin::Pin;
33
use std::sync::Arc;
44
use std::time::Duration;
55

6-
use chrono::NaiveDateTime;
6+
use chrono::{DateTime, Utc};
77
use futures::Future;
88
use libsql_sys::EncryptionConfig;
99
use tokio::sync::Semaphore;
@@ -139,7 +139,7 @@ pub trait ConfigureNamespace {
139139
from_config: MetaStoreHandle,
140140
to_ns: NamespaceName,
141141
to_config: MetaStoreHandle,
142-
timestamp: Option<NaiveDateTime>,
142+
timestamp: Option<DateTime<Utc>>,
143143
store: NamespaceStore,
144144
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>>;
145145
}

libsql-server/src/namespace/configurator/primary.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::pin::Pin;
33
use std::sync::atomic::{AtomicBool, Ordering};
44
use std::sync::Arc;
55

6+
use chrono::{DateTime, Utc};
67
use futures::prelude::Future;
78
use libsql_sys::EncryptionConfig;
89
use tokio::task::JoinSet;
@@ -184,15 +185,15 @@ impl ConfigureNamespace for PrimaryConfigurator {
184185
from_config: MetaStoreHandle,
185186
to_ns: NamespaceName,
186187
to_config: MetaStoreHandle,
187-
timestamp: Option<chrono::prelude::NaiveDateTime>,
188+
timestamp: Option<DateTime<Utc>>,
188189
store: NamespaceStore,
189190
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
190191
Box::pin(super::fork::fork(
191192
from_ns,
192193
from_config,
193194
to_ns,
194195
to_config,
195-
timestamp,
196+
timestamp.map(|d| d.naive_utc()),
196197
store,
197198
&self.primary_config,
198199
self.base.base_path.clone(),

libsql-server/src/namespace/configurator/replica.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::pin::Pin;
22
use std::sync::atomic::AtomicBool;
33
use std::sync::Arc;
44

5+
use chrono::{DateTime, Utc};
56
use futures::Future;
67
use hyper::Uri;
78
use libsql_replication::rpc::replication::log_offset::WalFlavor;
@@ -255,7 +256,7 @@ impl ConfigureNamespace for ReplicaConfigurator {
255256
_from_config: MetaStoreHandle,
256257
_to_ns: NamespaceName,
257258
_to_config: MetaStoreHandle,
258-
_timestamp: Option<chrono::prelude::NaiveDateTime>,
259+
_timestamp: Option<DateTime<Utc>>,
259260
_store: NamespaceStore,
260261
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
261262
Box::pin(std::future::ready(Err(crate::Error::Fork(

libsql-server/src/namespace/configurator/schema.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::{atomic::AtomicBool, Arc};
22

3+
use chrono::{DateTime, Utc};
34
use futures::prelude::Future;
45
use tokio::task::JoinSet;
56

@@ -120,15 +121,15 @@ impl ConfigureNamespace for SchemaConfigurator {
120121
from_config: MetaStoreHandle,
121122
to_ns: NamespaceName,
122123
to_config: MetaStoreHandle,
123-
timestamp: Option<chrono::prelude::NaiveDateTime>,
124+
timestamp: Option<DateTime<Utc>>,
124125
store: NamespaceStore,
125126
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
126127
Box::pin(super::fork::fork(
127128
from_ns,
128129
from_config,
129130
to_ns,
130131
to_config,
131-
timestamp,
132+
timestamp.map(|ts| ts.naive_utc()),
132133
store,
133134
&self.primary_config,
134135
self.base.base_path.clone(),

libsql-server/src/namespace/store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
22
use std::sync::Arc;
33

44
use async_lock::RwLock;
5-
use chrono::NaiveDateTime;
5+
use chrono::{DateTime, Utc};
66
use futures::TryFutureExt;
77
use moka::future::Cache;
88
use once_cell::sync::OnceCell;
@@ -219,7 +219,7 @@ impl NamespaceStore {
219219
from: NamespaceName,
220220
to: NamespaceName,
221221
to_config: DatabaseConfig,
222-
timestamp: Option<NaiveDateTime>,
222+
timestamp: Option<DateTime<Utc>>,
223223
) -> crate::Result<()> {
224224
if self.inner.has_shutdown.load(Ordering::Relaxed) {
225225
return Err(Error::NamespaceStoreShutdown);

libsql-server/src/wal_toolkit.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
33
use anyhow::Context as _;
44
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
55
use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider};
6+
use chrono::DateTime;
67
use libsql_wal::io::StdIO;
78
use libsql_wal::storage::backend::s3::S3Backend;
89
use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy;
@@ -80,11 +81,15 @@ impl WalToolkit {
8081
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
8182
println!(
8283
"- oldest segment: {}-{} ({})",
83-
first.key.start_frame_no, first.key.end_frame_no, first.created_at
84+
first.key.start_frame_no,
85+
first.key.end_frame_no,
86+
DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap()
8487
);
8588
println!(
8689
"- most recent segment: {}-{} ({})",
87-
last.key.start_frame_no, last.key.end_frame_no, last.created_at
90+
last.key.start_frame_no,
91+
last.key.end_frame_no,
92+
DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap()
8893
);
8994
}
9095

@@ -93,7 +98,9 @@ impl WalToolkit {
9398
compactor.list_all(&namespace, |info| {
9499
println!(
95100
"- {}-{} ({})",
96-
info.key.start_frame_no, info.key.end_frame_no, info.created_at
101+
info.key.start_frame_no,
102+
info.key.end_frame_no,
103+
DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap()
97104
);
98105
})?;
99106
}

0 commit comments

Comments
 (0)