Skip to content

Commit

Permalink
Initial add for events
Browse files Browse the repository at this point in the history
  • Loading branch information
vassilux committed Oct 2, 2014
1 parent 0f714e9 commit 7ac71fa
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 21 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ Import cdr asterisk datas from mysql backend to mongo


***** Installation
Install mongodb and redis packages in the target system
Install mongodb and redis packages in the target system.

Note : mongodb v 2.6.1 is used. Installation of packages is processing by avor_installation script.
You can check this script for more information

apt-get mongodb redis-server

Expand Down Expand Up @@ -53,6 +56,10 @@ Import cdr asterisk datas from mysql backend to mongo
--config , the json configuration file

--tick , the value on seconds for schedule the import task

****** Application tips

Application can be stopped by a proper way , send kill [pid]. You can find the pid by ps aux|grep vorimport



Expand Down
2 changes: 2 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"dbMySqlName": "asteriskcdrdb",
"dbMySqlFetchRowNumber": "81",
"mongoHost": "127.0.0.1",
"asteriskID": "asterisk1",
"eventsMongoHost": "127.0.0.1",
"dialplanContext": [
{ "name": "app-out", "direction": 1},
{ "name": "incomming", "direction": 2},
Expand Down
39 changes: 39 additions & 0 deletions evstorageworker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
"log"
)

const capacity = 32768

type EventStorageWorker struct {
}

func NewEventStorageWorker() (w *EventStorageWorker) {
return &EventStorageWorker{}
}

func (w *EventStorageWorker) Work(channel chan bson.M) {
config := GetConfig()
if config == nil {
log.Println("config is null ")
}
log.Println("EventsMongoHost : " + config.EventsMongoHost)
for {
event := <-channel
w.Save(config.EventsMongoHost, event)
}
}

func (w *EventStorageWorker) Save(mongoHost string, event bson.M) {
session, err := mgo.Dial(mongoHost)
if err != nil {
panic(err)
}
session.SetMode(mgo.Monotonic, true)
defer session.Close()
var collection = session.DB("notifications").C("events")
collection.Insert(event)
}
162 changes: 162 additions & 0 deletions evwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package main

import (
"fmt"
"labix.org/v2/mgo/bson"
"log"
"math/big"
)

const (
EV_NULL = 0
EV_START = 1
EV_STOP = 2
EV_MYSQL_ERROR = 3
EV_MYSQL_SUCCESS = 4
EV_MONGO_ERROR = 5
EV_MONGO_SUCCESS = 6
)

type BitSet struct {
bits big.Int
}

func (b *BitSet) Set(bit int) *BitSet {
b.bits.SetBit(&b.bits, int(bit), 1)
return b
}

func (b *BitSet) Clear(bit int) *BitSet {
b.bits.SetBit(&b.bits, bit, 0)
return b
}

func (b *BitSet) HasBit(bit int) bool {
return b.bits.Bit(bit) == 1
}

func (b *BitSet) Flip(bit int) *BitSet {
if !b.HasBit(bit) {
return b.Set(bit)
}
return b.Clear(bit)
}

func (b BitSet) String() string {
return fmt.Sprintf("%b", &b.bits)
}

func EmptyBitSet() *BitSet {
return new(BitSet)
}

type Event struct {
Mask *BitSet // Mask of event
Datas string // Data of event
Name string // Name of event
}

type EventWatcher struct {
event chan *Event // Events are returned on this channel
done chan bool // Channel for sending a "quit message"
//storage chan bson.M
isClosed bool // Set to true when Close() is first called
eventsMask *BitSet
storageWorker *EventStorageWorker
config *Config
}

