diff --git a/modules/globallock/lock.go b/modules/globallock/lock.go index 52a097e544..a106f3c8d8 100644 --- a/modules/globallock/lock.go +++ b/modules/globallock/lock.go @@ -16,13 +16,14 @@ import ( ) type Locker interface { - Lock() error - TryLock() (bool, error) - Unlock() (bool, error) + Lock() error // lock the resource and block until it is unlocked by the holder + TryLock() (bool, error) // try to lock the resource and return immediately, first return value indicates if the lock was successful + Unlock() (bool, error) // only lock with no error and TryLock returned true with no error can be unlocked } type LockService interface { - GetLock(name string) Locker + GetLocker(name string) Locker // create or get a locker by name, RemoveLocker should be called after the locker is no longer needed + RemoveLocker(name string) // remove a locker by name from the pool. This should be invoked affect locker is no longer needed, i.e. a pull request merged or closed } type memoryLock struct { @@ -57,11 +58,15 @@ func newMemoryLockService() *memoryLockService { } } -func (l *memoryLockService) GetLock(name string) Locker { +func (l *memoryLockService) GetLocker(name string) Locker { v, _ := l.syncMap.LoadOrStore(name, &memoryLock{}) return v.(*memoryLock) } +func (l *memoryLockService) RemoveLocker(name string) { + l.syncMap.Delete(name) +} + type redisLockService struct { rs *redsync.Redsync } @@ -86,10 +91,14 @@ type redisLock struct { mutex *redsync.Mutex } -func (r *redisLockService) GetLock(name string) Locker { +func (r *redisLockService) GetLocker(name string) Locker { return &redisLock{mutex: r.rs.NewMutex(name)} } +func (r *redisLockService) RemoveLocker(name string) { + // Do nothing +} + func (r *redisLock) Lock() error { return r.mutex.Lock() } @@ -123,6 +132,6 @@ func getLockService() LockService { return lockService } -func GetLock(name string) Locker { - return getLockService().GetLock(name) +func GetLocker(name string) Locker { + return getLockService().GetLocker(name) } diff --git a/modules/globallock/lock_test.go b/modules/globallock/lock_test.go index 0052057e46..99b26d08aa 100644 --- a/modules/globallock/lock_test.go +++ b/modules/globallock/lock_test.go @@ -10,17 +10,56 @@ import ( ) func Test_Lock(t *testing.T) { - locker := GetLock("test") - assert.NoError(t, locker.Lock()) - locker.Unlock() + locker1 := GetLocker("test2") + assert.NoError(t, locker1.Lock()) + unlocked, err := locker1.Unlock() + assert.NoError(t, err) + assert.True(t, unlocked) - locked1, err1 := locker.TryLock() + locker2 := GetLocker("test2") + assert.NoError(t, locker2.Lock()) + + locked1, err1 := locker2.TryLock() assert.NoError(t, err1) - assert.True(t, locked1) + assert.False(t, locked1) - locked2, err2 := locker.TryLock() + locker2.Unlock() + + locked2, err2 := locker2.TryLock() assert.NoError(t, err2) - assert.False(t, locked2) + assert.True(t, locked2) - locker.Unlock() + locker2.Unlock() +} + +func Test_Lock_Redis(t *testing.T) { + if os.Getenv("CI") == "" { + t.Skip("Skip test for local development") + } + + lockService = newRedisLockService("redis://redis") + + redisPool := + locker1 := GetLocker("test1") + assert.NoError(t, locker1.Lock()) + unlocked, err := locker1.Unlock() + assert.NoError(t, err) + assert.True(t, unlocked) + + locker2 := GetLocker("test1") + assert.NoError(t, locker2.Lock()) + + locked1, err1 := locker2.TryLock() + assert.NoError(t, err1) + assert.False(t, locked1) + + locker2.Unlock() + + locked2, err2 := locker2.TryLock() + assert.NoError(t, err2) + assert.True(t, locked2) + + locker2.Unlock() + + redisPool.Close() } diff --git a/services/pull/check.go b/services/pull/check.go index eef5137c4c..efc504ec07 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -335,13 +335,13 @@ func handler(items ...string) []string { } func testPR(id int64) { - lock := globallock.GetLock(getPullWorkingLockKey(id)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getPullWorkingLockKey(id)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() diff --git a/services/pull/merge.go b/services/pull/merge.go index 91743cb4bb..629f94f025 100644 --- a/services/pull/merge.go +++ b/services/pull/merge.go @@ -170,13 +170,13 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U return fmt.Errorf("unable to load head repo: %w", err) } - lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return fmt.Errorf("lock.Lock: %w", err) } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() @@ -492,13 +492,13 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques // MergedManually mark pr as merged manually func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error { - lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return fmt.Errorf("lock.Lock: %w", err) } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() diff --git a/services/pull/pull.go b/services/pull/pull.go index cec78bc8f4..9df8285af2 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -203,13 +203,13 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss // ChangeTargetBranch changes the target branch of this pull request, as the given user. func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) { - lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return fmt.Errorf("lock.Lock: %w", err) } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock(): %v", err) } }() diff --git a/services/pull/update.go b/services/pull/update.go index 208c309d2a..d1c6bd65d7 100644 --- a/services/pull/update.go +++ b/services/pull/update.go @@ -26,13 +26,13 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model. return fmt.Errorf("update of agit flow pull request's head branch is unsupported") } - lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return fmt.Errorf("lock.Lock: %w", err) } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() diff --git a/services/repository/transfer.go b/services/repository/transfer.go index e331304694..c1b94fb545 100644 --- a/services/repository/transfer.go +++ b/services/repository/transfer.go @@ -42,13 +42,13 @@ func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, rep oldOwner := repo.Owner - lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getRepoWorkingLockKey(repo.ID)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return fmt.Errorf("lock.Lock: %w", err) } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() @@ -371,14 +371,14 @@ func ChangeRepositoryName(ctx context.Context, doer *user_model.User, repo *repo // repo so that we can atomically rename the repo path and updates the // local copy's origin accordingly. - lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getRepoWorkingLockKey(repo.ID)) + if err := locker.Lock(); err != nil { log.Error("lock.Lock(): %v", err) return fmt.Errorf("lock.Lock: %w", err) } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() diff --git a/services/wiki/wiki.go b/services/wiki/wiki.go index da0cb2e066..59f896658a 100644 --- a/services/wiki/wiki.go +++ b/services/wiki/wiki.go @@ -90,12 +90,12 @@ func updateWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model if err = validateWebPath(newWikiName); err != nil { return err } - lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getWikiWorkingLockKey(repo.ID)) + if err := locker.Lock(); err != nil { return err } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }() @@ -258,12 +258,12 @@ func DeleteWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model return err } - lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID)) - if err := lock.Lock(); err != nil { + locker := globallock.GetLocker(getWikiWorkingLockKey(repo.ID)) + if err := locker.Lock(); err != nil { return err } defer func() { - if _, err := lock.Unlock(); err != nil { + if _, err := locker.Unlock(); err != nil { log.Error("lock.Unlock: %v", err) } }()