@@ -207,7 +207,7 @@ impl<B> Compactor<B> {
207207 out_index : Option < & ' a mut MapBuilder < Vec < u8 > > > ,
208208 mut progress : impl FnMut ( u32 , u32 ) + ' a ,
209209 ) -> (
210- impl Stream < Item = Result < ( CompactedFrameHeader , Bytes ) > > + ' a ,
210+ impl Stream < Item = crate :: storage :: Result < ( CompactedFrameHeader , Bytes ) > > + ' a ,
211211 CompactedSegmentHeader ,
212212 )
213213 where
@@ -363,7 +363,7 @@ impl<B> Compactor<B> {
363363 let mut builder = MapBuilder :: new ( Vec :: new ( ) ) . unwrap ( ) ;
364364
365365 let ( sender, mut receiver) = tokio:: sync:: mpsc:: channel :: < crate :: storage:: Result < Bytes > > ( 1 ) ;
366- let handle: JoinHandle < Result < ( ) > > = match out_path {
366+ let mut handle: JoinHandle < Result < ( ) > > = match out_path {
367367 Some ( path) => {
368368 let path = path. join ( & format ! ( "{new_key}.seg" ) ) ;
369369 let mut data_file = tokio:: fs:: File :: create ( path) . await ?;
@@ -397,37 +397,56 @@ impl<B> Compactor<B> {
397397 }
398398 } ;
399399
400- let ( stream, segment_header) = self
401- . dedup_stream ( set. clone ( ) , Some ( & mut builder) , progress)
402- . await ;
400+ let send_fut = async {
401+ let ( stream, segment_header) = self
402+ . dedup_stream ( set. clone ( ) , Some ( & mut builder) , progress)
403+ . await ;
403404
404- sender
405- . send ( Ok ( Bytes :: copy_from_slice ( segment_header. as_bytes ( ) ) ) )
406- . await
407- . unwrap ( ) ;
405+ if sender
406+ . send ( Ok ( Bytes :: copy_from_slice ( segment_header. as_bytes ( ) ) ) )
407+ . await
408+ . is_err ( )
409+ {
410+ return ;
411+ }
408412
409- {
410- tokio:: pin!( stream) ;
411- loop {
412- match stream. next ( ) . await {
413- Some ( Ok ( ( frame_header, frame_data) ) ) => {
414- sender
415- . send ( Ok ( Bytes :: copy_from_slice ( frame_header. as_bytes ( ) ) ) )
416- . await
417- . unwrap ( ) ;
418- sender. send ( Ok ( frame_data) ) . await . unwrap ( ) ;
419- }
420- Some ( Err ( _e) ) => {
421- panic ! ( )
422- // sender.send(Err(e.into())).await.unwrap();
413+ {
414+ tokio:: pin!( stream) ;
415+ loop {
416+ match stream. next ( ) . await {
417+ Some ( Ok ( ( frame_header, frame_data) ) ) => {
418+ if sender
419+ . send ( Ok ( Bytes :: copy_from_slice ( frame_header. as_bytes ( ) ) ) )
420+ . await
421+ . is_err ( )
422+ {
423+ return ;
424+ }
425+ if sender. send ( Ok ( frame_data) ) . await . is_err ( ) {
426+ return ;
427+ }
428+ }
429+ Some ( Err ( e) ) => {
430+ sender. send ( Err ( e. into ( ) ) ) . await . unwrap ( ) ;
431+ return ;
432+ }
433+ None => break ,
423434 }
424- None => break ,
425435 }
436+
437+ drop ( sender) ;
426438 }
427- drop ( sender) ;
428- }
439+ } ;
440+
441+ tokio:: select! {
442+ res = & mut handle => {
443+ res. unwrap( ) ?;
444+ } ,
445+ _ = send_fut => {
446+ handle. await . unwrap( ) ?;
429447
430- handle. await . unwrap ( ) ?;
448+ }
449+ }
431450
432451 let index = builder. into_inner ( ) . unwrap ( ) ;
433452 match out_path {
0 commit comments