This commit is contained in:
azw 2023-09-02 19:55:08 +08:00
commit 155220a3b2
6 changed files with 155 additions and 90 deletions

100
app.go
View File

@ -1,12 +1,13 @@
package f5
import (
"flag"
"fmt"
"os"
"q5"
"strings"
"sync"
"sync/atomic"
"time"
"os"
"q5"
)
type App interface {
@ -22,7 +23,7 @@ type App interface {
GetNowSeconds() int64
GetNowMillis() int64
GetNowNano() int64
RegisterMainThreadCb(func ())
RegisterMainThreadCb(func())
}
type UserApp interface {
@ -33,27 +34,27 @@ type UserApp interface {
}
type app struct {
zoneId int32
nodeId int32
instanceId int32
terminated bool
flags map[int32]int32
servicing bool
contextHash map[int32]q5.Args
loopCond *sync.Cond
imTopNode *IMMsgNode
imBotNode *IMMsgNode
imWorkNode *IMMsgNode
imMsgMutex sync.RWMutex
zoneId int32
nodeId int32
instanceId int32
terminated bool
flags map[int32]int32
servicing bool
contextHash map[int32]q5.Args
loopCond *sync.Cond
imTopNode *IMMsgNode
imBotNode *IMMsgNode
imWorkNode *IMMsgNode
imMsgMutex sync.RWMutex
chGoLoopTimerExit chan int
chGoLoopWait chan int64
nowTime time.Time
nowUnixNano int64
imMsgHandlers [1024]func(q5.Args)
maxRunDelay int64
maxScheduleTime int64
uuid q5.Uuid
userApp UserApp
chGoLoopWait chan int64
nowTime time.Time
nowUnixNano int64
imMsgHandlers [1024]func(q5.Args)
maxRunDelay int64
maxScheduleTime int64
uuid q5.Uuid
userApp UserApp
}
func (this *app) init(userApp UserApp) {
@ -75,21 +76,18 @@ func (this *app) init(userApp UserApp) {
_httpCliMgr = new(HttpCliMgr)
_httpCliMgr.init()
{
var tmpNodeId, tmpInstanceId int
flag.IntVar(&tmpNodeId, "n", 0, "node id")
flag.IntVar(&tmpInstanceId, "i", 0, "instance id")
flag.Parse()
tmpNodeId, tmpInstanceId := parseArgs()
this.nodeId = int32(tmpNodeId)
this.instanceId = int32(tmpInstanceId)
}
this.uuid.SetMachineId((this.nodeId - 1) * MAX_NODE_ID + this.instanceId)
this.uuid.SetMachineId((this.nodeId-1)*MAX_NODE_ID + this.instanceId)
this.loopCond = sync.NewCond(new(sync.Mutex))
this.chGoLoopTimerExit = make(chan int)
this.chGoLoopWait = make(chan int64)
_app.RegisterIMMsgHandle(
IM_EXEC_MAIN_THREAD_CB,
func (args q5.Args) {
cb := args[0].(func ())
func(args q5.Args) {
cb := args[0].(func())
cb()
})
this.outputRuningLog()
@ -125,7 +123,7 @@ func (this *app) run() {
this.schedule()
endTick := q5.GetTickCount()
if this.maxRunDelay < endTick - beginTick {
if this.maxRunDelay < endTick-beginTick {
this.maxRunDelay = endTick - beginTick
}
}
@ -178,10 +176,10 @@ func (this *app) RegisterIMMsgHandle(msgId uint16, handle func(q5.Args)) {
this.imMsgHandlers[msgId] = handle
}
func (this *app) RegisterMainThreadCb(cb func ()) {
func (this *app) RegisterMainThreadCb(cb func()) {
_app.AddIMMsg(IM_EXEC_MAIN_THREAD_CB,
[]interface{} {
cb,
[]interface{}{
cb,
})
}
@ -245,8 +243,8 @@ func (this *app) dispatchIMMsg() {
}
func (this *app) installTimer() {
GetTimer().SetInterval(1000 * 60,
func (ev int32, params *q5.Args) {
GetTimer().SetInterval(1000*60,
func(ev int32, params *q5.Args) {
if ev == q5.TIMER_EXEC_EVENT {
GetSysLog().Info("max_run_delay:%d max_schedule_time:%d",
_app.maxRunDelay,
@ -256,3 +254,31 @@ func (this *app) installTimer() {
}
})
}
func parseArgs() (int, int) {
args := os.Args[1:]
if len(args) <= 0 {
return 0, 0
}
var nodeId, instanceId int
for i := 0; i < len(args); i++ {
arg := args[i]
if strings.HasPrefix(arg, "-n") {
if len(arg) > 2 {
fmt.Sscanf(arg[2:], "%d", &nodeId)
} else if i+1 < len(args) {
fmt.Sscanf(args[i+1], "%d", &nodeId)
i++
}
} else if strings.HasPrefix(arg, "-i") {
if len(arg) > 2 {
fmt.Sscanf(arg[2:], "%d", &instanceId)
} else if i+1 < len(args) {
fmt.Sscanf(args[i+1], "%d", &instanceId)
i++
}
}
}
return nodeId, instanceId
}

8
constant_unix.go Normal file
View File

@ -0,0 +1,8 @@
//go:build unix
// +build unix
package f5
const (
SYS_LOG_ROOT = "/data/logs/%s/logs/"
)

8
constant_windows.go Normal file
View File

@ -0,0 +1,8 @@
//go:build windows
// +build windows
package f5
const (
SYS_LOG_ROOT = "d:/linux_root/data/logs/%s/logs/"
)

View File

@ -5,16 +5,16 @@ import (
)
type DataSet struct {
rows *sql.Rows
rows *sql.Rows
columns []string
values []interface{}
values []interface{}
}
func (this *DataSet) Next() bool {
ret := this.rows.Next()
this.GetColumns()
this.values = []interface{}{}
for i := 0 ; i < len(this.columns); i++ {
for i := 0; i < len(this.columns); i++ {
str := ""
this.values = append(this.values, &str)
}
@ -52,7 +52,7 @@ func (this *DataSet) GetByName(name string) *string {
func (this *DataSet) GetByIndex(index int32) *string {
this.GetColumns()
return this.values[index].(*string);
return this.values[index].(*string)
}
func NewDataSet(rows *sql.Rows) *DataSet {

View File

@ -1,13 +1,13 @@
package f5
import (
"q5"
"time"
"errors"
"fmt"
"math"
"q5"
"strings"
"sync"
"errors"
"time"
)
type DBStyle int32
@ -18,14 +18,14 @@ const (
)
type dataSource struct {
name string
conn *q5.Mysql
name string
conn *q5.Mysql
entry q5.ListHead
}
type dbPool struct {
style DBStyle
lock sync.Mutex
style DBStyle
lock sync.Mutex
dataSourceHash map[string]*q5.ListHead
}
@ -130,13 +130,35 @@ func (this *dbPool) Insert(
this.internalExec(dataSource, sql, params, cb)
}
func (this *dbPool) Replace(
dataSource string,
tblName string,
fieldsKv [][]string,
cb ExecResultCb) {
params := []string{}
sql := "REPLACE INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, &params)
this.internalExec(dataSource, sql, params, cb)
}
func (this *dbPool) Upsert(
dataSource string,
tblName string,
whereKv map[string]string,
updateKv map[string]string,
insertKv map[string]string,
whereKv [][]string,
updateKv [][]string,
insertKv [][]string,
cb ExecResultCb) {
this.OrmSelectOne(dataSource, tblName, whereKv,
func(err error, ds *DataSet) {
if err != nil {
cb(err, 0, 0)
return
}
if ds.Next() {
this.Update(dataSource, tblName, updateKv, whereKv, cb)
} else {
this.Insert(dataSource, tblName, insertKv, cb)
}
})
}
func (this *dbPool) PageQuery(
@ -160,23 +182,25 @@ func (this *dbPool) PageQuery(
}
this.internalQueryOne(
dataSource,
fmt.Sprintf("SELECT COUNT(*) FROM (%s)", finalySql),
fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql),
params,
func (err error, row *[]*string) {
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
}
pagination.Total = q5.ToInt32(*(*row)[0])
pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*(*row)[0]) /
float64(pagination.PerPage)))
if rows != nil && rows.Next() {
pagination.Total = q5.ToInt32(*rows.GetByIndex(0))
pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*rows.GetByIndex(0)) /
float64(pagination.PerPage)))
}
start := pagination.PerPage * (pagination.CurrentPage - 1)
limit := pagination.PerPage
this.internalQuery(
dataSource,
fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit),
params,
func (err error, rows *DataSet) {
func(err error, rows *DataSet) {
if err != nil {
cb(err, &pagination)
return
@ -192,14 +216,15 @@ func (this *dbPool) borrowConn(name string) *dataSource {
for tryCount < 5 {
{
this.lock.Lock()
defer this.lock.Unlock()
if head, ok := this.dataSourceHash[name]; ok {
if !head.Empty() {
next := head.Next()
next.Del()
this.lock.Unlock()
return next.GetData().(*dataSource)
}
}
this.lock.Unlock()
}
time.Sleep(time.Second * 1)
tryCount++
@ -238,7 +263,7 @@ func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) stri
sql := ""
for index, items := range fieldsKv {
suffix := ""
if index + 1 < len(fieldsKv) {
if index+1 < len(fieldsKv) {
suffix = ","
}
if items[0][0] == '!' {
@ -255,7 +280,7 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri
sql := " ("
for index, items := range fieldsKv {
suffix := ""
if index + 1 < len(fieldsKv) {
if index+1 < len(fieldsKv) {
suffix = ","
}
sql += "`" + items[0] + "`" + suffix
@ -264,14 +289,14 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri
sql += " VALUES("
for index, items := range fieldsKv {
suffix := ""
if index + 1 < len(fieldsKv) {
if index+1 < len(fieldsKv) {
suffix = ","
}
sql += "?" + suffix
*params = append(*params, items[1])
}
sql += ")"
return ""
return sql
}
func (this *dbPool) internalExec(dataSource string, sql string, params []string,
@ -298,7 +323,7 @@ func (this *dbPool) internalExec(dataSource string, sql string, params []string,
cb(err, lastInsertId, rowsAffected)
} else {
_app.RegisterMainThreadCb(
func () {
func() {
cb(err, lastInsertId, rowsAffected)
})
}
@ -318,7 +343,7 @@ func (this *dbPool) internalQuery(dataSource string, sql string, params []string
cb(err, NewDataSet(rows))
} else {
_app.RegisterMainThreadCb(
func () {
func() {
cb(err, NewDataSet(rows))
})
}
@ -334,20 +359,17 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str
rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...)
this.returnConn(ds)
values := &[]*string{}
var dataSet *DataSet
if err == nil {
dataSet := NewDataSet(rows)
if dataSet.Next() {
values = dataSet.GetValues()
}
dataSet = NewDataSet(rows)
}
GetSysLog().Info("xxxxxxxxxxxxxxxxxxx")
if this.style == GO_STYLE_DB {
cb(err, values)
cb(err, dataSet)
} else {
_app.RegisterMainThreadCb(
func () {
cb(err, values)
func() {
cb(err, dataSet)
})
}
}

View File

@ -1,37 +1,38 @@
package f5
import (
"os"
"sync"
"time"
"fmt"
"os"
"q5"
"sync"
"time"
)
const (
LOG_DEBUG = 0
LOG_INFO = iota
LOG_NOTICE = iota
LOG_WARNING = iota
LOG_ERROR = iota
LOG_ALERT = iota
LOG_DEBUG = 0
LOG_INFO = iota
LOG_NOTICE = iota
LOG_WARNING = iota
LOG_ERROR = iota
LOG_ALERT = iota
LOG_EMERGENCY = iota
)
const SYS_LOG_ROOT = "/data/logs/%s/logs/"
const SYS_LOG_FILENAME = "log_%d_%s.log"
const (
SYS_LOG_FILENAME = "log_%d_%s.log"
)
type LogMsgNode struct {
category int32
logMsg string
next *LogMsgNode
logMsg string
next *LogMsgNode
}
type SysLog_ struct {
logLevel int32
topNode *LogMsgNode
botNode *LogMsgNode
msgMutex sync.Mutex
logLevel int32
topNode *LogMsgNode
botNode *LogMsgNode
msgMutex sync.Mutex
chGoSaveExit chan int
}
@ -131,7 +132,7 @@ func (this *SysLog_) goSaveToFile() {
logDir := fmt.Sprintf(SYS_LOG_ROOT, GetApp().GetPkgName())
fileName := fmt.Sprintf(TGLOG_FILENAME, os.Getpid(), time.Now().Format("20060102"))
q5.ForceCreateDir(logDir)
if f, err := os.OpenFile(logDir + fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil {
if f, err := os.OpenFile(logDir+fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil {
for workNode != nil {
f.Write([]byte(workNode.logMsg))
workNode = workNode.next