11use std:: io;
2- use std:: mem:: size_of;
2+ use std:: mem:: { offset_of , size_of} ;
33
4+ use chrono:: { DateTime , Utc } ;
5+ use uuid:: Uuid ;
46use zerocopy:: little_endian:: { U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64} ;
57use zerocopy:: { AsBytes , FromBytes , FromZeroes } ;
68
79use crate :: io:: buf:: { IoBufMut , ZeroCopyBuf } ;
810use crate :: io:: FileExt ;
9- use crate :: segment:: FrameHeader ;
1011use crate :: { LIBSQL_MAGIC , LIBSQL_PAGE_SIZE , LIBSQL_WAL_VERSION } ;
1112
12- use super :: { Frame , Result } ;
13+ use super :: Result ;
1314
1415#[ derive( Debug , AsBytes , FromZeroes , FromBytes ) ]
1516#[ repr( C ) ]
16- pub struct CompactedSegmentDataHeader {
17+ pub struct CompactedSegmentHeader {
1718 pub ( crate ) magic : lu64 ,
1819 pub ( crate ) version : lu16 ,
19- pub ( crate ) frame_count : lu32 ,
20- pub ( crate ) segment_id : lu128 ,
20+ pub ( crate ) log_id : lu128 ,
2121 pub ( crate ) start_frame_no : lu64 ,
2222 pub ( crate ) end_frame_no : lu64 ,
2323 pub ( crate ) size_after : lu32 ,
2424 /// for now, always 4096
2525 pub ( crate ) page_size : lu16 ,
2626 pub ( crate ) timestamp : lu64 ,
2727}
28- impl CompactedSegmentDataHeader {
28+
29+ bitflags:: bitflags! {
30+ pub struct CompactedFrameFlags : u32 {
31+ /// This flag is set for the last frame in the segment
32+ const LAST = 1 << 0 ;
33+ }
34+ }
35+
36+ #[ derive( Debug , AsBytes , FromZeroes , FromBytes ) ]
37+ #[ repr( C ) ]
38+ pub struct CompactedFrameHeader {
39+ pub flags : lu32 ,
40+ pub page_no : lu32 ,
41+ pub frame_no : lu64 ,
42+ /// running checksum from this frame
43+ /// this is the crc32 of the checksum of the previous frame and all the frame data, including
44+ /// all the fields before checksum in the header. THIS FIELD MUST ALWAYS BE THE last FIELD IN
45+ /// THE STRUCT
46+ pub checksum : lu32 ,
47+ }
48+
49+ impl CompactedFrameHeader {
50+ pub fn flags ( & self ) -> CompactedFrameFlags {
51+ CompactedFrameFlags :: from_bits ( self . flags . get ( ) ) . unwrap ( )
52+ }
53+
54+ pub ( crate ) fn is_last ( & self ) -> bool {
55+ self . flags ( ) . contains ( CompactedFrameFlags :: LAST )
56+ }
57+
58+ pub ( crate ) fn set_last ( & mut self ) {
59+ let mut flags = self . flags ( ) ;
60+ flags. insert ( CompactedFrameFlags :: LAST ) ;
61+ self . flags = flags. bits ( ) . into ( ) ;
62+ }
63+
64+ pub ( crate ) fn reset_flags ( & mut self ) {
65+ self . flags = 0 . into ( ) ;
66+ }
67+
68+ pub ( crate ) fn compute_checksum ( & self , previous : u32 , data : & [ u8 ] ) -> u32 {
69+ assert_eq ! ( data. len( ) , LIBSQL_PAGE_SIZE as usize ) ;
70+ let mut h = crc32fast:: Hasher :: new_with_initial ( previous) ;
71+ h. update ( & self . as_bytes ( ) [ ..offset_of ! ( Self , checksum) ] ) ;
72+ h. update ( data) ;
73+ h. finalize ( )
74+ }
75+
76+ /// updates the checksum with the previous frame checksum and the frame data
77+ pub ( crate ) fn update_checksum ( & mut self , previous : u32 , data : & [ u8 ] ) -> u32 {
78+ let checksum = self . compute_checksum ( previous, data) ;
79+ self . checksum = checksum. into ( ) ;
80+ checksum
81+ }
82+
83+ pub fn checksum ( & self ) -> u32 {
84+ self . checksum . get ( )
85+ }
86+
87+ pub fn page_no ( & self ) -> u32 {
88+ self . page_no . get ( )
89+ }
90+ }
91+
92+ #[ derive( Debug , AsBytes , FromZeroes , FromBytes ) ]
93+ #[ repr( C ) ]
94+ pub struct CompactedFrame {
95+ pub header : CompactedFrameHeader ,
96+ pub data : [ u8 ; LIBSQL_PAGE_SIZE as usize ] ,
97+ }
98+
99+ impl CompactedFrame {
100+ pub fn header ( & self ) -> & CompactedFrameHeader {
101+ & self . header
102+ }
103+
104+ pub ( crate ) fn header_mut ( & mut self ) -> & mut CompactedFrameHeader {
105+ & mut self . header
106+ }
107+ }
108+
109+ impl CompactedSegmentHeader {
110+ pub fn new (
111+ start_frame_no : u64 ,
112+ end_frame_no : u64 ,
113+ size_after : u32 ,
114+ timestamp : DateTime < Utc > ,
115+ log_id : Uuid ,
116+ ) -> Self {
117+ Self {
118+ magic : LIBSQL_MAGIC . into ( ) ,
119+ version : LIBSQL_WAL_VERSION . into ( ) ,
120+ start_frame_no : start_frame_no. into ( ) ,
121+ end_frame_no : end_frame_no. into ( ) ,
122+ size_after : size_after. into ( ) ,
123+ page_size : LIBSQL_PAGE_SIZE . into ( ) ,
124+ timestamp : ( timestamp. timestamp_millis ( ) as u64 ) . into ( ) ,
125+ log_id : log_id. as_u128 ( ) . into ( ) ,
126+ }
127+ }
128+
29129 fn check ( & self ) -> Result < ( ) > {
30130 if self . magic . get ( ) != LIBSQL_MAGIC {
31131 return Err ( super :: Error :: InvalidHeaderMagic ) ;
@@ -47,14 +147,8 @@ impl CompactedSegmentDataHeader {
47147 }
48148}
49149
50- #[ derive( Debug , AsBytes , FromZeroes , FromBytes ) ]
51- #[ repr( C ) ]
52- pub struct CompactedSegmentDataFooter {
53- pub ( crate ) checksum : lu32 ,
54- }
55-
56150pub struct CompactedSegment < F > {
57- header : CompactedSegmentDataHeader ,
151+ header : CompactedSegmentHeader ,
58152 file : F ,
59153}
60154
@@ -69,7 +163,7 @@ impl<F> CompactedSegment<F> {
69163 }
70164 }
71165
72- pub fn header ( & self ) -> & CompactedSegmentDataHeader {
166+ pub fn header ( & self ) -> & CompactedSegmentHeader {
73167 & self . header
74168 }
75169}
@@ -79,23 +173,25 @@ impl<F: FileExt> CompactedSegment<F> {
79173 let buf = ZeroCopyBuf :: new_uninit ( ) ;
80174 let ( buf, ret) = file. read_exact_at_async ( buf, 0 ) . await ;
81175 ret?;
82- let header: CompactedSegmentDataHeader = buf. into_inner ( ) ;
176+ let header: CompactedSegmentHeader = buf. into_inner ( ) ;
83177 header. check ( ) ?;
84178 Ok ( Self { file, header } )
85179 }
86180
87- pub ( crate ) fn from_parts ( file : F , header : CompactedSegmentDataHeader ) -> Self {
181+ pub ( crate ) fn from_parts ( file : F , header : CompactedSegmentHeader ) -> Self {
88182 Self { header, file }
89183 }
90184
185+ /// read a CompactedFrame from the segment
91186 pub ( crate ) async fn read_frame < B : IoBufMut + Send + ' static > (
92187 & self ,
93188 buf : B ,
94189 offset : u32 ,
95190 ) -> ( B , io:: Result < ( ) > ) {
96191 assert_eq ! ( buf. bytes_init( ) , 0 ) ;
97- assert_eq ! ( buf. bytes_total( ) , size_of:: <Frame >( ) ) ;
98- let offset = size_of :: < CompactedSegmentDataHeader > ( ) + size_of :: < Frame > ( ) * offset as usize ;
192+ assert_eq ! ( buf. bytes_total( ) , size_of:: <CompactedFrame >( ) ) ;
193+ let offset =
194+ size_of :: < CompactedSegmentHeader > ( ) + size_of :: < CompactedFrame > ( ) * offset as usize ;
99195 let ( buf, ret) = self . file . read_exact_at_async ( buf, offset as u64 ) . await ;
100196 ( buf, ret)
101197 }
@@ -107,9 +203,9 @@ impl<F: FileExt> CompactedSegment<F> {
107203 ) -> ( B , io:: Result < ( ) > ) {
108204 assert_eq ! ( buf. bytes_init( ) , 0 ) ;
109205 assert_eq ! ( buf. bytes_total( ) , LIBSQL_PAGE_SIZE as usize ) ;
110- let offset = size_of :: < CompactedSegmentDataHeader > ( )
111- + size_of :: < Frame > ( ) * offset as usize
112- + size_of :: < FrameHeader > ( ) ;
206+ let offset = size_of :: < CompactedSegmentHeader > ( )
207+ + size_of :: < CompactedFrame > ( ) * offset as usize
208+ + size_of :: < CompactedFrameHeader > ( ) ;
113209 let ( buf, ret) = self . file . read_exact_at_async ( buf, offset as u64 ) . await ;
114210 ( buf, ret)
115211 }
0 commit comments