From 07a41b5ca11a3e94f3049c179c05658fea5edd63 Mon Sep 17 00:00:00 2001 From: vassilux Date: Tue, 14 Oct 2014 15:29:48 +0200 Subject: [PATCH] Change the code/name of events --- callOriginator.go | 33 ++++++----- config.json | 8 ++- evwatcher.go | 138 ++++++++++++++++++++++++++++++++-------------- main.go | 84 ++++++++++++++-------------- mysql.go | 8 ++- 5 files changed, 171 insertions(+), 100 deletions(-) diff --git a/callOriginator.go b/callOriginator.go index 1706b19..95cdaa8 100644 --- a/callOriginator.go +++ b/callOriginator.go @@ -3,26 +3,27 @@ package main import ( gami "code.google.com/p/gami" "fmt" - "log" "net" ) type callOriginator struct { - Addr string - Port int - testCall chan bool - Username string - Password string + Addr string + Port int + testCall chan bool + resultTestCall chan error + Username string + Password string } func NewCallOriginator(addr string, port int, user string, pswd string) *callOriginator { // originator := &callOriginator{ - Addr: addr, - Port: port, - Username: user, - Password: pswd, - testCall: make(chan bool, 1), + Addr: addr, + Port: port, + Username: user, + Password: pswd, + testCall: make(chan bool, 1), + resultTestCall: make(chan error, 1), } go originator.run() @@ -33,7 +34,8 @@ func (originator *callOriginator) processTestCall() { c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", originator.Addr, originator.Port)) if err != nil { - log.Fatal(err) + originator.resultTestCall <- err + return } defer c.Close() @@ -43,7 +45,8 @@ func (originator *callOriginator) processTestCall() { err = g.Login(originator.Username, originator.Password) if err != nil { - log.Fatal(err) + originator.resultTestCall <- err + return } ch := "Local/testcall@app-alive-test" @@ -56,8 +59,10 @@ func (originator *callOriginator) processTestCall() { err = g.Originate(o, nil, &cb) if err != nil { - log.Fatal(err) + originator.resultTestCall <- err + return } + originator.resultTestCall <- nil } diff --git a/config.json b/config.json index 7c0bdd4..036a043 100644 --- a/config.json +++ b/config.json @@ -7,11 +7,15 @@ "mongoHost": "127.0.0.1", "asteriskID": "asterisk1", "eventsMongoHost": "127.0.0.1", - "asteriskAddr": "192.168.3.20", + "asteriskAddr": "127.0.0.1", "asteriskPort" : 5038, "asteriskUser" : "astmanager", "asteriskPassword": "lepanos", - "testCallSchedule": 1, + "testCallSchedule": 20, + "notifications": [ + "f1com", + "smtp" + ], "dialplanContext": [ { "name": "app-out", "direction": 1}, { "name": "incomming", "direction": 2}, diff --git a/evwatcher.go b/evwatcher.go index 13645b7..5a533f4 100644 --- a/evwatcher.go +++ b/evwatcher.go @@ -8,13 +8,17 @@ import ( ) const ( - EV_NULL = 0 - EV_START = 1 - EV_STOP = 2 - EV_MYSQL_ERROR = 3 - EV_MYSQL_SUCCESS = 4 - EV_MONGO_ERROR = 5 - EV_MONGO_SUCCESS = 6 + EV_NULL = 0 + APPSTA = 1 + APPSTO = 2 + MYSQKO = 3 + MYSQOK = 4 + MONGOKO = 5 + MONGOOK = 6 + TCALOK = 7 + TCALKO = 8 + CCALOK = 9 // check call action success + CCALKO = 10 ) type BitSet struct { @@ -53,7 +57,7 @@ func EmptyBitSet() *BitSet { type Event struct { Mask *BitSet // Mask of event Datas string // Data of event - Name string // Name of event + Code string // Code of event } type EventWatcher struct { @@ -69,16 +73,21 @@ type EventWatcher struct { //Create a event watcher and set the mask of events to all func NewEventWatcher(config *Config) (ew *EventWatcher) { ew = new(EventWatcher) + ew.config = config ew.event = make(chan *Event) ew.done = make(chan bool, 1) ew.eventsMask = EmptyBitSet() - ew.eventsMask.Set(EV_START) - ew.eventsMask.Set(EV_STOP) - ew.eventsMask.Set(EV_MYSQL_ERROR) - ew.eventsMask.Set(EV_MYSQL_SUCCESS) - ew.eventsMask.Set(EV_MONGO_ERROR) - ew.eventsMask.Set(EV_MONGO_SUCCESS) + ew.eventsMask.Set(APPSTA) + ew.eventsMask.Set(APPSTO) + ew.eventsMask.Set(MYSQKO) + ew.eventsMask.Set(MYSQOK) + ew.eventsMask.Set(MONGOKO) + ew.eventsMask.Set(MONGOOK) + ew.eventsMask.Set(TCALKO) + ew.eventsMask.Set(TCALOK) + ew.eventsMask.Set(CCALOK) + ew.eventsMask.Set(CCALKO) ew.storageWorker = NewEventStorageWorker() //comment cause Save methode used from EventStorageWorker @@ -88,68 +97,117 @@ func NewEventWatcher(config *Config) (ew *EventWatcher) { } func (eventWatcher *EventWatcher) publishEvent(event bson.M) { - log.Println("publishEvent : ", event) - eventWatcher.storageWorker.Save(config.EventsMongoHost, event) + for _, notification := range eventWatcher.config.Notifications { + event["appid"] = "vorimport" + event["asteriskid"] = eventWatcher.config.AsteriskID + event["transport"] = notification + log.Println("publishEvent : ", event) + eventWatcher.storageWorker.Save(config.EventsMongoHost, event) + } } //Handler dispatch events and flip the event type //Flip is used to send one time the same type of notification func (eventWatcher *EventWatcher) processEvent(event *Event) { //log.Println("processEvent : ", event) - if event.Mask.HasBit(EV_START) { - var pushEvent = bson.M{"type": EV_START, "data": event.Datas} + if event.Mask.HasBit(APPSTA) { + var pushEvent = bson.M{"type": 1, "code": "APPSTA", "data": event.Datas} eventWatcher.publishEvent(pushEvent) } - if event.Mask.HasBit(EV_STOP) { - var pushEvent = bson.M{"type": EV_STOP, "data": event.Datas} + if event.Mask.HasBit(APPSTO) { + var pushEvent = bson.M{"type": 1, "code": "APPSTO", "data": event.Datas} eventWatcher.publishEvent(pushEvent) eventWatcher.done <- true } //Follow code can/has be refactored //mysql parts - if event.Mask.HasBit(EV_MYSQL_ERROR) { - if eventWatcher.eventsMask.HasBit(EV_MYSQL_ERROR) { - var pushEvent = bson.M{"type": EV_MYSQL_ERROR, "data": event.Datas} + if event.Mask.HasBit(MYSQKO) { + if eventWatcher.eventsMask.HasBit(MYSQKO) { + var pushEvent = bson.M{"type": 1, "code": "MYSQKO", "data": event.Datas} eventWatcher.publishEvent(pushEvent) - eventWatcher.eventsMask.Clear(EV_MYSQL_ERROR) - eventWatcher.eventsMask.Set(EV_MYSQL_SUCCESS) + eventWatcher.eventsMask.Clear(MYSQKO) + eventWatcher.eventsMask.Set(MYSQOK) } } - if event.Mask.HasBit(EV_MYSQL_SUCCESS) { - if eventWatcher.eventsMask.HasBit(EV_MYSQL_SUCCESS) { - var pushEvent = bson.M{"type": EV_MYSQL_SUCCESS, "data": event.Datas} + if event.Mask.HasBit(MYSQOK) { + if eventWatcher.eventsMask.HasBit(MYSQOK) { + var pushEvent = bson.M{"type": 1, "code": "MYSQOK", "data": event.Datas} eventWatcher.publishEvent(pushEvent) - eventWatcher.eventsMask.Clear(EV_MYSQL_SUCCESS) - eventWatcher.eventsMask.Set(EV_MYSQL_ERROR) + eventWatcher.eventsMask.Clear(MYSQOK) + eventWatcher.eventsMask.Set(MYSQKO) } } //mongo parts - if event.Mask.HasBit(EV_MONGO_ERROR) { - if eventWatcher.eventsMask.HasBit(EV_MONGO_ERROR) { - var pushEvent = bson.M{"type": EV_MONGO_ERROR, "data": event.Datas} + if event.Mask.HasBit(MONGOKO) { + if eventWatcher.eventsMask.HasBit(MONGOKO) { + var pushEvent = bson.M{"type": 1, "code": "MONGOKO", "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(MONGOKO) + eventWatcher.eventsMask.Set(MONGOOK) + } + + } + + if event.Mask.HasBit(MONGOOK) { + if eventWatcher.eventsMask.HasBit(MONGOOK) { + var pushEvent = bson.M{"type": 1, "code": "MONGOOK", "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(MONGOOK) + eventWatcher.eventsMask.Set(MONGOKO) + } + + } + + //test call part + if event.Mask.HasBit(TCALKO) { + if eventWatcher.eventsMask.HasBit(TCALKO) { + var pushEvent = bson.M{"type": 1, "code": "TCALKO", "data": event.Datas} eventWatcher.publishEvent(pushEvent) - eventWatcher.eventsMask.Clear(EV_MONGO_ERROR) - eventWatcher.eventsMask.Set(EV_MONGO_SUCCESS) + eventWatcher.eventsMask.Clear(TCALKO) + eventWatcher.eventsMask.Set(TCALOK) } } - if event.Mask.HasBit(EV_MONGO_SUCCESS) { - if eventWatcher.eventsMask.HasBit(EV_MONGO_SUCCESS) { - var pushEvent = bson.M{"type": EV_MONGO_SUCCESS, "data": event.Datas} + if event.Mask.HasBit(TCALOK) { + if eventWatcher.eventsMask.HasBit(TCALOK) { + var pushEvent = bson.M{"type": 1, "code": "TCALOK", "data": event.Datas} eventWatcher.publishEvent(pushEvent) - eventWatcher.eventsMask.Clear(EV_MONGO_SUCCESS) - eventWatcher.eventsMask.Set(EV_MONGO_ERROR) + eventWatcher.eventsMask.Clear(TCALOK) + eventWatcher.eventsMask.Set(TCALKO) } } + + if event.Mask.HasBit(CCALKO) { + if eventWatcher.eventsMask.HasBit(CCALKO) { + var pushEvent = bson.M{"type": CCALKO, "code": "CCALKO", "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(CCALKO) + eventWatcher.eventsMask.Set(CCALOK) + } + + } + + if event.Mask.HasBit(CCALOK) { + fmt.Println("Enter CCALOK") + if eventWatcher.eventsMask.HasBit(CCALOK) { + fmt.Println("Enter CCALOK send and change flag") + var pushEvent = bson.M{"type": 1, "code": "CCALOK", "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(CCALOK) + eventWatcher.eventsMask.Set(CCALKO) + } + + } + } func (eventWatcher *EventWatcher) run() { diff --git a/main.go b/main.go index 8fc829a..70f90e9 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ type Config struct { AsteriskPassword string TestCallSchedule int DialplanContext []Context + Notifications []string } var ( @@ -175,11 +176,10 @@ func syncPublish(spec *redis.ConnectionSpec, channel string, messageType string) client.Quit() } -func sendEventNotification(flag int, name, datas string) { +func sendEventNotification(flag int, datas string) { ev := &Event{ Mask: new(BitSet), Datas: datas, - Name: name, } ev.Mask.Set(flag) eventWatcher.event <- ev @@ -187,14 +187,12 @@ func sendEventNotification(flag int, name, datas string) { func sendMySqlEventNotification(flag int) { datas := fmt.Sprintf("MySql server : %s change state", config.DbMySqlHost) - name := fmt.Sprintf("MySql state : %d", flag) - sendEventNotification(flag, name, datas) + sendEventNotification(flag, datas) } func sendMongoEventNotification(flag int) { datas := fmt.Sprintf("Mongo server : %s change state", config.DbMySqlHost) - name := fmt.Sprintf("Mongo state : %d", flag) - sendEventNotification(flag, name, datas) + sendEventNotification(flag, datas) } func importJob() { @@ -204,22 +202,22 @@ func importJob() { // err := db.Connect() if err != nil { - sendMySqlEventNotification(EV_MYSQL_ERROR) + sendMySqlEventNotification(MYSQKO) log.Criticalf("Can't connect to the mysql database error : %s.", err) return } - sendMySqlEventNotification(EV_MYSQL_SUCCESS) + sendMySqlEventNotification(MYSQOK) log.Debug("Connected to the mysql database with success.") // session, err := mgo.Dial(config.MongoHost) if err != nil { log.Debugf("Can't connect to the mongo database error : %s.", err) - sendMongoEventNotification(EV_MONGO_ERROR) + sendMongoEventNotification(MONGOKO) return } session.SetMode(mgo.Monotonic, true) defer session.Close() - sendMongoEventNotification(EV_MONGO_SUCCESS) + sendMongoEventNotification(MONGOOK) log.Debug("Connected to the mongo database with success.") // cdrs, err := getMysqlCdr(db) @@ -334,8 +332,8 @@ func cleanup() { stopImportJob <- true stopGenerateTestCall <- true // - name := fmt.Sprintf("vorimport state : %d", EV_STOP) - sendEventNotification(EV_STOP, name, "vorimport stopped") + data := fmt.Sprintf("Application stopped : %d", APPSTO) + sendEventNotification(APPSTO, data) //wait for the eventWatcher select { case <-eventWatcher.done: @@ -346,30 +344,50 @@ func cleanup() { } func generateTestCall() { + // testCallOriginator.testCall <- true - time.Sleep(5 * time.Second) + // + select { + case res := <-testCallOriginator.resultTestCall: + if res != nil { + data := fmt.Sprintf("Test call failed : %v for the asterisk server %s.", res, config.AsteriskID) + //log.Errorf("Failed create test call for the asterisk %s:%d : %s.", config.AsteriskAddr, config.AsteriskPort, res) + sendEventNotification(TCALKO, data) + return + } else { + data := fmt.Sprintf("Test call ok : %d", TCALOK) + sendEventNotification(TCALOK, data) + } + } + //little stuoid wait + time.Sleep(3 * time.Second) db := mysql.New("tcp", "", config.DbMySqlHost, config.DbMySqlUser, config.DbMySqlPassword, config.DbMySqlName) log.Debugf("Connecting to the database %s %s %s %s.", config.DbMySqlHost, config.DbMySqlUser, config.DbMySqlPassword, config.DbMySqlName) // err := db.Connect() if err != nil { - sendMySqlEventNotification(EV_MYSQL_ERROR) - log.Criticalf("Can't connect to the mysql database error : %s.", err) + data := fmt.Sprintf("Failed check the generated test call : %v for the asterisk server %s.", err, config.AsteriskID) + sendEventNotification(CCALKO, data) return } - sendMySqlEventNotification(EV_MYSQL_SUCCESS) // - cdrs, err := getMysqlCdr(db) + cdrs, err := getMysqlCdrTestCall(db) if len(cdrs) == 0 { - log.Error("Oups something is wrong") + data := fmt.Sprint("Cannot find the generated test call into asterisk database for the asterisk server %s.", config.AsteriskID) + sendEventNotification(CCALKO, data) + log.Errorf(data) } else { for _, cdr := range cdrs { - err = udpateMySqlCdrImportStatus(db, cdr.Uniqueid, 1) + err = deleteMySqlCdrRecord(db, cdr.Uniqueid) if err != nil { - log.Errorf("Can't update the import status for the call with unique id [%s].", cdr.Uniqueid) + log.Errorf("Can't delete the test call record with unique id [%s] cause get an error %v.", cdr.Uniqueid, err) + cleanup() os.Exit(1) } } + data := fmt.Sprintf("Test call ok : %d for the asterisk server %s.", CCALOK, config.AsteriskID) + sendEventNotification(TCALOK, data) + log.Infof("Asterisk the test call processed with success.") } } @@ -419,30 +437,10 @@ func main() { //ticker := time.NewTicker(duration) stopImportJob = schedule(importJob, duration) - name := fmt.Sprintf("vorimport state : %d", EV_START) - sendEventNotification(EV_START, name, "vorimport started") - - /* //make(chan struct{}) - name := fmt.Sprintf("vorimport state : %d", EV_START) - sendEventNotification(EV_START, name, "vorimport started") - go func() { - for { - select { - case <-ticker.C: - if isImportProcessing == false { - isImportProcessing = true - importJob() - isImportProcessing = false - } - - case <-quit: - ticker.Stop() - return - } - } - }() */ + data := fmt.Sprintf("Application vorimport started : %d", APPSTA) + sendEventNotification(APPSTA, data) - durationTestCall := time.Duration(config.TestCallSchedule) * time.Minute + durationTestCall := time.Duration(config.TestCallSchedule) * time.Second //ticker := time.NewTicker(duration) stopGenerateTestCall = schedule(generateTestCall, durationTestCall) diff --git a/mysql.go b/mysql.go index b823519..a01cbf8 100644 --- a/mysql.go +++ b/mysql.go @@ -8,7 +8,6 @@ import ( "labix.org/v2/mgo/bson" "strings" "time" - //m "vorimport/models" ) /** @@ -289,3 +288,10 @@ func udpateMySqlCdrImportStatus(db mysql.Conn, uniqueid string, status int) (err // return err } + +func deleteMySqlCdrRecord(db mysql.Conn, uniqueid string) (err error) { + var query = fmt.Sprintf("DELETE FROM cdr WHERE uniqueid = '%s'", uniqueid) + _, _, err = db.Query(query) + // + return err +}