Skip to content

Commit b418211

Browse files
committed
perf: improve concurrency in commit fetching logic
1 parent 8e5f7f2 commit b418211

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

pkg/container/container.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"os"
88
"strings"
9+
"sync"
910

1011
"github.com/thanhhaudev/github-stats/pkg/clock"
1112
"github.com/thanhhaudev/github-stats/pkg/github"
@@ -166,6 +167,8 @@ func (d *DataContainer) InitCommits(ctx context.Context) error {
166167
errChan := make(chan error, repoCount)
167168
commitChan := make(chan []github.Commit, repoCount)
168169
seenOIDs := make(map[string]bool)
170+
var mu sync.Mutex
171+
169172
mask := func(input string) string {
170173
length := len(input)
171174
if length <= 2 {
@@ -184,13 +187,17 @@ func (d *DataContainer) InitCommits(ctx context.Context) error {
184187
if repoCount == 1 {
185188
return "repository"
186189
}
187-
188190
return "repositories"
189191
}())
190192
}
191193

194+
var wg sync.WaitGroup
195+
semaphore := make(chan struct{}, 5) // Limit to 5 concurrent goroutines
196+
192197
for _, repo := range d.Data.Repositories {
198+
wg.Add(1)
193199
go func(repo github.Repository) {
200+
defer wg.Done()
194201
if fetchAllBranches {
195202
if !hiddenRepoInfo {
196203
d.Logger.Println("Fetching commits from all branches of repository:", mask(repo.Name))
@@ -202,17 +209,30 @@ func (d *DataContainer) InitCommits(ctx context.Context) error {
202209
return
203210
}
204211

212+
var branchWg sync.WaitGroup
205213
var allCommits []github.Commit
206214
for _, branch := range branches {
207-
commits, err := d.ClientManager.GetCommits(ctx, repo.Owner.Login, repo.Name, d.Data.Viewer.ID, fmt.Sprintf("refs/heads/%s", branch.Name), commitPerQuery)
208-
if err != nil {
209-
errChan <- err
210-
return
211-
}
212-
213-
allCommits = append(allCommits, commits...)
215+
branchWg.Add(1)
216+
semaphore <- struct{}{} // Acquire a slot
217+
go func(branch github.Branch) {
218+
defer branchWg.Done()
219+
defer func() { <-semaphore }() // Release the slot
220+
commits, err := d.ClientManager.GetCommits(ctx, repo.Owner.Login, repo.Name, d.Data.Viewer.ID, fmt.Sprintf("refs/heads/%s", branch.Name), commitPerQuery)
221+
if err != nil {
222+
errChan <- err
223+
return
224+
}
225+
226+
mu.Lock()
227+
allCommits = append(allCommits, commits...)
228+
if !hiddenRepoInfo {
229+
log.Printf("Fetched %d commits from branch %s of repository %s", len(commits), mask(branch.Name), mask(repo.Name))
230+
}
231+
mu.Unlock()
232+
}(branch)
214233
}
215234

235+
branchWg.Wait()
216236
commitChan <- allCommits
217237
} else {
218238
if !hiddenRepoInfo {
@@ -238,14 +258,18 @@ func (d *DataContainer) InitCommits(ctx context.Context) error {
238258
}(repo)
239259
}
240260

241-
for i := 0; i < len(d.Data.Repositories); i++ {
261+
go func() {
262+
wg.Wait()
263+
close(commitChan)
264+
close(errChan)
265+
}()
266+
267+
for i := 0; i < repoCount; i++ {
242268
if err := <-errChan; err != nil {
243269
return err
244270
}
245271
}
246272

247-
close(commitChan) // Close the channel to signal that all commits have been fetched
248-
249273
// Deduplicate commits
250274
for commits := range commitChan {
251275
for _, commit := range commits {
@@ -258,7 +282,6 @@ func (d *DataContainer) InitCommits(ctx context.Context) error {
258282
}
259283

260284
d.Logger.Println("Fetched commits successfully")
261-
262285
return nil
263286
}
264287

0 commit comments

Comments
 (0)