diff options
| -rw-r--r-- | config.go | 23 | ||||
| -rw-r--r-- | scorsh.cfg | 12 | ||||
| -rw-r--r-- | scorsh.go | 31 | ||||
| -rw-r--r-- | spooler.go | 2 | ||||
| -rw-r--r-- | types.go | 31 | ||||
| -rw-r--r-- | worker_config.cfg | 28 | ||||
| -rw-r--r-- | workers.go | 128 | 
7 files changed, 206 insertions, 49 deletions
| @@ -20,9 +20,11 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {  		log.Fatal("Error while reading file: ", err)  	} +	  	var cfg *SCORSHmaster -	cfg = new(SCORSHmaster) +	cfg = new(SCORSHmaster) +	  	// Unmarshal the YAML configuration file into a SCORSHcfg structure  	err = yaml.Unmarshal(data, cfg)  	if err != nil { @@ -30,27 +32,29 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {  	}  	fmt.Printf("%s", cfg) - +	  	// If the user has not set a spooldir, crash loudly  	if cfg.Spooldir == "" {  		log.Fatal("No spooldir defined in ", fname, ". Exiting\n")  	}  	// Check if the user has set a custom logprefix -	if cfg.LogPrefix != "" { -		log.SetPrefix(cfg.LogPrefix) -	}  	// Check if the user wants to redirect the logs to a file  	if cfg.Logfile != "" { -		f, err := os.Open(cfg.Logfile) +		log.Printf("Opening log file: %s\n", cfg.Logfile) +		f, err := os.OpenFile(cfg.Logfile, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)  		if err != nil {  			log.SetOutput(io.Writer(f))  		} else { -			log.Printf("Error opening logfile: \n", err) +			log.Fatal("Error opening logfile: ", cfg.Logfile, err)  		}  	} +	if cfg.LogPrefix != "" { +		log.SetPrefix(cfg.LogPrefix) +	} +  	// If we got so far, then there is some sort of config in cfg  	log.Printf("Successfully read config from %s\n", fname) @@ -58,7 +62,10 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {  } -func (cfg *SCORSHmaster_cfg) String() string { + + + +func (cfg *SCORSHmaster) String() string {  	var buff bytes.Buffer @@ -1,12 +1,6 @@ -# -# This is a typical scorsh configuration. We declare here the list of -# workers, with the corresponding repo/branches regular expressions -# and the associated folder -# -  --- -s_spooldir: "/var/spool/scorsh" -s_logfile: "/var/log/scorsh/scorsh.log" +s_spooldir: "./spool" +s_logfile: "./scorsh.log"  s_logprefix: "[scorsh]"  s_workers: @@ -50,4 +44,4 @@ s_workers:                     ],        }    ] -...
\ No newline at end of file +... @@ -61,27 +61,48 @@ func Master(master *SCORSHmaster) {  		case push_msg = <- master.Spooler:  			// - lookup the repos map for matching workers  			matching_workers = FindMatchingWorkers(master, &push_msg) +			// add the message to PendingMsg +			//...  			// - dispatch the message to all the matching workers  			for _, w := range matching_workers { -				w.Chan <- push_msg +				// increase the counter associated to the message  +				w.MsgChan <- push_msg  			}  		}  	}  } -func main() { - -	flag.Parse() +func InitMaster() *SCORSHmaster {  	master := ReadGlobalConfig(*conf_file) - +	 +	master.Repos = make(map[string][]*SCORSHworker) +	master.WorkingMsg = make(map[string]int) +	// This is the mutex-channel on which we receive acks from workers +	master.StatusChan = make(chan SCORSHmsg, 1) +	  	err_workers := StartWorkers(master)  	if err_workers != nil {  		log.Fatal("Error starting workers: ", err_workers) +	} else { +		log.Println("Workers started correctly")  	}  	err_spooler := StartSpooler(master)  	if err_spooler != nil {  		log.Fatal("Error starting spooler: ", err_spooler) +	} else { +		log.Println("Spooler started correctly")  	} +	return master +	 +} + + +func main() { + +	flag.Parse() + +	master := InitMaster() +	  	go Master(master)  } @@ -16,6 +16,8 @@ func parse_request(fname string) (SCORSHmsg, error) {  		log.Printf("Unable to open file: %s\n", fname)  		return ret, SCORSHerr(SCORSH_ERR_NO_FILE)  	} + +	// FIXME: Fill in the ret structure  	return ret, nil @@ -15,22 +15,22 @@ const (  // the SCORSHmsg type represents messages received from the spool and  // sent to workers  type SCORSHmsg struct { +	name    string  	repo    string  	branch  string  	old_rev string  	new_rev string  } -  type SCORSHcmd struct { -	URL  string -	hash string +	URL  string `yaml:"c_url"` +	Hash string `yaml:"c_hash"`  }  type SCORSHtag struct { -	TagName  string -	Keyrings []string -	Commands []SCORSHcmd +	Name     string      `yaml:"t_name"` +	Keyrings []string    `yaml:"t_keyrings"` +	Commands []SCORSHcmd `yaml:"t_commands"`  }  // Configuration of a worker @@ -45,16 +45,17 @@ type SCORSHworker_cfg struct {  // State of a worker  type SCORSHworker_state struct { -	Tags map[string]SCORSHtag -	Keys map[string]openpgp.KeyRing -	Chan chan SCORSHmsg +	Tags       []SCORSHtag `yaml:"w_tags"` +	Keys       map[string]openpgp.KeyRing +	MsgChan    chan SCORSHmsg +	StatusChan chan SCORSHmsg  }  // The type SCORSHworker represents the configuration and state of a  // worker  type SCORSHworker struct { -	SCORSHworker_cfg -	SCORSHworker_state +	SCORSHworker_cfg   `yaml:",inline"` +	SCORSHworker_state `yaml:",inline"`  }  // Configuration of the master @@ -67,13 +68,15 @@ type SCORSHmaster_cfg struct {  // State of the master  type SCORSHmaster_state struct { -	Spooler chan SCORSHmsg -	Repos   map[string][]*SCORSHworker +	Spooler    chan SCORSHmsg +	StatusChan chan SCORSHmsg +	Repos      map[string][]*SCORSHworker +	WorkingMsg map[string]int  }  // The type SCORSHmaster represents the configuration and state of the  // master  type SCORSHmaster struct { -	SCORSHmaster_cfg +	SCORSHmaster_cfg `yaml:",inline"`  	SCORSHmaster_state  } diff --git a/worker_config.cfg b/worker_config.cfg index 5173b6f..a156ac8 100644 --- a/worker_config.cfg +++ b/worker_config.cfg @@ -9,11 +9,11 @@  ---  w_tags: -  [      {        t_name: "BUILD", -      t_keyrings: ["build_keyring.asc", "general_keyring.asc"], -      t_commands: [ +      { +       t_keyrings: ["build_keyring.asc", "general_keyring.asc"], +       t_commands: [                      {                       c_url: "file:///home/user/bin/script.sh $1 $2",                       c_hash: "12da324fb76s924acbce" @@ -21,17 +21,19 @@ w_tags:                      {                       c_url: "http://my.server.net/call.pl?branch=$1"                      } -                  ] -    }, +                   ] +      }      {        t_name: "PUBLISH", -      t_keyrings: ["web_developers.asc"], -      t_commands: [ -                   { -                    c_url: "file:///usr/local/bin/publish.py $repo $branch", -                    c_hash: "3234567898765432345678" -                   } -                  ] +      { +       t_keyrings: ["web_developers.asc"], +       t_commands: [ +                    { +                     c_url: "file:///usr/local/bin/publish.py $repo $branch", +                     c_hash: "3234567898765432345678" +                    } +                   ] +      }      } -  ] +   }  ...
\ No newline at end of file diff --git a/workers.go b/workers.go new file mode 100644 index 0000000..d5462c1 --- /dev/null +++ b/workers.go @@ -0,0 +1,128 @@ +package main + +import ( +	"fmt" +	"github.com/go-yaml/yaml" +	"golang.org/x/crypto/openpgp" +	"io/ioutil" +	"log" +	"os" +	"regexp" +	"strings" +) + +func (worker *SCORSHworker) Matches(repo, branch string) bool { +	 +	for _, r := range worker.Repos { +		parts := strings.SplitN(r, ":", 2) +		repo_pattern := parts[0] +		branch_pattern := parts[1] +		repo_match, _ := regexp.MatchString(repo_pattern, repo) +		branch_match, _ := regexp.MatchString(branch_pattern, branch) +		if repo_match && branch_match { +			return true +		} +	} +	return false +} + +func (w *SCORSHworker) LoadKeyrings() error { + +	w.Keys = make(map[string]openpgp.KeyRing, len(w.Keyrings)) + +	// Open the keyring files +	for _, keyring := range w.Keyrings { +		f, err_file := os.Open(keyring) + +		if err_file != nil { +			log.Printf("[worker] cannot open keyring:", err_file) +			f.Close() +			return fmt.Errorf("Unable to open keyring: ", err_file) +		} + +		// load the keyring +		kr, err_key := openpgp.ReadArmoredKeyRing(f) + +		if err_key != nil { +			log.Printf("[worker] cannot load keyring: ", err_key) +			f.Close() +			return fmt.Errorf("Unable to load keyring: ", err_key) +		} +		w.Keys[keyring] = kr +		f.Close() +	} +	return nil +} + +// Still to be implemented +func (w *SCORSHworker) LoadTags() error { +	 +	w_tags, err := ioutil.ReadFile(w.Tagfile) +	if err != nil{ +		log.Printf("[worker:%s] Cannot read worker config: ", w.Name, err) +		return err +	} +	 +	err = yaml.Unmarshal(w_tags, w.Tags) + +	if err != nil { +		log.Printf("[worker:%s] Error while reading tags: ", w.Name, err) +		return err +	} + +	 +	return nil +} + +// FIXME--- STILL UNDER HEAVY WORK... +func SCORSHWorker(w *SCORSHworker) { + + +	// This is the main worker loop +	for { +		select { +		case msg := <-w.MsgChan: +			// process message +			err := walk_commits(msg, w) +			if err != nil { +				log.Printf("[worker: %s] error in walk_commits: %s", err) +			} +		} +	} +} + +// StartWorkers starts all the workers specified in a given +// configuration and fills in the SCORSHmaster struct +func StartWorkers(master *SCORSHmaster) error { + +	num_workers := len(master.Workers) +	 +	// We should now start each worker + +	for w:=1; w<num_workers; w++ { +		 +		worker := & (master.Workers[w]) +		// Set the Status and Msg channels +		worker.StatusChan = master.StatusChan +		worker.MsgChan = make(chan SCORSHmsg) +		// Load worker keyrings +		err := worker.LoadKeyrings() +		if err != nil { +			log.Printf("[worker: %s] Unable to load keyrings (Exiting): %s\n", worker.Name, err) +			close(worker.MsgChan) +			return err +		} +		// Load worker tags from worker.Tagfile +		err = worker.LoadTags() +		if err != nil { +			log.Printf("[worker: %s] Unable to load tags (Exiting): %s\n", worker.Name, err) +			close(worker.MsgChan) +			return err +		} +		// Add the repos definitions to the map master.Repos +		for _, repo_name := range worker.Repos { +			master.Repos[repo_name] = append(master.Repos[repo_name], worker) +		} +	} +	return nil +} | 
