Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion lib/model/Model.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { clone } = require('./modelClone');
const { bindKnex } = require('./modelBindKnex');
const { validate } = require('./modelValidate');
const { isMsSql } = require('../utils/knexUtils');
const { waitForRunAfterTransaction } = require('../transaction');
const { visitModels } = require('./modelVisitor');
const { hasId, getSetId } = require('./modelId');
const { map: promiseMap } = require('../utils/promiseUtils');
Expand All @@ -13,6 +14,7 @@ const { defineNonEnumerableProperty } = require('./modelUtils');
const { parseRelationsIntoModelInstances } = require('./modelParseRelations');
const { fetchTableMetadata, tableMetadata } = require('./modelTableMetadata');
const { asArray, isFunction, isString, asSingle } = require('../utils/objectUtils');
const promiseUtils = require('../utils/promiseUtils');
const { setJson, setFast, setRelated, appendRelated, setDatabaseJson } = require('./modelSet');
const {
getJsonAttributes,
Expand Down Expand Up @@ -526,7 +528,22 @@ class Model {
knexOrTrx = null;
}

return (knexOrTrx || this.knex()).transaction(cb);
return (knexOrTrx || this.knex()).transaction((trx) => {
trx.runAfterTransactionMethods = [];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extending trx object to save runAfterTransactionMethods and afterTransationMethodsBatch helpers.

trx.afterTransactionMethodsBatch = trx.afterTransactionMethodsBatch || 10;

return promiseUtils
.try(() => {
return cb(trx);
})
.then((result) =>
waitForRunAfterTransaction(
0,
trx.runAfterTransactionMethods,
trx.afterTransactionMethodsBatch,
).then(() => result),
);
});
}

static startTransaction(knexOrTrx) {
Expand Down
66 changes: 63 additions & 3 deletions lib/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ function transaction() {
}

return knex.transaction((trx) => {
trx.runAfterTransactionMethods = [];
trx.afterTransactionMethodsBatch = trx.afterTransactionMethodsBatch || 10;
let args = new Array(modelClasses.length + 1);

for (let i = 0; i < modelClasses.length; ++i) {
Expand All @@ -54,9 +56,17 @@ function transaction() {

args[args.length - 1] = trx;

return promiseUtils.try(() => {
return callback.apply(trx, args);
});
return promiseUtils
.try(() => {
return callback.apply(trx, args);
})
.then((result) =>
waitForRunAfterTransaction(
0,
trx.runAfterTransactionMethods,
trx.afterTransactionMethodsBatch,
).then(() => result),
);
});
}
}
Expand Down Expand Up @@ -91,6 +101,56 @@ function isModelClass(maybeModel) {
return isFunction(maybeModel) && maybeModel.isObjectionModelClass;
}

async function waitForRunAfterTransaction(
initialIndex = 0,
runAfterTransactionMethods,
afterTransactionMethodsBatch,
failed = false,
) {
if (runAfterTransactionMethods.length > 0) {
const promises = [];

for (let index = initialIndex; index < runAfterTransactionMethods.length; index++) {
promises.push(
promiseUtils.try(() => {
return runAfterTransactionMethods[index]();
}),
);
if (index === runAfterTransactionMethods.length - 1) {
return Promise.all(promises)
.then(() => {
if (failed) {
return Promise.reject(new Error('Run After transaction failed.'));
}
})
.catch(() => {
return Promise.reject(new Error('Run After transaction failed.'));
});
}
if ((index + 1) % afterTransactionMethodsBatch === 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as there could be many methods to be executed after transaction, better to do this operation in batches. In order to prevent thousands of promises in parallel that consume more resources that are available

return Promise.all(promises)
.then(() => {
return waitForRunAfterTransaction(
index + 1,
runAfterTransactionMethods,
afterTransactionMethodsBatch,
false,
);
})
.catch(() => {
return waitForRunAfterTransaction(
index + 1,
runAfterTransactionMethods,
afterTransactionMethodsBatch,
true,
);
});
}
}
}
}

module.exports = {
transaction,
waitForRunAfterTransaction,
};
61 changes: 61 additions & 0 deletions tests/integration/transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,34 @@ module.exports = (session) => {
expect(rows[0].model2_prop1).to.equal('test 3');
});

it('should resolve runAfterTransaction methods', async () => {
let runAfterTransactionMethodsExecuted = false;
const result = await Model1.transaction(async (trx) => {
trx.runAfterTransactionMethods.push(async () => {
return new Promise((resolve) =>
setTimeout(() => {
runAfterTransactionMethodsExecuted = true;
resolve();
}, 2000),
);
});
await Model1.query(trx).insert({ model1Prop1: 'test 1' });
await Model1.query(trx).insert({ model1Prop1: 'test 2' });
return Model2.query(trx).insert({ model2Prop1: 'test 3' });
});

expect(runAfterTransactionMethodsExecuted).to.equal(true);
expect(result.model2Prop1).to.equal('test 3');
let rows = await session.knex('Model1');

expect(rows).to.have.length(2);
expect(_.map(rows, 'model1Prop1').sort()).to.eql(['test 1', 'test 2']);
rows = await session.knex('model2');

expect(rows).to.have.length(1);
expect(rows[0].model2_prop1).to.equal('test 3');
});

it('should commit transaction if no errors occur (Model.transaction with two args)', async () => {
const result = await Model1.transaction(Model1.knex(), async (trx) => {
await Model1.query(trx).insert({ model1Prop1: 'test 1' });
Expand Down Expand Up @@ -244,6 +272,39 @@ module.exports = (session) => {
.catch(done);
});

it('should not resolve runAfterTransaction methods when rollback happens', async () => {
let runAfterTransactionMethodsExecuted = false;
try {
await Model1.transaction(async (trx) => {
trx.runAfterTransactionMethods.push(async () => {
return new Promise((resolve) =>
setTimeout(() => {
runAfterTransactionMethodsExecuted = true;
resolve();
}, 2000),
);
});
await Model1.query(trx).insert({ model1Prop1: 'test 1' });
await Model1.query(trx).insert({ model1Prop1: 'test 2' });
await Model2.query(trx).insert({ model2Prop1: 'test 3' });

throw new Error('whoops');
});

throw new Error('should not get here');
} catch (err) {
expect(runAfterTransactionMethodsExecuted).to.equal(false);

expect(err.message).to.equal('whoops');

let rows = await session.knex('Model1');
expect(rows).to.have.length(0);

rows = await session.knex('model2');
expect(rows).to.have.length(0);
}
});

it('should rollback if an error occurs (Model.transaction)', async () => {
try {
await Model1.transaction(async (trx) => {
Expand Down
11 changes: 7 additions & 4 deletions typings/objection/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1426,15 +1426,18 @@ declare namespace Objection {
interface PrototypeType<T> extends Function {
prototype: T;
}

interface ConstructorFunctionType<T = any> extends PrototypeType<T> {
new (...args: any[]): T;
}

// for internal use on generic static this deduction, copied from https://github.com/microsoft/TypeScript/issues/5863#issuecomment-1483978415
type ConstructorType<T = unknown, Static extends Record<string, any> = PrototypeType<T>> = (ConstructorFunctionType<T> | PrototypeType<T>) & {
type ConstructorType<T = unknown, Static extends Record<string, any> = PrototypeType<T>> = (
| ConstructorFunctionType<T>
| PrototypeType<T>
) & {
[Key in keyof Static]: Static[Key];
};
};

export interface ModelConstructor<M extends Model> extends Constructor<M> {}

Expand Down