Skip to content

Commit 8a60155

Browse files
committed
replay: don't make ReplayCommittedState again and again
1 parent fd77985 commit 8a60155

2 files changed

Lines changed: 45 additions & 37 deletions

File tree

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
},
2525
};
2626
use anyhow::anyhow;
27-
use core::{cell::RefCell, ops::RangeBounds};
27+
use core::ops::RangeBounds;
2828
use parking_lot::{Mutex, RwLock};
2929
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
3030
use spacetimedb_durability::TxOffset;
@@ -170,13 +170,9 @@ impl Locking {
170170
/// The provided closure will be called for each transaction found in the
171171
/// history, the parameter is the transaction's offset. The closure is called
172172
/// _before_ the transaction is applied to the database state.
173-
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> {
174-
Replay {
175-
database_identity: self.database_identity,
176-
committed_state: self.committed_state.clone(),
177-
progress: RefCell::new(progress),
178-
error_behavior,
179-
}
173+
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
174+
let committed_state = self.committed_state.write();
175+
Replay::new(self.database_identity, committed_state, progress, error_behavior)
180176
}
181177

182178
/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.

crates/datastore/src/locking_tx_datastore/replay.rs

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use crate::system_tables::{
77
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
88
};
99
use anyhow::{anyhow, Context};
10+
use core::cell::RefMut;
1011
use core::ops::{Deref, DerefMut, RangeBounds};
11-
use parking_lot::{RwLock, RwLockReadGuard};
12+
use parking_lot::RwLockWriteGuard;
1213
use prometheus::core::{AtomicF64, GenericGauge};
1314
use prometheus::IntGauge;
1415
use spacetimedb_commitlog::payload::txdata;
@@ -104,21 +105,32 @@ pub enum ReplayError {
104105

105106
/// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction
106107
/// history into the database state.
107-
pub struct Replay<F> {
108-
pub(super) database_identity: Identity,
109-
pub(super) committed_state: Arc<RwLock<CommittedState>>,
110-
pub(super) progress: RefCell<F>,
111-
pub(super) error_behavior: ErrorBehavior,
108+
pub struct Replay<'a, F> {
109+
database_identity: Identity,
110+
committed_state: RefCell<ReplayCommittedState<'a>>,
111+
progress: RefCell<F>,
112+
error_behavior: ErrorBehavior,
112113
}
113114

114-
impl<F> Replay<F> {
115-
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T {
116-
let mut committed_state = self.committed_state.write();
117-
let state = &mut *committed_state;
118-
let committed_state = ReplayCommittedState::new(state);
115+
impl<'a, F> Replay<'a, F> {
116+
pub fn new(
117+
database_identity: Identity,
118+
committed_state: RwLockWriteGuard<'a, CommittedState>,
119+
progress: F,
120+
error_behavior: ErrorBehavior,
121+
) -> Self {
122+
Self {
123+
database_identity,
124+
committed_state: RefCell::new(ReplayCommittedState::new(committed_state)),
125+
progress: RefCell::new(progress),
126+
error_behavior,
127+
}
128+
}
129+
130+
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, '_, F>) -> T) -> T {
119131
let mut visitor = ReplayVisitor {
120132
database_identity: &self.database_identity,
121-
committed_state,
133+
committed_state: &mut self.committed_state.borrow_mut(),
122134
progress: &mut *self.progress.borrow_mut(),
123135
dropped_table_names: IntMap::default(),
124136
error_behavior: self.error_behavior,
@@ -127,16 +139,16 @@ impl<F> Replay<F> {
127139
}
128140

129141
pub fn next_tx_offset(&self) -> u64 {
130-
self.committed_state.read_arc().next_tx_offset
142+
self.committed_state.borrow().next_tx_offset
131143
}
132144

133145
// NOTE: This is not unused.
134-
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
135-
self.committed_state.read()
146+
pub fn committed_state(&self) -> RefMut<'_, ReplayCommittedState<'a>> {
147+
self.committed_state.borrow_mut()
136148
}
137149
}
138150

139-
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
151+
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<'_, F> {
140152
type Record = txdata::Txdata<ProductValue>;
141153
type Error = txdata::DecoderError<ReplayError>;
142154

@@ -217,9 +229,9 @@ pub enum ErrorBehavior {
217229
Warn,
218230
}
219231

220-
struct ReplayVisitor<'a, F> {
232+
struct ReplayVisitor<'a, 'cs, F> {
221233
database_identity: &'a Identity,
222-
committed_state: ReplayCommittedState<'a>,
234+
committed_state: &'a mut ReplayCommittedState<'cs>,
223235
progress: &'a mut F,
224236
// Since deletes are handled before truncation / drop, sometimes the schema
225237
// info is gone. We save the name on the first delete of that table so metrics
@@ -228,7 +240,7 @@ struct ReplayVisitor<'a, F> {
228240
error_behavior: ErrorBehavior,
229241
}
230242

231-
impl<F> ReplayVisitor<'_, F> {
243+
impl<F> ReplayVisitor<'_, '_, F> {
232244
/// Process `err` according to `self.error_behavior`,
233245
/// either warning about it or returning it.
234246
///
@@ -244,7 +256,7 @@ impl<F> ReplayVisitor<'_, F> {
244256
}
245257
}
246258

247-
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
259+
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, '_, F> {
248260
type Error = ReplayError;
249261
// NOTE: Technically, this could be `()` if and when we can extract the
250262
// row data without going through `ProductValue` (PV).
@@ -392,9 +404,9 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
392404
}
393405

394406
/// A `CommittedState` under construction during replay.
395-
struct ReplayCommittedState<'cs> {
396-
/// The committed state being contructed.
397-
state: &'cs mut CommittedState,
407+
pub struct ReplayCommittedState<'cs> {
408+
/// The committed state being constructed.
409+
state: RwLockWriteGuard<'cs, CommittedState>,
398410

399411
/// Whether the table was dropped within the current transaction during replay.
400412
///
@@ -446,18 +458,18 @@ impl Deref for ReplayCommittedState<'_> {
446458
type Target = CommittedState;
447459

448460
fn deref(&self) -> &Self::Target {
449-
self.state
461+
&self.state
450462
}
451463
}
452464

453465
impl DerefMut for ReplayCommittedState<'_> {
454466
fn deref_mut(&mut self) -> &mut Self::Target {
455-
&mut *self.state
467+
&mut self.state
456468
}
457469
}
458470

459471
impl<'cs> ReplayCommittedState<'cs> {
460-
fn new(state: &'cs mut CommittedState) -> Self {
472+
fn new(state: RwLockWriteGuard<'cs, CommittedState>) -> Self {
461473
Self {
462474
state,
463475
replay_table_dropped: <_>::default(),
@@ -590,7 +602,7 @@ impl<'cs> ReplayCommittedState<'cs> {
590602
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1]))
591603
.expect("second field in `st_column` should decode to a `ColId`");
592604

593-
let outdated_st_column_rows = iter_st_column_for_table(self.state, &target_table_id.into())?
605+
let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())?
594606
.filter_map(|row_ref| {
595607
StColumnRow::try_from(row_ref)
596608
.map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer()))
@@ -620,7 +632,7 @@ impl<'cs> ReplayCommittedState<'cs> {
620632
// and not the other one, as it is being replaced.
621633
// `Self::ignore_previous_version_of_column` has marked the old version as ignored,
622634
// so filter only the non-ignored columns.
623-
let mut columns = iter_st_column_for_table(self.state, &table_id.into())?
635+
let mut columns = iter_st_column_for_table(self, &table_id.into())?
624636
.filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer()))
625637
.map(|row_ref| {
626638
let row = StColumnRow::try_from(row_ref)?;
@@ -844,7 +856,7 @@ mod tests {
844856
// Directly call replay_insert on committed state.
845857
let row = u32_str_u32(1, "Carol", 40);
846858
{
847-
let state = &mut *datastore.committed_state.write();
859+
let state = datastore.committed_state.write();
848860
let mut committed_state = ReplayCommittedState::new(state);
849861
committed_state.replay_insert(table_id, &schema, &row)?;
850862
}

0 commit comments

Comments
 (0)