From 7ac71fad8013ac5d26eedb5f11d08c93df237eeb Mon Sep 17 00:00:00 2001 From: vassilux Date: Thu, 2 Oct 2014 17:28:28 +0200 Subject: [PATCH] Initial add for events --- README.md | 9 ++- config.json | 2 + evstorageworker.go | 39 +++++++++++ evwatcher.go | 162 +++++++++++++++++++++++++++++++++++++++++++++ main.go | 82 +++++++++++++++++------ main_test.go | 53 +++++++++++++++ 6 files changed, 326 insertions(+), 21 deletions(-) create mode 100644 evstorageworker.go create mode 100644 evwatcher.go diff --git a/README.md b/README.md index 816a36e..4212b2b 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,10 @@ Import cdr asterisk datas from mysql backend to mongo ***** Installation - Install mongodb and redis packages in the target system + Install mongodb and redis packages in the target system. + + Note : mongodb v 2.6.1 is used. Installation of packages is processing by avor_installation script. + You can check this script for more information apt-get mongodb redis-server @@ -53,6 +56,10 @@ Import cdr asterisk datas from mysql backend to mongo --config , the json configuration file --tick , the value on seconds for schedule the import task + +****** Application tips + + Application can be stopped by a proper way , send kill [pid]. You can find the pid by ps aux|grep vorimport diff --git a/config.json b/config.json index a8f686a..2700b83 100644 --- a/config.json +++ b/config.json @@ -5,6 +5,8 @@ "dbMySqlName": "asteriskcdrdb", "dbMySqlFetchRowNumber": "81", "mongoHost": "127.0.0.1", + "asteriskID": "asterisk1", + "eventsMongoHost": "127.0.0.1", "dialplanContext": [ { "name": "app-out", "direction": 1}, { "name": "incomming", "direction": 2}, diff --git a/evstorageworker.go b/evstorageworker.go new file mode 100644 index 0000000..ae553a6 --- /dev/null +++ b/evstorageworker.go @@ -0,0 +1,39 @@ +package main + +import ( + "labix.org/v2/mgo" + "labix.org/v2/mgo/bson" + "log" +) + +const capacity = 32768 + +type EventStorageWorker struct { +} + +func NewEventStorageWorker() (w *EventStorageWorker) { + return &EventStorageWorker{} +} + +func (w *EventStorageWorker) Work(channel chan bson.M) { + config := GetConfig() + if config == nil { + log.Println("config is null ") + } + log.Println("EventsMongoHost : " + config.EventsMongoHost) + for { + event := <-channel + w.Save(config.EventsMongoHost, event) + } +} + +func (w *EventStorageWorker) Save(mongoHost string, event bson.M) { + session, err := mgo.Dial(mongoHost) + if err != nil { + panic(err) + } + session.SetMode(mgo.Monotonic, true) + defer session.Close() + var collection = session.DB("notifications").C("events") + collection.Insert(event) +} diff --git a/evwatcher.go b/evwatcher.go new file mode 100644 index 0000000..13645b7 --- /dev/null +++ b/evwatcher.go @@ -0,0 +1,162 @@ +package main + +import ( + "fmt" + "labix.org/v2/mgo/bson" + "log" + "math/big" +) + +const ( + EV_NULL = 0 + EV_START = 1 + EV_STOP = 2 + EV_MYSQL_ERROR = 3 + EV_MYSQL_SUCCESS = 4 + EV_MONGO_ERROR = 5 + EV_MONGO_SUCCESS = 6 +) + +type BitSet struct { + bits big.Int +} + +func (b *BitSet) Set(bit int) *BitSet { + b.bits.SetBit(&b.bits, int(bit), 1) + return b +} + +func (b *BitSet) Clear(bit int) *BitSet { + b.bits.SetBit(&b.bits, bit, 0) + return b +} + +func (b *BitSet) HasBit(bit int) bool { + return b.bits.Bit(bit) == 1 +} + +func (b *BitSet) Flip(bit int) *BitSet { + if !b.HasBit(bit) { + return b.Set(bit) + } + return b.Clear(bit) +} + +func (b BitSet) String() string { + return fmt.Sprintf("%b", &b.bits) +} + +func EmptyBitSet() *BitSet { + return new(BitSet) +} + +type Event struct { + Mask *BitSet // Mask of event + Datas string // Data of event + Name string // Name of event +} + +type EventWatcher struct { + event chan *Event // Events are returned on this channel + done chan bool // Channel for sending a "quit message" + //storage chan bson.M + isClosed bool // Set to true when Close() is first called + eventsMask *BitSet + storageWorker *EventStorageWorker + config *Config +} + +//Create a event watcher and set the mask of events to all +func NewEventWatcher(config *Config) (ew *EventWatcher) { + ew = new(EventWatcher) + ew.event = make(chan *Event) + ew.done = make(chan bool, 1) + + ew.eventsMask = EmptyBitSet() + ew.eventsMask.Set(EV_START) + ew.eventsMask.Set(EV_STOP) + ew.eventsMask.Set(EV_MYSQL_ERROR) + ew.eventsMask.Set(EV_MYSQL_SUCCESS) + ew.eventsMask.Set(EV_MONGO_ERROR) + ew.eventsMask.Set(EV_MONGO_SUCCESS) + + ew.storageWorker = NewEventStorageWorker() + //comment cause Save methode used from EventStorageWorker + //ew.storage = make(chan bson.M) + //go ew.storageWorker.Work(ew.storage) + return ew +} + +func (eventWatcher *EventWatcher) publishEvent(event bson.M) { + log.Println("publishEvent : ", event) + eventWatcher.storageWorker.Save(config.EventsMongoHost, event) +} + +//Handler dispatch events and flip the event type +//Flip is used to send one time the same type of notification +func (eventWatcher *EventWatcher) processEvent(event *Event) { + //log.Println("processEvent : ", event) + if event.Mask.HasBit(EV_START) { + var pushEvent = bson.M{"type": EV_START, "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + } + + if event.Mask.HasBit(EV_STOP) { + var pushEvent = bson.M{"type": EV_STOP, "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.done <- true + } + + //Follow code can/has be refactored + //mysql parts + if event.Mask.HasBit(EV_MYSQL_ERROR) { + if eventWatcher.eventsMask.HasBit(EV_MYSQL_ERROR) { + var pushEvent = bson.M{"type": EV_MYSQL_ERROR, "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(EV_MYSQL_ERROR) + eventWatcher.eventsMask.Set(EV_MYSQL_SUCCESS) + } + + } + + if event.Mask.HasBit(EV_MYSQL_SUCCESS) { + if eventWatcher.eventsMask.HasBit(EV_MYSQL_SUCCESS) { + var pushEvent = bson.M{"type": EV_MYSQL_SUCCESS, "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(EV_MYSQL_SUCCESS) + eventWatcher.eventsMask.Set(EV_MYSQL_ERROR) + } + + } + + //mongo parts + + if event.Mask.HasBit(EV_MONGO_ERROR) { + if eventWatcher.eventsMask.HasBit(EV_MONGO_ERROR) { + var pushEvent = bson.M{"type": EV_MONGO_ERROR, "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(EV_MONGO_ERROR) + eventWatcher.eventsMask.Set(EV_MONGO_SUCCESS) + } + + } + + if event.Mask.HasBit(EV_MONGO_SUCCESS) { + if eventWatcher.eventsMask.HasBit(EV_MONGO_SUCCESS) { + var pushEvent = bson.M{"type": EV_MONGO_SUCCESS, "data": event.Datas} + eventWatcher.publishEvent(pushEvent) + eventWatcher.eventsMask.Clear(EV_MONGO_SUCCESS) + eventWatcher.eventsMask.Set(EV_MONGO_ERROR) + } + + } +} + +func (eventWatcher *EventWatcher) run() { + for { + select { + case c := <-eventWatcher.event: + eventWatcher.processEvent(c) + } + } +} diff --git a/main.go b/main.go index 99f9143..e4d3231 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,8 @@ type Config struct { DbMySqlName string DbMySqlFetchRowNumber string MongoHost string + EventsMongoHost string + AsteriskID string DialplanContext []Context } @@ -40,6 +42,7 @@ var ( isImportProcessing bool configFile = flag.String("config", "config.json", "Configuration file path") importTick = flag.Int("tick", 10, "Importing tick cycle") + eventWatcher *EventWatcher ) const ( @@ -126,19 +129,8 @@ func GetConfig() *Config { } func init() { - //called on the start by go - //loadLogger() - //loadConfig(true) + // - s := make(chan os.Signal, 1) - signal.Notify(s, syscall.SIGUSR2) - go func() { - for { - <-s - loadConfig(false) - log.Info("Configuration reloading") - } - }() } func getInOutStatus(cdr RawCall) (status int, err error) { @@ -175,6 +167,28 @@ func syncPublish(spec *redis.ConnectionSpec, channel string, messageType string) client.Quit() } +func sendEventNotification(flag int, name, datas string) { + ev := &Event{ + Mask: new(BitSet), + Datas: datas, + Name: name, + } + ev.Mask.Set(flag) + eventWatcher.event <- ev +} + +func sendMySqlEventNotification(flag int) { + datas := fmt.Sprintf("MySql server : %s change state", config.DbMySqlHost) + name := fmt.Sprintf("MySql state : %d", flag) + sendEventNotification(flag, name, datas) +} + +func sendMongoEventNotification(flag int) { + datas := fmt.Sprintf("Mongo server : %s change state", config.DbMySqlHost) + name := fmt.Sprintf("Mongo state : %d", flag) + sendEventNotification(flag, name, datas) +} + func importJob() { // db := mysql.New("tcp", "", config.DbMySqlHost, config.DbMySqlUser, config.DbMySqlPassword, config.DbMySqlName) @@ -182,18 +196,22 @@ func importJob() { // err := db.Connect() if err != nil { + sendMySqlEventNotification(EV_MYSQL_ERROR) log.Criticalf("Can't connect to the mysql database error : %s.", err) return } + sendMySqlEventNotification(EV_MYSQL_SUCCESS) log.Debug("Connected to the mysql database with success.") // session, err := mgo.Dial(config.MongoHost) if err != nil { log.Debugf("Can't connect to the mongo database error : %s.", err) + sendMongoEventNotification(EV_MONGO_ERROR) return } session.SetMode(mgo.Monotonic, true) defer session.Close() + sendMongoEventNotification(EV_MONGO_SUCCESS) log.Debug("Connected to the mongo database with success.") // cdrs, err := getMysqlCdr(db) @@ -304,35 +322,59 @@ func importJob() { } func cleanup() { + name := fmt.Sprintf("vorimport state : %d", EV_STOP) + sendEventNotification(EV_STOP, name, "vorimport stopped") + //wait for the eventWatcher + select { + case <-eventWatcher.done: + log.Info("Event watcher stopped.") + } log.Info("Execute the application cleanup") + log.Flush() } func main() { flag.Parse() - loadLogger() loadConfig(true) config = GetConfig() - //dummy flag for indicate that the import is processing - isImportProcessing = false - // - now := time.Now() - _, timeZoneOffset := now.Zone() - log.Infof("Startring and using the timezone offset used : %d.", timeZoneOffset) - // + + eventWatcher = NewEventWatcher(config) + go eventWatcher.run() + + //something wrong I cannot trup SIGUSR1 :-) + /*s := make(chan os.Signal, 1) + signal.Notify(s, syscall.SIGUSR1)*/ + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) + go func() { <-c cleanup() os.Exit(1) + /*<-s + loadConfig(false) + log.Info("Configuration reloading")*/ }() + // + log.Infof("Starting for %s", config.AsteriskID) + //dummy flag for indicate that the import is processing + isImportProcessing = false + // + now := time.Now() + _, timeZoneOffset := now.Zone() + log.Infof("Startring and using the timezone offset used : %d.", timeZoneOffset) + // + // duration := time.Duration(*importTick) * time.Second ticker := time.NewTicker(duration) quit := make(chan struct{}) + name := fmt.Sprintf("vorimport state : %d", EV_START) + sendEventNotification(EV_START, name, "vorimport started") go func() { for { select { diff --git a/main_test.go b/main_test.go index 7407c40..de62bd2 100644 --- a/main_test.go +++ b/main_test.go @@ -2,6 +2,7 @@ package main import ( "testing" + "time" ) func Test_DstChannel(t *testing.T) { @@ -17,3 +18,55 @@ func Test_DstChannel(t *testing.T) { } t.Log("dstChannelTester test passed.") } + +func Test_EventWatcher_MySql(t *testing.T) { + eventWatcher := NewEventWatcher() + go eventWatcher.run() + ev := &Event{ + Mask: new(BitSet), + Datas: "EV_MYSQL_ERROR data ev", + Name: "EV_MYSQL_ERROR name", + } + + ev.Mask.Set(EV_MYSQL_ERROR) + + eventWatcher.event <- ev + // + ev1 := &Event{ + Mask: new(BitSet), + Datas: "EV_MYSQL_ERROR data ev1 ", + Name: "EV_MYSQL_ERROR name", + } + ev1.Mask.Set(EV_MYSQL_ERROR) + eventWatcher.event <- ev1 + + ev2 := &Event{ + Mask: new(BitSet), + Datas: "EV_MYSQL_SUCCESS data ev2", + Name: "EV_MYSQL_SUCCESS name", + } + + ev2.Mask.Set(EV_MYSQL_SUCCESS) + eventWatcher.event <- ev2 + + ev3 := &Event{ + Mask: new(BitSet), + Datas: "EV_MYSQL_SUCCESS data ev3", + Name: "EV_MYSQL_SUCCESS name", + } + ev3.Mask.Set(EV_MYSQL_SUCCESS) + eventWatcher.event <- ev3 + + ev4 := &Event{ + Mask: new(BitSet), + Datas: "EV_MYSQL_ERROR data ev4", + Name: "EV_MYSQL_ERROR name", + } + ev4.Mask.Set(EV_MYSQL_ERROR) + eventWatcher.event <- ev4 + time.Sleep(1 * time.Second) + if eventWatcher.eventsMask.HasBit(EV_MYSQL_ERROR) || !eventWatcher.eventsMask.HasBit(EV_MYSQL_SUCCESS) { + t.Fail() + } + +}