Skip to content

Commit 129a4e8

Browse files
committed
replay: don't make ReplayCommittedState again and again
1 parent 353ae76 commit 129a4e8

2 files changed

Lines changed: 45 additions & 36 deletions

File tree

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
},
2525
};
2626
use anyhow::anyhow;
27-
use core::{cell::RefCell, ops::RangeBounds};
27+
use core::ops::RangeBounds;
2828
use parking_lot::{Mutex, RwLock};
2929
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
3030
use spacetimedb_durability::TxOffset;
@@ -170,13 +170,9 @@ impl Locking {
170170
/// The provided closure will be called for each transaction found in the
171171
/// history, the parameter is the transaction's offset. The closure is called
172172
/// _before_ the transaction is applied to the database state.
173-
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> {
174-
Replay {
175-
database_identity: self.database_identity,
176-
committed_state: self.committed_state.clone(),
177-
progress: RefCell::new(progress),
178-
error_behavior,
179-
}
173+
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
174+
let committed_state = self.committed_state.write();
175+
Replay::new(self.database_identity, committed_state, progress, error_behavior)
180176
}
181177

182178
/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.

crates/datastore/src/locking_tx_datastore/replay.rs

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use crate::system_tables::{
77
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
88
};
99
use anyhow::{anyhow, Context};
10+
use core::cell::RefMut;
1011
use core::ops::{Deref, DerefMut, RangeBounds};
11-
use parking_lot::{RwLock, RwLockReadGuard};
12+
use parking_lot::RwLockWriteGuard;
1213
use prometheus::core::{AtomicF64, GenericGauge};
1314
use prometheus::IntGauge;
1415
use spacetimedb_commitlog::payload::txdata;
@@ -79,6 +80,7 @@ pub fn apply_history(
7980
.set((end_tx_offset - start_tx_offset) as _);
8081

8182
log::info!("[{database_identity}] DATABASE: applied transaction history");
83+
drop(replay); // Neccessary to avoid a deadlock.
8284
datastore.rebuild_state_after_replay()?;
8385
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
8486

@@ -104,21 +106,32 @@ pub enum ReplayError {
104106

105107
/// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction
106108
/// history into the database state.
107-
pub struct Replay<F> {
108-
pub(super) database_identity: Identity,
109-
pub(super) committed_state: Arc<RwLock<CommittedState>>,
110-
pub(super) progress: RefCell<F>,
111-
pub(super) error_behavior: ErrorBehavior,
109+
pub struct Replay<'a, F> {
110+
database_identity: Identity,
111+
committed_state: RefCell<ReplayCommittedState<'a>>,
112+
progress: RefCell<F>,
113+
error_behavior: ErrorBehavior,
112114
}
113115

114-
impl<F> Replay<F> {
115-
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T {
116-
let mut committed_state = self.committed_state.write();
117-
let state = &mut *committed_state;
118-
let committed_state = ReplayCommittedState::new(state);
116+
impl<'a, F> Replay<'a, F> {
117+
pub fn new(
118+
database_identity: Identity,
119+
committed_state: RwLockWriteGuard<'a, CommittedState>,
120+
progress: F,
121+
error_behavior: ErrorBehavior,
122+
) -> Self {
123+
Self {
124+
database_identity,
125+
committed_state: RefCell::new(ReplayCommittedState::new(committed_state)),
126+
progress: RefCell::new(progress),
127+
error_behavior,
128+
}
129+
}
130+
131+
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, '_, F>) -> T) -> T {
119132
let mut visitor = ReplayVisitor {
120133
database_identity: &self.database_identity,
121-
committed_state,
134+
committed_state: &mut self.committed_state.borrow_mut(),
122135
progress: &mut *self.progress.borrow_mut(),
123136
dropped_table_names: IntMap::default(),
124137
error_behavior: self.error_behavior,
@@ -127,16 +140,16 @@ impl<F> Replay<F> {
127140
}
128141

129142
pub fn next_tx_offset(&self) -> u64 {
130-
self.committed_state.read_arc().next_tx_offset
143+
self.committed_state.borrow().next_tx_offset
131144
}
132145

133146
// NOTE: This is not unused.
134-
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
135-
self.committed_state.read()
147+
pub fn committed_state(&self) -> RefMut<'_, ReplayCommittedState<'a>> {
148+
self.committed_state.borrow_mut()
136149
}
137150
}
138151

139-
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
152+
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<'_, F> {
140153
type Record = txdata::Txdata<ProductValue>;
141154
type Error = txdata::DecoderError<ReplayError>;
142155

@@ -217,9 +230,9 @@ pub enum ErrorBehavior {
217230
Warn,
218231
}
219232

220-
struct ReplayVisitor<'a, F> {
233+
struct ReplayVisitor<'a, 'cs, F> {
221234
database_identity: &'a Identity,
222-
committed_state: ReplayCommittedState<'a>,
235+
committed_state: &'a mut ReplayCommittedState<'cs>,
223236
progress: &'a mut F,
224237
// Since deletes are handled before truncation / drop, sometimes the schema
225238
// info is gone. We save the name on the first delete of that table so metrics
@@ -228,7 +241,7 @@ struct ReplayVisitor<'a, F> {
228241
error_behavior: ErrorBehavior,
229242
}
230243

231-
impl<F> ReplayVisitor<'_, F> {
244+
impl<F> ReplayVisitor<'_, '_, F> {
232245
/// Process `err` according to `self.error_behavior`,
233246
/// either warning about it or returning it.
234247
///
@@ -244,7 +257,7 @@ impl<F> ReplayVisitor<'_, F> {
244257
}
245258
}
246259

247-
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
260+
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, '_, F> {
248261
type Error = ReplayError;
249262
// NOTE: Technically, this could be `()` if and when we can extract the
250263
// row data without going through `ProductValue` (PV).
@@ -392,9 +405,9 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
392405
}
393406

394407
/// A `CommittedState` under construction during replay.
395-
struct ReplayCommittedState<'cs> {
396-
/// The committed state being contructed.
397-
state: &'cs mut CommittedState,
408+
pub struct ReplayCommittedState<'cs> {
409+
/// The committed state being constructed.
410+
state: RwLockWriteGuard<'cs, CommittedState>,
398411

399412
/// Whether the table was dropped within the current transaction during replay.
400413
///
@@ -439,25 +452,25 @@ struct ReplayCommittedState<'cs> {
439452
///
440453
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
441454
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
442-
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
455+
replay_table_updated: IntMap<TableId, RowPointer>,
443456
}
444457

445458
impl Deref for ReplayCommittedState<'_> {
446459
type Target = CommittedState;
447460

448461
fn deref(&self) -> &Self::Target {
449-
self.state
462+
&self.state
450463
}
451464
}
452465

453466
impl DerefMut for ReplayCommittedState<'_> {
454467
fn deref_mut(&mut self) -> &mut Self::Target {
455-
&mut *self.state
468+
&mut self.state
456469
}
457470
}
458471

459472
impl<'cs> ReplayCommittedState<'cs> {
460-
fn new(state: &'cs mut CommittedState) -> Self {
473+
fn new(state: RwLockWriteGuard<'cs, CommittedState>) -> Self {
461474
Self {
462475
state,
463476
replay_table_dropped: <_>::default(),
@@ -844,7 +857,7 @@ mod tests {
844857
// Directly call replay_insert on committed state.
845858
let row = u32_str_u32(1, "Carol", 40);
846859
{
847-
let state = &mut *datastore.committed_state.write();
860+
let state = datastore.committed_state.write();
848861
let mut committed_state = ReplayCommittedState::new(state);
849862
committed_state.replay_insert(table_id, &schema, &row)?;
850863
}

0 commit comments

Comments
 (0)