diff --git a/app.go b/app.go index a8135ce..696cc63 100644 --- a/app.go +++ b/app.go @@ -1,12 +1,13 @@ package f5 import ( - "flag" + "fmt" + "os" + "q5" + "strings" "sync" "sync/atomic" "time" - "os" - "q5" ) type App interface { @@ -22,7 +23,7 @@ type App interface { GetNowSeconds() int64 GetNowMillis() int64 GetNowNano() int64 - RegisterMainThreadCb(func ()) + RegisterMainThreadCb(func()) } type UserApp interface { @@ -33,27 +34,27 @@ type UserApp interface { } type app struct { - zoneId int32 - nodeId int32 - instanceId int32 - terminated bool - flags map[int32]int32 - servicing bool - contextHash map[int32]q5.Args - loopCond *sync.Cond - imTopNode *IMMsgNode - imBotNode *IMMsgNode - imWorkNode *IMMsgNode - imMsgMutex sync.RWMutex + zoneId int32 + nodeId int32 + instanceId int32 + terminated bool + flags map[int32]int32 + servicing bool + contextHash map[int32]q5.Args + loopCond *sync.Cond + imTopNode *IMMsgNode + imBotNode *IMMsgNode + imWorkNode *IMMsgNode + imMsgMutex sync.RWMutex chGoLoopTimerExit chan int - chGoLoopWait chan int64 - nowTime time.Time - nowUnixNano int64 - imMsgHandlers [1024]func(q5.Args) - maxRunDelay int64 - maxScheduleTime int64 - uuid q5.Uuid - userApp UserApp + chGoLoopWait chan int64 + nowTime time.Time + nowUnixNano int64 + imMsgHandlers [1024]func(q5.Args) + maxRunDelay int64 + maxScheduleTime int64 + uuid q5.Uuid + userApp UserApp } func (this *app) init(userApp UserApp) { @@ -75,21 +76,18 @@ func (this *app) init(userApp UserApp) { _httpCliMgr = new(HttpCliMgr) _httpCliMgr.init() { - var tmpNodeId, tmpInstanceId int - flag.IntVar(&tmpNodeId, "n", 0, "node id") - flag.IntVar(&tmpInstanceId, "i", 0, "instance id") - flag.Parse() + tmpNodeId, tmpInstanceId := parseArgs() this.nodeId = int32(tmpNodeId) this.instanceId = int32(tmpInstanceId) } - this.uuid.SetMachineId((this.nodeId - 1) * MAX_NODE_ID + this.instanceId) + this.uuid.SetMachineId((this.nodeId-1)*MAX_NODE_ID + this.instanceId) this.loopCond = sync.NewCond(new(sync.Mutex)) this.chGoLoopTimerExit = make(chan int) this.chGoLoopWait = make(chan int64) _app.RegisterIMMsgHandle( IM_EXEC_MAIN_THREAD_CB, - func (args q5.Args) { - cb := args[0].(func ()) + func(args q5.Args) { + cb := args[0].(func()) cb() }) this.outputRuningLog() @@ -125,7 +123,7 @@ func (this *app) run() { this.schedule() endTick := q5.GetTickCount() - if this.maxRunDelay < endTick - beginTick { + if this.maxRunDelay < endTick-beginTick { this.maxRunDelay = endTick - beginTick } } @@ -178,10 +176,10 @@ func (this *app) RegisterIMMsgHandle(msgId uint16, handle func(q5.Args)) { this.imMsgHandlers[msgId] = handle } -func (this *app) RegisterMainThreadCb(cb func ()) { +func (this *app) RegisterMainThreadCb(cb func()) { _app.AddIMMsg(IM_EXEC_MAIN_THREAD_CB, - []interface{} { - cb, + []interface{}{ + cb, }) } @@ -245,8 +243,8 @@ func (this *app) dispatchIMMsg() { } func (this *app) installTimer() { - GetTimer().SetInterval(1000 * 60, - func (ev int32, params *q5.Args) { + GetTimer().SetInterval(1000*60, + func(ev int32, params *q5.Args) { if ev == q5.TIMER_EXEC_EVENT { GetSysLog().Info("max_run_delay:%d max_schedule_time:%d", _app.maxRunDelay, @@ -256,3 +254,31 @@ func (this *app) installTimer() { } }) } + +func parseArgs() (int, int) { + args := os.Args[1:] + if len(args) <= 0 { + return 0, 0 + } + var nodeId, instanceId int + for i := 0; i < len(args); i++ { + arg := args[i] + if strings.HasPrefix(arg, "-n") { + if len(arg) > 2 { + fmt.Sscanf(arg[2:], "%d", &nodeId) + } else if i+1 < len(args) { + fmt.Sscanf(args[i+1], "%d", &nodeId) + i++ + } + } else if strings.HasPrefix(arg, "-i") { + if len(arg) > 2 { + fmt.Sscanf(arg[2:], "%d", &instanceId) + } else if i+1 < len(args) { + fmt.Sscanf(args[i+1], "%d", &instanceId) + i++ + } + } + } + + return nodeId, instanceId +} diff --git a/constant_unix.go b/constant_unix.go new file mode 100644 index 0000000..7de33c5 --- /dev/null +++ b/constant_unix.go @@ -0,0 +1,8 @@ +//go:build unix +// +build unix + +package f5 + +const ( + SYS_LOG_ROOT = "/data/logs/%s/logs/" +) diff --git a/constant_windows.go b/constant_windows.go new file mode 100644 index 0000000..a7a538f --- /dev/null +++ b/constant_windows.go @@ -0,0 +1,8 @@ +//go:build windows +// +build windows + +package f5 + +const ( + SYS_LOG_ROOT = "d:/linux_root/data/logs/%s/logs/" +) diff --git a/dataset.go b/dataset.go index fd7890f..f1773d5 100644 --- a/dataset.go +++ b/dataset.go @@ -5,16 +5,16 @@ import ( ) type DataSet struct { - rows *sql.Rows + rows *sql.Rows columns []string - values []interface{} + values []interface{} } func (this *DataSet) Next() bool { ret := this.rows.Next() this.GetColumns() this.values = []interface{}{} - for i := 0 ; i < len(this.columns); i++ { + for i := 0; i < len(this.columns); i++ { str := "" this.values = append(this.values, &str) } @@ -52,7 +52,7 @@ func (this *DataSet) GetByName(name string) *string { func (this *DataSet) GetByIndex(index int32) *string { this.GetColumns() - return this.values[index].(*string); + return this.values[index].(*string) } func NewDataSet(rows *sql.Rows) *DataSet { diff --git a/dbpool.go b/dbpool.go index ebe8dee..6453f13 100644 --- a/dbpool.go +++ b/dbpool.go @@ -1,13 +1,13 @@ package f5 import ( - "q5" - "time" + "errors" "fmt" "math" + "q5" "strings" "sync" - "errors" + "time" ) type DBStyle int32 @@ -18,14 +18,14 @@ const ( ) type dataSource struct { - name string - conn *q5.Mysql + name string + conn *q5.Mysql entry q5.ListHead } type dbPool struct { - style DBStyle - lock sync.Mutex + style DBStyle + lock sync.Mutex dataSourceHash map[string]*q5.ListHead } @@ -130,13 +130,35 @@ func (this *dbPool) Insert( this.internalExec(dataSource, sql, params, cb) } +func (this *dbPool) Replace( + dataSource string, + tblName string, + fieldsKv [][]string, + cb ExecResultCb) { + params := []string{} + sql := "REPLACE INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, ¶ms) + this.internalExec(dataSource, sql, params, cb) +} + func (this *dbPool) Upsert( dataSource string, tblName string, - whereKv map[string]string, - updateKv map[string]string, - insertKv map[string]string, + whereKv [][]string, + updateKv [][]string, + insertKv [][]string, cb ExecResultCb) { + this.OrmSelectOne(dataSource, tblName, whereKv, + func(err error, ds *DataSet) { + if err != nil { + cb(err, 0, 0) + return + } + if ds.Next() { + this.Update(dataSource, tblName, updateKv, whereKv, cb) + } else { + this.Insert(dataSource, tblName, insertKv, cb) + } + }) } func (this *dbPool) PageQuery( @@ -160,23 +182,25 @@ func (this *dbPool) PageQuery( } this.internalQueryOne( dataSource, - fmt.Sprintf("SELECT COUNT(*) FROM (%s)", finalySql), + fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql), params, - func (err error, row *[]*string) { + func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } - pagination.Total = q5.ToInt32(*(*row)[0]) - pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*(*row)[0]) / - float64(pagination.PerPage))) + if rows != nil && rows.Next() { + pagination.Total = q5.ToInt32(*rows.GetByIndex(0)) + pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*rows.GetByIndex(0)) / + float64(pagination.PerPage))) + } start := pagination.PerPage * (pagination.CurrentPage - 1) limit := pagination.PerPage this.internalQuery( dataSource, fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit), params, - func (err error, rows *DataSet) { + func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return @@ -192,14 +216,15 @@ func (this *dbPool) borrowConn(name string) *dataSource { for tryCount < 5 { { this.lock.Lock() - defer this.lock.Unlock() if head, ok := this.dataSourceHash[name]; ok { if !head.Empty() { next := head.Next() next.Del() + this.lock.Unlock() return next.GetData().(*dataSource) } } + this.lock.Unlock() } time.Sleep(time.Second * 1) tryCount++ @@ -238,7 +263,7 @@ func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) stri sql := "" for index, items := range fieldsKv { suffix := "" - if index + 1 < len(fieldsKv) { + if index+1 < len(fieldsKv) { suffix = "," } if items[0][0] == '!' { @@ -255,7 +280,7 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri sql := " (" for index, items := range fieldsKv { suffix := "" - if index + 1 < len(fieldsKv) { + if index+1 < len(fieldsKv) { suffix = "," } sql += "`" + items[0] + "`" + suffix @@ -264,14 +289,14 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri sql += " VALUES(" for index, items := range fieldsKv { suffix := "" - if index + 1 < len(fieldsKv) { + if index+1 < len(fieldsKv) { suffix = "," } sql += "?" + suffix *params = append(*params, items[1]) } sql += ")" - return "" + return sql } func (this *dbPool) internalExec(dataSource string, sql string, params []string, @@ -298,7 +323,7 @@ func (this *dbPool) internalExec(dataSource string, sql string, params []string, cb(err, lastInsertId, rowsAffected) } else { _app.RegisterMainThreadCb( - func () { + func() { cb(err, lastInsertId, rowsAffected) }) } @@ -318,7 +343,7 @@ func (this *dbPool) internalQuery(dataSource string, sql string, params []string cb(err, NewDataSet(rows)) } else { _app.RegisterMainThreadCb( - func () { + func() { cb(err, NewDataSet(rows)) }) } @@ -334,20 +359,17 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) - values := &[]*string{} + var dataSet *DataSet if err == nil { - dataSet := NewDataSet(rows) - if dataSet.Next() { - values = dataSet.GetValues() - } + dataSet = NewDataSet(rows) } GetSysLog().Info("xxxxxxxxxxxxxxxxxxx") if this.style == GO_STYLE_DB { - cb(err, values) + cb(err, dataSet) } else { _app.RegisterMainThreadCb( - func () { - cb(err, values) + func() { + cb(err, dataSet) }) } } diff --git a/syslog.go b/syslog.go index eb7fd73..e166699 100644 --- a/syslog.go +++ b/syslog.go @@ -1,37 +1,38 @@ package f5 import ( - "os" - "sync" - "time" "fmt" + "os" "q5" + "sync" + "time" ) const ( - LOG_DEBUG = 0 - LOG_INFO = iota - LOG_NOTICE = iota - LOG_WARNING = iota - LOG_ERROR = iota - LOG_ALERT = iota + LOG_DEBUG = 0 + LOG_INFO = iota + LOG_NOTICE = iota + LOG_WARNING = iota + LOG_ERROR = iota + LOG_ALERT = iota LOG_EMERGENCY = iota ) -const SYS_LOG_ROOT = "/data/logs/%s/logs/" -const SYS_LOG_FILENAME = "log_%d_%s.log" +const ( + SYS_LOG_FILENAME = "log_%d_%s.log" +) type LogMsgNode struct { category int32 - logMsg string - next *LogMsgNode + logMsg string + next *LogMsgNode } type SysLog_ struct { - logLevel int32 - topNode *LogMsgNode - botNode *LogMsgNode - msgMutex sync.Mutex + logLevel int32 + topNode *LogMsgNode + botNode *LogMsgNode + msgMutex sync.Mutex chGoSaveExit chan int } @@ -131,7 +132,7 @@ func (this *SysLog_) goSaveToFile() { logDir := fmt.Sprintf(SYS_LOG_ROOT, GetApp().GetPkgName()) fileName := fmt.Sprintf(TGLOG_FILENAME, os.Getpid(), time.Now().Format("20060102")) q5.ForceCreateDir(logDir) - if f, err := os.OpenFile(logDir + fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil { + if f, err := os.OpenFile(logDir+fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil { for workNode != nil { f.Write([]byte(workNode.logMsg)) workNode = workNode.next