Skip to content

Commit

Permalink
Fix caller dst and python script
Browse files Browse the repository at this point in the history
  • Loading branch information
vassilux committed Sep 17, 2014
1 parent 882e9db commit b2dd884
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 87 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ _testmain.go
*.exe
log
bin
*.pyc
5 changes: 3 additions & 2 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
{
"dbMySqlHost": "192.168.3.20:3306",
"dbMySqlHost": "127.0.0.1:3306",
"dbMySqlUser": "root",
"dbMySqlPassword": "lepanos",
"dbMySqlName": "asteriskcdrdb",
"dbMySqlFetchRowNumber": "81",
"mongoHost": "127.0.0.1",
"dialplanContext": [
{ "name": "app-out", "direction": 1},
{ "name": "incomming", "direction": 2},
{ "name": "incomming", "direction": 2},
{ "name": "outgoing", "direction": 1},
{ "name": "DLPN_DialPlan1", "direction": 2},
{ "name": "app-daynight-toggle", "direction": 3},
{ "name": "test-dnd", "direction": 3},
Expand Down
33 changes: 16 additions & 17 deletions logger.xml
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
<seelog type="adaptive" mininterval="200000000" maxinterval="1000000000" critmsgcount="5" minlevel="trace">
<outputs formatid="msg">
<filter levels="trace">
<!--<file path="./log/vorimport_traces.log"/>-->
<console />
</filter>
<filter levels="debug">
<!--<file path="./log/vorimport_debug.log"/> -->
<console />
</filter>
<seelog type="adaptive" mininterval="2000000" maxinterval="100000000" critmsgcount="500" minlevel="info">
<exceptions>
<exception filepattern="test*" minlevel="error"/>
</exceptions>
<outputs formatid="all">
<file path="./log/all.log"/>
<filter levels="info">
<!--<file path="./log/vorimport_info.log"/>-->
<console />
<console formatid="fmtinfo"/>
<file path="./log/infos.log"/>
</filter>
<filter levels="error,critical" formatid="fmterror">
<console/>
<file path="./log/errors.log"/>
</filter>
<filter levels="error">
<!--<file path="./log/vorimport_error.log"/>-->
<console />
</filter>
</outputs>
<formats>
<format id="msg" format="%Time: %Msg%n"/>
<format id="fmtinfo" format="[%Level] [%Time] %Msg%n"/>
<format id="fmterror" format="[%LEVEL] [%Time] [%FuncShort @ %File.%Line] %Msg%n"/>
<format id="all" format="[%Level] [%Time] [@ %File.%Line] %Msg%n"/>
<format id="criticalemail" format="Critical error on our server!\n %Time %Date %RelFile %Func %Msg \nSent by Seelog"/>
</formats>
</seelog>
48 changes: 33 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type Config struct {
}

var (
config *Config
configLock = new(sync.RWMutex)
timeZoneOffset int64
config *Config
configLock = new(sync.RWMutex)
timeZoneOffset int64
isImportProcessing bool
)

const (
Expand Down Expand Up @@ -103,6 +104,19 @@ func loadConfig(fail bool) {
configLock.Unlock()
}

func loadLogger() {
logger, err := log.LoggerFromConfigAsFile("logger.xml")

if err != nil {
log.Error("Can not load the logger configuration file, Please check if the file logger.xml exists on current directory", err)
os.Exit(1)
} else {
log.ReplaceLogger(logger)
logger.Flush()
}

}

func GetConfig() *Config {
configLock.RLock()
defer configLock.RUnlock()
Expand All @@ -112,20 +126,15 @@ func GetConfig() *Config {
func init() {
//called on the start by go
loadConfig(true)
logger, err := log.LoggerFromConfigAsFile("logger.xml")
loadLogger()

if err != nil {
fmt.Printf("Can not load the logger configuration file, Please check if the file logger.xml exists on current directory", err)
}

log.ReplaceLogger(logger)
s := make(chan os.Signal, 1)
signal.Notify(s, syscall.SIGUSR2)
go func() {
for {
<-s
loadConfig(false)
log.Error("Configuration reloading")
log.Info("Configuration reloading")
}
}()
}
Expand Down Expand Up @@ -200,7 +209,7 @@ func importJob() {
var outgoingCount = 0
for _, cdr := range cdrs {
var datetime = cdr.Calldate.Format(time.RFC3339)
log.Debugf("Get raw cdr for the date [%s], the clid [%s] and the context [%s]", datetime, cdr.ClidNumber, cdr.Dcontext)
log.Tracef("Get raw cdr for the date [%s], the clid [%s] and the context [%s]", datetime, cdr.ClidNumber, cdr.Dcontext)
var cel m.Cel
cel, err = getMySqlCel(db, cdr.Uniqueid)
var inoutstatus, err = getInOutStatus(cdr)
Expand Down Expand Up @@ -258,7 +267,8 @@ func importJob() {
if extent != "" {
cdr.Dst = extent
} else {
cdr.Dst = getPeerFromChannel(cdr.Dstchannel)
//must be checked cause by testing
cdr.Dst = cdr.Dst //getPeerFromChannel(cdr.Dstchannel)
}

} else {
Expand All @@ -271,7 +281,7 @@ func importJob() {
importedStatus = -1
}
//
log.Infof("Import executed for unique id [%s] with code : [%d], try process the mysql updating.\n",
log.Debugf("Import executed for unique id [%s] with code : [%d], try process the mysql updating.\n",
cdr.Uniqueid, importedStatus)
err = udpateMySqlCdrImportStatus(db, cdr.Uniqueid, importedStatus)
if err != nil {
Expand All @@ -290,7 +300,7 @@ func importJob() {
if outgoingCount > 0 {
syncPublish(spec, channel, "cdroutgoing")
}

//
}

func cleanup() {
Expand All @@ -299,8 +309,11 @@ func cleanup() {

func main() {
//

config = GetConfig()
//
isImportProcessing = false
//
now := time.Now()
_, timeZoneOffset := now.Zone()
log.Infof("Startring and using the timezone offset used : %d.", timeZoneOffset)
Expand All @@ -320,7 +333,12 @@ func main() {
for {
select {
case <-ticker.C:
importJob()
if isImportProcessing == false {
isImportProcessing = true
importJob()
isImportProcessing = false
}

case <-quit:
ticker.Stop()
return
Expand Down
10 changes: 5 additions & 5 deletions mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func processMonthlyAnalytics(session *mgo.Session, cdr m.RawCall) (err error) {
}
info, err = collection.Find(selector).Apply(change, &doc)
if info != nil {
log.Debugf("Monthly analytics wew record inserted : %s.", doc.Id)
log.Tracef("Monthly analytics wew record inserted : %s.", doc.Id)
} else {
log.Errorf("Monthly analytics can't be updated, get the error : [%v] for the document : %s", err, doc.Id)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func processDailyAnalytics(session *mgo.Session, cdr m.RawCall) (err error) {
} else {
return errors.New("[mongo] Can't detect the call context")
}
//var t = time.Unix(cdr.calldate, 0)
//
var id = fmt.Sprintf("%04d%02d%02d-%s-%d", cdr.Calldate.Year(), cdr.Calldate.Month(),
cdr.Calldate.Day(), dst, cdr.Disposition)
var metaDate = time.Date(cdr.Calldate.Year(), cdr.Calldate.Month(), cdr.Calldate.Day(), 1, 0, 0, 0, time.UTC)
Expand All @@ -115,7 +115,7 @@ func processDailyAnalytics(session *mgo.Session, cdr m.RawCall) (err error) {
metaDoc := m.MetaData{Dst: dst, Dt: metaDate, Disposition: cdr.Disposition}
doc := m.DailyCall{Id: id, Meta: metaDoc, AnswereWaitTime: cdr.AnswerWaitTime, CallDaily: 0,
DurationDaily: 0}
//err = collection.Insert(doc)
//
var selector = bson.M{"_id": id, "metadata": metaDoc}
var hourlyInc = fmt.Sprintf("call_hourly.%d", cdr.Calldate.Hour())
var durationHourlyInc = fmt.Sprintf("duration_hourly.%d", cdr.Calldate.Hour())
Expand Down Expand Up @@ -152,7 +152,7 @@ func processDailyAnalytics(session *mgo.Session, cdr m.RawCall) (err error) {

//
func processDidImport(session *mgo.Session, cdr m.RawCall) (err error) {
log.Debugf("Import by did : %s\n", cdr.Dnid)
log.Tracef("Import by did : %s\n", cdr.Dnid)
err = processDidDailyAnalytics(session, cdr)
if err != nil {
return nil
Expand Down Expand Up @@ -268,7 +268,7 @@ func processDidDailyAnalytics(session *mgo.Session, cdr m.RawCall) (err error) {
}

func importCdrToMongo(session *mgo.Session, cdr m.RawCall) (err error) {
log.Debugf("Start analyze data for mongo database.")
log.Tracef("Start analyze data for mongo database.")
createMongoCdr(session, cdr)
err = processDailyAnalytics(session, cdr)
if err != nil {
Expand Down
52 changes: 30 additions & 22 deletions mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,31 @@ import (
m "vorimport/models"
)

/*type CallDetail struct {
eventType string
eventTime time.Time
cidNum string
cidDnid string
exten string
uniqueId string
linkedId string
peer string
}*/

//'CHAN_START'
/**
* Used for split clid into two string caller name and caller number
*/
func bracket(r rune) bool {
return r == '<' || r == '>'
}

/**
*
*/
func getMysqlCdr(db mysql.Conn) (results []m.RawCall, err error) {
log.Debugf("Enter into getMysqlCdr")
log.Tracef("Enter into getMysqlCdr")
myQuery := "SELECT UNIX_TIMESTAMP(calldate) as calldate, clid, src, dst, channel, dcontext, disposition,billsec,duration,uniqueid,dstchannel, dnid, recordfile from asteriskcdrdb.cdr WHERE import = 0 LIMIT 0, " + config.DbMySqlFetchRowNumber
//
log.Debugf("Executing request [%s]\r\n", myQuery)
log.Tracef("Executing request [%s]\r\n", myQuery)
rows, res, err := db.Query(myQuery)
//
if err != nil {
fmt.Printf("Executing request failed with error [%s]\r\n", err)
log.Debugf("Executing request [%s] and get error [%s] \r\n", myQuery, err)
return nil, err
}
//
fmt.Printf("Request executed and get [%d] rows\r\n", len(rows))
log.Debugf("Request executed and get [%d] rows\r\n", len(rows))
log.Tracef("Request executed and get [%d] rows\r\n", len(rows))
//prepare results array
results = make([]m.RawCall, len(rows))
fmt.Printf("Create results for [%d] rows\r\n", len(rows))
i := 0
for _, row := range rows {
//
Expand All @@ -64,11 +55,30 @@ func getMysqlCdr(db mysql.Conn) (results []m.RawCall, err error) {
recordfile := res.Map("recordfile")
dstchannel := res.Map("dstchannel")
//
raw_clid := strings.FieldsFunc(row.Str(clid), bracket)
caller_name := ""
caller_number := ""

if len(raw_clid) == 2 {
caller_name = raw_clid[0]
caller_number = raw_clid[1]
} else if len(raw_clid) == 1 {
caller_number = raw_clid[0]
}

/*if len(raw_clid) == 2 {
caller_name := raw_clid[0]
caller_number := raw_clid[1]
}else len(raw_clid) == 1 {
caller_number := raw_clid[0]
}*/
//
c = m.RawCall{Id: bson.NewObjectId(),
Calldate: time.Unix(row.Int64(calldate)+int64(timeZoneOffset), 0),
MetadataDt: time.Unix(time.Now().Unix()+int64(timeZoneOffset), 0),
ClidName: row.Str(clid),
ClidNumber: row.Str(clid),
ClidName: caller_name,
ClidNumber: caller_number,
Src: row.Str(src),
Channel: row.Str(channel),
Dcontext: row.Str(dcontext),
Expand All @@ -88,7 +98,6 @@ func getMysqlCdr(db mysql.Conn) (results []m.RawCall, err error) {
i++

}
fmt.Printf("Return [%d] results .\r\n", len(results))
return results, nil
}

Expand Down Expand Up @@ -158,7 +167,6 @@ func getMySqlCallDetails(db mysql.Conn, uniqueid string) (results []m.CallDetail
if len(rows) == 0 {
return nil, nil
}
fmt.Printf("getMySqlCallDetails Request executed and get [%d] rows\r\n", len(rows))
//prepare results array
results = make([]m.CallDetail, len(rows))
log.Debugf("getMySqlCallDetails create results for [%d] rows\r\n", len(rows))
Expand Down
33 changes: 16 additions & 17 deletions samples/logger.xml
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
<seelog type="adaptive" mininterval="200000000" maxinterval="1000000000" critmsgcount="5" minlevel="trace">
<outputs formatid="msg">
<filter levels="trace">
<!--<file path="./log/vorimport_traces.log"/>-->
<console />
</filter>
<filter levels="debug">
<!--<file path="./log/vorimport_debug.log"/> -->
<console />
</filter>
<seelog type="adaptive" mininterval="2000000" maxinterval="100000000" critmsgcount="500" minlevel="info">
<exceptions>
<exception filepattern="test*" minlevel="error"/>
</exceptions>
<outputs formatid="all">
<file path="./log/all.log"/>
<filter levels="info">
<!--<file path="./log/vorimport_info.log"/>-->
<console />
<console formatid="fmtinfo"/>
<file path="./log/infos.log"/>
</filter>
<filter levels="error,critical" formatid="fmterror">
<console/>
<file path="./log/errors.log"/>
</filter>
<filter levels="error">
<!--<file path="./log/vorimport_error.log"/>-->
<console />
</filter>
</outputs>
<formats>
<format id="msg" format="%Time: %Msg%n"/>
<format id="fmtinfo" format="[%Level] [%Time] %Msg%n"/>
<format id="fmterror" format="[%LEVEL] [%Time] [%FuncShort @ %File.%Line] %Msg%n"/>
<format id="all" format="[%Level] [%Time] [@ %File.%Line] %Msg%n"/>
<format id="criticalemail" format="Critical error on our server!\n %Time %Date %RelFile %Func %Msg \nSent by Seelog"/>
</formats>
</seelog>
Loading

0 comments on commit b2dd884

Please sign in to comment.