Add a reaper

This commit is contained in:
yosssi 2014-06-15 01:15:10 +09:00
parent bfe3e283f8
commit cb6c914754
16 changed files with 190 additions and 70 deletions

2
Makefile Normal file
View file

@ -0,0 +1,2 @@
protoc:
protoc --gogo_out=. shared/protobuf/*.proto

View file

@ -1,11 +0,0 @@
package boltstore
import "time"
const (
defaultPath = "/"
defaultDBPath = "./sessions.db"
defaultBucketName = "sessions"
defaultBatchSize = 1000
defaultCheckInterval = 10 * time.Second
)

5
doc.go
View file

@ -1,5 +0,0 @@
/*
Package boltstore provides a session store backend
for gorilla/sessions using Bolt.
*/
package boltstore

5
reaper/doc.go Normal file
View file

@ -0,0 +1,5 @@
/*
Package reaper provides a reaper which removes
expired sessions.
*/
package reaper

27
reaper/options.go Normal file
View file

@ -0,0 +1,27 @@
package reaper
import (
"time"
"github.com/yosssi/boltstore/shared"
)
// Options represents options for the reaper.
type Options struct {
BucketName []byte
BatchSize int
CheckInterval time.Duration
}
// setDefault sets default to the reaper options.
func (o *Options) setDefault() {
if o.BucketName == nil {
o.BucketName = []byte(shared.DefaultBucketName)
}
if o.BatchSize == 0 {
o.BatchSize = shared.DefaultBatchSize
}
if o.CheckInterval == 0 {
o.CheckInterval = shared.DefaultCheckInterval
}
}

83
reaper/reaper.go Normal file
View file

@ -0,0 +1,83 @@
package reaper
import (
"log"
"time"
"github.com/boltdb/bolt"
"github.com/yosssi/boltstore/shared"
)
// Run invokes a reap function as a goroutine.
func Run(db *bolt.DB, options Options) (chan<- struct{}, <-chan struct{}) {
options.setDefault()
quitC, doneC := make(chan struct{}), make(chan struct{})
go reap(db, options, quitC, doneC)
return quitC, doneC
}
// Quit terminats the reap goroutine.
func Quit(quitC chan<- struct{}, doneC <-chan struct{}) {
quitC <- struct{}{}
<-doneC
}
func reap(db *bolt.DB, options Options, quitC <-chan struct{}, doneC chan<- struct{}) {
var prevKey []byte
for {
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(options.BucketName)
if bucket == nil {
return nil
}
c := bucket.Cursor()
var i int
for k, v := c.Seek(prevKey); ; k, v = c.Next() {
// If we hit the end of our sessions then
// exit and start over next time.
if k == nil {
prevKey = nil
return nil
}
i++
session, err := shared.Session(v)
if err != nil {
return err
}
if shared.Expired(session) {
err := db.Update(func(txu *bolt.Tx) error {
return txu.Bucket(options.BucketName).Delete(k)
})
if err != nil {
return err
}
}
if options.BatchSize == i {
copy(prevKey, k)
return nil
}
}
})
if err != nil {
log.Println(err.Error())
}
// Check if a quit signal is sent.
select {
case <-quitC:
doneC <- struct{}{}
return
default:
}
time.Sleep(options.CheckInterval)
}
}

15
shared/consts.go Normal file
View file

@ -0,0 +1,15 @@
package shared
import "time"
// Defaults for store.Options
const (
DefaultPath = "/"
DefaultBucketName = "sessions"
)
// Defaults for reaper.Options
const (
DefaultBatchSize = 10
DefaultCheckInterval = time.Second
)

4
shared/doc.go Normal file
View file

@ -0,0 +1,4 @@
/*
Package shared provides shared types, functions and constants.
*/
package shared

View file

@ -1,12 +1,12 @@
// Code generated by protoc-gen-gogo. // Code generated by protoc-gen-gogo.
// source: session.proto // source: shared/protobuf/session.proto
// DO NOT EDIT! // DO NOT EDIT!
/* /*
Package protobuf is a generated protocol buffer package. Package protobuf is a generated protocol buffer package.
It is generated from these files: It is generated from these files:
session.proto shared/protobuf/session.proto
It has these top-level messages: It has these top-level messages:
Session Session

20
shared/utils.go Normal file
View file

@ -0,0 +1,20 @@
package shared
import (
"time"
"code.google.com/p/gogoprotobuf/proto"
"github.com/yosssi/boltstore/shared/protobuf"
)
// Session converts the byte slice to the session struct value.
func Session(data []byte) (protobuf.Session, error) {
session := protobuf.Session{}
err := proto.Unmarshal(data, &session)
return session, err
}
// Expired checks if the session is expired.
func Expired(session protobuf.Session) bool {
return *session.ExpiresAt > 0 && *session.ExpiresAt <= time.Now().Unix()
}

View file

@ -1,6 +1,9 @@
package boltstore package store
import "github.com/gorilla/sessions" import (
"github.com/gorilla/sessions"
"github.com/yosssi/boltstore/shared"
)
// Config represents a config for a session store. // Config represents a config for a session store.
type Config struct { type Config struct {
@ -11,12 +14,9 @@ type Config struct {
// setDefault sets default to the config. // setDefault sets default to the config.
func (c *Config) setDefault() { func (c *Config) setDefault() {
if c.SessionOptions.Path == "" { if c.SessionOptions.Path == "" {
c.SessionOptions.Path = defaultPath c.SessionOptions.Path = shared.DefaultPath
}
if c.DBOptions.Path == "" {
c.DBOptions.Path = defaultDBPath
} }
if c.DBOptions.BucketName == nil { if c.DBOptions.BucketName == nil {
c.DBOptions.BucketName = []byte(defaultBucketName) c.DBOptions.BucketName = []byte(shared.DefaultBucketName)
} }
} }

5
store/doc.go Normal file
View file

@ -0,0 +1,5 @@
/*
Package store provides a session store backend
for gorilla/sessions using Bolt.
*/
package store

