diff options
author | KatolaZ <katolaz@freaknet.org> | 2017-07-15 01:26:39 +0100 |
---|---|---|
committer | KatolaZ <katolaz@freaknet.org> | 2017-07-15 01:26:39 +0100 |
commit | 726b399e4747032a3d052339cd62c57ae5b6767c (patch) | |
tree | aac8350e83bb13e9628dd287ca3aae97ecafcb32 /workers.go | |
parent | ed637037b75cb5dfe1b49e776956fa6ab3632b68 (diff) |
pipeline from spool to worker is done -- added examples
Diffstat (limited to 'workers.go')
-rw-r--r-- | workers.go | 46 |
1 files changed, 28 insertions, 18 deletions
@@ -19,6 +19,8 @@ func (worker *SCORSHworker) Matches(repo, branch string) bool { branch_pattern := parts[1] repo_match, _ := regexp.MatchString(repo_pattern, repo) branch_match, _ := regexp.MatchString(branch_pattern, branch) + debug.log("[worker.Matches] repo_match: %s\n", repo_match) + debug.log("[worker.Matches] branch_match: %s\n", branch_match) if repo_match && branch_match { return true } @@ -59,34 +61,40 @@ func (w *SCORSHworker) LoadTags() error { w_tags, err := ioutil.ReadFile(w.Tagfile) if err != nil{ - log.Printf("[worker:%s] Cannot read worker config: ", w.Name, err) - return err + return fmt.Errorf("Cannot read worker config: %s", err) } + - err = yaml.Unmarshal(w_tags, w.Tags) + err = yaml.Unmarshal(w_tags, w) + //err = yaml.Unmarshal(w_tags, tags) if err != nil { - log.Printf("[worker:%s] Error while reading tags: ", w.Name, err) - return err + return fmt.Errorf("Error while reading tags: %s", err) } - return nil } -// FIXME--- STILL UNDER HEAVY WORK... -func SCORSHWorker(w *SCORSHworker) { +// FIXME--- still needs some work... +func Worker(w *SCORSHworker) { + var msg SCORSHmsg + + log.Printf("[worker: %s] Started\n", w.Name) + w.StatusChan <- msg + // This is the main worker loop for { select { - case msg := <-w.MsgChan: + case msg = <-w.MsgChan: + debug.log("[worker: %s] received message %s\n", w.Name, msg.Name) // process message - err := walk_commits(msg, w) - if err != nil { - log.Printf("[worker: %s] error in walk_commits: %s", err) - } + // err := walk_commits(msg, w) + // if err != nil { + // log.Printf("[worker: %s] error in walk_commits: %s", err) + // } + log.Printf("[worker: %s] Received message: ", w.Name, msg) } } } @@ -99,7 +107,9 @@ func StartWorkers(master *SCORSHmaster) error { // We should now start each worker - for w:=1; w<num_workers; w++ { + log.Printf("num_workers: %d\n", num_workers) + + for w:=0; w<num_workers; w++ { worker := & (master.Workers[w]) // Set the Status and Msg channels @@ -108,21 +118,21 @@ func StartWorkers(master *SCORSHmaster) error { // Load worker keyrings err := worker.LoadKeyrings() if err != nil { - log.Printf("[worker: %s] Unable to load keyrings (Exiting): %s\n", worker.Name, err) close(worker.MsgChan) - return err + return fmt.Errorf("[Starting worker: %s] Unable to load keyrings: %s\n", worker.Name, err) } // Load worker tags from worker.Tagfile err = worker.LoadTags() if err != nil { - log.Printf("[worker: %s] Unable to load tags (Exiting): %s\n", worker.Name, err) close(worker.MsgChan) - return err + return fmt.Errorf("[Starting worker: %s] Unable to load tags: %s\n", worker.Name, err) } // Add the repos definitions to the map master.Repos for _, repo_name := range worker.Repos { master.Repos[repo_name] = append(master.Repos[repo_name], worker) } + go Worker(worker) + <- master.StatusChan } return nil } |