Skip to content

Commit 00d6824

Browse files
committed
move apply_history to mod replay
1 parent 91494c9 commit 00d6824

3 files changed

Lines changed: 86 additions & 62 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 14 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1919
};
20-
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
2121
use spacetimedb_datastore::system_tables::{
2222
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2323
};
@@ -1617,62 +1617,20 @@ impl RelationalDB {
16171617
}
16181618
}
16191619

1620-
fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
1621-
where
1622-
H: durability::History<TxData = Txdata>,
1623-
{
1624-
log::info!("[{database_identity}] DATABASE: applying transaction history...");
1625-
1626-
// TODO: Revisit once we actually replay history suffixes, ie. starting
1627-
// from an offset larger than the history's min offset.
1628-
// TODO: We may want to require that a `tokio::runtime::Handle` is
1629-
// always supplied when constructing a `RelationalDB`. This would allow
1630-
// to spawn a timer task here which just prints the progress periodically
1631-
// in case the history is finite but very long.
1632-
let (_, max_tx_offset) = history.tx_range_hint();
1633-
let mut last_logged_percentage = 0;
1634-
let progress = |tx_offset: u64| {
1635-
if let Some(max_tx_offset) = max_tx_offset {
1636-
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
1637-
if percentage > last_logged_percentage && percentage % 10 == 0 {
1638-
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
1639-
last_logged_percentage = percentage;
1640-
}
1641-
// Print _something_ even if we don't know what's still ahead.
1642-
} else if tx_offset.is_multiple_of(10_000) {
1643-
log::info!("[{database_identity}] Loading transaction {tx_offset}");
1644-
}
1620+
fn apply_history(
1621+
datastore: &Locking,
1622+
database_identity: Identity,
1623+
history: impl durability::History<TxData = Txdata>,
1624+
) -> Result<(), DBError> {
1625+
let counters = ApplyHistoryCounters {
1626+
replay_commitlog_time_seconds: WORKER_METRICS
1627+
.replay_commitlog_time_seconds
1628+
.with_label_values(&database_identity),
1629+
replay_commitlog_num_commits: WORKER_METRICS
1630+
.replay_commitlog_num_commits
1631+
.with_label_values(&database_identity),
16451632
};
1646-
1647-
let time_before = std::time::Instant::now();
1648-
1649-
let mut replay = datastore.replay(
1650-
progress,
1651-
// We don't want to instantiate an incorrect state;
1652-
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
1653-
spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast,
1654-
);
1655-
let start_tx_offset = replay.next_tx_offset();
1656-
history
1657-
.fold_transactions_from(start_tx_offset, &mut replay)
1658-
.map_err(anyhow::Error::from)?;
1659-
1660-
let time_elapsed = time_before.elapsed();
1661-
WORKER_METRICS
1662-
.replay_commitlog_time_seconds
1663-
.with_label_values(&database_identity)
1664-
.set(time_elapsed.as_secs_f64());
1665-
1666-
let end_tx_offset = replay.next_tx_offset();
1667-
WORKER_METRICS
1668-
.replay_commitlog_num_commits
1669-
.with_label_values(&database_identity)
1670-
.set((end_tx_offset - start_tx_offset) as _);
1671-
1672-
log::info!("[{database_identity}] DATABASE: applied transaction history");
1673-
datastore.rebuild_state_after_replay()?;
1674-
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
1675-
1633+
spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
16761634
Ok(())
16771635
}
16781636

crates/datastore/src/locking_tx_datastore/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub mod state_view;
99
pub use state_view::{IterByColEqTx, IterByColRangeTx};
1010
pub mod delete_table;
1111
mod replay;
12-
pub use replay::{ErrorBehavior, Replay};
12+
pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay};
1313
mod tx;
1414
pub use tx::{NumDistinctValues, TxId};
1515
mod tx_state;

crates/datastore/src/locking_tx_datastore/replay.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
use super::committed_state::CommittedState;
2-
use super::datastore::Result;
2+
use super::datastore::{Locking, Result};
33
use crate::db_metrics::DB_METRICS;
44
use crate::error::{IndexError, TableError};
55
use crate::locking_tx_datastore::datastore::ReplayError;
6-
use crate::locking_tx_datastore::state_view::iter_st_column_for_table;
7-
use crate::locking_tx_datastore::state_view::StateView;
8-
use crate::system_tables::{is_built_in_meta_row, StFields as _};
9-
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
6+
use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView};
7+
use crate::system_tables::{
8+
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
9+
};
1010
use anyhow::{anyhow, Context};
1111
use core::ops::{Deref, DerefMut, RangeBounds};
1212
use parking_lot::{RwLock, RwLockReadGuard};
13+
use prometheus::core::{AtomicF64, GenericGauge};
14+
use prometheus::IntGauge;
1315
use spacetimedb_commitlog::payload::txdata;
1416
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
17+
use spacetimedb_durability::History;
18+
use spacetimedb_durability::Txdata;
1519
use spacetimedb_lib::Identity;
1620
use spacetimedb_primitives::{ColId, ColList, TableId};
1721
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
@@ -24,6 +28,68 @@ use spacetimedb_table::table::{InsertError, RowRef};
2428
use std::cell::RefCell;
2529
use std::sync::Arc;
2630

