From 2d1b84c71b884c7a67ad9004b047e2784b24c2b1 Mon Sep 17 00:00:00 2001 From: aozhiwei Date: Fri, 19 Jul 2024 13:05:33 +0800 Subject: [PATCH] 1 --- server/light_backtask/task/taskmgr.go | 3 + server/light_backtask/task/webhook.go | 265 ++++++++++++++++++++++++++ 2 files changed, 268 insertions(+) create mode 100644 server/light_backtask/task/webhook.go diff --git a/server/light_backtask/task/taskmgr.go b/server/light_backtask/task/taskmgr.go index 3266e902..629746a4 100644 --- a/server/light_backtask/task/taskmgr.go +++ b/server/light_backtask/task/taskmgr.go @@ -4,6 +4,7 @@ import ( ) type taskMgr struct { + webHook repairNft sysMail repairOrder @@ -11,6 +12,7 @@ type taskMgr struct { } func (this *taskMgr) Init() { + this.webHook.init() this.repairNft.init() this.sysMail.init() this.repairOrder.init() @@ -22,4 +24,5 @@ func (this *taskMgr) UnInit() { this.repairOrder.unInit() this.sysMail.unInit() this.repairNft.unInit() + this.webHook.unInit() } diff --git a/server/light_backtask/task/webhook.go b/server/light_backtask/task/webhook.go new file mode 100644 index 00000000..1ed06110 --- /dev/null +++ b/server/light_backtask/task/webhook.go @@ -0,0 +1,265 @@ +package task + +import ( + "q5" + "f5" + "mt" + "jccommon" + "main/constant" + "main/service" + "time" + "strings" + "math/big" +) + +type webHook struct { + +} + +func (this* webHook) init() { + go this.loadWebHookEvent(jccommon.ORDER_UPDATE_EVENT, this.orderUpdatedCb) + go this.loadWebHookEvent(jccommon.ACTIVITY_SALE_EVENT, this.activitySaleCb) +} + +func (this* webHook) unInit() { + +} + +func (this* webHook) loadWebHookEvent(eventName string, cb func(ds *f5.DataSet) bool) { + time.Sleep(time.Second * 3) + lastOutTick := q5.GetTickCount() + var lastSyncIdx = this.getLastIdx(eventName) + for true { + if lastSyncIdx < 0 { + lastSyncIdx = this.getLastIdx(eventName) + if lastSyncIdx >= 0 { + continue + } + } else { + var newLastSyncIdx int64 + f5.GetGoStyleDb().RawQuery( + constant.BCEVENT_DB, + "SELECT * FROM t_webhook_event WHERE idx > ? AND event_name = ? LIMIT 1000", + []string{ + q5.ToString(lastSyncIdx), + eventName, + }, + func (err error, ds *f5.DataSet) { + if err == nil { + for ds.Next() { + idx := q5.ToInt64(ds.GetByName("idx")) + if cb(ds) { + if idx > newLastSyncIdx { + newLastSyncIdx = idx + } + } else { + break + } + } + } + }) + if newLastSyncIdx > lastSyncIdx { + lastSyncIdx = newLastSyncIdx + this.saveLastIdx(eventName, lastSyncIdx) + } + } + if q5.GetTickCount() - lastOutTick > 1000 * 15 { + lastOutTick = q5.GetTickCount() + f5.GetSysLog().Info("webhook %s last_idx:%d", eventName, lastSyncIdx) + } + time.Sleep(time.Second * 3) + } +} + +func (this* webHook) getLastIdx(eventName string) int64 { + var lastSyncIdx int64 = -1 + f5.GetGoStyleDb().OrmSelectOne( + constant.BCEVENT_DB, + "t_webhook_process_last_idx", + [][]string{ + {"event_name", eventName}, + }, + func (err error, ds *f5.DataSet) { + if err != nil { + return + } + if ds.Next() { + lastSyncIdx = q5.ToInt64(ds.GetByName("last_idx")) + } else { + lastSyncIdx = 0 + } + }) + return lastSyncIdx +} + +func (this* webHook) saveLastIdx(eventName string, lastIdx int64) bool { + var result bool = false + nowTime := f5.GetApp().GetRealSeconds() + f5.GetGoStyleDb().Upsert( + constant.BCEVENT_DB, + "t_webhook_process_last_idx", + [][]string{ + {"event_name", eventName}, + }, + [][]string{ + {"last_idx", q5.ToString(lastIdx)}, + }, + [][]string{ + {"event_name", eventName}, + {"last_idx", q5.ToString(lastIdx)}, + {"createtime", q5.ToString(nowTime)}, + {"modifytime", q5.ToString(nowTime)}, + }, + func (err error, lastInsertId int64, rowsAffected int64) { + result = err == nil + }) + return result +} + +func (this* webHook) orderUpdatedCb(ds *f5.DataSet) bool { + f5.GetSysLog().Info("orderUpdate event_id:%s start", ds.GetByName("event_id")) + nowTime := f5.GetApp().GetRealSeconds() + rawData := ds.GetByName("raw_data") + p := new(jccommon.OrderUpdatedEvent) + err := q5.DecodeJson(rawData, &p) + netId := mt.Table.Config.GetById(0).GetNetId() + if err != nil { + f5.GetSysLog().Warning("orderUpdate event_id:%s event_data parse error :%s", + ds.GetByName("event_id"), err) + return false + } + var resultErr error + var itemId int32 + var heroQuality int32 + service.GetItemIdHeroQuality(netId, p.Data.Sell[0].ContractAddress, p.Data.Sell[0].TokenId, + &itemId, &heroQuality) + startTime := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.StartAt)) + endTime := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.EndAt)) + createdAt := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.CreatedAt)) + updatedAt := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.UpdatedAt)) + srcPriceAmount := p.Data.Buy[0].Amount + { + bnSrcPriceAmount, ok := new(big.Int).SetString(srcPriceAmount, 10) + if ok { + for _, val := range p.Data.Fees { + bnAmount, ok := new(big.Int).SetString(val.Amount, 10) + if ok { + bnSrcPriceAmount.Add(bnSrcPriceAmount, bnAmount) + } + } + srcPriceAmount = bnSrcPriceAmount.String() + } + } + srcPriceContractAddress := p.Data.Buy[0].ContractAddress + srcPriceItemType := p.Data.Buy[0].ItemType + srcPriceExchangeRate, price := service.BcCurrency.ExchangeUSD( + srcPriceAmount, srcPriceItemType, srcPriceContractAddress) + commonFields := [][]string{ + {"order_id", p.Data.Id}, + {"chain", p.Chain}, + {"status", p.Data.Status.Name}, + {"seller_address", strings.ToLower(p.Data.AccountAddress)}, + {"item_id", q5.ToString(itemId)}, + {"hero_quality", q5.ToString(heroQuality)}, + {"start_time", q5.ToString(startTime)}, + {"end_time", q5.ToString(endTime)}, + {"created_at", q5.ToString(createdAt)}, + {"updated_at", q5.ToString(updatedAt)}, + {"event_id", p.EventId}, + {"event_data", rawData}, + {"modifytime", q5.ToString(nowTime)}, + {"src_price_amount", srcPriceAmount}, + {"src_price_contract_address", srcPriceContractAddress}, + {"src_price_item_type", srcPriceItemType}, + {"price", price}, + {"price_len", q5.ToString(len(price))}, + {"src_price_exchange_rate", q5.ToString(srcPriceExchangeRate)}, + } + updateFields := commonFields[0:] + insertFields := commonFields[0:] + q5.AppendSlice(&insertFields, []string{"net_id", q5.ToString(netId)}) + q5.AppendSlice(&insertFields, []string{"token_id", p.Data.Sell[0].TokenId}) + q5.AppendSlice(&insertFields, []string{"contract_address", + strings.ToLower(p.Data.Sell[0].ContractAddress)}) + q5.AppendSlice(&insertFields, []string{"createtime", q5.ToString(nowTime)}) + f5.GetGoStyleDb().UpsertEx( + constant.BCNFT_DB, + "t_order", + [][]string{ + {"net_id", q5.ToString(netId)}, + {"contract_address", strings.ToLower(p.Data.Sell[0].ContractAddress)}, + {"token_id", p.Data.Sell[0].TokenId}, + }, + updateFields, + insertFields, + func (err error, lastInsertId int64, rowsAffected int64) { + resultErr = err + if err != nil { + return + } + }, + func (ds *f5.DataSet) bool { + return p.EventId > ds.GetByName("event_id") + }) + f5.GetSysLog().Info("orderUpdate event_id:%s end", ds.GetByName("event_id")) + return resultErr == nil +} + +func (this* webHook) activitySaleCb(ds *f5.DataSet) bool { + f5.GetSysLog().Info("activitySale event_id:%s start", ds.GetByName("event_id")) + nowTime := f5.GetApp().GetRealSeconds() + rawData := ds.GetByName("raw_data") + p := new(jccommon.ActivitySaleEvent) + err := q5.DecodeJson(rawData, &p) + netId := mt.Table.Config.GetById(0).GetNetId() + if err != nil { + f5.GetSysLog().Warning("activitySale event_id:%s event_data parse error :%s", + ds.GetByName("event_id"), err) + return false + } + var resultErr error + var itemId int32 + var heroQuality int32 + service.GetItemIdHeroQuality(netId, + p.Data.Details.Asset[0].ContractAddress, + p.Data.Details.Asset[0].TokenId, + &itemId, &heroQuality) + indexedAt := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.IndexedAt)) + updateFields := [][]string{ + } + insertFields := [][]string{ + {"order_id", p.Data.Details.OrderId}, + {"chain", p.Chain}, + {"net_id", q5.ToString(netId)}, + {"token_id", p.Data.Details.Asset[0].TokenId}, + {"contract_address", strings.ToLower(p.Data.Details.Asset[0].ContractAddress)}, + {"item_id", q5.ToString(itemId)}, + {"hero_quality", q5.ToString(heroQuality)}, + {"buyer", p.Data.Details.From}, + {"seller", p.Data.Details.To}, + {"indexed_at", q5.ToString(indexedAt)}, + {"event_id", p.EventId}, + {"event_data", rawData}, + {"createtime", q5.ToString(nowTime)}, + {"modifytime", q5.ToString(nowTime)}, + } + f5.GetGoStyleDb().UpsertEx( + constant.BCNFT_DB, + "t_sale", + [][]string{ + {"order_id", p.Data.Details.OrderId}, + }, + updateFields, + insertFields, + func (err error, lastInsertId int64, rowsAffected int64) { + resultErr = err + if err != nil { + return + } + }, + func (ds *f5.DataSet) bool { + return p.EventId > ds.GetByName("event_id") + }) + f5.GetSysLog().Info("activitySale event_id:%s end", ds.GetByName("event_id")) + return resultErr == nil +}