add melody pubsub

This commit is contained in:
Anbraten 2024-03-11 15:42:21 +01:00
parent bb4443da28
commit 372faaa587
6 changed files with 134 additions and 4 deletions

View File

@ -5,9 +5,9 @@ tmp_dir = ".air"
cmd = "make --no-print-directory backend"
bin = "gitea"
delay = 1000
include_ext = ["go", "tmpl"]
include_ext = ["go", "tmpl", "css", "js"]
include_file = ["main.go"]
include_dir = ["cmd", "models", "modules", "options", "routers", "services"]
include_dir = ["cmd", "models", "modules", "options", "public", "routers", "services", "templates"]
exclude_dir = ["modules/git/tests", "services/gitdiff/testdata", "modules/avatar/testdata", "models/fixtures", "models/migrations/fixtures", "modules/migration/file_format_testdata", "modules/avatar/identicon/testdata"]
exclude_regex = ["_test.go$", "_gen.go$"]
stop_on_error = true

59
services/pubsub/memory.go Normal file
View File

@ -0,0 +1,59 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package pubsub
import (
"context"
"sync"
)
type Memory struct {
sync.Mutex
topics map[string]map[*Subscriber]struct{}
}
// New creates an in-memory publisher.
func NewMemory() Broker {
return &Memory{
topics: make(map[string]map[*Subscriber]struct{}),
}
}
func (p *Memory) Publish(_ context.Context, message Message) {
p.Lock()
topic, ok := p.topics[message.Topic]
if !ok {
p.Unlock()
return
}
for s := range topic {
go (*s)(message)
}
p.Unlock()
}
func (p *Memory) Subscribe(c context.Context, topic string, subscriber Subscriber) {
// Subscribe
p.Lock()
_, ok := p.topics[topic]
if !ok {
p.topics[topic] = make(map[*Subscriber]struct{})
}
p.topics[topic][&subscriber] = struct{}{}
p.Unlock()
// Wait for context to be done
<-c.Done()
// Unsubscribe
p.Lock()
delete(p.topics[topic], &subscriber)
if len(p.topics[topic]) == 0 {
delete(p.topics, topic)
}
p.Unlock()
}

View File

@ -0,0 +1,47 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package pubsub
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPubsub(t *testing.T) {
var (
wg sync.WaitGroup
testMessage = Message{
Data: []byte("test"),
Topic: "hello-world",
}
)
ctx, cancel := context.WithCancelCause(
context.Background(),
)
broker := NewMemory()
go func() {
broker.Subscribe(ctx, "hello-world", func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
}()
go func() {
broker.Subscribe(ctx, "hello-world", func(_ Message) { wg.Done() })
}()
// Wait a bit for the subscriptions to be registered
<-time.After(100 * time.Millisecond)
wg.Add(2)
go func() {
broker.Publish(ctx, testMessage)
}()
wg.Wait()
cancel(nil)
}

20
services/pubsub/types.go Normal file
View File

@ -0,0 +1,20 @@
package pubsub
import "context"
// Message defines a published message.
type Message struct {
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Topic is the topic of the message.
Topic string `json:"topic"`
}
// Subscriber receives published messages.
type Subscriber func(Message)
type Broker interface {
Publish(c context.Context, message Message)
Subscribe(c context.Context, topic string, subscriber Subscriber)
}

View File

@ -7,6 +7,7 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/templates"
notify_service "code.gitea.io/gitea/services/notify"
"code.gitea.io/gitea/services/pubsub"
"github.com/olahol/melody"
)
@ -17,7 +18,7 @@ type websocketNotifier struct {
}
// NewNotifier create a new webhooksNotifier notifier
func newNotifier(m *melody.Melody) notify_service.Notifier {
func newNotifier(m *melody.Melody, pubsub pubsub.Broker) notify_service.Notifier {
return &websocketNotifier{
m: m,
rnd: templates.HTMLRenderer(),

View File

@ -7,6 +7,7 @@ import (
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/services/context"
notify_service "code.gitea.io/gitea/services/notify"
"code.gitea.io/gitea/services/pubsub"
"github.com/mitchellh/mapstructure"
"github.com/olahol/melody"
@ -28,7 +29,9 @@ func Init() *melody.Melody {
m.HandleConnect(handleConnect)
m.HandleMessage(handleMessage)
m.HandleDisconnect(handleDisconnect)
notify_service.RegisterNotifier(newNotifier(m))
broker := pubsub.NewMemory() // TODO: allow for other pubsub implementations
notify_service.RegisterNotifier(newNotifier(m, broker))
return m
}