31+
pub fn apply_history(
32+
datastore: &Locking,
33+
database_identity: Identity,
34+
history: impl History<TxData = Txdata<ProductValue>>,
35+
counters: ApplyHistoryCounters,
36+
) -> Result<()> {
37+
log::info!("[{database_identity}] DATABASE: applying transaction history...");
38+
39+
// TODO: Revisit once we actually replay history suffixes, ie. starting
40+
// from an offset larger than the history's min offset.
41+
// TODO: We may want to require that a `tokio::runtime::Handle` is
42+
// always supplied when constructing a `RelationalDB`. This would allow
43+
// to spawn a timer task here which just prints the progress periodically
44+
// in case the history is finite but very long.
45+
let (_, max_tx_offset) = history.tx_range_hint();
46+
let mut last_logged_percentage = 0;
47+
let progress = |tx_offset: u64| {
48+
if let Some(max_tx_offset) = max_tx_offset {
49+
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
50+
if percentage > last_logged_percentage && percentage % 10 == 0 {
51+
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
52+
last_logged_percentage = percentage;
53+
}
54+
// Print _something_ even if we don't know what's still ahead.
55+
} else if tx_offset.is_multiple_of(10_000) {
56+
log::info!("[{database_identity}] Loading transaction {tx_offset}");
57+
}
58+
};
59+
60+
let time_before = std::time::Instant::now();
61+
62+
let mut replay = datastore.replay(
63+
progress,
64+
// We don't want to instantiate an incorrect state;
65+
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
66+
ErrorBehavior::FailFast,
67+
);
68+
let start_tx_offset = replay.next_tx_offset();
69+
history
70+
.fold_transactions_from(start_tx_offset, &mut replay)
71+
.map_err(anyhow::Error::from)?;
72+
73+
let time_elapsed = time_before.elapsed();
74+
counters.replay_commitlog_time_seconds.set(time_elapsed.as_secs_f64());
75+
76+
let end_tx_offset = replay.next_tx_offset();
77+
counters
78+
.replay_commitlog_num_commits
79+
.set((end_tx_offset - start_tx_offset) as _);
80+
81+
log::info!("[{database_identity}] DATABASE: applied transaction history");
82+
datastore.rebuild_state_after_replay()?;
83+
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
84+
85+
Ok(())
86+
}
87+
88+
pub struct ApplyHistoryCounters {
89+
pub replay_commitlog_time_seconds: GenericGauge<AtomicF64>,
90+
pub replay_commitlog_num_commits: IntGauge,
91+
}
92+
2793
/// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction
2894
/// history into the database state.
2995
pub struct Replay<F> {

0 commit comments

Comments
 (0)