refactor the names and add remove locker

This commit is contained in:
Lunny Xiao 2024-08-19 11:17:07 -07:00
parent 1748ba82ca
commit 0c3c1a324c
No known key found for this signature in database
GPG Key ID: C3B7C91B632F738A
8 changed files with 91 additions and 43 deletions

View File

@ -16,13 +16,14 @@ import (
) )
type Locker interface { type Locker interface {
Lock() error Lock() error // lock the resource and block until it is unlocked by the holder
TryLock() (bool, error) TryLock() (bool, error) // try to lock the resource and return immediately, first return value indicates if the lock was successful
Unlock() (bool, error) Unlock() (bool, error) // only lock with no error and TryLock returned true with no error can be unlocked
} }
type LockService interface { type LockService interface {
GetLock(name string) Locker GetLocker(name string) Locker // create or get a locker by name, RemoveLocker should be called after the locker is no longer needed
RemoveLocker(name string) // remove a locker by name from the pool. This should be invoked affect locker is no longer needed, i.e. a pull request merged or closed
} }
type memoryLock struct { type memoryLock struct {
@ -57,11 +58,15 @@ func newMemoryLockService() *memoryLockService {
} }
} }
func (l *memoryLockService) GetLock(name string) Locker { func (l *memoryLockService) GetLocker(name string) Locker {
v, _ := l.syncMap.LoadOrStore(name, &memoryLock{}) v, _ := l.syncMap.LoadOrStore(name, &memoryLock{})
return v.(*memoryLock) return v.(*memoryLock)
} }
func (l *memoryLockService) RemoveLocker(name string) {
l.syncMap.Delete(name)
}
type redisLockService struct { type redisLockService struct {
rs *redsync.Redsync rs *redsync.Redsync
} }
@ -86,10 +91,14 @@ type redisLock struct {
mutex *redsync.Mutex mutex *redsync.Mutex
} }
func (r *redisLockService) GetLock(name string) Locker { func (r *redisLockService) GetLocker(name string) Locker {
return &redisLock{mutex: r.rs.NewMutex(name)} return &redisLock{mutex: r.rs.NewMutex(name)}
} }
func (r *redisLockService) RemoveLocker(name string) {
// Do nothing
}
func (r *redisLock) Lock() error { func (r *redisLock) Lock() error {
return r.mutex.Lock() return r.mutex.Lock()
} }
@ -123,6 +132,6 @@ func getLockService() LockService {
return lockService return lockService
} }
func GetLock(name string) Locker { func GetLocker(name string) Locker {
return getLockService().GetLock(name) return getLockService().GetLocker(name)
} }

View File

@ -10,17 +10,56 @@ import (
) )
func Test_Lock(t *testing.T) { func Test_Lock(t *testing.T) {
locker := GetLock("test") locker1 := GetLocker("test2")
assert.NoError(t, locker.Lock()) assert.NoError(t, locker1.Lock())
locker.Unlock() unlocked, err := locker1.Unlock()
assert.NoError(t, err)
assert.True(t, unlocked)
locked1, err1 := locker.TryLock() locker2 := GetLocker("test2")
assert.NoError(t, locker2.Lock())
locked1, err1 := locker2.TryLock()
assert.NoError(t, err1) assert.NoError(t, err1)
assert.True(t, locked1) assert.False(t, locked1)
locked2, err2 := locker.TryLock() locker2.Unlock()
locked2, err2 := locker2.TryLock()
assert.NoError(t, err2) assert.NoError(t, err2)
assert.False(t, locked2) assert.True(t, locked2)
locker.Unlock() locker2.Unlock()
}
func Test_Lock_Redis(t *testing.T) {
if os.Getenv("CI") == "" {
t.Skip("Skip test for local development")
}
lockService = newRedisLockService("redis://redis")
redisPool :=
locker1 := GetLocker("test1")
assert.NoError(t, locker1.Lock())
unlocked, err := locker1.Unlock()
assert.NoError(t, err)
assert.True(t, unlocked)
locker2 := GetLocker("test1")
assert.NoError(t, locker2.Lock())
locked1, err1 := locker2.TryLock()
assert.NoError(t, err1)
assert.False(t, locked1)
locker2.Unlock()
locked2, err2 := locker2.TryLock()
assert.NoError(t, err2)
assert.True(t, locked2)
locker2.Unlock()
redisPool.Close()
} }

View File

@ -335,13 +335,13 @@ func handler(items ...string) []string {
} }
func testPR(id int64) { func testPR(id int64) {
lock := globallock.GetLock(getPullWorkingLockKey(id)) locker := globallock.GetLocker(getPullWorkingLockKey(id))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return return
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()

View File

@ -170,13 +170,13 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return fmt.Errorf("unable to load head repo: %w", err) return fmt.Errorf("unable to load head repo: %w", err)
} }
lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err) return fmt.Errorf("lock.Lock: %w", err)
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()
@ -492,13 +492,13 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques
// MergedManually mark pr as merged manually // MergedManually mark pr as merged manually
func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error { func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err) return fmt.Errorf("lock.Lock: %w", err)
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()

View File

@ -203,13 +203,13 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss
// ChangeTargetBranch changes the target branch of this pull request, as the given user. // ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) { func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) {
lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err) return fmt.Errorf("lock.Lock: %w", err)
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock(): %v", err) log.Error("lock.Unlock(): %v", err)
} }
}() }()

View File

@ -26,13 +26,13 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
return fmt.Errorf("update of agit flow pull request's head branch is unsupported") return fmt.Errorf("update of agit flow pull request's head branch is unsupported")
} }
lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err) return fmt.Errorf("lock.Lock: %w", err)
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()

View File

@ -42,13 +42,13 @@ func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, rep
oldOwner := repo.Owner oldOwner := repo.Owner
lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID)) locker := globallock.GetLocker(getRepoWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err) return fmt.Errorf("lock.Lock: %w", err)
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()
@ -371,14 +371,14 @@ func ChangeRepositoryName(ctx context.Context, doer *user_model.User, repo *repo
// repo so that we can atomically rename the repo path and updates the // repo so that we can atomically rename the repo path and updates the
// local copy's origin accordingly. // local copy's origin accordingly.
lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID)) locker := globallock.GetLocker(getRepoWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err) log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err) return fmt.Errorf("lock.Lock: %w", err)
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()

View File

@ -90,12 +90,12 @@ func updateWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
if err = validateWebPath(newWikiName); err != nil { if err = validateWebPath(newWikiName); err != nil {
return err return err
} }
lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID)) locker := globallock.GetLocker(getWikiWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
return err return err
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()
@ -258,12 +258,12 @@ func DeleteWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
return err return err
} }
lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID)) locker := globallock.GetLocker(getWikiWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil { if err := locker.Lock(); err != nil {
return err return err
} }
defer func() { defer func() {
if _, err := lock.Unlock(); err != nil { if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err) log.Error("lock.Unlock: %v", err)
} }
}() }()