using mutex over channels, and some other minor cleanups

This commit is contained in:
sdomino 2014-12-16 14:30:28 -07:00
parent c924cd01ef
commit b405d6fba9
1 changed files with 62 additions and 67 deletions

View File

@ -2,10 +2,12 @@ package scribble
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
"sync"
"github.com/nanobox-core/hatchet" "github.com/nanobox-core/hatchet"
) )
@ -18,16 +20,16 @@ type (
// Driver // Driver
Driver struct { Driver struct {
channels map[string]chan int mutexes map[string]sync.Mutex
dir string dir string
log *hatchet.Logger log hatchet.Logger
} }
// Transaction represents // Transaction represents
Transaction struct { Transaction struct {
Action string Action string
Collection string Collection string
Resource string ResourceID string
Container interface{} Container interface{}
} }
) )
@ -36,9 +38,17 @@ type (
func New(dir string, logger hatchet.Logger) (*Driver, error) { func New(dir string, logger hatchet.Logger) (*Driver, error) {
fmt.Printf("Creating database directory at '%v'...\n", dir) fmt.Printf("Creating database directory at '%v'...\n", dir)
scribble := &Driver{} //
scribble.dir = dir if logger == nil {
scribble.channels = make(map[string]chan int) logger = hatchet.DevNullLogger{}
}
//
scribble := &Driver{
dir: dir,
mutexes: make(map[string]sync.Mutex),
log: logger,
}
// //
if err := mkDir(scribble.dir); err != nil { if err := mkDir(scribble.dir); err != nil {
@ -52,92 +62,74 @@ func New(dir string, logger hatchet.Logger) (*Driver, error) {
// Transact // Transact
func (d *Driver) Transact(trans Transaction) error { func (d *Driver) Transact(trans Transaction) error {
//
done := d.getOrCreateChan(trans.Collection)
fail := make(chan error)
// //
switch trans.Action { switch trans.Action {
case "write": case "write":
go d.write(trans, done, fail) return d.write(trans)
case "read": case "read":
go d.read(trans, done, fail) return d.read(trans)
case "readall": case "readall":
go d.readAll(trans, done, fail) return d.readAll(trans)
case "delete": case "delete":
go d.delete(trans, done, fail) return d.delete(trans)
default: default:
fmt.Println("Unsupported action ", trans.Action) return errors.New(fmt.Sprintf("Unsupported action %+v", trans.Action))
} }
// wait until we're done, or error return nil
select {
case <-done:
return nil
case err := <-fail:
return err
}
} }
// private // private
// write // write
func (d *Driver) write(trans Transaction, done chan<- int, fail chan<- error) { func (d *Driver) write(trans Transaction) error {
mutex := d.getOrCreateMutex(trans.Collection)
mutex.Lock()
// //
dir := d.dir + "/" + trans.Collection dir := d.dir + "/" + trans.Collection
//
if err := mkDir(dir); err != nil {
fail <- err
}
//
file, err := os.Create(dir + "/" + trans.Resource)
if err != nil {
fail <- err
}
defer file.Close()
// //
b, err := json.MarshalIndent(trans.Container, "", "\t") b, err := json.MarshalIndent(trans.Container, "", "\t")
if err != nil { if err != nil {
fail <- err return err
} }
_, err = file.WriteString(string(b)) //
if err != nil { if err := mkDir(dir); err != nil {
fail <- err return err
} }
// release... //
done <- 0 if err := ioutil.WriteFile(trans.ResourceID, b, 0666); err != nil {
return err
}
mutex.Unlock()
return nil
} }
// read // read
func (d *Driver) read(trans Transaction, done chan<- int, fail chan<- error) interface{} { func (d *Driver) read(trans Transaction) error {
dir := d.dir + "/" + trans.Collection dir := d.dir + "/" + trans.Collection
b, err := ioutil.ReadFile(dir + "/" + trans.Resource) b, err := ioutil.ReadFile(dir + "/" + trans.ResourceID)
if err != nil { if err != nil {
fmt.Printf("Unable to read file %v/%v: %v", trans.Collection, trans.Resource, err) return err
os.Exit(1)
} }
if err := json.Unmarshal(b, trans.Container); err != nil { if err := json.Unmarshal(b, trans.Container); err != nil {
fail <- err return err
} }
// release... return nil
done <- 0
return trans.Container
} }
// readAll // readAll
func (d *Driver) readAll(trans Transaction, done chan<- int, fail chan<- error) { func (d *Driver) readAll(trans Transaction) error {
dir := d.dir + "/" + trans.Collection dir := d.dir + "/" + trans.Collection
@ -153,7 +145,7 @@ func (d *Driver) readAll(trans Transaction, done chan<- int, fail chan<- error)
for _, file := range files { for _, file := range files {
b, err := ioutil.ReadFile(dir + "/" + file.Name()) b, err := ioutil.ReadFile(dir + "/" + file.Name())
if err != nil { if err != nil {
fail <- err return err
} }
f = append(f, string(b)) f = append(f, string(b))
@ -161,38 +153,41 @@ func (d *Driver) readAll(trans Transaction, done chan<- int, fail chan<- error)
// //
if err := json.Unmarshal([]byte("["+strings.Join(f, ",")+"]"), trans.Container); err != nil { if err := json.Unmarshal([]byte("["+strings.Join(f, ",")+"]"), trans.Container); err != nil {
fail <- err return err
} }
// release... return nil
done <- 0
} }
// delete // delete
func (d *Driver) delete(trans Transaction, done chan<- int, fail chan<- error) { func (d *Driver) delete(trans Transaction) error {
mutex := d.getOrCreateMutex(trans.Collection)
mutex.Lock()
dir := d.dir + "/" + trans.Collection dir := d.dir + "/" + trans.Collection
err := os.Remove(dir + "/" + trans.Resource) err := os.Remove(dir + "/" + trans.ResourceID)
if err != nil { if err != nil {
fail <- err return err
} }
// release... mutex.Unlock()
done <- 0
return nil
} }
// helpers // helpers
// getChan // getOrCreateMutex
func (d *Driver) getOrCreateChan(channel string) chan int { func (d *Driver) getOrCreateMutex(collection string) sync.Mutex {
c, ok := d.channels[channel] c, ok := d.mutexes[collection]
// if the chan doesn't exist make it // if the mutex doesn't exist make it
if !ok { if !ok {
d.channels[channel] = make(chan int) d.mutexes[collection] = sync.Mutex{}
return d.channels[channel] return d.mutexes[collection]
} }
return c return c