@@ -10,13 +10,13 @@ use zerocopy::FromZeroes;
1010use crate :: io:: buf:: ZeroCopyBoxIoBuf ;
1111use crate :: segment:: Frame ;
1212use crate :: storage:: backend:: FindSegmentReq ;
13- use crate :: storage:: Storage ;
13+ use crate :: storage:: { Storage } ;
1414
1515use super :: Result ;
1616
1717pub trait ReplicateFromStorage : Sync + Send + ' static {
18- fn stream < ' a > (
19- & ' a self ,
18+ fn stream < ' a , ' b > (
19+ & ' b self ,
2020 seen : & ' a mut RoaringBitmap ,
2121 current : u64 ,
2222 until : u64 ,
@@ -38,16 +38,18 @@ impl<S> ReplicateFromStorage for StorageReplicator<S>
3838where
3939 S : Storage ,
4040{
41- fn stream < ' a > (
42- & ' a self ,
41+ fn stream < ' a , ' b > (
42+ & ' b self ,
4343 seen : & ' a mut roaring:: RoaringBitmap ,
4444 mut current : u64 ,
4545 until : u64 ,
4646 ) -> Pin < Box < dyn Stream < Item = Result < Box < Frame > > > + Send + ' a > > {
47+ let storage = self . storage . clone ( ) ;
48+ let namespace = self . namespace . clone ( ) ;
4749 Box :: pin ( async_stream:: try_stream! {
4850 loop {
49- let key = self . storage. find_segment( & self . namespace, FindSegmentReq :: EndFrameNoLessThan ( current) , None ) . await ?;
50- let index = self . storage. fetch_segment_index( & self . namespace, & key, None ) . await ?;
51+ let key = storage. find_segment( & namespace, FindSegmentReq :: EndFrameNoLessThan ( current) , None ) . await ?;
52+ let index = storage. fetch_segment_index( & namespace, & key, None ) . await ?;
5153 let mut pages = index. into_stream( ) ;
5254 let mut maybe_seg = None ;
5355 while let Some ( ( page, offset) ) = pages. next( ) {
5860 let segment = match maybe_seg {
5961 Some ( ref seg) => seg,
6062 None => {
61- maybe_seg = Some ( self . storage. fetch_segment_data( & self . namespace, & key, None ) . await ?) ;
63+ maybe_seg = Some ( storage. fetch_segment_data( & namespace, & key, None ) . await ?) ;
6264 maybe_seg. as_ref( ) . unwrap( )
6365 } ,
6466 } ;
0 commit comments