From 1a8c40f038fb9ca886b7adaf7f19df0bb78beddb Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Sun, 20 Aug 2023 18:41:07 +0800 Subject: [PATCH 1/9] 1 --- dbpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbpool.go b/dbpool.go index e989032..74ceb62 100644 --- a/dbpool.go +++ b/dbpool.go @@ -99,7 +99,6 @@ func (this *dbPool) SelectOne( func (this *dbPool) OrmSelectOne( dataSource string, tblName string, - fields []string, whereKv [][]string, cb QueryOneCb) { params := []string{} @@ -193,14 +192,15 @@ func (this *dbPool) borrowConn(name string) *dataSource { for tryCount < 5 { { this.lock.Lock() - defer this.lock.Unlock() if head, ok := this.dataSourceHash[name]; ok { if !head.Empty() { next := head.Next() next.Del() + this.lock.Unlock() return next.GetData().(*dataSource) } } + this.lock.Unlock() } time.Sleep(time.Second * 1) tryCount++ From 7fa1685792fb6cefc39732d7cad7d1fb7e768d87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Mon, 21 Aug 2023 14:52:02 +0800 Subject: [PATCH 2/9] Add --- dbpool.go | 49 ++++++++++++++++++++++++------------------------- types.go | 30 ++++++++++++++++-------------- 2 files changed, 40 insertions(+), 39 deletions(-) diff --git a/dbpool.go b/dbpool.go index 74ceb62..1414b75 100644 --- a/dbpool.go +++ b/dbpool.go @@ -1,13 +1,13 @@ package f5 import ( - "q5" - "time" + "errors" "fmt" "math" + "q5" "strings" "sync" - "errors" + "time" ) type DBStyle int32 @@ -18,14 +18,14 @@ const ( ) type dataSource struct { - name string - conn *q5.Mysql + name string + conn *q5.Mysql entry q5.ListHead } type dbPool struct { - style DBStyle - lock sync.Mutex + style DBStyle + lock sync.Mutex dataSourceHash map[string]*q5.ListHead } @@ -162,21 +162,23 @@ func (this *dbPool) PageQuery( dataSource, fmt.Sprintf("SELECT COUNT(*) FROM (%s)", finalySql), params, - func (err error, row *[]*string) { + func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return } - pagination.Total = q5.ToInt32(*(*row)[0]) - pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*(*row)[0]) / - float64(pagination.PerPage))) + if rows != nil && rows.Next() { + pagination.Total = q5.ToInt32(*rows.GetByIndex(0)) + pagination.TotalPages = int32(math.Ceil(q5.ToFloat64(*rows.GetByIndex(0)) / + float64(pagination.PerPage))) + } start := pagination.PerPage * (pagination.CurrentPage - 1) limit := pagination.PerPage this.internalQuery( dataSource, fmt.Sprintf("%s LIMIT %d, %d", finalySql, start, limit), params, - func (err error, rows *DataSet) { + func(err error, rows *DataSet) { if err != nil { cb(err, &pagination) return @@ -239,7 +241,7 @@ func (this *dbPool) joinUpdateFields(fieldsKv [][]string, params *[]string) stri sql := "" for index, items := range fieldsKv { suffix := "" - if index + 1 < len(fieldsKv) { + if index+1 < len(fieldsKv) { suffix = "," } if items[0][0] == '!' { @@ -256,7 +258,7 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri sql := " (" for index, items := range fieldsKv { suffix := "" - if index + 1 < len(fieldsKv) { + if index+1 < len(fieldsKv) { suffix = "," } sql += "`" + items[0] + "`" + suffix @@ -265,7 +267,7 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri sql += " VALUES(" for index, items := range fieldsKv { suffix := "" - if index + 1 < len(fieldsKv) { + if index+1 < len(fieldsKv) { suffix = "," } sql += "?" + suffix @@ -299,7 +301,7 @@ func (this *dbPool) internalExec(dataSource string, sql string, params []string, cb(err, lastInsertId, rowsAffected) } else { _app.RegisterMainThreadCb( - func () { + func() { cb(err, lastInsertId, rowsAffected) }) } @@ -319,7 +321,7 @@ func (this *dbPool) internalQuery(dataSource string, sql string, params []string cb(err, NewDataSet(rows)) } else { _app.RegisterMainThreadCb( - func () { + func() { cb(err, NewDataSet(rows)) }) } @@ -335,19 +337,16 @@ func (this *dbPool) internalQueryOne(dataSource string, sql string, params []str rows, err := ds.conn.Query(sql, q5.ToInterfaces(params)...) this.returnConn(ds) - values := &[]*string{} + var dataSet *DataSet if err == nil { - dataSet := NewDataSet(rows) - if dataSet.Next() { - values = dataSet.GetValues() - } + dataSet = NewDataSet(rows) } if this.style == GO_STYLE_DB { - cb(err, values) + cb(err, dataSet) } else { _app.RegisterMainThreadCb( - func () { - cb(err, values) + func() { + cb(err, dataSet) }) } } diff --git a/types.go b/types.go index 43e4b59..0008b71 100644 --- a/types.go +++ b/types.go @@ -7,36 +7,38 @@ import ( const ( WSPROXYPACKHEAD_C_SIZE = 20 WSPROXYPACKHEAD_S_SIZE = 16 - NET_MSG_MAGIC_CODE = uint16('K') + uint16('S') << 8 + NET_MSG_MAGIC_CODE = uint16('K') + uint16('S')<<8 ) type MsgNode struct { - } type IMMsgNode struct { - msgId uint16 + msgId uint16 params q5.Args - next *IMMsgNode + next *IMMsgNode } type Pagination struct { - Total int32 - PerPage int32 + Total int32 + PerPage int32 CurrentPage int32 - TotalPages int32 - Rows *DataSet + TotalPages int32 + Rows *DataSet } type HandlerFunc func(*Context) -type QueryResultCb func (error, *DataSet); -type QueryOneCb func (error, *[]*string); -type PageQueryCb func (error, *Pagination); -type ExecResultCb func (error, int64, int64); +type QueryResultCb func(error, *DataSet) + +type QueryOneCb func(error, *DataSet) + +type PageQueryCb func(error, *Pagination) + +type ExecResultCb func(error, int64, int64) type middleware struct { middlewareType int32 - handlerFunc HandlerFunc - entry q5.ListHead + handlerFunc HandlerFunc + entry q5.ListHead } From 1055774a44c216387425d5e1025e89dc052fb30a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Mon, 21 Aug 2023 16:42:02 +0800 Subject: [PATCH 3/9] code format --- dataset.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dataset.go b/dataset.go index fd7890f..f1773d5 100644 --- a/dataset.go +++ b/dataset.go @@ -5,16 +5,16 @@ import ( ) type DataSet struct { - rows *sql.Rows + rows *sql.Rows columns []string - values []interface{} + values []interface{} } func (this *DataSet) Next() bool { ret := this.rows.Next() this.GetColumns() this.values = []interface{}{} - for i := 0 ; i < len(this.columns); i++ { + for i := 0; i < len(this.columns); i++ { str := "" this.values = append(this.values, &str) } @@ -52,7 +52,7 @@ func (this *DataSet) GetByName(name string) *string { func (this *DataSet) GetByIndex(index int32) *string { this.GetColumns() - return this.values[index].(*string); + return this.values[index].(*string) } func NewDataSet(rows *sql.Rows) *DataSet { From f5fd0f9309b257f55e0a5588d5f018546a1b8bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Tue, 22 Aug 2023 18:03:05 +0800 Subject: [PATCH 4/9] Add replace --- dbpool.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dbpool.go b/dbpool.go index 1414b75..b18c4fc 100644 --- a/dbpool.go +++ b/dbpool.go @@ -130,6 +130,16 @@ func (this *dbPool) Insert( this.internalExec(dataSource, sql, params, cb) } +func (this *dbPool) Replace( + dataSource string, + tblName string, + fieldsKv [][]string, + cb ExecResultCb) { + params := []string{} + sql := "REPLACE INTO `" + tblName + "` " + this.joinInsertFields(fieldsKv, ¶ms) + this.internalExec(dataSource, sql, params, cb) +} + func (this *dbPool) Upsert( dataSource string, tblName string, @@ -274,7 +284,7 @@ func (this *dbPool) joinInsertFields(fieldsKv [][]string, params *[]string) stri *params = append(*params, items[1]) } sql += ")" - return "" + return sql } func (this *dbPool) internalExec(dataSource string, sql string, params []string, From 0391e1a0064557b26b8164c4908af6b6f7f3c684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Thu, 24 Aug 2023 11:22:15 +0800 Subject: [PATCH 5/9] =?UTF-8?q?Add=20Upsert=20=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbpool.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/dbpool.go b/dbpool.go index b18c4fc..fc2ab87 100644 --- a/dbpool.go +++ b/dbpool.go @@ -143,10 +143,22 @@ func (this *dbPool) Replace( func (this *dbPool) Upsert( dataSource string, tblName string, - whereKv map[string]string, - updateKv map[string]string, - insertKv map[string]string, + whereKv [][]string, + updateKv [][]string, + insertKv [][]string, cb ExecResultCb) { + this.OrmSelectOne(dataSource, tblName, whereKv, + func(err error, ds *DataSet) { + if err != nil { + cb(err, 0, 0) + return + } + if ds.Next() { + this.Update(dataSource, tblName, updateKv, whereKv, cb) + } else { + this.Insert(dataSource, tblName, insertKv, cb) + } + }) } func (this *dbPool) PageQuery( From 0463bb48fc3be6eae3ca1a2d136d0303f2e41114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Fri, 25 Aug 2023 15:03:37 +0800 Subject: [PATCH 6/9] internalQueryOne select count... add tmp table flag --- app.go | 64 +++++++++++++++++++++++++++---------------------------- dbpool.go | 2 +- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/app.go b/app.go index a8135ce..d2cbbea 100644 --- a/app.go +++ b/app.go @@ -2,11 +2,11 @@ package f5 import ( "flag" + "os" + "q5" "sync" "sync/atomic" "time" - "os" - "q5" ) type App interface { @@ -22,7 +22,7 @@ type App interface { GetNowSeconds() int64 GetNowMillis() int64 GetNowNano() int64 - RegisterMainThreadCb(func ()) + RegisterMainThreadCb(func()) } type UserApp interface { @@ -33,27 +33,27 @@ type UserApp interface { } type app struct { - zoneId int32 - nodeId int32 - instanceId int32 - terminated bool - flags map[int32]int32 - servicing bool - contextHash map[int32]q5.Args - loopCond *sync.Cond - imTopNode *IMMsgNode - imBotNode *IMMsgNode - imWorkNode *IMMsgNode - imMsgMutex sync.RWMutex + zoneId int32 + nodeId int32 + instanceId int32 + terminated bool + flags map[int32]int32 + servicing bool + contextHash map[int32]q5.Args + loopCond *sync.Cond + imTopNode *IMMsgNode + imBotNode *IMMsgNode + imWorkNode *IMMsgNode + imMsgMutex sync.RWMutex chGoLoopTimerExit chan int - chGoLoopWait chan int64 - nowTime time.Time - nowUnixNano int64 - imMsgHandlers [1024]func(q5.Args) - maxRunDelay int64 - maxScheduleTime int64 - uuid q5.Uuid - userApp UserApp + chGoLoopWait chan int64 + nowTime time.Time + nowUnixNano int64 + imMsgHandlers [1024]func(q5.Args) + maxRunDelay int64 + maxScheduleTime int64 + uuid q5.Uuid + userApp UserApp } func (this *app) init(userApp UserApp) { @@ -82,14 +82,14 @@ func (this *app) init(userApp UserApp) { this.nodeId = int32(tmpNodeId) this.instanceId = int32(tmpInstanceId) } - this.uuid.SetMachineId((this.nodeId - 1) * MAX_NODE_ID + this.instanceId) + this.uuid.SetMachineId((this.nodeId-1)*MAX_NODE_ID + this.instanceId) this.loopCond = sync.NewCond(new(sync.Mutex)) this.chGoLoopTimerExit = make(chan int) this.chGoLoopWait = make(chan int64) _app.RegisterIMMsgHandle( IM_EXEC_MAIN_THREAD_CB, - func (args q5.Args) { - cb := args[0].(func ()) + func(args q5.Args) { + cb := args[0].(func()) cb() }) this.outputRuningLog() @@ -125,7 +125,7 @@ func (this *app) run() { this.schedule() endTick := q5.GetTickCount() - if this.maxRunDelay < endTick - beginTick { + if this.maxRunDelay < endTick-beginTick { this.maxRunDelay = endTick - beginTick } } @@ -178,10 +178,10 @@ func (this *app) RegisterIMMsgHandle(msgId uint16, handle func(q5.Args)) { this.imMsgHandlers[msgId] = handle } -func (this *app) RegisterMainThreadCb(cb func ()) { +func (this *app) RegisterMainThreadCb(cb func()) { _app.AddIMMsg(IM_EXEC_MAIN_THREAD_CB, - []interface{} { - cb, + []interface{}{ + cb, }) } @@ -245,8 +245,8 @@ func (this *app) dispatchIMMsg() { } func (this *app) installTimer() { - GetTimer().SetInterval(1000 * 60, - func (ev int32, params *q5.Args) { + GetTimer().SetInterval(1000*60, + func(ev int32, params *q5.Args) { if ev == q5.TIMER_EXEC_EVENT { GetSysLog().Info("max_run_delay:%d max_schedule_time:%d", _app.maxRunDelay, diff --git a/dbpool.go b/dbpool.go index fc2ab87..34907fd 100644 --- a/dbpool.go +++ b/dbpool.go @@ -182,7 +182,7 @@ func (this *dbPool) PageQuery( } this.internalQueryOne( dataSource, - fmt.Sprintf("SELECT COUNT(*) FROM (%s)", finalySql), + fmt.Sprintf("SELECT COUNT(*) FROM (%s) as t", finalySql), params, func(err error, rows *DataSet) { if err != nil { From 28e2d3ea05614a70fd8bc3d853b8a3ece3ca03d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Mon, 28 Aug 2023 13:12:45 +0800 Subject: [PATCH 7/9] =?UTF-8?q?syslog=E6=97=A5=E5=BF=97=E8=B7=AF=E5=BE=84?= =?UTF-8?q?=20=E6=B7=BB=E5=8A=A0windows=E7=9B=AE=E5=BD=95=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- syslog.go | 51 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/syslog.go b/syslog.go index eb7fd73..baa5f77 100644 --- a/syslog.go +++ b/syslog.go @@ -1,37 +1,41 @@ package f5 import ( - "os" - "sync" - "time" "fmt" + "os" "q5" + "runtime" + "sync" + "time" ) const ( - LOG_DEBUG = 0 - LOG_INFO = iota - LOG_NOTICE = iota - LOG_WARNING = iota - LOG_ERROR = iota - LOG_ALERT = iota + LOG_DEBUG = 0 + LOG_INFO = iota + LOG_NOTICE = iota + LOG_WARNING = iota + LOG_ERROR = iota + LOG_ALERT = iota LOG_EMERGENCY = iota ) -const SYS_LOG_ROOT = "/data/logs/%s/logs/" -const SYS_LOG_FILENAME = "log_%d_%s.log" +const ( + SYS_LOG_ROOT_LINUX = "/data/logs/%s/logs/" + SYS_LOG_ROOT_WINDOWS = "d:/linux_root/data/logs/%s/logs/" + SYS_LOG_FILENAME = "log_%d_%s.log" +) type LogMsgNode struct { category int32 - logMsg string - next *LogMsgNode + logMsg string + next *LogMsgNode } type SysLog_ struct { - logLevel int32 - topNode *LogMsgNode - botNode *LogMsgNode - msgMutex sync.Mutex + logLevel int32 + topNode *LogMsgNode + botNode *LogMsgNode + msgMutex sync.Mutex chGoSaveExit chan int } @@ -128,10 +132,10 @@ func (this *SysLog_) goSaveToFile() { this.botNode = nil this.msgMutex.Unlock() if workNode != nil { - logDir := fmt.Sprintf(SYS_LOG_ROOT, GetApp().GetPkgName()) + logDir := this.GetLogDir() fileName := fmt.Sprintf(TGLOG_FILENAME, os.Getpid(), time.Now().Format("20060102")) q5.ForceCreateDir(logDir) - if f, err := os.OpenFile(logDir + fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil { + if f, err := os.OpenFile(logDir+fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil { for workNode != nil { f.Write([]byte(workNode.logMsg)) workNode = workNode.next @@ -143,3 +147,12 @@ func (this *SysLog_) goSaveToFile() { } } } + +func (this *SysLog_) GetLogDir() string { + pkgName := GetApp().GetPkgName() + var logDir string = fmt.Sprintf(SYS_LOG_ROOT_LINUX, pkgName) + if runtime.GOOS == "windows" { + logDir = fmt.Sprintf(SYS_LOG_ROOT_WINDOWS, pkgName) + } + return logDir +} From 3584ff5e7ebdd4186fe26abed7c4ad79b845246d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Mon, 28 Aug 2023 13:58:31 +0800 Subject: [PATCH 8/9] =?UTF-8?q?syslog=E6=97=A5=E5=BF=97=E8=B7=AF=E5=BE=84?= =?UTF-8?q?=20=E6=B7=BB=E5=8A=A0windows=E7=9B=AE=E5=BD=95=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- constant_unix.go | 8 ++++++++ constant_windows.go | 8 ++++++++ syslog.go | 16 ++-------------- 3 files changed, 18 insertions(+), 14 deletions(-) create mode 100644 constant_unix.go create mode 100644 constant_windows.go diff --git a/constant_unix.go b/constant_unix.go new file mode 100644 index 0000000..7de33c5 --- /dev/null +++ b/constant_unix.go @@ -0,0 +1,8 @@ +//go:build unix +// +build unix + +package f5 + +const ( + SYS_LOG_ROOT = "/data/logs/%s/logs/" +) diff --git a/constant_windows.go b/constant_windows.go new file mode 100644 index 0000000..a7a538f --- /dev/null +++ b/constant_windows.go @@ -0,0 +1,8 @@ +//go:build windows +// +build windows + +package f5 + +const ( + SYS_LOG_ROOT = "d:/linux_root/data/logs/%s/logs/" +) diff --git a/syslog.go b/syslog.go index baa5f77..e166699 100644 --- a/syslog.go +++ b/syslog.go @@ -4,7 +4,6 @@ import ( "fmt" "os" "q5" - "runtime" "sync" "time" ) @@ -20,9 +19,7 @@ const ( ) const ( - SYS_LOG_ROOT_LINUX = "/data/logs/%s/logs/" - SYS_LOG_ROOT_WINDOWS = "d:/linux_root/data/logs/%s/logs/" - SYS_LOG_FILENAME = "log_%d_%s.log" + SYS_LOG_FILENAME = "log_%d_%s.log" ) type LogMsgNode struct { @@ -132,7 +129,7 @@ func (this *SysLog_) goSaveToFile() { this.botNode = nil this.msgMutex.Unlock() if workNode != nil { - logDir := this.GetLogDir() + logDir := fmt.Sprintf(SYS_LOG_ROOT, GetApp().GetPkgName()) fileName := fmt.Sprintf(TGLOG_FILENAME, os.Getpid(), time.Now().Format("20060102")) q5.ForceCreateDir(logDir) if f, err := os.OpenFile(logDir+fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666); err == nil { @@ -147,12 +144,3 @@ func (this *SysLog_) goSaveToFile() { } } } - -func (this *SysLog_) GetLogDir() string { - pkgName := GetApp().GetPkgName() - var logDir string = fmt.Sprintf(SYS_LOG_ROOT_LINUX, pkgName) - if runtime.GOOS == "windows" { - logDir = fmt.Sprintf(SYS_LOG_ROOT_WINDOWS, pkgName) - } - return logDir -} From 0ff73b5f57d40fcae373baa16f015b28067e52f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B7=E5=8B=87?= Date: Mon, 28 Aug 2023 14:58:49 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9flag=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=8F=82=E6=95=B0=20"-n=201=20-i=201"=20to=20"-n1=20-i1"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.go | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/app.go b/app.go index d2cbbea..696cc63 100644 --- a/app.go +++ b/app.go @@ -1,9 +1,10 @@ package f5 import ( - "flag" + "fmt" "os" "q5" + "strings" "sync" "sync/atomic" "time" @@ -75,10 +76,7 @@ func (this *app) init(userApp UserApp) { _httpCliMgr = new(HttpCliMgr) _httpCliMgr.init() { - var tmpNodeId, tmpInstanceId int - flag.IntVar(&tmpNodeId, "n", 0, "node id") - flag.IntVar(&tmpInstanceId, "i", 0, "instance id") - flag.Parse() + tmpNodeId, tmpInstanceId := parseArgs() this.nodeId = int32(tmpNodeId) this.instanceId = int32(tmpInstanceId) } @@ -256,3 +254,31 @@ func (this *app) installTimer() { } }) } + +func parseArgs() (int, int) { + args := os.Args[1:] + if len(args) <= 0 { + return 0, 0 + } + var nodeId, instanceId int + for i := 0; i < len(args); i++ { + arg := args[i] + if strings.HasPrefix(arg, "-n") { + if len(arg) > 2 { + fmt.Sscanf(arg[2:], "%d", &nodeId) + } else if i+1 < len(args) { + fmt.Sscanf(args[i+1], "%d", &nodeId) + i++ + } + } else if strings.HasPrefix(arg, "-i") { + if len(arg) > 2 { + fmt.Sscanf(arg[2:], "%d", &instanceId) + } else if i+1 < len(args) { + fmt.Sscanf(args[i+1], "%d", &instanceId) + i++ + } + } + } + + return nodeId, instanceId +}