From 1c483f82570239c49830d54ba8345ffdb2f59b71 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Fri, 9 Aug 2024 15:20:31 -0700 Subject: [PATCH] Use an abstract lock layer to allow distributed lock between multiple Gitea instances --- custom/conf/app.example.ini | 6 ++ go.mod | 3 + go.sum | 7 ++ modules/globallock/lock.go | 128 ++++++++++++++++++++++++++++ modules/globallock/lock_test.go | 26 ++++++ modules/setting/gloabl_lock.go | 37 ++++++++ modules/setting/global_lock_test.go | 35 ++++++++ modules/setting/setting.go | 1 + modules/sync/exclusive_pool.go | 69 --------------- modules/sync/status_pool.go | 57 ------------- modules/sync/status_pool_test.go | 31 ------- services/cron/cron.go | 4 - services/cron/tasks.go | 12 ++- services/pull/check.go | 14 ++- services/pull/merge.go | 25 +++++- services/pull/pull.go | 19 +++-- services/pull/update.go | 13 ++- services/repository/transfer.go | 37 +++++--- services/wiki/wiki.go | 31 +++++-- 19 files changed, 361 insertions(+), 194 deletions(-) create mode 100644 modules/globallock/lock.go create mode 100644 modules/globallock/lock_test.go create mode 100644 modules/setting/gloabl_lock.go create mode 100644 modules/setting/global_lock_test.go delete mode 100644 modules/sync/exclusive_pool.go delete mode 100644 modules/sync/status_pool.go delete mode 100644 modules/sync/status_pool_test.go diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index adec5aff36..0f70a1a3d0 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -2713,3 +2713,9 @@ LEVEL = Info ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; storage type ;STORAGE_TYPE = local + +;[global_lock] +;; Lock service type, could be memory or redis +;SERVICE_TYPE = memory +;; Ignored for the "memory" type. For "redis" use something like `redis://127.0.0.1:6379/0` +;SERVICE_CONN_STR = diff --git a/go.mod b/go.mod index f5c189893f..1d05a1cec6 100644 --- a/go.mod +++ b/go.mod @@ -201,6 +201,7 @@ require ( github.com/go-openapi/strfmt v0.23.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-openapi/validate v0.24.0 // indirect + github.com/go-redsync/redsync/v4 v4.13.0 // indirect github.com/go-webauthn/x v0.1.9 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect @@ -218,7 +219,9 @@ require ( github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.16 // indirect diff --git a/go.sum b/go.sum index f1780fada7..0882d6be22 100644 --- a/go.sum +++ b/go.sum @@ -342,6 +342,8 @@ github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58= github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA= +github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= @@ -449,10 +451,15 @@ github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE= github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= diff --git a/modules/globallock/lock.go b/modules/globallock/lock.go new file mode 100644 index 0000000000..52a097e544 --- /dev/null +++ b/modules/globallock/lock.go @@ -0,0 +1,128 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "sync" + "time" + + "code.gitea.io/gitea/modules/nosql" + "code.gitea.io/gitea/modules/setting" + + redsync "github.com/go-redsync/redsync/v4" + goredis "github.com/go-redsync/redsync/v4/redis/goredis/v9" +) + +type Locker interface { + Lock() error + TryLock() (bool, error) + Unlock() (bool, error) +} + +type LockService interface { + GetLock(name string) Locker +} + +type memoryLock struct { + mutex sync.Mutex +} + +func (r *memoryLock) Lock() error { + r.mutex.Lock() + return nil +} + +func (r *memoryLock) TryLock() (bool, error) { + return r.mutex.TryLock(), nil +} + +func (r *memoryLock) Unlock() (bool, error) { + r.mutex.Unlock() + return true, nil +} + +var _ Locker = &memoryLock{} + +type memoryLockService struct { + syncMap sync.Map +} + +var _ LockService = &memoryLockService{} + +func newMemoryLockService() *memoryLockService { + return &memoryLockService{ + syncMap: sync.Map{}, + } +} + +func (l *memoryLockService) GetLock(name string) Locker { + v, _ := l.syncMap.LoadOrStore(name, &memoryLock{}) + return v.(*memoryLock) +} + +type redisLockService struct { + rs *redsync.Redsync +} + +var _ LockService = &redisLockService{} + +func newRedisLockService(connection string) *redisLockService { + client := nosql.GetManager().GetRedisClient(connection) + + pool := goredis.NewPool(client) + + // Create an instance of redisync to be used to obtain a mutual exclusion + // lock. + rs := redsync.New(pool) + + return &redisLockService{ + rs: rs, + } +} + +type redisLock struct { + mutex *redsync.Mutex +} + +func (r *redisLockService) GetLock(name string) Locker { + return &redisLock{mutex: r.rs.NewMutex(name)} +} + +func (r *redisLock) Lock() error { + return r.mutex.Lock() +} + +func (r *redisLock) TryLock() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + if err := r.mutex.LockContext(ctx); err != nil { + return false, err + } + return true, nil +} + +func (r *redisLock) Unlock() (bool, error) { + return r.mutex.Unlock() +} + +var ( + syncOnce sync.Once + lockService LockService +) + +func getLockService() LockService { + syncOnce.Do(func() { + if setting.GlobalLock.ServiceType == "redis" { + lockService = newRedisLockService(setting.GlobalLock.ServiceConnStr) + } else { + lockService = newMemoryLockService() + } + }) + return lockService +} + +func GetLock(name string) Locker { + return getLockService().GetLock(name) +} diff --git a/modules/globallock/lock_test.go b/modules/globallock/lock_test.go new file mode 100644 index 0000000000..0052057e46 --- /dev/null +++ b/modules/globallock/lock_test.go @@ -0,0 +1,26 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Lock(t *testing.T) { + locker := GetLock("test") + assert.NoError(t, locker.Lock()) + locker.Unlock() + + locked1, err1 := locker.TryLock() + assert.NoError(t, err1) + assert.True(t, locked1) + + locked2, err2 := locker.TryLock() + assert.NoError(t, err2) + assert.False(t, locked2) + + locker.Unlock() +} diff --git a/modules/setting/gloabl_lock.go b/modules/setting/gloabl_lock.go new file mode 100644 index 0000000000..a7802a9df1 --- /dev/null +++ b/modules/setting/gloabl_lock.go @@ -0,0 +1,37 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package setting + +import ( + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/nosql" +) + +// GlobalLock represents configuration of global lock +var GlobalLock = struct { + ServiceType string + ServiceConnStr string +}{ + ServiceType: "memory", +} + +func loadGlobalLockFrom(rootCfg ConfigProvider) { + sec := rootCfg.Section("global_lock") + GlobalLock.ServiceType = sec.Key("SERVICE_TYPE").MustString("memory") + switch GlobalLock.ServiceType { + case "memory": + case "redis": + connStr := sec.Key("SERVICE_CONN_STR").String() + if connStr == "" { + log.Fatal("SERVICE_CONN_STR is empty for redis") + } + u := nosql.ToRedisURI(connStr) + if u == nil { + log.Fatal("SERVICE_CONN_STR %s is not a valid redis connection string", connStr) + } + GlobalLock.ServiceConnStr = connStr + default: + log.Fatal("Unknown sync lock service type: %s", GlobalLock.ServiceType) + } +} diff --git a/modules/setting/global_lock_test.go b/modules/setting/global_lock_test.go new file mode 100644 index 0000000000..5eeb275523 --- /dev/null +++ b/modules/setting/global_lock_test.go @@ -0,0 +1,35 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package setting + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLoadGlobalLockConfig(t *testing.T) { + t.Run("DefaultGlobalLockConfig", func(t *testing.T) { + iniStr := `` + cfg, err := NewConfigProviderFromData(iniStr) + assert.NoError(t, err) + + loadGlobalLockFrom(cfg) + assert.EqualValues(t, "memory", GlobalLock.ServiceType) + }) + + t.Run("RedisGlobalLockConfig", func(t *testing.T) { + iniStr := ` +[global_lock] +SERVICE_TYPE = redis +SERVICE_CONN_STR = addrs=127.0.0.1:6379 db=0 +` + cfg, err := NewConfigProviderFromData(iniStr) + assert.NoError(t, err) + + loadGlobalLockFrom(cfg) + assert.EqualValues(t, "redis", GlobalLock.ServiceType) + assert.EqualValues(t, "addrs=127.0.0.1:6379 db=0", GlobalLock.ServiceConnStr) + }) +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index b4f913cdae..c93d199b1b 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -147,6 +147,7 @@ func loadCommonSettingsFrom(cfg ConfigProvider) error { loadGitFrom(cfg) loadMirrorFrom(cfg) loadMarkupFrom(cfg) + loadGlobalLockFrom(cfg) loadOtherFrom(cfg) return nil } diff --git a/modules/sync/exclusive_pool.go b/modules/sync/exclusive_pool.go deleted file mode 100644 index fbfc1f2292..0000000000 --- a/modules/sync/exclusive_pool.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2016 The Gogs Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package sync - -import ( - "sync" -) - -// ExclusivePool is a pool of non-identical instances -// that only one instance with same identity is in the pool at a time. -// In other words, only instances with different identities can be in -// the pool the same time. If another instance with same identity tries -// to get into the pool, it hangs until previous instance left the pool. -// -// This pool is particularly useful for performing tasks on same resource -// on the file system in different goroutines. -type ExclusivePool struct { - lock sync.Mutex - - // pool maintains locks for each instance in the pool. - pool map[string]*sync.Mutex - - // count maintains the number of times an instance with same identity checks in - // to the pool, and should be reduced to 0 (removed from map) by checking out - // with same number of times. - // The purpose of count is to delete lock when count down to 0 and recycle memory - // from map object. - count map[string]int -} - -// NewExclusivePool initializes and returns a new ExclusivePool object. -func NewExclusivePool() *ExclusivePool { - return &ExclusivePool{ - pool: make(map[string]*sync.Mutex), - count: make(map[string]int), - } -} - -// CheckIn checks in an instance to the pool and hangs while instance -// with same identity is using the lock. -func (p *ExclusivePool) CheckIn(identity string) { - p.lock.Lock() - - lock, has := p.pool[identity] - if !has { - lock = &sync.Mutex{} - p.pool[identity] = lock - } - p.count[identity]++ - - p.lock.Unlock() - lock.Lock() -} - -// CheckOut checks out an instance from the pool and releases the lock -// to let other instances with same identity to grab the lock. -func (p *ExclusivePool) CheckOut(identity string) { - p.lock.Lock() - defer p.lock.Unlock() - - p.pool[identity].Unlock() - if p.count[identity] == 1 { - delete(p.pool, identity) - delete(p.count, identity) - } else { - p.count[identity]-- - } -} diff --git a/modules/sync/status_pool.go b/modules/sync/status_pool.go deleted file mode 100644 index 6f075d54b7..0000000000 --- a/modules/sync/status_pool.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2016 The Gogs Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package sync - -import ( - "sync" - - "code.gitea.io/gitea/modules/container" -) - -// StatusTable is a table maintains true/false values. -// -// This table is particularly useful for un/marking and checking values -// in different goroutines. -type StatusTable struct { - lock sync.RWMutex - pool container.Set[string] -} - -// NewStatusTable initializes and returns a new StatusTable object. -func NewStatusTable() *StatusTable { - return &StatusTable{ - pool: make(container.Set[string]), - } -} - -// StartIfNotRunning sets value of given name to true if not already in pool. -// Returns whether set value was set to true -func (p *StatusTable) StartIfNotRunning(name string) bool { - p.lock.Lock() - added := p.pool.Add(name) - p.lock.Unlock() - return added -} - -// Start sets value of given name to true in the pool. -func (p *StatusTable) Start(name string) { - p.lock.Lock() - p.pool.Add(name) - p.lock.Unlock() -} - -// Stop sets value of given name to false in the pool. -func (p *StatusTable) Stop(name string) { - p.lock.Lock() - p.pool.Remove(name) - p.lock.Unlock() -} - -// IsRunning checks if value of given name is set to true in the pool. -func (p *StatusTable) IsRunning(name string) bool { - p.lock.RLock() - exists := p.pool.Contains(name) - p.lock.RUnlock() - return exists -} diff --git a/modules/sync/status_pool_test.go b/modules/sync/status_pool_test.go deleted file mode 100644 index e2e48862f5..0000000000 --- a/modules/sync/status_pool_test.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2017 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package sync - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_StatusTable(t *testing.T) { - table := NewStatusTable() - - assert.False(t, table.IsRunning("xyz")) - - table.Start("xyz") - assert.True(t, table.IsRunning("xyz")) - - assert.False(t, table.StartIfNotRunning("xyz")) - assert.True(t, table.IsRunning("xyz")) - - table.Stop("xyz") - assert.False(t, table.IsRunning("xyz")) - - assert.True(t, table.StartIfNotRunning("xyz")) - assert.True(t, table.IsRunning("xyz")) - - table.Stop("xyz") - assert.False(t, table.IsRunning("xyz")) -} diff --git a/services/cron/cron.go b/services/cron/cron.go index 3c5737e371..1d803ed96d 100644 --- a/services/cron/cron.go +++ b/services/cron/cron.go @@ -11,7 +11,6 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/process" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/translation" "github.com/go-co-op/gocron" @@ -19,9 +18,6 @@ import ( var scheduler = gocron.NewScheduler(time.Local) -// Prevent duplicate running tasks. -var taskStatusTable = sync.NewStatusTable() - // NewContext begins cron tasks // Each cron task is run within the shutdown context as a running server // AtShutdown the cron server is stopped diff --git a/services/cron/tasks.go b/services/cron/tasks.go index f8a7444c49..48d1fbcd53 100644 --- a/services/cron/tasks.go +++ b/services/cron/tasks.go @@ -14,6 +14,7 @@ import ( "code.gitea.io/gitea/models/db" system_model "code.gitea.io/gitea/models/system" user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" @@ -73,7 +74,12 @@ func (t *Task) Run() { // RunWithUser will run the task incrementing the cron counter at the time with User func (t *Task) RunWithUser(doer *user_model.User, config Config) { - if !taskStatusTable.StartIfNotRunning(t.Name) { + lock := globallock.GetLock(fmt.Sprintf("cron_tasks_%s", t.Name)) + if success, err := lock.TryLock(); err != nil { + log.Error("Unable to lock cron task %s Error: %v", t.Name, err) + return + } else if !success { + // get lock failed, so that there must be another task are running. return } t.lock.Lock() @@ -83,7 +89,9 @@ func (t *Task) RunWithUser(doer *user_model.User, config Config) { t.ExecTimes++ t.lock.Unlock() defer func() { - taskStatusTable.Stop(t.Name) + if _, err := lock.Unlock(); err != nil { + log.Error("Unable to unlock cron task %s Error: %v", t.Name, err) + } }() graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) { defer func() { diff --git a/services/pull/check.go b/services/pull/check.go index 7d93ff7a8a..eef5137c4c 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -21,6 +21,7 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/gitrepo" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" @@ -334,8 +335,17 @@ func handler(items ...string) []string { } func testPR(id int64) { - pullWorkingPool.CheckIn(fmt.Sprint(id)) - defer pullWorkingPool.CheckOut(fmt.Sprint(id)) + lock := globallock.GetLock(getPullWorkingLockKey(id)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() + ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id)) defer finished() diff --git a/services/pull/merge.go b/services/pull/merge.go index e19292c31c..91743cb4bb 100644 --- a/services/pull/merge.go +++ b/services/pull/merge.go @@ -23,6 +23,7 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/httplib" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/references" @@ -169,8 +170,16 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U return fmt.Errorf("unable to load head repo: %w", err) } - pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) - defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return fmt.Errorf("lock.Lock: %w", err) + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() prUnit, err := pr.BaseRepo.GetUnit(ctx, unit.TypePullRequests) if err != nil { @@ -483,8 +492,16 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques // MergedManually mark pr as merged manually func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error { - pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) - defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return fmt.Errorf("lock.Lock: %w", err) + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() if err := db.WithTx(ctx, func(ctx context.Context) error { if err := pr.LoadBaseRepo(ctx); err != nil { diff --git a/services/pull/pull.go b/services/pull/pull.go index e69c842a2d..cec78bc8f4 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -25,20 +25,21 @@ import ( "code.gitea.io/gitea/modules/container" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/gitrepo" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" repo_module "code.gitea.io/gitea/modules/repository" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" gitea_context "code.gitea.io/gitea/services/context" issue_service "code.gitea.io/gitea/services/issue" notify_service "code.gitea.io/gitea/services/notify" ) -// TODO: use clustered lock (unique queue? or *abuse* cache) -var pullWorkingPool = sync.NewExclusivePool() +func getPullWorkingLockKey(prID int64) string { + return fmt.Sprintf("pull_working_%d", prID) +} // NewPullRequest creates new pull request with labels for repository. func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *issues_model.Issue, labelIDs []int64, uuids []string, pr *issues_model.PullRequest, assigneeIDs []int64) error { @@ -202,8 +203,16 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss // ChangeTargetBranch changes the target branch of this pull request, as the given user. func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) { - pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) - defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return fmt.Errorf("lock.Lock: %w", err) + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock(): %v", err) + } + }() // Current target branch is already the same if pr.BaseBranch == targetBranch { diff --git a/services/pull/update.go b/services/pull/update.go index a7fd81421e..208c309d2a 100644 --- a/services/pull/update.go +++ b/services/pull/update.go @@ -14,6 +14,7 @@ import ( "code.gitea.io/gitea/models/unit" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/repository" ) @@ -25,8 +26,16 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model. return fmt.Errorf("update of agit flow pull request's head branch is unsupported") } - pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) - defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + lock := globallock.GetLock(getPullWorkingLockKey(pr.ID)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return fmt.Errorf("lock.Lock: %w", err) + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() diffCount, err := GetDiverging(ctx, pr) if err != nil { diff --git a/services/repository/transfer.go b/services/repository/transfer.go index 9e0ff7ae14..000598f567 100644 --- a/services/repository/transfer.go +++ b/services/repository/transfer.go @@ -17,16 +17,16 @@ import ( access_model "code.gitea.io/gitea/models/perm/access" repo_model "code.gitea.io/gitea/models/repo" user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/log" repo_module "code.gitea.io/gitea/modules/repository" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" notify_service "code.gitea.io/gitea/services/notify" ) -// repoWorkingPool represents a working pool to order the parallel changes to the same repository -// TODO: use clustered lock (unique queue? or *abuse* cache) -var repoWorkingPool = sync.NewExclusivePool() +func getRepoWorkingLockKey(repoID int64) string { + return fmt.Sprintf("repo_working_%d", repoID) +} // TransferOwnership transfers all corresponding setting from old user to new one. func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository, teams []*organization.Team) error { @@ -41,12 +41,20 @@ func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, rep oldOwner := repo.Owner - repoWorkingPool.CheckIn(fmt.Sprint(repo.ID)) + lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return fmt.Errorf("lock.Lock: %w", err) + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() + if err := transferOwnership(ctx, doer, newOwner.Name, repo); err != nil { - repoWorkingPool.CheckOut(fmt.Sprint(repo.ID)) return err } - repoWorkingPool.CheckOut(fmt.Sprint(repo.ID)) newRepo, err := repo_model.GetRepositoryByID(ctx, repo.ID) if err != nil { @@ -346,12 +354,21 @@ func ChangeRepositoryName(ctx context.Context, doer *user_model.User, repo *repo // repo so that we can atomically rename the repo path and updates the // local copy's origin accordingly. - repoWorkingPool.CheckIn(fmt.Sprint(repo.ID)) + lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID)) + if err := lock.Lock(); err != nil { + log.Error("lock.Lock(): %v", err) + return fmt.Errorf("lock.Lock: %w", err) + } + + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() + if err := changeRepositoryName(ctx, repo, newRepoName); err != nil { - repoWorkingPool.CheckOut(fmt.Sprint(repo.ID)) return err } - repoWorkingPool.CheckOut(fmt.Sprint(repo.ID)) repo.Name = newRepoName notify_service.RenameRepository(ctx, doer, repo, oldRepoName) diff --git a/services/wiki/wiki.go b/services/wiki/wiki.go index fdcc5feefa..da0cb2e066 100644 --- a/services/wiki/wiki.go +++ b/services/wiki/wiki.go @@ -18,19 +18,20 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/gitrepo" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/log" repo_module "code.gitea.io/gitea/modules/repository" - "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" asymkey_service "code.gitea.io/gitea/services/asymkey" repo_service "code.gitea.io/gitea/services/repository" ) -// TODO: use clustered lock (unique queue? or *abuse* cache) -var wikiWorkingPool = sync.NewExclusivePool() - const DefaultRemote = "origin" +func getWikiWorkingLockKey(repoID int64) string { + return fmt.Sprintf("wiki_working_%d", repoID) +} + // InitWiki initializes a wiki for repository, // it does nothing when repository already has wiki. func InitWiki(ctx context.Context, repo *repo_model.Repository) error { @@ -89,8 +90,15 @@ func updateWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model if err = validateWebPath(newWikiName); err != nil { return err } - wikiWorkingPool.CheckIn(fmt.Sprint(repo.ID)) - defer wikiWorkingPool.CheckOut(fmt.Sprint(repo.ID)) + lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID)) + if err := lock.Lock(); err != nil { + return err + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() if err = InitWiki(ctx, repo); err != nil { return fmt.Errorf("InitWiki: %w", err) @@ -250,8 +258,15 @@ func DeleteWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model return err } - wikiWorkingPool.CheckIn(fmt.Sprint(repo.ID)) - defer wikiWorkingPool.CheckOut(fmt.Sprint(repo.ID)) + lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID)) + if err := lock.Lock(); err != nil { + return err + } + defer func() { + if _, err := lock.Unlock(); err != nil { + log.Error("lock.Unlock: %v", err) + } + }() if err = InitWiki(ctx, repo); err != nil { return fmt.Errorf("InitWiki: %w", err)