diff options
| -rw-r--r-- | commits.go | 30 | ||||
| -rw-r--r-- | scorsh.go | 62 | ||||
| -rw-r--r-- | spooler.go | 29 | ||||
| -rw-r--r-- | types.go | 95 | ||||
| -rw-r--r-- | workers.go | 13 | 
5 files changed, 145 insertions, 84 deletions
@@ -4,6 +4,7 @@ import (  	"fmt"  	"github.com/KatolaZ/git2go"  	"golang.org/x/crypto/openpgp" +	"log"  	"os"  	"strings"  	//	"log" @@ -23,15 +24,15 @@ func CommitToString(commit *git.Commit) string {  }  // FIXME: RETURN THE ENTITY PROVIDED BY THE CHECK, OR nil -func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, signed string, err error) { +func check_signature(commit *git.Commit, keys *map[string]openpgp.KeyRing) (signature, signed string, err error) {  	signature, signed, err = commit.ExtractSignature()  	if err == nil { -		for _, keyring := range keys { +		for _, keyring := range *keys {  			_, err_sig := -				openpgp.CheckArmoredDetachedSignature(*keyring, strings.NewReader(signed), +				openpgp.CheckArmoredDetachedSignature(keyring, strings.NewReader(signed),  					strings.NewReader(signature))  			if err_sig == nil { @@ -45,6 +46,14 @@ func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, si  	return "", "", err  } +func find_scorsh_message(commit *git.Commit) (string, error) { + +	msg := commit.RawMessage() +	debug.log("[find_scorsg_msg] found message:\n %s\n", msg) + +	return msg, nil +} +  // traverse all the commits between two references, looking for scorsh  // commands  // fixme: we don't have just one keyring here.... @@ -91,12 +100,17 @@ func walk_commits(msg SCORSHmsg, w *SCORSHworker) error {  			// check if it can be verified by any of the keyrings associated  			// with the scorsh-tag -			//signature, signed, err := check_signature(commit, &keyring) +			// check if the commit contains a scorsh command + +			_, err = find_scorsh_message(commit) + +			//signature, signed, err := check_signature(commit, &w.Keys)  			//_, _, err := check_signature(commit, w.keys) -			//if err != nil { -			//	log.Printf("%s\n", SCORSHerr(SCORSH_ERR_SIGNATURE)) -			// -			//} +			if err != nil { +				log.Printf("[worker: %s] %s\n", w.Name, SCORSHerr(SCORSH_ERR_SIGNATURE)) +			} else { + +			}  			cur_commit = commit.Parent(0)  		} else {  			fmt.Printf("Commit %x not found!\n", cur_commit.Id()) @@ -62,32 +62,52 @@ func Master(master *SCORSHmaster) {  	// master main loop:  	var matching_workers []*SCORSHworker -	var push_msg SCORSHmsg  	matching_workers = make([]*SCORSHworker, len(master.Workers))  	log.Println("[master] Master started ") +	debug.log("[master] StatusChan: %s\n", master.StatusChan)  	for { +		debug.log("[master] Receive loop...\n")  		select { -		// - receive stuff from the spooler -		case push_msg = <-master.Spooler: - +		case push_msg := <-master.Spooler: +			// here we manage the stuff we receive from the spooler  			debug.log("[master] received message: %s\n", push_msg) -  			// - lookup the repos map for matching workers  			matching_workers = FindMatchingWorkers(master, &push_msg) -			debug.log("[master] matching workers: %s\n", matching_workers) - -			// add the message to PendingMsg -			//... -			// - dispatch the message to all the matching workers -			for _, w := range matching_workers { -				// increase the counter associated to the message -				w.MsgChan <- push_msg +			debug.log("[master] matching workers: \n%s\n", matching_workers) + +			// add the message to WorkingMsg, if it's not a duplicate! +			if _, ok := master.WorkingMsg[push_msg.Id]; ok { +				log.Printf("[master] detected duplicate message %s \n", push_msg.Id) +			} else { +				master.WorkingMsg[push_msg.Id] = 0 +				// - dispatch the message to all the matching workers +				for _, w := range matching_workers { +					debug.log("[master] sending msg to worker: %s\n", w.Name) +					// send the message to the worker +					w.MsgChan <- push_msg +					// increase the counter associated to the message +					master.WorkingMsg[push_msg.Id] += 1 +					debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id]) +				} +			} +		case done_msg := <-master.StatusChan: +			// Here we manage a status message from a worker +			debug.log("[master] received message from StatusChan: %s\n", done_msg) +			if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 { +				master.WorkingMsg[done_msg.Id] -= 1 +				if master.WorkingMsg[done_msg.Id] == 0 { +					delete(master.WorkingMsg, done_msg.Id) +					master.Spooler <- done_msg +				} +			} else { +				log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id)  			}  		}  	} +	debug.log("[master] Exiting the for loop, for some mysterious reason...\n")  }  func InitMaster() *SCORSHmaster { @@ -96,9 +116,12 @@ func InitMaster() *SCORSHmaster {  	master.Repos = make(map[string][]*SCORSHworker)  	master.WorkingMsg = make(map[string]int) -	// This is the mutex-channel on which we receive acks from workers -	master.StatusChan = make(chan SCORSHmsg, 1) -	master.Spooler = make(chan SCORSHmsg, 1) +	// This is the channel on which we receive acks from workers +	master.StatusChan = make(chan SCORSHmsg) +	// This is the channel on which we exchange messages with the spooler +	master.Spooler = make(chan SCORSHmsg) + +	debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan)  	err_workers := StartWorkers(master)  	if err_workers != nil { @@ -111,17 +134,18 @@ func InitMaster() *SCORSHmaster {  		log.Fatal("Error starting spooler: ", err_spooler)  	}  	return master -  }  func main() { +	var done chan int +  	flag.Parse()  	master := InitMaster()  	go Master(master) -	<-master.StatusChan - +	// wait indefinitely -- we should implement signal handling... +	<-done  } @@ -6,6 +6,7 @@ import (  	"github.com/go-yaml/yaml"  	"io/ioutil"  	"log" +	"os"  	//	"time"  ) @@ -32,28 +33,40 @@ func parse_request(fname string, msg *SCORSHmsg) error {  	return nil  } -func spooler(watcher *fsnotify.Watcher, worker chan SCORSHmsg) { +func spooler(watcher *fsnotify.Watcher, master chan SCORSHmsg) {  	log.Println("Spooler started correctly") -	var msg *SCORSHmsg -	msg = new(SCORSHmsg) -  	for {  		select {  		case event := <-watcher.Events: +			// Here we manage genuine events from fsnotify. We catch the +			// "Write" event, which should happen only when the file is +			// created  			if event.Op == fsnotify.Write { -				//time.Sleep(1000 * time.Millisecond) +				var msg SCORSHmsg  				debug.log("[spooler] new file %s detected\n", event.Name) -				err := parse_request(event.Name, msg) +				err := parse_request(event.Name, &msg)  				if err != nil {  					log.Printf("Invalid packet received. [%s]\n", err)  				}  				debug.log("[spooler] read message: %s\n", msg) -				worker <- *msg +				msg.Path = event.Name +				master <- msg  			}  		case err := <-watcher.Errors: -			log.Println("error:", err) +			// here we manage event errors +			log.Println("[spooler] error: ", err) +		case msg := <-master: +			// Here we receive messages from the master about files to be +			// removed +			log.Printf("[spooler] received deletion request for: %s\n", msg.Path) +			err := os.Remove(msg.Path) +			if err != nil { +				log.Printf("[spooler] error removing file: %s\n", err) +			} else { +				log.Printf("[spooler] file %s successfully removed\n", msg.Path) +			}  		}  	}  } @@ -2,6 +2,7 @@ package main  import (  	"bytes" +	"fmt"  	"golang.org/x/crypto/openpgp"  ) @@ -16,11 +17,12 @@ const (  // the SCORSHmsg type represents messages received from the spool and  // sent to workers  type SCORSHmsg struct { -	Name    string `yaml:"m_id"` +	Id      string `yaml:"m_id"`  	Repo    string `yaml:"m_repo"`  	Branch  string `yaml:"m_branch"`  	Old_rev string `yaml:"m_oldrev"`  	New_rev string `yaml:"m_newrev"` +	Path    string  }  type SCORSHcmd struct { @@ -28,7 +30,7 @@ type SCORSHcmd struct {  	Hash string `yaml:"c_hash"`  } -type SCORSHtag struct { +type SCORSHtag_cfg struct {  	Name     string      `yaml:"t_name"`  	Keyrings []string    `yaml:"t_keyrings"`  	Commands []SCORSHcmd `yaml:"t_commands"` @@ -36,13 +38,13 @@ type SCORSHtag struct {  // Configuration of a worker  type SCORSHworker_cfg struct { -	Name     string      `yaml:"w_name"` -	Repos    []string    `yaml:"w_repos"` -	Folder   string      `yaml:"w_folder"` -	Logfile  string      `yaml:"w_logfile"` -	Tagfile  string      `yaml:"w_tagfile"` -	Keyrings []string    `yaml:"w_keyrings"` -	Tags     []SCORSHtag `yaml:"w_tags"` +	Name     string          `yaml:"w_name"` +	Repos    []string        `yaml:"w_repos"` +	Folder   string          `yaml:"w_folder"` +	Logfile  string          `yaml:"w_logfile"` +	Tagfile  string          `yaml:"w_tagfile"` +	Keyrings []string        `yaml:"w_keyrings"` +	Tags     []SCORSHtag_cfg `yaml:"w_tags"`  }  // State of a worker @@ -82,39 +84,30 @@ type SCORSHmaster struct {  	SCORSHmaster_state  } +// client commands + +type SCORSHtag struct { +	Tag  string   `yaml:"s_tag"` +	Args []string `yaml:"s_args"` +} + +type SCORSHclient_msg struct { +	Tags []SCORSHtag `yaml:"scorsh"` +} + +//////////////////////// +  func (cfg *SCORSHmaster) String() string {  	var buff bytes.Buffer -	buff.WriteString("spooldir: ") -	buff.WriteString(cfg.Spooldir) -	buff.WriteString("\nlogfile: ") -	buff.WriteString(cfg.Logfile) -	buff.WriteString("\nlogprefix: ") -	buff.WriteString(cfg.LogPrefix) -	buff.WriteString("\nWorkers: \n") +	fmt.Fprintf(&buff, "spooldir: %s\n", cfg.Spooldir) +	fmt.Fprintf(&buff, "logfile: %s\n", cfg.Logfile) +	fmt.Fprintf(&buff, "logprefix: %s\n", cfg.LogPrefix) +	fmt.Fprintf(&buff, "Workers: \n")  	for _, w := range cfg.Workers { -		buff.WriteString("---\n  name: ") -		buff.WriteString(w.Name) -		buff.WriteString("\n  repos: ") -		for _, r := range w.Repos { -			buff.WriteString("\n    ") -			buff.WriteString(r) -		} -		buff.WriteString("\n  folder: ") -		buff.WriteString(w.Folder) -		buff.WriteString("\n  logfile: ") -		buff.WriteString(w.Logfile) -		buff.WriteString("\n  tagfile: ") -		buff.WriteString(w.Tagfile) -		buff.WriteString("\n  keyrings: ") -		for _, k := range w.Keyrings { -			buff.WriteString("\n    ") -			buff.WriteString(k) -		} -		buff.WriteString("\n...\n") - +		fmt.Fprintf(&buff, "%s", &w)  	}  	return buff.String() @@ -123,16 +116,26 @@ func (cfg *SCORSHmaster) String() string {  func (msg *SCORSHmsg) String() string {  	var buff bytes.Buffer -	buff.WriteString("\nName: ") -	buff.WriteString(msg.Name) -	buff.WriteString("\nRepo: ") -	buff.WriteString(msg.Repo) -	buff.WriteString("\nBranch: ") -	buff.WriteString(msg.Branch) -	buff.WriteString("\nOld_rev: ") -	buff.WriteString(msg.Old_rev) -	buff.WriteString("\nNew_rev: ") -	buff.WriteString(msg.New_rev) +	fmt.Fprintf(&buff, "Id: %s\n", msg.Id) +	fmt.Fprintf(&buff, "Repo: %s\n", msg.Repo) +	fmt.Fprintf(&buff, "Branch: %s\n", msg.Branch) +	fmt.Fprintf(&buff, "Old_Rev: %s\n", msg.Old_rev) +	fmt.Fprintf(&buff, "New_rev: %s\n", msg.New_rev) +	fmt.Fprintf(&buff, "Path: %s\n", msg.Path) +  	return buff.String()  } + +func (w *SCORSHworker) String() string { + +	var buff bytes.Buffer +	fmt.Fprintf(&buff, "Name: %s\n", w.Name) +	fmt.Fprintf(&buff, "Repos: %s\n", w.Repos) +	fmt.Fprintf(&buff, "Folder: %s\n", w.Folder) +	fmt.Fprintf(&buff, "Logfile: %s\n", w.Logfile) +	fmt.Fprintf(&buff, "Tagfile: %s\n", w.Tagfile) +	fmt.Fprintf(&buff, "Keyrings: %s\n", w.Keyrings) + +	return buff.String() +} @@ -9,6 +9,7 @@ import (  	"os"  	"regexp"  	"strings" +	"time"  )  func (worker *SCORSHworker) Matches(repo, branch string) bool { @@ -80,20 +81,26 @@ func Worker(w *SCORSHworker) {  	var msg SCORSHmsg  	log.Printf("[worker: %s] Started\n", w.Name) +	debug.log("[worker: %s] MsgChan: %s\n", w.Name, w.MsgChan) +	// notify that we have been started!  	w.StatusChan <- msg  	// This is the main worker loop  	for {  		select {  		case msg = <-w.MsgChan: -			debug.log("[worker: %s] received message %s\n", w.Name, msg.Name) +			debug.log("[worker: %s] received message %s\n", w.Name, msg.Id)  			// process message  			// err := walk_commits(msg, w)  			// if err != nil {  			// 	log.Printf("[worker: %s] error in walk_commits: %s", err)  			// } -			log.Printf("[worker: %s] Received message: ", w.Name, msg) +			debug.log("[worker: %s] Received message: %s", w.Name, msg) +			debug.log("[worker: %s] StatusChan: %s\n", w.Name, w.StatusChan) +			time.Sleep(1000 * time.Millisecond) +			w.StatusChan <- msg +			debug.log("[worker: %s] Sent message back: %s", w.Name, msg)  		}  	}  } @@ -113,7 +120,7 @@ func StartWorkers(master *SCORSHmaster) error {  		worker := &(master.Workers[w])  		// Set the Status and Msg channels  		worker.StatusChan = master.StatusChan -		worker.MsgChan = make(chan SCORSHmsg) +		worker.MsgChan = make(chan SCORSHmsg, 10)  		// Load worker keyrings  		err := worker.LoadKeyrings()  		if err != nil {  | 