//Create a event watcher and set the mask of events to all
func NewEventWatcher(config *Config) (ew *EventWatcher) {
ew = new(EventWatcher)
ew.event = make(chan *Event)
ew.done = make(chan bool, 1)

ew.eventsMask = EmptyBitSet()
ew.eventsMask.Set(EV_START)
ew.eventsMask.Set(EV_STOP)
ew.eventsMask.Set(EV_MYSQL_ERROR)
ew.eventsMask.Set(EV_MYSQL_SUCCESS)
ew.eventsMask.Set(EV_MONGO_ERROR)
ew.eventsMask.Set(EV_MONGO_SUCCESS)

ew.storageWorker = NewEventStorageWorker()
//comment cause Save methode used from EventStorageWorker
//ew.storage = make(chan bson.M)
//go ew.storageWorker.Work(ew.storage)
return ew
}

func (eventWatcher *EventWatcher) publishEvent(event bson.M) {
log.Println("publishEvent : ", event)
eventWatcher.storageWorker.Save(config.EventsMongoHost, event)
}

//Handler dispatch events and flip the event type
//Flip is used to send one time the same type of notification
func (eventWatcher *EventWatcher) processEvent(event *Event) {
//log.Println("processEvent : ", event)
if event.Mask.HasBit(EV_START) {
var pushEvent = bson.M{"type": EV_START, "data": event.Datas}
eventWatcher.publishEvent(pushEvent)
}

if event.Mask.HasBit(EV_STOP) {
var pushEvent = bson.M{"type": EV_STOP, "data": event.Datas}
eventWatcher.publishEvent(pushEvent)
eventWatcher.done <- true
}

//Follow code can/has be refactored
//mysql parts
if event.Mask.HasBit(EV_MYSQL_ERROR) {
if eventWatcher.eventsMask.HasBit(EV_MYSQL_ERROR) {
var pushEvent = bson.M{"type": EV_MYSQL_ERROR, "data": event.Datas}
eventWatcher.publishEvent(pushEvent)
eventWatcher.eventsMask.Clear(EV_MYSQL_ERROR)
eventWatcher.eventsMask.Set(EV_MYSQL_SUCCESS)
}

}

if event.Mask.HasBit(EV_MYSQL_SUCCESS) {
if eventWatcher.eventsMask.HasBit(EV_MYSQL_SUCCESS) {
var pushEvent = bson.M{"type": EV_MYSQL_SUCCESS, "data": event.Datas}
eventWatcher.publishEvent(pushEvent)
eventWatcher.eventsMask.Clear(EV_MYSQL_SUCCESS)
eventWatcher.eventsMask.Set(EV_MYSQL_ERROR)
}

}

//mongo parts

if event.Mask.HasBit(EV_MONGO_ERROR) {
if eventWatcher.eventsMask.HasBit(EV_MONGO_ERROR) {
var pushEvent = bson.M{"type": EV_MONGO_ERROR, "data": event.Datas}
eventWatcher.publishEvent(pushEvent)
eventWatcher.eventsMask.Clear(EV_MONGO_ERROR)
eventWatcher.eventsMask.Set(EV_MONGO_SUCCESS)
}

}

if event.Mask.HasBit(EV_MONGO_SUCCESS) {
if eventWatcher.eventsMask.HasBit(EV_MONGO_SUCCESS) {
var pushEvent = bson.M{"type": EV_MONGO_SUCCESS, "data": event.Datas}
eventWatcher.publishEvent(pushEvent)
eventWatcher.eventsMask.Clear(EV_MONGO_SUCCESS)
eventWatcher.eventsMask.Set(EV_MONGO_ERROR)
}

}
}

func (eventWatcher *EventWatcher) run() {
for {
select {
case c := <-eventWatcher.event:
eventWatcher.processEvent(c)
}
}
}
82 changes: 62 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
DbMySqlName string
DbMySqlFetchRowNumber string
MongoHost string
EventsMongoHost string
AsteriskID string
DialplanContext []Context
}

Expand All @@ -40,6 +42,7 @@ var (
isImportProcessing bool
configFile = flag.String("config", "config.json", "Configuration file path")
importTick = flag.Int("tick", 10, "Importing tick cycle")
eventWatcher *EventWatcher
)

