diff --git a/projects/app/src/pages/api/core/dataset/training/rebuildEmbedding.ts b/projects/app/src/pages/api/core/dataset/training/rebuildEmbedding.ts index f33cea69c92..3e59e087b0a 100644 --- a/projects/app/src/pages/api/core/dataset/training/rebuildEmbedding.ts +++ b/projects/app/src/pages/api/core/dataset/training/rebuildEmbedding.ts @@ -81,50 +81,61 @@ async function handler( }); // get 10 init dataset.data - const arr = new Array(10).fill(0); + const max = global.systemEnv?.vectorMaxProcess || 10; + const arr = new Array(max * 2).fill(0); + for await (const _ of arr) { - await mongoSessionRun(async (session) => { - const data = await MongoDatasetData.findOneAndUpdate( - { - teamId, - datasetId, - rebuilding: true - }, - { - $unset: { - rebuilding: null + try { + const hasNext = await mongoSessionRun(async (session) => { + // get next dataset.data + const data = await MongoDatasetData.findOneAndUpdate( + { + rebuilding: true, + teamId, + datasetId + }, + { + $unset: { + rebuilding: null + }, + updateTime: new Date() }, - updateTime: new Date() - }, - { - session - } - ).select({ - _id: 1, - collectionId: 1 - }); - - if (data) { - await MongoDatasetTraining.create( - [ - { - teamId, - tmbId, - datasetId, - collectionId: data.collectionId, - billId, - mode: TrainingModeEnum.chunk, - model: vectorModel, - q: '1', - dataId: data._id - } - ], { session } - ); + ).select({ + _id: 1, + collectionId: 1 + }); + + if (data) { + await MongoDatasetTraining.create( + [ + { + teamId, + tmbId, + datasetId, + collectionId: data.collectionId, + billId, + mode: TrainingModeEnum.chunk, + model: vectorModel, + q: '1', + dataId: data._id + } + ], + { + session + } + ); + } + + return !!data; + }); + + if (!hasNext) { + break; } - }); + } catch (error) {} } return {}; diff --git a/projects/app/src/service/events/generateVector.ts b/projects/app/src/service/events/generateVector.ts index 87ba7994ca5..e72c51e872c 100644 --- a/projects/app/src/service/events/generateVector.ts +++ b/projects/app/src/service/events/generateVector.ts @@ -158,27 +158,69 @@ const rebuildData = async ({ const deleteVectorIdList = mongoData.indexes.map((index) => index.dataId); - const { tokens } = await mongoSessionRun(async (session) => { - // update vector, update dataset.data rebuilding status, delete data from training - const updateResult = await Promise.all( - mongoData.indexes.map(async (index, i) => { - const result = await insertDatasetDataVector({ - query: index.text, - model: getVectorModel(trainingData.model), - teamId: mongoData.teamId, - datasetId: mongoData.datasetId, - collectionId: mongoData.collectionId - }); - mongoData.indexes[i].dataId = result.insertId; - return result; - }) - ); + // Find next rebuilding data to insert training queue + await mongoSessionRun(async (session) => { + // get new mongoData insert to training + const newRebuildingData = await MongoDatasetData.findOneAndUpdate( + { + teamId: mongoData.teamId, + datasetId: mongoData.datasetId, + rebuilding: true + }, + { + $unset: { + rebuilding: null + }, + updateTime: new Date() + }, + { session } + ).select({ + _id: 1, + collectionId: 1 + }); - // Ensure that the training data is deleted after the Mongo update is successful + if (newRebuildingData) { + await MongoDatasetTraining.create( + [ + { + teamId: mongoData.teamId, + tmbId: trainingData.tmbId, + datasetId: mongoData.datasetId, + collectionId: newRebuildingData.collectionId, + billId: trainingData.billId, + mode: TrainingModeEnum.chunk, + model: trainingData.model, + q: '1', + dataId: newRebuildingData._id + } + ], + { session } + ); + } + }); + + // update vector, update dataset_data rebuilding status, delete data from training + // 1. Insert new vector to dataset_data + const updateResult = await Promise.all( + mongoData.indexes.map(async (index, i) => { + const result = await insertDatasetDataVector({ + query: index.text, + model: getVectorModel(trainingData.model), + teamId: mongoData.teamId, + datasetId: mongoData.datasetId, + collectionId: mongoData.collectionId + }); + mongoData.indexes[i].dataId = result.insertId; + return result; + }) + ); + const { tokens } = await mongoSessionRun(async (session) => { + // 2. Ensure that the training data is deleted after the Mongo update is successful await mongoData.save({ session }); + // 3. Delete the training data await trainingData.deleteOne({ session }); - // delete old vector + // 4. Delete old vector await deleteDatasetDataVector({ teamId: mongoData.teamId, idList: deleteVectorIdList @@ -189,59 +231,6 @@ const rebuildData = async ({ }; }); - // find next data insert to training queue - const arr = new Array(5).fill(0); - - for await (const _ of arr) { - try { - const hasNextData = await mongoSessionRun(async (session) => { - // get new mongoData insert to training - const newRebuildingData = await MongoDatasetData.findOneAndUpdate( - { - teamId: mongoData.teamId, - datasetId: mongoData.datasetId, - rebuilding: true - }, - { - $unset: { - rebuilding: null - }, - updateTime: new Date() - }, - { session } - ).select({ - _id: 1, - collectionId: 1 - }); - - if (newRebuildingData) { - await MongoDatasetTraining.create( - [ - { - teamId: mongoData.teamId, - tmbId: trainingData.tmbId, - datasetId: mongoData.datasetId, - collectionId: newRebuildingData.collectionId, - billId: trainingData.billId, - mode: TrainingModeEnum.chunk, - model: trainingData.model, - q: '1', - dataId: newRebuildingData._id - } - ], - { session } - ); - } - - return !!newRebuildingData; - }); - - if (!hasNextData) { - break; - } - } catch (error) {} - } - return { tokens }; };