|
| 1 | +import LLMFactory from '@/app/add-repositories/llm-factory' |
| 2 | +import { Repository, RepositoryData } from '@/db/models/repository' |
| 3 | +import LLMConfig from '@/llm/llm-config' |
| 4 | +import { InsertRepoService } from '@/service/insert-db' |
| 5 | +import { checkRateLimit } from '@/github/ratelimit' |
| 6 | +import axios from 'axios' |
| 7 | + |
| 8 | +interface InsertItem { |
| 9 | + owner: string |
| 10 | + repo: string |
| 11 | +} |
| 12 | + |
| 13 | +export default class InsertQueue { |
| 14 | + private _queue: InsertItem[] = [] |
| 15 | + private _isProcessing: boolean = false |
| 16 | + private _repoService: InsertRepoService |
| 17 | + private _processed: InsertItem[] = [] |
| 18 | + private static _instance?: InsertQueue |
| 19 | + private _rateLimitBeforeStop: number = 500 |
| 20 | + private _maxQueueSize: number = 10 |
| 21 | + private _repository: Repository |
| 22 | + |
| 23 | + private constructor() { |
| 24 | + this._queue = [] |
| 25 | + this._isProcessing = false |
| 26 | + this._processed = [] |
| 27 | + const llmConfig = new LLMConfig(1, 0.95, 40, 8192) |
| 28 | + this._repoService = new InsertRepoService( |
| 29 | + LLMFactory.createProvider(llmConfig) |
| 30 | + ) |
| 31 | + this._repository = new Repository() |
| 32 | + } |
| 33 | + |
| 34 | + public static getInstance() { |
| 35 | + if (!this._instance) { |
| 36 | + this._instance = new InsertQueue() |
| 37 | + } |
| 38 | + return this._instance |
| 39 | + } |
| 40 | + |
| 41 | + /** |
| 42 | + * Adds a new item to the queue |
| 43 | + */ |
| 44 | + public add(item: InsertItem): boolean { |
| 45 | + // Check if item is already in the queue |
| 46 | + if (this._queue.find((i) => i.owner === item.owner && i.repo === item.repo)) { |
| 47 | + return false |
| 48 | + } |
| 49 | + |
| 50 | + // Check if item is in the database |
| 51 | + const isIncludedInRepo = this._repository.select(item.owner, item.repo) |
| 52 | + if(isIncludedInRepo == null) { |
| 53 | + return false |
| 54 | + } |
| 55 | + |
| 56 | + // Check if the queue is full |
| 57 | + if (this._queue.length >= this._maxQueueSize) { |
| 58 | + return false |
| 59 | + } |
| 60 | + |
| 61 | + // Check if the repository exists |
| 62 | + axios.get(`https://api.github.com/repos/${item.owner}/${item.repo}`) |
| 63 | + .then((res) => { |
| 64 | + if (res.status === 404) { |
| 65 | + return false |
| 66 | + } |
| 67 | + }) |
| 68 | + |
| 69 | + this._queue.push(item) |
| 70 | + |
| 71 | + if (!this._isProcessing) { |
| 72 | + this._processQueue() |
| 73 | + } |
| 74 | + return true |
| 75 | + } |
| 76 | + |
| 77 | + /** |
| 78 | + * Returns the finished queue |
| 79 | + */ |
| 80 | + get processed(): InsertItem[] { |
| 81 | + return this._processed |
| 82 | + } |
| 83 | + |
| 84 | + /** |
| 85 | + * Clears the processed queue |
| 86 | + */ |
| 87 | + clearProcessed(): void { |
| 88 | + this._processed = [] |
| 89 | + } |
| 90 | + |
| 91 | + /** |
| 92 | + * Processes the queue, if rate limit is not reached |
| 93 | + * if rate limit is reached, it will wait for the reset time |
| 94 | + */ |
| 95 | + private async _processQueue(): Promise<void> { |
| 96 | + this._isProcessing = true |
| 97 | + while (this._queue.length > 0) { |
| 98 | + const rateLimit = await checkRateLimit() |
| 99 | + if (rateLimit?.remaining! < this._rateLimitBeforeStop) { |
| 100 | + console.log('Rate limit reached. Stopping queue processing.') |
| 101 | + const waitTime = rateLimit!.reset - Date.now() |
| 102 | + console.log(`Waiting for ${waitTime}ms`) |
| 103 | + await new Promise((resolve) => setTimeout(resolve, waitTime + 1000)) |
| 104 | + continue |
| 105 | + } |
| 106 | + const item = this._queue.shift() |
| 107 | + if (item) { |
| 108 | + await this._processItem(item) |
| 109 | + } |
| 110 | + } |
| 111 | + this._isProcessing = false |
| 112 | + } |
| 113 | + |
| 114 | + /** |
| 115 | + * Processes an item from the queue |
| 116 | + */ |
| 117 | + private async _processItem( |
| 118 | + item: InsertItem |
| 119 | + ): Promise<RepositoryData | null> { |
| 120 | + console.log(`Processing item: ${item}`) |
| 121 | + let result: RepositoryData | null = null |
| 122 | + try { |
| 123 | + result = await this._repoService.insertRepository( |
| 124 | + item.owner, |
| 125 | + item.repo |
| 126 | + ) |
| 127 | + } catch (error) { |
| 128 | + console.error(`Failed to process item: ${item}`, error) |
| 129 | + } |
| 130 | + |
| 131 | + return result |
| 132 | + } |
| 133 | + |
| 134 | + /** |
| 135 | + * Returns the current queue |
| 136 | + */ |
| 137 | + get queue(): InsertItem[] { |
| 138 | + return this._queue |
| 139 | + } |
| 140 | +} |
0 commit comments