diff options
Diffstat (limited to 'scorshd.go')
-rw-r--r-- | scorshd.go | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/scorshd.go b/scorshd.go new file mode 100644 index 0000000..d36b646 --- /dev/null +++ b/scorshd.go @@ -0,0 +1,151 @@ +package main + +import ( + "flag" + "fmt" + "log" +) + +// manage debugging messages + +const debug debugging = true + +type debugging bool + +func (d debugging) log(format string, args ...interface{}) { + if d { + log.Printf(format, args...) + } +} + +/////////// + +var conf_file = flag.String("c", "./scorsh.cfg", "Configuration file for SCORSH") + +func SCORSHerr(err int) error { + + var err_str string + + switch err { + case SCORSH_ERR_NO_FILE: + err_str = "Invalid file name" + case SCORSH_ERR_KEYRING: + err_str = "Invalid keyring" + case SCORSH_ERR_NO_REPO: + err_str = "Invalid repository" + case SCORSH_ERR_NO_COMMIT: + err_str = "Invalid commit ID" + case SCORSH_ERR_SIGNATURE: + err_str = "Invalid signature" + default: + err_str = "Generic Error" + } + return fmt.Errorf("%s", err_str) + +} + +func FindMatchingWorkers(master *SCORSHmaster, msg *SCORSHmsg) []*SCORSHworker { + + var ret []*SCORSHworker + + for idx, w := range master.Workers { + if w.Matches(msg.Repo, msg.Branch) { + debug.log("--- Worker: %s matches %s:%s\n", w.Name, msg.Repo, msg.Branch) + ret = append(ret, &(master.Workers[idx])) + } + } + return ret +} + +func Master(master *SCORSHmaster) { + + // master main loop: + + var matching_workers []*SCORSHworker + + matching_workers = make([]*SCORSHworker, len(master.Workers)) + + log.Println("[master] Master started ") + debug.log("[master] StatusChan: %s\n", master.StatusChan) + + for { + debug.log("[master] Receive loop...\n") + select { + case push_msg := <-master.Spooler: + // here we manage the stuff we receive from the spooler + debug.log("[master] received message: %s\n", push_msg) + // - lookup the repos map for matching workers + matching_workers = FindMatchingWorkers(master, &push_msg) + debug.log("[master] matching workers: \n%s\n", matching_workers) + + // add the message to WorkingMsg, if it's not a duplicate! + if _, ok := master.WorkingMsg[push_msg.Id]; ok { + log.Printf("[master] detected duplicate message %s \n", push_msg.Id) + } else { + master.WorkingMsg[push_msg.Id] = 0 + // - dispatch the message to all the matching workers + for _, w := range matching_workers { + debug.log("[master] sending msg to worker: %s\n", w.Name) + // send the message to the worker + w.MsgChan <- push_msg + // increase the counter associated to the message + master.WorkingMsg[push_msg.Id] += 1 + debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id]) + } + } + case done_msg := <-master.StatusChan: + // Here we manage a status message from a worker + debug.log("[master] received message from StatusChan: %s\n", done_msg) + if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 { + master.WorkingMsg[done_msg.Id] -= 1 + if master.WorkingMsg[done_msg.Id] == 0 { + delete(master.WorkingMsg, done_msg.Id) + master.Spooler <- done_msg + } + } else { + log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id) + } + } + } + debug.log("[master] Exiting the for loop, for some mysterious reason...\n") +} + +func InitMaster() *SCORSHmaster { + + master := ReadGlobalConfig(*conf_file) + + master.Repos = make(map[string][]*SCORSHworker) + master.WorkingMsg = make(map[string]int) + // This is the channel on which we receive acks from workers + master.StatusChan = make(chan SCORSHmsg) + // This is the channel on which we exchange messages with the spooler + master.Spooler = make(chan SCORSHmsg) + + debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan) + + err_workers := StartWorkers(master) + if err_workers != nil { + log.Fatal("Error starting workers: ", err_workers) + } else { + log.Println("Workers started correctly") + } + err_spooler := StartSpooler(master) + if err_spooler != nil { + log.Fatal("Error starting spooler: ", err_spooler) + } + return master +} + +func main() { + + var done chan int + + flag.Parse() + + master := InitMaster() + + go Master(master) + + // wait indefinitely -- we should implement signal handling... + <-done +} |