const (
Expand Down Expand Up @@ -126,19 +129,8 @@ func GetConfig() *Config {
}

func init() {
//called on the start by go
//loadLogger()
//loadConfig(true)
//

s := make(chan os.Signal, 1)
signal.Notify(s, syscall.SIGUSR2)
go func() {
for {
<-s
loadConfig(false)
log.Info("Configuration reloading")
}
}()
}

func getInOutStatus(cdr RawCall) (status int, err error) {
Expand Down Expand Up @@ -175,25 +167,51 @@ func syncPublish(spec *redis.ConnectionSpec, channel string, messageType string)
client.Quit()
}

func sendEventNotification(flag int, name, datas string) {
ev := &Event{
Mask: new(BitSet),
Datas: datas,
Name: name,
}
ev.Mask.Set(flag)
eventWatcher.event <- ev
}

func sendMySqlEventNotification(flag int) {
datas := fmt.Sprintf("MySql server : %s change state", config.DbMySqlHost)
name := fmt.Sprintf("MySql state : %d", flag)
sendEventNotification(flag, name, datas)
}

func sendMongoEventNotification(flag int) {
datas := fmt.Sprintf("Mongo server : %s change state", config.DbMySqlHost)
name := fmt.Sprintf("Mongo state : %d", flag)
sendEventNotification(flag, name, datas)
}

func importJob() {
//
db := mysql.New("tcp", "", config.DbMySqlHost, config.DbMySqlUser, config.DbMySqlPassword, config.DbMySqlName)
log.Debugf("Connecting to the database %s %s %s %s.", config.DbMySqlHost, config.DbMySqlUser, config.DbMySqlPassword, config.DbMySqlName)
//
err := db.Connect()
if err != nil {
sendMySqlEventNotification(EV_MYSQL_ERROR)
log.Criticalf("Can't connect to the mysql database error : %s.", err)
return
}
sendMySqlEventNotification(EV_MYSQL_SUCCESS)
log.Debug("Connected to the mysql database with success.")
//
session, err := mgo.Dial(config.MongoHost)
if err != nil {
log.Debugf("Can't connect to the mongo database error : %s.", err)
sendMongoEventNotification(EV_MONGO_ERROR)
return
}
session.SetMode(mgo.Monotonic, true)
defer session.Close()
sendMongoEventNotification(EV_MONGO_SUCCESS)
log.Debug("Connected to the mongo database with success.")
//
cdrs, err := getMysqlCdr(db)
Expand Down Expand Up @@ -304,35 +322,59 @@ func importJob() {
}

func cleanup() {
name := fmt.Sprintf("vorimport state : %d", EV_STOP)
sendEventNotification(EV_STOP, name, "vorimport stopped")
//wait for the eventWatcher
select {
case <-eventWatcher.done:
log.Info("Event watcher stopped.")
}
log.Info("Execute the application cleanup")
log.Flush()
}

func main() {
flag.Parse()

loadLogger()
loadConfig(true)

config = GetConfig()
//dummy flag for indicate that the import is processing
isImportProcessing = false
//
now := time.Now()
_, timeZoneOffset := now.Zone()
log.Infof("Startring and using the timezone offset used : %d.", timeZoneOffset)
//

eventWatcher = NewEventWatcher(config)
go eventWatcher.run()

//something wrong I cannot trup SIGUSR1 :-)
/*s := make(chan os.Signal, 1)
signal.Notify(s, syscall.SIGUSR1)*/

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)

go func() {
<-c
cleanup()
os.Exit(1)
/*<-s
loadConfig(false)
log.Info("Configuration reloading")*/
}()
//
log.Infof("Starting for %s", config.AsteriskID)
//dummy flag for indicate that the import is processing
isImportProcessing = false
//
now := time.Now()
_, timeZoneOffset := now.Zone()
log.Infof("Startring and using the timezone offset used : %d.", timeZoneOffset)
//

//
duration := time.Duration(*importTick) * time.Second
ticker := time.NewTicker(duration)
quit := make(chan struct{})
name := fmt.Sprintf("vorimport state : %d", EV_START)
sendEventNotification(EV_START, name, "vorimport started")
go func() {
for {
select {
Expand Down
Loading

0 comments on commit 7ac71fa

Please sign in to comment.