@@ -4,10 +4,8 @@ use std::io::{BufWriter, ErrorKind, Write};
44use std:: mem:: size_of;
55use std:: ops:: Deref ;
66use std:: path:: { Path , PathBuf } ;
7- use std:: sync:: {
8- atomic:: { AtomicU64 , Ordering } ,
9- Arc ,
10- } ;
7+ use std:: sync:: Arc ;
8+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
119
1210use chrono:: prelude:: { DateTime , Utc } ;
1311use fst:: { Map , MapBuilder , Streamer } ;
@@ -212,7 +210,7 @@ where
212210}
213211
214212impl < F : FileExt > SealedSegment < F > {
215- pub fn open ( file : Arc < F > , path : PathBuf , read_locks : Arc < AtomicU64 > ) -> Result < Option < Self > > {
213+ pub fn open ( file : Arc < F > , path : PathBuf , read_locks : Arc < AtomicU64 > , now : DateTime < Utc > ) -> Result < Option < Self > > {
216214 let mut header: SegmentHeader = SegmentHeader :: new_zeroed ( ) ;
217215 file. read_exact_at ( header. as_bytes_mut ( ) , 0 ) ?;
218216
@@ -230,7 +228,7 @@ impl<F: FileExt> SealedSegment<F> {
230228 // recover the index, and seal the segment.
231229 if !header. flags ( ) . contains ( SegmentFlags :: SEALED ) {
232230 assert_eq ! ( header. index_offset. get( ) , 0 , "{header:?}" ) ;
233- return Self :: recover ( file, path, header) . map ( Some ) ;
231+ return Self :: recover ( file, path, header, now ) . map ( Some ) ;
234232 }
235233
236234 let mut slice = vec ! [ 0 ; index_len as usize ] ;
@@ -248,7 +246,7 @@ impl<F: FileExt> SealedSegment<F> {
248246 } ) )
249247 }
250248
251- fn recover ( file : Arc < F > , path : PathBuf , mut header : SegmentHeader ) -> Result < Self > {
249+ fn recover ( file : Arc < F > , path : PathBuf , mut header : SegmentHeader , now : DateTime < Utc > ) -> Result < Self > {
252250 assert ! ( !header. is_empty( ) ) ;
253251 assert_eq ! ( header. index_size. get( ) , 0 ) ;
254252 assert_eq ! ( header. index_offset. get( ) , 0 ) ;
@@ -335,6 +333,7 @@ impl<F: FileExt> SealedSegment<F> {
335333 header. index_size = index_size. into ( ) ;
336334 header. last_commited_frame_no = last_committed. into ( ) ;
337335 header. size_after = size_after. into ( ) ;
336+ header. sealed_at_timestamp = ( now. timestamp_millis ( ) as u64 ) . into ( ) ;
338337 let flags = header. flags ( ) ;
339338 header. set_flags ( flags | SegmentFlags :: SEALED ) ;
340339 header. recompute_checksum ( ) ;
0 commit comments