Skip to content

Commit

Permalink
Merge pull request #1404 from traPtitech/fix/ogp-cache-safety
Browse files Browse the repository at this point in the history
OGP/BOTサービスのスレッド安全性を修正
  • Loading branch information
motoki317 authored Mar 24, 2022
2 parents ae1c740 + 0578029 commit 1f7f6ac
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 113 deletions.
4 changes: 0 additions & 4 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,7 @@ func (s *Server) Start(address string) error {
_ = s.Repo.UpdateUser(userID, repository.UpdateUserArgs{LastOnline: optional.TimeFrom(datetime)})
}
}()
s.SS.BOT.Start()
s.SS.StampThrottler.Start()
if err := s.SS.OGP.Start(); err != nil {
return err
}
return s.Router.Start(address)
}

Expand Down
35 changes: 2 additions & 33 deletions repository/gorm/ogp_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gorm
import (
"crypto/sha1"
"fmt"
"reflect"
"time"

"gorm.io/gorm"
Expand All @@ -20,7 +19,7 @@ func getURLHash(url string) (string, error) {
}

// CreateOgpCache implements OgpRepository interface.
func (repo *Repository) CreateOgpCache(url string, content *model.Ogp) (c *model.OgpCache, err error) {
func (repo *Repository) CreateOgpCache(url string, content *model.Ogp) (*model.OgpCache, error) {
urlHash, err := getURLHash(url)
if err != nil {
return nil, err
Expand All @@ -47,36 +46,6 @@ func (repo *Repository) CreateOgpCache(url string, content *model.Ogp) (c *model
return ogpCache, nil
}

// UpdateOgpCache implements OgpRepository interface.
func (repo *Repository) UpdateOgpCache(url string, content *model.Ogp) error {
urlHash, err := getURLHash(url)
if err != nil {
return err
}

changes := map[string]interface{}{}
return repo.db.Transaction(func(tx *gorm.DB) error {
var c model.OgpCache
if err := tx.First(&c, &model.OgpCache{URL: url, URLHash: urlHash}).Error; err != nil {
return convertError(err)
}

if content == nil {
changes["valid"] = false
changes["content"] = model.Ogp{}
changes["expires_at"] = time.Now().Add(ogp.CacheDuration)
return tx.Model(&c).Updates(changes).Error
}
if !reflect.DeepEqual(c.Content, content) {
changes["valid"] = true
changes["content"] = content
changes["expires_at"] = time.Now().Add(ogp.CacheDuration)
return tx.Model(&c).Updates(changes).Error
}
return nil
})
}

// GetOgpCache implements OgpRepository interface.
func (repo *Repository) GetOgpCache(url string) (c *model.OgpCache, err error) {
urlHash, err := getURLHash(url)
Expand All @@ -99,7 +68,7 @@ func (repo *Repository) DeleteOgpCache(url string) error {
}
result := repo.db.Delete(c)
if result.Error != nil {
return result.Error
return convertError(result.Error)
}
if result.RowsAffected == 0 {
return repository.ErrNotFound
Expand Down
10 changes: 3 additions & 7 deletions repository/ogp_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ import "github.com/traPtitech/traQ/model"
type OgpCacheRepository interface {
// CreateOgpCache OGPキャッシュを作成します
//
// contentがnilの場合、Validをfalseとしたネガティブキャッシュを作成します。
//
// 成功した場合、作成されたOGPキャッシュとnilを返します。
// DBによるエラーを返すことがあります。
CreateOgpCache(url string, content *model.Ogp) (c *model.OgpCache, err error)

// UpdateOgpCache OGPキャッシュを更新します
//
// 成功した場合、nilを返します。
// 存在しなかった場合、ErrNotFoundを返します。
// DBによるエラーを返すことがあります。
UpdateOgpCache(url string, content *model.Ogp) error

// GetOgpCache 指定したURLのOGPキャッシュを取得します
//
// 成功した場合、取得したOGPキャッシュとnilを返します。
Expand All @@ -26,6 +21,7 @@ type OgpCacheRepository interface {
// DeleteOgpCache 指定したURLのOGPキャッシュを削除します
//
// 成功した場合、nilを返します。
// 存在しなかった場合、ErrNotFoundを返します。
// DBによるエラーを返すことがあります。
DeleteOgpCache(url string) error

Expand Down
2 changes: 0 additions & 2 deletions service/bot/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import "context"

// Service BOTサービス
type Service interface {
// Start BOTサービスを開始します
Start()
// Shutdown BOTサービスをシャットダウンします
Shutdown(ctx context.Context) error
}
52 changes: 31 additions & 21 deletions service/bot/service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ type serviceImpl struct {
dispatcher event.Dispatcher
hub *hub.Hub

sub hub.Subscription
wg sync.WaitGroup
logPurger *jitterbug.Ticker
started bool
sub hub.Subscription
logPurger *jitterbug.Ticker
serviceDone chan struct{}
hubDone chan struct{}
purgerDone chan struct{}
}

// NewService ボットサービスを生成します
Expand All @@ -42,16 +43,16 @@ func NewService(repo repository.Repository, cm channel.Manager, hub *hub.Hub, s
logger: logger.Named("bot"),
hub: hub,
dispatcher: event.NewDispatcher(logger, repo, s),

serviceDone: make(chan struct{}),
hubDone: make(chan struct{}),
purgerDone: make(chan struct{}),
}
p.start()
return p
}

func (p *serviceImpl) Start() {
if p.started {
return
}
p.started = true

func (p *serviceImpl) start() {
// イベントの発送を開始
events := make([]string, 0, len(eventHandlerSet))
for k := range eventHandlerSet {
Expand All @@ -60,10 +61,12 @@ func (p *serviceImpl) Start() {
p.sub = p.hub.Subscribe(100, events...)

go func() {
defer close(p.hubDone)
var wg sync.WaitGroup
for ev := range p.sub.Receiver {
p.wg.Add(1)
wg.Add(1)
go func(ev hub.Message) {
defer p.wg.Done()
defer wg.Done()
h, ok := eventHandlerSet[ev.Name]
if ok {
err := h(p, time.Now(), ev.Name, ev.Fields)
Expand All @@ -73,32 +76,39 @@ func (p *serviceImpl) Start() {
}
}(ev)
}
wg.Wait()
}()

// BOTイベントログの定期的消去
p.logPurger = jitterbug.New(time.Hour*24, &jitterbug.Uniform{
Min: time.Hour * 23,
})
go func() {
for range p.logPurger.C {
p.wg.Add(1)
if err := p.repo.PurgeBotEventLogs(time.Now().Add(-botEventLogPurgeBefore)); err != nil {
p.logger.Error("an error occurred while puring old bot event logs", zap.Error(err))
defer close(p.purgerDone)
for {
select {
case _, ok := <-p.logPurger.C:
if !ok {
return
}
if err := p.repo.PurgeBotEventLogs(time.Now().Add(-botEventLogPurgeBefore)); err != nil {
p.logger.Error("an error occurred while puring old bot event logs", zap.Error(err))
}
case <-p.serviceDone:
return
}
p.wg.Done()
}
}()

p.logger.Info("bot service started")
}

func (p *serviceImpl) Shutdown(ctx context.Context) error {
if !p.started {
return nil
}
p.hub.Unsubscribe(p.sub)
p.logPurger.Stop()
p.wg.Wait()
close(p.serviceDone)
<-p.hubDone
<-p.purgerDone
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions service/ogp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

// Service OGPサービス
type Service interface {
// Start OGPサービスを開始します
Start() error
// Shutdown OGPサービスを停止します
Shutdown() error

Expand Down
102 changes: 58 additions & 44 deletions service/ogp/service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ogp
import (
"errors"
"net/url"
"sync"
"time"

"github.com/lthibault/jitterbug/v2"
Expand All @@ -20,28 +19,43 @@ type ServiceImpl struct {
logger *zap.Logger

cachePurger *jitterbug.Ticker
wg sync.WaitGroup
serviceDone chan struct{}
purgerDone chan struct{}
sfGroup singleflight.Group
}

func NewServiceImpl(repo repository.Repository, logger *zap.Logger) (Service, error) {
return &ServiceImpl{
s := &ServiceImpl{
repo: repo,
logger: logger,
}, nil

cachePurger: jitterbug.New(time.Hour*24, &jitterbug.Uniform{
Min: time.Hour * 23,
}),
serviceDone: make(chan struct{}),
purgerDone: make(chan struct{}),
}
if err := s.start(); err != nil {
return nil, err
}
return s, nil
}

func (s *ServiceImpl) Start() error {
s.cachePurger = jitterbug.New(time.Hour*24, &jitterbug.Uniform{
Min: time.Hour * 23,
})
func (s *ServiceImpl) start() error {
go func() {
for range s.cachePurger.C {
s.wg.Add(1)
if err := s.repo.DeleteStaleOgpCache(); err != nil {
s.logger.Error("an error occurred while deleting stale ogp caches", zap.Error(err))
defer close(s.purgerDone)
for {
select {
case _, ok := <-s.cachePurger.C:
if !ok {
return
}
if err := s.repo.DeleteStaleOgpCache(); err != nil {
s.logger.Error("an error occurred while deleting stale ogp caches", zap.Error(err))
}
case <-s.serviceDone:
return
}
s.wg.Done()
}
}()

Expand All @@ -51,7 +65,8 @@ func (s *ServiceImpl) Start() error {

func (s *ServiceImpl) Shutdown() error {
s.cachePurger.Stop()
s.wg.Wait()
close(s.serviceDone)
<-s.purgerDone
return nil
}

Expand All @@ -75,51 +90,50 @@ func (s *ServiceImpl) GetMeta(url *url.URL) (ogp *model.Ogp, expiresIn time.Dura
func (s *ServiceImpl) getMeta(url *url.URL) (ogp *model.Ogp, expiresIn time.Duration, err error) {
cacheURL := url.String()
cache, err := s.repo.GetOgpCache(cacheURL)
if err != nil && err != repository.ErrNotFound {
return nil, 0, err
}

shouldUpdateCache := err == nil &&
time.Now().After(cache.ExpiresAt)
shouldCreateCache := err != nil

if !shouldUpdateCache && !shouldCreateCache && err == nil {
now := time.Now()
isCacheHit := err == nil && now.Before(cache.ExpiresAt)
isCacheExpired := err == nil && !now.Before(cache.ExpiresAt)
if isCacheHit {
if cache.Valid {
// 通常のキャッシュヒット
return &cache.Content, time.Until(cache.ExpiresAt), nil
}
// キャッシュがヒットしたがネガティブキャッシュだった
// ネガティブキャッシュヒット
return nil, time.Until(cache.ExpiresAt), nil
}
if isCacheExpired {
if err := s.repo.DeleteOgpCache(cacheURL); err != nil && err != repository.ErrNotFound {
return nil, 0, err
}
}

// キャッシュが存在しなかったので、リクエストを飛ばす
og, meta, err := parser.ParseMetaForURL(url)
if err == parser.ErrClient || err == parser.ErrParse || err == parser.ErrNetwork || err == parser.ErrContentTypeNotSupported {
// 4xxエラー、パースエラー、名前解決などのネットワークエラーの場合はネガティブキャッシュを作成
if shouldUpdateCache {
updateErr := s.repo.UpdateOgpCache(cacheURL, nil)
if updateErr != nil {
return nil, time.Duration(0), updateErr
}
} else if shouldCreateCache {

if err != nil {
switch err {
case parser.ErrClient, parser.ErrParse, parser.ErrNetwork, parser.ErrContentTypeNotSupported:
// 4xxエラー、パースエラー、名前解決などのネットワークエラーの場合はネガティブキャッシュを作成
_, createErr := s.repo.CreateOgpCache(cacheURL, nil)
if createErr != nil {
return nil, time.Duration(0), createErr
return nil, 0, createErr
}
return nil, CacheDuration, nil
default:
// このパスは5xxエラーなのでクライアント側キャッシュつけない
return nil, 0, nil
}
return nil, CacheDuration, nil
} else if err != nil {
// このパスは5xxエラーなのでクライアント側キャッシュつけない
return nil, time.Duration(0), nil
}

// リクエストが成功した場合はキャッシュを作成
content := parser.MergeDefaultPageMetaAndOpenGraph(og, meta)

if shouldUpdateCache {
err = s.repo.UpdateOgpCache(cacheURL, content)
if err != nil {
return nil, time.Duration(0), err
}
} else if shouldCreateCache {
_, err = s.repo.CreateOgpCache(cacheURL, content)
if err != nil {
return nil, time.Duration(0), err
}
_, err = s.repo.CreateOgpCache(cacheURL, content)
if err != nil {
return nil, 0, err
}

return content, CacheDuration, nil
Expand Down

0 comments on commit 1f7f6ac

Please sign in to comment.