View file

@ -1,7 +1,6 @@
package boltstore package store
// Options represents options for a database. // Options represents options for a database.
type Options struct { type Options struct {
Path string
BucketName []byte BucketName []byte
} }

View file

@ -1,9 +1,9 @@
package boltstore package store
import ( import (
"time" "time"
"github.com/yosssi/boltstore/protobuf" "github.com/yosssi/boltstore/shared/protobuf"
) )
// NewSession creates and returns a session data. // NewSession creates and returns a session data.

View file

@ -1,4 +1,4 @@
package boltstore package store
import ( import (
"bytes" "bytes"
@ -6,13 +6,12 @@ import (
"encoding/gob" "encoding/gob"
"net/http" "net/http"
"strings" "strings"
"time"
"code.google.com/p/gogoprotobuf/proto" "code.google.com/p/gogoprotobuf/proto"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"github.com/gorilla/sessions" "github.com/gorilla/sessions"
"github.com/yosssi/boltstore/protobuf" "github.com/yosssi/boltstore/shared"
) )
// store represents a session store. // store represents a session store.
@ -69,39 +68,12 @@ func (s *store) Save(r *http.Request, w http.ResponseWriter, session *sessions.S
return nil return nil
} }
// Close closes the database.
func (s *store) Close() error {
return s.db.Close()
}
// open Opens a database and sets it to the session store.
func (s *store) open() error {
// Open a database.
db, err := bolt.Open(s.config.DBOptions.Path, 0666)
if err != nil {
return err
}
// Create a bucket if it does not exist.
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(s.config.DBOptions.BucketName)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
s.db = db
return nil
}
// load loads a session data from the database. // load loads a session data from the database.
// True is returned if there is a session data in the database. // True is returned if there is a session data in the database.
func (s *store) load(session *sessions.Session) (bool, error) { func (s *store) load(session *sessions.Session) (bool, error) {
// exists represents whether a session data exists or not. // exists represents whether a session data exists or not.
var exists bool var exists bool
err := s.db.Update(func(tx *bolt.Tx) error { err := s.db.View(func(tx *bolt.Tx) error {
id := []byte(session.ID) id := []byte(session.ID)
bucket := tx.Bucket(s.config.DBOptions.BucketName) bucket := tx.Bucket(s.config.DBOptions.BucketName)
// Get the session data. // Get the session data.
@ -109,17 +81,16 @@ func (s *store) load(session *sessions.Session) (bool, error) {
if data == nil { if data == nil {
return nil return nil
} }
sessionData := &protobuf.Session{} sessionData, err := shared.Session(data)
// Convert the byte slice to the Session struct value. if err != nil {
if err := proto.Unmarshal(data, sessionData); err != nil {
return err return err
} }
// Check the expiration of the session data. // Check the expiration of the session data.
if *sessionData.ExpiresAt > 0 && *sessionData.ExpiresAt < time.Now().Unix() { if shared.Expired(sessionData) {
if err := bucket.Delete(id); err != nil { err := s.db.Update(func(txu *bolt.Tx) error {
return err return txu.Bucket(s.config.DBOptions.BucketName).Delete(id)
} })
return nil return err
} }
exists = true exists = true
dec := gob.NewDecoder(bytes.NewBuffer(sessionData.Values)) dec := gob.NewDecoder(bytes.NewBuffer(sessionData.Values))
@ -158,13 +129,18 @@ func (s *store) save(session *sessions.Session) error {
} }
// New creates and returns a session store. // New creates and returns a session store.
func New(config Config, keyPairs ...[]byte) (*store, error) { func New(db *bolt.DB, config Config, keyPairs ...[]byte) (*store, error) {
config.setDefault() config.setDefault()
store := &store{ store := &store{
codecs: securecookie.CodecsFromPairs(keyPairs...), codecs: securecookie.CodecsFromPairs(keyPairs...),
config: config, config: config,
db: db,
} }
if err := store.open(); err != nil { err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(config.DBOptions.BucketName)
return err
})
if err != nil {
return nil, err return nil, err
} }
return store, nil return store, nil