@@ -101,272 +101,21 @@ pub(crate) struct JobResult<S, C> {
101101#[ cfg( test) ]
102102mod test {
103103 use std:: future:: ready;
104- // use std::fs::File;
105- // use std::io::Write;
106- // use std::mem::size_of;
107104 use std:: str:: FromStr ;
108- // use std::sync::atomic::AtomicBool;
109105 use std:: sync:: Arc ;
110106
111107 use chrono:: prelude:: DateTime ;
112108 use chrono:: Utc ;
113- // use fst::{Map, Streamer};
114- // use libsql_sys::rusqlite::OpenFlags;
115- // use tempfile::{tempdir, tempfile, NamedTempFile};
116109 use uuid:: Uuid ;
117110
118111 use crate :: io:: file:: FileExt ;
119112 use crate :: io:: StdIO ;
120113 use crate :: segment:: compacted:: CompactedSegmentDataHeader ;
121114 use crate :: storage:: { RestoreOptions , SegmentKey } ;
122- // use crate::registry::WalRegistry;
123- // use crate::segment::compacted::CompactedSegmentDataHeader;
124- // use crate::segment::sealed::SealedSegment;
125- // use crate::segment::{Frame, FrameHeader};
126- // use crate::storage::Storage;
127- // use crate::wal::{LibsqlWal, LibsqlWalManager};
128115 use libsql_sys:: name:: NamespaceName ;
129116
130117 use super :: * ;
131118
132- // fn setup_wal<S: Storage>(
133- // path: &Path,
134- // storage: S,
135- // ) -> (LibsqlWalManager<StdIO, S>, Arc<WalRegistry<StdIO, S>>) {
136- // let resolver = |path: &Path| {
137- // NamespaceName::from_string(path.file_name().unwrap().to_str().unwrap().to_string())
138- // };
139- // let registry =
140- // Arc::new(WalRegistry::new(path.join("wals"), storage).unwrap());
141- // (LibsqlWalManager::new(registry.clone()), registry)
142- // }
143- //
144- // fn make_connection(
145- // path: &Path,
146- // wal: LibsqlWalManager<StdIO>,
147- // ) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
148- // libsql_sys::Connection::open(
149- // path.join("db"),
150- // OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
151- // wal,
152- // 10000,
153- // None,
154- // )
155- // .unwrap()
156- // }
157- //
158- // #[tokio::test]
159- // async fn compact_segment() {
160- // struct SwapHandler;
161- //
162- // impl SegmentSwapHandler<Arc<SealedSegment<File>>> for SwapHandler {
163- // fn handle_segment_swap(
164- // &self,
165- // namespace: NamespaceName,
166- // segment: Arc<SealedSegment<File>>,
167- // ) {
168- // tokio::runtime::Handle::current().block_on(async move {
169- // let out_file = tempfile().unwrap();
170- // let id = Uuid::new_v4();
171- // let index_bytes = segment.compact(&out_file, id).await.unwrap();
172- // let index = Map::new(index_bytes).unwrap();
173- //
174- // // indexes contain the same pages
175- // let mut new_stream = index.stream();
176- // let mut orig_stream = segment.index().stream();
177- // assert_eq!(new_stream.next().unwrap().0, orig_stream.next().unwrap().0);
178- // assert_eq!(new_stream.next().unwrap().0, orig_stream.next().unwrap().0);
179- // assert!(new_stream.next().is_none());
180- // assert!(orig_stream.next().is_none());
181- //
182- // let mut db_file = NamedTempFile::new().unwrap();
183- // let mut stream = index.stream();
184- // while let Some((page_bytes, offset)) = stream.next() {
185- // let page_no = u32::from_be_bytes(page_bytes.try_into().unwrap());
186- // let mut buf = [0u8; 4096];
187- // let offset = size_of::<CompactedSegmentDataHeader>()
188- // + offset as usize * size_of::<Frame>()
189- // + size_of::<FrameHeader>();
190- // out_file.read_exact_at(&mut buf, offset as u64).unwrap();
191- // db_file
192- // .as_file()
193- // .write_all_at(&buf, (page_no as u64 - 1) * 4096)
194- // .unwrap();
195- // }
196- //
197- // db_file.flush().unwrap();
198- // let conn = libsql_sys::rusqlite::Connection::open(db_file.path()).unwrap();
199- // conn.query_row("select count(*) from test", (), |r| Ok(()))
200- // .unwrap();
201- // });
202- // }
203- // }
204- //
205- // let tmp = tempdir().unwrap();
206- // let (wal, registry) = setup_wal(tmp.path(), SwapHandler);
207- // let conn = make_connection(tmp.path(), wal.clone());
208- //
209- // tokio::task::spawn_blocking(move || {
210- // conn.execute("create table test (x)", ()).unwrap();
211- // for i in 0..100usize {
212- // conn.execute("insert into test values (?)", [i]).unwrap();
213- // }
214- //
215- // registry.shutdown().unwrap();
216- // })
217- // .await
218- // .unwrap();
219- // }
220- //
221- // #[tokio::test]
222- // async fn simple_perform_job() {
223- // struct TestIO;
224- //
225- // impl Io for TestIO {
226- // type File = <StdIO as Io>::File;
227- // type TempFile = <StdIO as Io>::TempFile;
228- //
229- // fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
230- // StdIO(()).create_dir_all(path)
231- // }
232- //
233- // fn open(
234- // &self,
235- // create_new: bool,
236- // read: bool,
237- // write: bool,
238- // path: &Path,
239- // ) -> std::io::Result<Self::File> {
240- // StdIO(()).open(create_new, read, write, path)
241- // }
242- //
243- // fn tempfile(&self) -> std::io::Result<Self::TempFile> {
244- // StdIO(()).tempfile()
245- // }
246- //
247- // fn now(&self) -> DateTime<Utc> {
248- // DateTime::UNIX_EPOCH
249- // }
250- //
251- // fn uuid(&self) -> Uuid {
252- // Uuid::from_u128(0)
253- // }
254- //
255- // fn hard_link(&self, _src: &Path, _dst: &Path) -> std::io::Result<()> {
256- // unimplemented!()
257- // }
258- // }
259- //
260- // struct TestStorage {
261- // called: AtomicBool,
262- // }
263- //
264- // impl Drop for TestStorage {
265- // fn drop(&mut self) {
266- // assert!(self.called.load(std::sync::atomic::Ordering::Relaxed));
267- // }
268- // }
269- //
270- // impl Backend for TestStorage {
271- // type Config = ();
272- //
273- // fn store(
274- // &self,
275- // _config: &Self::Config,
276- // meta: SegmentMeta,
277- // segment_data: impl FileExt,
278- // segment_index: Vec<u8>,
279- // ) -> impl std::future::Future<Output = Result<()>> + Send {
280- // async move {
281- // self.called
282- // .store(true, std::sync::atomic::Ordering::Relaxed);
283- //
284- // insta::assert_debug_snapshot!(meta);
285- // insta::assert_debug_snapshot!(crc32fast::hash(&segment_index));
286- // insta::assert_debug_snapshot!(segment_index.len());
287- // let data = async_read_all_to_vec(segment_data).await.unwrap();
288- // insta::assert_debug_snapshot!(data.len());
289- // insta::assert_debug_snapshot!(crc32fast::hash(&data));
290- //
291- // Ok(())
292- // }
293- // }
294- //
295- // async fn fetch_segment(
296- // &self,
297- // _config: &Self::Config,
298- // _namespace: NamespaceName,
299- // _frame_no: u64,
300- // _dest_path: &Path,
301- // ) -> Result<()> {
302- // todo!()
303- // }
304- //
305- // async fn meta(
306- // &self,
307- // _config: &Self::Config,
308- // _namespace: NamespaceName,
309- // ) -> Result<crate::storage::backend::DbMeta> {
310- // todo!();
311- // }
312- //
313- // fn default_config(&self) -> Arc<Self::Config> {
314- // Arc::new(())
315- // }
316- // }
317- //
318- // struct SwapHandler;
319- //
320- // impl SegmentSwapHandler<File> for SwapHandler {
321- // fn handle_segment_swap(
322- // &self,
323- // namespace: NamespaceName,
324- // segment: Arc<SealedSegment<File>>,
325- // ) {
326- // tokio::runtime::Handle::current().block_on(async move {
327- // let job = Job {
328- // request: IndexedRequest {
329- // request: StoreSegmentRequest {
330- // namespace,
331- // segment,
332- // created_at: TestIO.now(),
333- // storage_config_override: None,
334- // },
335- // id: 0,
336- // },
337- // };
338- //
339- // let result = job
340- // .perform(
341- // TestStorage {
342- // called: false.into(),
343- // },
344- // TestIO,
345- // )
346- // .await;
347- //
348- // assert_eq!(result.job.request.id, 0);
349- // assert!(result.result.is_ok());
350- // });
351- // }
352- // }
353- //
354- // let tmp = tempdir().unwrap();
355- // let (wal, registry) = setup_wal(tmp.path(), SwapHandler);
356- // let conn = make_connection(tmp.path(), wal.clone());
357- //
358- // tokio::task::spawn_blocking(move || {
359- // conn.execute("create table test (x)", ()).unwrap();
360- // for i in 0..100usize {
361- // conn.execute("insert into test values (?)", [i]).unwrap();
362- // }
363- //
364- // registry.shutdown().unwrap();
365- // })
366- // .await
367- // .unwrap();
368- // }
369-
370119 #[ tokio:: test]
371120 async fn perform_job ( ) {
372121 #[ derive( Debug ) ]
0 commit comments