Skip to content

Commit d8a4e40

Browse files
authored
Merge pull request #1683 from tursodatabase/disable-foreign-key-in-migrations
disable foreign keys in schema migration
2 parents 7e1accd + 1e809f2 commit d8a4e40

9 files changed

Lines changed: 79 additions & 77 deletions

File tree

libsql-server/src/database/schema.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@ impl<C: crate::connection::Connection> crate::connection::Connection for SchemaC
5151
} else {
5252
check_program_auth(&ctx, &migration, &self.config.get()).await?;
5353
let connection = self.connection.clone();
54-
validate_migration(&mut migration)?;
54+
let disable_foreign_key = validate_migration(&mut migration)?;
5555
let migration = Arc::new(migration);
5656
let builder = tokio::task::spawn_blocking({
5757
let migration = migration.clone();
5858
move || {
5959
let res = connection.with_raw(|conn| -> crate::Result<_> {
60+
if disable_foreign_key {
61+
conn.execute("PRAGMA foreign_keys=off", ())?;
62+
}
6063
let mut txn = conn
6164
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
6265
.map_err(|_| {
@@ -73,6 +76,9 @@ impl<C: crate::connection::Connection> crate::connection::Connection for SchemaC
7376
&QueryBuilderConfig::default(),
7477
);
7578
txn.rollback().unwrap();
79+
if disable_foreign_key {
80+
conn.execute("PRAGMA foreign_keys=on", ())?;
81+
}
7682
Ok(ret?)
7783
});
7884

libsql-server/src/query_analysis.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,14 @@ impl Statement {
406406
StmtKind::Read | StmtKind::TxnEnd | StmtKind::TxnBegin
407407
)
408408
}
409+
410+
pub(crate) fn is_pragma(&self) -> bool {
411+
// adding a flag to the program would break the serialization, so we do that instead
412+
match self.stmt.split_whitespace().next() {
413+
Some(s) => s.trim().eq_ignore_ascii_case("pragma"),
414+
None => false,
415+
}
416+
}
409417
}
410418

411419
/// Given a an initial state and an array of queries, attempts to predict what the final state will

libsql-server/src/schema/db.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::namespace::NamespaceName;
99
use crate::schema::status::{MigrationJobProgress, MigrationJobSummary};
1010

1111
use super::status::MigrationProgress;
12+
use super::validate_migration;
1213
use super::{
1314
status::{MigrationJob, MigrationTask},
1415
Error, MigrationDetails, MigrationJobStatus, MigrationSummary, MigrationTaskStatus,
@@ -328,15 +329,17 @@ pub(super) fn get_next_pending_migration_job(
328329
|row| {
329330
let job_id = row.get::<_, i64>(0)?;
330331
let status = MigrationJobStatus::from_int(row.get::<_, u64>(1)?);
331-
let migration = serde_json::from_str(row.get_ref(2)?.as_str()?).unwrap();
332+
let mut migration = serde_json::from_str(row.get_ref(2)?.as_str()?).unwrap();
332333
let schema = NamespaceName::from_string(row.get::<_, String>(3)?).unwrap();
334+
let disable_foreign_key = validate_migration(&mut migration).unwrap();
333335
Ok(MigrationJob {
334336
schema,
335337
job_id,
336338
status,
337-
migration,
338339
progress: Default::default(),
339340
task_error: None,
341+
disable_foreign_key,
342+
migration: migration.into(),
340343
})
341344
},
342345
)

libsql-server/src/schema/mod.rs

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,37 +47,47 @@ pub use scheduler::Scheduler;
4747
pub use status::{MigrationDetails, MigrationJobStatus, MigrationSummary, MigrationTaskStatus};
4848

4949
use crate::connection::program::Program;
50-
use crate::query::{Params, Query};
51-
use crate::query_analysis::{Statement, StmtKind};
50+
use crate::query_analysis::StmtKind;
5251

53-
pub fn validate_migration(migration: &mut Program) -> Result<(), Error> {
54-
if !migration.steps.is_empty()
55-
&& matches!(migration.steps[0].query.stmt.kind, StmtKind::TxnBegin)
56-
{
57-
if !matches!(
58-
migration.steps.last().map(|s| &s.query.stmt.kind),
59-
Some(&StmtKind::TxnEnd)
60-
) {
61-
return Err(Error::MigrationContainsTransactionStatements);
52+
// validate program is valid for migration, and return whether foreign keys should be disabled
53+
pub fn validate_migration(migration: &mut Program) -> Result<bool, Error> {
54+
let mut steps = migration.steps_mut().unwrap().iter_mut().peekable();
55+
let mut explicit_tx = false;
56+
let mut disable_foreign_key = false;
57+
// skip pragmas prologue
58+
while steps.next_if(|s| s.query.stmt.is_pragma()).is_some() {
59+
disable_foreign_key = true;
60+
}
61+
62+
// first step can be a BEGIN
63+
if let Some(step) = steps.next() {
64+
if matches!(step.query.stmt.kind, StmtKind::TxnBegin) {
65+
// neutralize step
66+
step.query.stmt.stmt = r#"SELECT 'neutralized txn begin'"#.into();
67+
explicit_tx = true;
6268
}
63-
migration.steps_mut().unwrap()[0].query = Query {
64-
stmt: Statement::parse("PRAGMA max_page_count")
65-
.next()
66-
.unwrap()
67-
.unwrap(),
68-
params: Params::empty(),
69-
want_rows: false,
70-
};
71-
while let Some(step) = migration.steps.last() {
72-
if !matches!(step.query.stmt.kind, StmtKind::TxnEnd) {
73-
break;
69+
}
70+
71+
// skip all steps that are not tx items
72+
while steps.next_if(|s| !s.query.stmt.kind.is_txn()).is_some() {}
73+
74+
// last stmt can be a tx commit
75+
while let Some(step) = steps.next_if(|s| s.query.stmt.kind.is_txn()) {
76+
if matches!(step.query.stmt.kind, StmtKind::TxnEnd) {
77+
if !explicit_tx {
78+
// transaction is closed but was never opened
79+
return Err(Error::MigrationContainsTransactionStatements);
7480
}
75-
migration.steps_mut().unwrap().pop();
81+
// neutralize step
82+
step.query.stmt.stmt = r#"SELECT 'neutralized txn component'"#.into();
7683
}
7784
}
78-
if migration.steps().iter().any(|s| s.query.stmt.kind.is_txn()) {
79-
Err(Error::MigrationContainsTransactionStatements)
80-
} else {
81-
Ok(())
85+
86+
// validate pragma epilogue
87+
if steps.by_ref().any(|s| !s.query.stmt.is_pragma()) {
88+
// only accept pragmas after tx end
89+
return Err(Error::MigrationContainsTransactionStatements);
8290
}
91+
92+
Ok(disable_foreign_key)
8393
}

libsql-server/src/schema/scheduler.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ impl Scheduler {
272272
job.job_id(),
273273
self.namespace_store.clone(),
274274
self.migration_db.clone(),
275+
job.disable_foreign_key,
275276
));
276277
// do not enqueue anything until the schema migration is complete
277278
self.has_work = false;
@@ -374,6 +375,7 @@ impl Scheduler {
374375
job.migration.clone(),
375376
task,
376377
block_writes,
378+
job.disable_foreign_key,
377379
));
378380
} else {
379381
// there is still a job, but the queue is empty, it means that we are waiting for the
@@ -434,6 +436,7 @@ async fn try_step_task(
434436
migration: Arc<Program>,
435437
mut task: MigrationTask,
436438
block_writes: Arc<AtomicBool>,
439+
disable_foreign_key: bool,
437440
) -> WorkResult {
438441
let old_status = *task.status();
439442
let error = match try_step_task_inner(
@@ -443,6 +446,7 @@ async fn try_step_task(
443446
migration,
444447
&task,
445448
block_writes,
449+
disable_foreign_key,
446450
)
447451
.await
448452
{
@@ -485,6 +489,7 @@ async fn try_step_task_inner(
485489
migration: Arc<Program>,
486490
task: &MigrationTask,
487491
block_writes: Arc<AtomicBool>,
492+
disable_foreign_key: bool,
488493
) -> Result<(MigrationTaskStatus, Option<String>), Error> {
489494
let status = *task.status();
490495
let mut db_connection = connection_maker
@@ -508,6 +513,9 @@ async fn try_step_task_inner(
508513
let job_id = task.job_id();
509514
let (status, error) = tokio::task::spawn_blocking(move || -> Result<_, Error> {
510515
db_connection.with_raw(move |conn| {
516+
if disable_foreign_key {
517+
conn.execute("PRAGMA foreign_keys=off", ())?;
518+
}
511519
let mut txn = conn.transaction()?;
512520

513521
match status {
@@ -526,6 +534,10 @@ async fn try_step_task_inner(
526534
let (new_status, error) = step_task(&mut txn, job_id)?;
527535
txn.commit()?;
528536

537+
if disable_foreign_key {
538+
conn.execute("PRAGMA foreign_keys=off", ())?;
539+
}
540+
529541
if new_status.is_finished() {
530542
block_writes.store(false, std::sync::atomic::Ordering::SeqCst);
531543
}
@@ -737,6 +749,7 @@ async fn step_job_run_success(
737749
job_id: i64,
738750
namespace_store: NamespaceStore,
739751
migration_db: Arc<Mutex<MetaStoreConnection>>,
752+
disable_foreign_key: bool,
740753
) -> WorkResult {
741754
try_step_job(MigrationJobStatus::WaitingRun, async move {
742755
// TODO: check that all tasks actually reported success before migration
@@ -757,6 +770,9 @@ async fn step_job_run_success(
757770
.map_err(|e| Error::FailedToConnect(schema.clone(), e.into()))?;
758771
tokio::task::spawn_blocking(move || -> Result<(), Error> {
759772
connection.with_raw(|conn| -> Result<(), Error> {
773+
if disable_foreign_key {
774+
conn.execute("PRAGMA foreign_keys=off", ())?;
775+
}
760776
let mut txn = conn.transaction()?;
761777
let schema_version =
762778
txn.query_row("PRAGMA schema_version", (), |row| row.get::<_, i64>(0))?;
@@ -774,6 +790,9 @@ async fn step_job_run_success(
774790
txn.pragma_update(None, "schema_version", job_id)?;
775791
// update schema version to job_id?
776792
txn.commit()?;
793+
if disable_foreign_key {
794+
conn.execute("PRAGMA foreign_keys=on", ())?;
795+
}
777796
}
778797

779798
Ok(())

libsql-server/src/schema/snapshots/libsql_server__schema__db__test__pending_job-3.snap

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ MigrationJob {
3535
0,
3636
],
3737
task_error: None,
38+
disable_foreign_key: false,
3839
}

libsql-server/src/schema/snapshots/libsql_server__schema__db__test__pending_job.snap

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ MigrationJob {
3535
0,
3636
],
3737
task_error: None,
38+
disable_foreign_key: false,
3839
}

libsql-server/src/schema/status.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub struct MigrationJob {
4545
pub(super) progress: MigrationProgress,
4646
/// error info for the task that failed the job
4747
pub(super) task_error: Option<(i64, String, NamespaceName)>,
48+
pub(super) disable_foreign_key: bool,
4849
}
4950

5051
impl MigrationJob {

libsql-server/tests/namespaces/shared_schema.rs

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -216,53 +216,6 @@ fn no_job_created_when_migration_job_is_invalid() {
216216
sim.run().unwrap();
217217
}
218218

219-
#[test]
220-
fn migration_contains_txn_statements() {
221-
let mut sim = Builder::new()
222-
.simulation_duration(Duration::from_secs(100000))
223-
.build();
224-
let tmp = tempdir().unwrap();
225-
make_primary(&mut sim, tmp.path().to_path_buf());
226-
227-
sim.client("client", async {
228-
let client = Client::new();
229-
client
230-
.post(
231-
"http://primary:9090/v1/namespaces/schema/create",
232-
json!({"shared_schema": true }),
233-
)
234-
.await
235-
.unwrap();
236-
237-
let schema_db = Database::open_remote_with_connector(
238-
"http://schema.primary:8080",
239-
String::new(),
240-
TurmoilConnector,
241-
)
242-
.unwrap();
243-
let schema_conn = schema_db.connect().unwrap();
244-
schema_conn
245-
.execute_batch("begin; create table test1 (c);commit")
246-
.await
247-
.unwrap();
248-
assert_debug_snapshot!(schema_conn
249-
.execute_batch("begin; create table test (c)")
250-
.await
251-
.unwrap_err());
252-
253-
let resp = client
254-
.get("http://schema.primary:8080/v1/jobs/2")
255-
.await
256-
.unwrap();
257-
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
258-
assert_debug_snapshot!(resp.json_value().await.unwrap());
259-
260-
Ok(())
261-
});
262-
263-
sim.run().unwrap();
264-
}
265-
266219
#[test]
267220
fn dry_run_failure() {
268221
let mut sim = Builder::new()

0 commit comments

Comments
 (0)