Skip to content

Commit 2014773

Browse files
authored
Merge pull request #1721 from Shopify/jn-fix-replicated-frames-synced
Only add to last_frames_synced if frames did change.
2 parents bdb9f20 + 379fe7a commit 2014773

2 files changed

Lines changed: 138 additions & 6 deletions

File tree

  • libsql-server/tests/embedded_replica
  • libsql/src/replication

libsql-server/tests/embedded_replica/mod.rs

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1359,7 +1359,7 @@ fn replicated_return() {
13591359

13601360
let rep = db.sync().await.unwrap();
13611361
assert_eq!(rep.frame_no(), Some(4));
1362-
assert_eq!(rep.frames_synced(), 3);
1362+
assert_eq!(rep.frames_synced(), 5);
13631363

13641364
let mut row = conn.query("select count(*) from user", ()).await.unwrap();
13651365
let count = row.next().await.unwrap().unwrap().get::<u64>(0).unwrap();
@@ -1518,3 +1518,122 @@ fn replicate_auth() {
15181518

15191519
sim.run().unwrap();
15201520
}
1521+
1522+
#[test]
1523+
fn replicated_synced_frames_zero_when_no_data_synced() {
1524+
let tmp_embedded = tempdir().unwrap();
1525+
let tmp_embedded_path = tmp_embedded.path().to_owned();
1526+
1527+
let mut sim = Builder::new()
1528+
.simulation_duration(Duration::from_secs(1000))
1529+
.build();
1530+
let tmp = tempdir().unwrap();
1531+
1532+
let notify = Arc::new(Notify::new());
1533+
let notify_clone = notify.clone();
1534+
1535+
init_tracing();
1536+
sim.host("primary", move || {
1537+
let notify = notify_clone.clone();
1538+
let path = tmp.path().to_path_buf();
1539+
async move {
1540+
let make_server = || async {
1541+
TestServer {
1542+
path: path.clone().into(),
1543+
user_api_config: UserApiConfig {
1544+
..Default::default()
1545+
},
1546+
admin_api_config: Some(AdminApiConfig {
1547+
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
1548+
connector: TurmoilConnector,
1549+
disable_metrics: true,
1550+
auth_key: None,
1551+
}),
1552+
rpc_server_config: Some(RpcServerConfig {
1553+
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
1554+
tls_config: None,
1555+
}),
1556+
..Default::default()
1557+
}
1558+
};
1559+
1560+
let server = make_server().await;
1561+
let shutdown = server.shutdown.clone();
1562+
1563+
let fut = async move { server.start_sim(8080).await };
1564+
1565+
tokio::pin!(fut);
1566+
1567+
loop {
1568+
tokio::select! {
1569+
res = &mut fut => {
1570+
res.unwrap();
1571+
break
1572+
}
1573+
_ = notify.notified() => {
1574+
shutdown.notify_waiters();
1575+
},
1576+
}
1577+
}
1578+
1579+
drop(fut);
1580+
1581+
tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel"))
1582+
.await
1583+
.unwrap();
1584+
1585+
notify.notify_waiters();
1586+
let server = make_server().await;
1587+
server.start_sim(8080).await.unwrap();
1588+
1589+
Ok(())
1590+
}
1591+
});
1592+
1593+
sim.client("client", async move {
1594+
let path = tmp_embedded_path.join("embedded");
1595+
let db = Database::open_with_remote_sync_connector(
1596+
path.to_str().unwrap(),
1597+
"http://primary:8080",
1598+
"",
1599+
TurmoilConnector,
1600+
false,
1601+
None,
1602+
)
1603+
.await?;
1604+
1605+
let rep = db.sync().await.unwrap();
1606+
assert_eq!(rep.frame_no(), None);
1607+
assert_eq!(rep.frames_synced(), 0);
1608+
1609+
let conn = db.connect()?;
1610+
1611+
conn.execute("CREATE TABLE user (id INTEGER)", ())
1612+
.await
1613+
.unwrap();
1614+
1615+
let rep = db.sync().await.unwrap();
1616+
assert_eq!(rep.frame_no(), Some(1));
1617+
assert_eq!(rep.frames_synced(), 2);
1618+
1619+
conn.execute("INSERT into user(id) values (randomblob(4096));", ())
1620+
.await
1621+
.unwrap();
1622+
1623+
let rep = db.sync().await.unwrap();
1624+
assert_eq!(rep.frame_no(), Some(4));
1625+
assert_eq!(rep.frames_synced(), 3);
1626+
1627+
let rep = db.sync().await.unwrap();
1628+
assert_eq!(rep.frame_no(), Some(4));
1629+
assert_eq!(rep.frames_synced(), 0);
1630+
1631+
let rep = db.sync().await.unwrap();
1632+
assert_eq!(rep.frame_no(), Some(4));
1633+
assert_eq!(rep.frames_synced(), 0);
1634+
1635+
Ok(())
1636+
});
1637+
1638+
sim.run().unwrap();
1639+
}

libsql/src/replication/mod.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,26 @@ impl EmbeddedReplicator {
256256
}
257257
}
258258

259-
let last_frames_synced = self.last_frames_synced.fetch_add(
260-
replicator.frames_synced(),
261-
std::sync::atomic::Ordering::Relaxed,
262-
);
259+
let current_frames_synced = replicator.frames_synced();
260+
261+
let mut last_frames_synced = self
262+
.last_frames_synced
263+
.load(std::sync::atomic::Ordering::Relaxed);
264+
265+
while current_frames_synced > last_frames_synced {
266+
match self.last_frames_synced.compare_exchange(
267+
last_frames_synced,
268+
current_frames_synced,
269+
std::sync::atomic::Ordering::Relaxed,
270+
std::sync::atomic::Ordering::Relaxed,
271+
) {
272+
Ok(_) => break,
273+
Err(current_value) => last_frames_synced = current_value,
274+
}
275+
}
263276

264277
let frames_synced =
265-
((replicator.frames_synced() as i64 - last_frames_synced as i64).abs()) as usize;
278+
((current_frames_synced as i64 - last_frames_synced as i64).abs()) as usize;
266279

267280
let replicated = Replicated {
268281
frame_no: replicator.client_mut().committed_frame_no(),

0 commit comments

Comments
 (0)