From b405d6fba95b239182c03a018a3257b2515f1a12 Mon Sep 17 00:00:00 2001 From: sdomino Date: Tue, 16 Dec 2014 14:30:28 -0700 Subject: [PATCH] using mutex over channels, and some other minor cleanups --- scribble.go | 129 +++++++++++++++++++++++++--------------------------- 1 file changed, 62 insertions(+), 67 deletions(-) diff --git a/scribble.go b/scribble.go index 11df0c8..99840b7 100644 --- a/scribble.go +++ b/scribble.go @@ -2,10 +2,12 @@ package scribble import ( "encoding/json" + "errors" "fmt" "io/ioutil" "os" "strings" + "sync" "github.com/nanobox-core/hatchet" ) @@ -18,16 +20,16 @@ type ( // Driver Driver struct { - channels map[string]chan int + mutexes map[string]sync.Mutex dir string - log *hatchet.Logger + log hatchet.Logger } // Transaction represents Transaction struct { Action string Collection string - Resource string + ResourceID string Container interface{} } ) @@ -36,9 +38,17 @@ type ( func New(dir string, logger hatchet.Logger) (*Driver, error) { fmt.Printf("Creating database directory at '%v'...\n", dir) - scribble := &Driver{} - scribble.dir = dir - scribble.channels = make(map[string]chan int) + // + if logger == nil { + logger = hatchet.DevNullLogger{} + } + + // + scribble := &Driver{ + dir: dir, + mutexes: make(map[string]sync.Mutex), + log: logger, + } // if err := mkDir(scribble.dir); err != nil { @@ -52,92 +62,74 @@ func New(dir string, logger hatchet.Logger) (*Driver, error) { // Transact func (d *Driver) Transact(trans Transaction) error { - // - done := d.getOrCreateChan(trans.Collection) - fail := make(chan error) - // switch trans.Action { case "write": - go d.write(trans, done, fail) + return d.write(trans) case "read": - go d.read(trans, done, fail) + return d.read(trans) case "readall": - go d.readAll(trans, done, fail) + return d.readAll(trans) case "delete": - go d.delete(trans, done, fail) + return d.delete(trans) default: - fmt.Println("Unsupported action ", trans.Action) + return errors.New(fmt.Sprintf("Unsupported action %+v", trans.Action)) } - // wait until we're done, or error - select { - case <-done: - return nil - case err := <-fail: - return err - } + return nil } // private // write -func (d *Driver) write(trans Transaction, done chan<- int, fail chan<- error) { +func (d *Driver) write(trans Transaction) error { + + mutex := d.getOrCreateMutex(trans.Collection) + mutex.Lock() // dir := d.dir + "/" + trans.Collection - // - if err := mkDir(dir); err != nil { - fail <- err - } - - // - file, err := os.Create(dir + "/" + trans.Resource) - if err != nil { - fail <- err - } - - defer file.Close() - // b, err := json.MarshalIndent(trans.Container, "", "\t") if err != nil { - fail <- err + return err } - _, err = file.WriteString(string(b)) - if err != nil { - fail <- err + // + if err := mkDir(dir); err != nil { + return err } - // release... - done <- 0 + // + if err := ioutil.WriteFile(trans.ResourceID, b, 0666); err != nil { + return err + } + + mutex.Unlock() + + return nil } // read -func (d *Driver) read(trans Transaction, done chan<- int, fail chan<- error) interface{} { +func (d *Driver) read(trans Transaction) error { dir := d.dir + "/" + trans.Collection - b, err := ioutil.ReadFile(dir + "/" + trans.Resource) + b, err := ioutil.ReadFile(dir + "/" + trans.ResourceID) if err != nil { - fmt.Printf("Unable to read file %v/%v: %v", trans.Collection, trans.Resource, err) - os.Exit(1) + return err } if err := json.Unmarshal(b, trans.Container); err != nil { - fail <- err + return err } - // release... - done <- 0 - - return trans.Container + return nil } // readAll -func (d *Driver) readAll(trans Transaction, done chan<- int, fail chan<- error) { +func (d *Driver) readAll(trans Transaction) error { dir := d.dir + "/" + trans.Collection @@ -153,7 +145,7 @@ func (d *Driver) readAll(trans Transaction, done chan<- int, fail chan<- error) for _, file := range files { b, err := ioutil.ReadFile(dir + "/" + file.Name()) if err != nil { - fail <- err + return err } f = append(f, string(b)) @@ -161,38 +153,41 @@ func (d *Driver) readAll(trans Transaction, done chan<- int, fail chan<- error) // if err := json.Unmarshal([]byte("["+strings.Join(f, ",")+"]"), trans.Container); err != nil { - fail <- err + return err } - // release... - done <- 0 + return nil } // delete -func (d *Driver) delete(trans Transaction, done chan<- int, fail chan<- error) { +func (d *Driver) delete(trans Transaction) error { + + mutex := d.getOrCreateMutex(trans.Collection) + mutex.Lock() dir := d.dir + "/" + trans.Collection - err := os.Remove(dir + "/" + trans.Resource) + err := os.Remove(dir + "/" + trans.ResourceID) if err != nil { - fail <- err + return err } - // release... - done <- 0 + mutex.Unlock() + + return nil } // helpers -// getChan -func (d *Driver) getOrCreateChan(channel string) chan int { +// getOrCreateMutex +func (d *Driver) getOrCreateMutex(collection string) sync.Mutex { - c, ok := d.channels[channel] + c, ok := d.mutexes[collection] - // if the chan doesn't exist make it + // if the mutex doesn't exist make it if !ok { - d.channels[channel] = make(chan int) - return d.channels[channel] + d.mutexes[collection] = sync.Mutex{} + return d.mutexes[collection] } return c