791 lines
21 KiB
Go
791 lines
21 KiB
Go
// Copyright 2013 Hui Chen
|
||
// Copyright 2016 ego authors
|
||
//
|
||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||
// not use this file except in compliance with the License. You may obtain
|
||
// a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||
// License for the specific language governing permissions and limitations
|
||
// under the License.
|
||
|
||
/*
|
||
|
||
Package riot is riot engine
|
||
*/
|
||
package riot
|
||
|
||
import (
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"runtime"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
// "reflect"
|
||
|
||
"sync/atomic"
|
||
|
||
"github.com/go-ego/riot/core"
|
||
"github.com/go-ego/riot/store"
|
||
"github.com/go-ego/riot/types"
|
||
"github.com/go-ego/riot/utils"
|
||
|
||
"github.com/go-ego/gse"
|
||
"github.com/go-ego/murmur"
|
||
"github.com/shirou/gopsutil/mem"
|
||
)
|
||
|
||
const (
|
||
// Version get the riot version
|
||
Version string = "v0.10.0.425, Danube River!"
|
||
|
||
// NumNanosecondsInAMillisecond nano-seconds in a milli-second num
|
||
NumNanosecondsInAMillisecond = 1000000
|
||
// StoreFilePrefix persistent store file prefix
|
||
StoreFilePrefix = "riot"
|
||
|
||
// DefaultPath default db path
|
||
DefaultPath = "./riot-index"
|
||
)
|
||
|
||
// GetVersion get the riot version
|
||
func GetVersion() string {
|
||
return Version
|
||
}
|
||
|
||
// Engine initialize the engine
|
||
type Engine struct {
|
||
loc sync.RWMutex
|
||
|
||
// 计数器,用来统计有多少文档被索引等信息
|
||
numDocsIndexed uint64
|
||
numDocsRemoved uint64
|
||
numDocsForceUpdated uint64
|
||
numIndexingReqs uint64
|
||
numRemovingReqs uint64
|
||
numForceUpdatingReqs uint64
|
||
numTokenIndexAdded uint64
|
||
numDocsStored uint64
|
||
|
||
// 记录初始化参数
|
||
initOptions types.EngineOpts
|
||
initialized bool
|
||
|
||
indexers []core.Indexer
|
||
rankers []core.Ranker
|
||
segmenter gse.Segmenter
|
||
loaded bool
|
||
stopTokens StopTokens
|
||
dbs []store.Store
|
||
|
||
// 建立索引器使用的通信通道
|
||
segmenterChan chan segmenterReq
|
||
indexerAddDocChans []chan indexerAddDocReq
|
||
indexerRemoveDocChans []chan indexerRemoveDocReq
|
||
rankerAddDocChans []chan rankerAddDocReq
|
||
|
||
// 建立排序器使用的通信通道
|
||
indexerLookupChans []chan indexerLookupReq
|
||
rankerRankChans []chan rankerRankReq
|
||
rankerRemoveDocChans []chan rankerRemoveDocReq
|
||
|
||
// 建立持久存储使用的通信通道
|
||
storeIndexDocChans []chan storeIndexDocReq
|
||
storeInitChan chan bool
|
||
}
|
||
|
||
// Indexer initialize the indexer channel
|
||
func (engine *Engine) Indexer(options types.EngineOpts) {
|
||
engine.indexerAddDocChans = make(
|
||
[]chan indexerAddDocReq, options.NumShards)
|
||
|
||
engine.indexerRemoveDocChans = make(
|
||
[]chan indexerRemoveDocReq, options.NumShards)
|
||
|
||
engine.indexerLookupChans = make(
|
||
[]chan indexerLookupReq, options.NumShards)
|
||
|
||
for shard := 0; shard < options.NumShards; shard++ {
|
||
engine.indexerAddDocChans[shard] = make(
|
||
chan indexerAddDocReq, options.IndexerBufLen)
|
||
|
||
engine.indexerRemoveDocChans[shard] = make(
|
||
chan indexerRemoveDocReq, options.IndexerBufLen)
|
||
|
||
engine.indexerLookupChans[shard] = make(
|
||
chan indexerLookupReq, options.IndexerBufLen)
|
||
}
|
||
}
|
||
|
||
// Ranker initialize the ranker channel
|
||
func (engine *Engine) Ranker(options types.EngineOpts) {
|
||
engine.rankerAddDocChans = make(
|
||
[]chan rankerAddDocReq, options.NumShards)
|
||
|
||
engine.rankerRankChans = make(
|
||
[]chan rankerRankReq, options.NumShards)
|
||
|
||
engine.rankerRemoveDocChans = make(
|
||
[]chan rankerRemoveDocReq, options.NumShards)
|
||
|
||
for shard := 0; shard < options.NumShards; shard++ {
|
||
engine.rankerAddDocChans[shard] = make(
|
||
chan rankerAddDocReq, options.RankerBufLen)
|
||
|
||
engine.rankerRankChans[shard] = make(
|
||
chan rankerRankReq, options.RankerBufLen)
|
||
|
||
engine.rankerRemoveDocChans[shard] = make(
|
||
chan rankerRemoveDocReq, options.RankerBufLen)
|
||
}
|
||
}
|
||
|
||
// InitStore initialize the persistent store channel
|
||
func (engine *Engine) InitStore() {
|
||
engine.storeIndexDocChans = make(
|
||
[]chan storeIndexDocReq, engine.initOptions.StoreShards)
|
||
|
||
for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
|
||
engine.storeIndexDocChans[shard] = make(
|
||
chan storeIndexDocReq)
|
||
}
|
||
engine.storeInitChan = make(
|
||
chan bool, engine.initOptions.StoreShards)
|
||
}
|
||
|
||
// CheckMem check the memory when the memory is larger
|
||
// than 99.99% using the store
|
||
func (engine *Engine) CheckMem() {
|
||
// Todo test
|
||
if !engine.initOptions.UseStore {
|
||
log.Println("Check virtualMemory...")
|
||
|
||
vmem, _ := mem.VirtualMemory()
|
||
log.Printf("Total: %v, Free: %v, UsedPercent: %f%%\n",
|
||
vmem.Total, vmem.Free, vmem.UsedPercent)
|
||
|
||
useMem := fmt.Sprintf("%.2f", vmem.UsedPercent)
|
||
if useMem == "99.99" {
|
||
engine.initOptions.UseStore = true
|
||
engine.initOptions.StoreFolder = DefaultPath
|
||
// os.MkdirAll(DefaultPath, 0777)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Store start the persistent store work connection
|
||
func (engine *Engine) Store() {
|
||
// if engine.initOptions.UseStore {
|
||
err := os.MkdirAll(engine.initOptions.StoreFolder, 0700)
|
||
if err != nil {
|
||
log.Fatalf("Can not create directory: %s ; %v",
|
||
engine.initOptions.StoreFolder, err)
|
||
}
|
||
|
||
// 打开或者创建数据库
|
||
engine.dbs = make([]store.Store, engine.initOptions.StoreShards)
|
||
for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
|
||
dbPath := engine.initOptions.StoreFolder + "/" +
|
||
StoreFilePrefix + "." + strconv.Itoa(shard)
|
||
|
||
db, err := store.OpenStore(dbPath, engine.initOptions.StoreEngine)
|
||
if db == nil || err != nil {
|
||
log.Fatal("Unable to open database ", dbPath, ": ", err)
|
||
}
|
||
engine.dbs[shard] = db
|
||
}
|
||
|
||
// 从数据库中恢复
|
||
for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
|
||
go engine.storeInitWorker(shard)
|
||
}
|
||
|
||
// 等待恢复完成
|
||
for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
|
||
<-engine.storeInitChan
|
||
}
|
||
|
||
for {
|
||
runtime.Gosched()
|
||
|
||
engine.loc.RLock()
|
||
numDoced := engine.numIndexingReqs == engine.numDocsIndexed
|
||
engine.loc.RUnlock()
|
||
|
||
if numDoced {
|
||
break
|
||
}
|
||
|
||
}
|
||
|
||
// 关闭并重新打开数据库
|
||
for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
|
||
engine.dbs[shard].Close()
|
||
dbPath := engine.initOptions.StoreFolder + "/" +
|
||
StoreFilePrefix + "." + strconv.Itoa(shard)
|
||
|
||
db, err := store.OpenStore(dbPath, engine.initOptions.StoreEngine)
|
||
if db == nil || err != nil {
|
||
log.Fatal("Unable to open database ", dbPath, ": ", err)
|
||
}
|
||
engine.dbs[shard] = db
|
||
}
|
||
|
||
for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
|
||
go engine.storeIndexDocWorker(shard)
|
||
}
|
||
// }
|
||
}
|
||
|
||
// WithGse Using user defined segmenter
|
||
// If using a not nil segmenter and the dictionary is loaded,
|
||
// the `opt.GseDict` will be ignore.
|
||
func (engine *Engine) WithGse(segmenter gse.Segmenter) *Engine {
|
||
if engine.initialized {
|
||
log.Fatal(`Do not re-initialize the engine,
|
||
WithGse should call before initialize the engine.`)
|
||
}
|
||
|
||
engine.segmenter = segmenter
|
||
engine.loaded = true
|
||
return engine
|
||
}
|
||
|
||
// Init initialize the engine
|
||
func (engine *Engine) Init(options types.EngineOpts) {
|
||
// 将线程数设置为CPU数
|
||
// runtime.GOMAXPROCS(runtime.NumCPU())
|
||
// runtime.GOMAXPROCS(128)
|
||
|
||
// 初始化初始参数
|
||
if engine.initialized {
|
||
log.Fatal("Do not re-initialize the engine.")
|
||
}
|
||
|
||
if options.GseDict == "" && !options.NotUseGse && !engine.loaded {
|
||
log.Printf("Dictionary file path is empty, load the default dictionary file.")
|
||
options.GseDict = "zh"
|
||
}
|
||
|
||
if options.UseStore == true && options.StoreFolder == "" {
|
||
log.Printf("Store file path is empty, use default folder path.")
|
||
options.StoreFolder = DefaultPath
|
||
// os.MkdirAll(DefaultPath, 0777)
|
||
}
|
||
|
||
options.Init()
|
||
engine.initOptions = options
|
||
engine.initialized = true
|
||
|
||
if !options.NotUseGse {
|
||
if !engine.loaded {
|
||
// 载入分词器词典
|
||
engine.segmenter.LoadDict(options.GseDict)
|
||
engine.loaded = true
|
||
}
|
||
|
||
// 初始化停用词
|
||
engine.stopTokens.Init(options.StopTokenFile)
|
||
}
|
||
|
||
// 初始化索引器和排序器
|
||
for shard := 0; shard < options.NumShards; shard++ {
|
||
engine.indexers = append(engine.indexers, core.Indexer{})
|
||
engine.indexers[shard].Init(*options.IndexerOpts)
|
||
|
||
engine.rankers = append(engine.rankers, core.Ranker{})
|
||
engine.rankers[shard].Init(options.IDOnly)
|
||
}
|
||
|
||
// 初始化分词器通道
|
||
engine.segmenterChan = make(
|
||
chan segmenterReq, options.NumGseThreads)
|
||
|
||
// 初始化索引器通道
|
||
engine.Indexer(options)
|
||
|
||
// 初始化排序器通道
|
||
engine.Ranker(options)
|
||
|
||
// engine.CheckMem(engine.initOptions.UseStore)
|
||
engine.CheckMem()
|
||
|
||
// 初始化持久化存储通道
|
||
if engine.initOptions.UseStore {
|
||
engine.InitStore()
|
||
}
|
||
|
||
// 启动分词器
|
||
for iThread := 0; iThread < options.NumGseThreads; iThread++ {
|
||
go engine.segmenterWorker()
|
||
}
|
||
|
||
// 启动索引器和排序器
|
||
for shard := 0; shard < options.NumShards; shard++ {
|
||
go engine.indexerAddDocWorker(shard)
|
||
go engine.indexerRemoveDocWorker(shard)
|
||
go engine.rankerAddDocWorker(shard)
|
||
go engine.rankerRemoveDocWorker(shard)
|
||
|
||
for i := 0; i < options.NumIndexerThreadsPerShard; i++ {
|
||
go engine.indexerLookupWorker(shard)
|
||
}
|
||
for i := 0; i < options.NumRankerThreadsPerShard; i++ {
|
||
go engine.rankerRankWorker(shard)
|
||
}
|
||
}
|
||
|
||
// 启动持久化存储工作协程
|
||
if engine.initOptions.UseStore {
|
||
engine.Store()
|
||
}
|
||
|
||
atomic.AddUint64(&engine.numDocsStored, engine.numIndexingReqs)
|
||
}
|
||
|
||
// IndexDoc add the document to the index
|
||
// 将文档加入索引
|
||
//
|
||
// 输入参数:
|
||
// docId 标识文档编号,必须唯一,docId == 0 表示非法文档(用于强制刷新索引),[1, +oo) 表示合法文档
|
||
// data 见 DocIndexData 注释
|
||
// forceUpdate 是否强制刷新 cache,如果设为 true,则尽快添加到索引,否则等待 cache 满之后一次全量添加
|
||
//
|
||
// 注意:
|
||
// 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
|
||
// 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
|
||
// 如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
|
||
func (engine *Engine) IndexDoc(docId uint64, data types.DocData,
|
||
forceUpdate ...bool) {
|
||
engine.Index(docId, data, forceUpdate...)
|
||
}
|
||
|
||
// Index add the document to the index
|
||
func (engine *Engine) Index(docId uint64, data types.DocData,
|
||
forceUpdate ...bool) {
|
||
|
||
var force bool
|
||
if len(forceUpdate) > 0 {
|
||
force = forceUpdate[0]
|
||
}
|
||
|
||
// if engine.HasDoc(docId) {
|
||
// engine.RemoveDoc(docId)
|
||
// }
|
||
|
||
// data.Tokens
|
||
engine.internalIndexDoc(docId, data, force)
|
||
|
||
hash := murmur.Sum32(fmt.Sprintf("%d", docId)) %
|
||
uint32(engine.initOptions.StoreShards)
|
||
|
||
if engine.initOptions.UseStore && docId != 0 {
|
||
engine.storeIndexDocChans[hash] <- storeIndexDocReq{
|
||
docId: docId, data: data}
|
||
}
|
||
}
|
||
|
||
func (engine *Engine) internalIndexDoc(docId uint64, data types.DocData,
|
||
forceUpdate bool) {
|
||
|
||
if !engine.initialized {
|
||
log.Fatal("The engine must be initialized first.")
|
||
}
|
||
|
||
if docId != 0 {
|
||
atomic.AddUint64(&engine.numIndexingReqs, 1)
|
||
}
|
||
if forceUpdate {
|
||
atomic.AddUint64(&engine.numForceUpdatingReqs, 1)
|
||
}
|
||
|
||
hash := murmur.Sum32(fmt.Sprintf("%d%s", docId, data.Content))
|
||
engine.segmenterChan <- segmenterReq{
|
||
docId: docId, hash: hash, data: data, forceUpdate: forceUpdate}
|
||
}
|
||
|
||
// RemoveDoc remove the document from the index
|
||
// 将文档从索引中删除
|
||
//
|
||
// 输入参数:
|
||
// docId 标识文档编号,必须唯一,docId == 0 表示非法文档(用于强制刷新索引),[1, +oo) 表示合法文档
|
||
// forceUpdate 是否强制刷新 cache,如果设为 true,则尽快删除索引,否则等待 cache 满之后一次全量删除
|
||
//
|
||
// 注意:
|
||
// 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
|
||
// 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
|
||
// 如果立刻调用 Search 可能无法查询到这个文档。强制刷新索引请调用 FlushIndex 函数。
|
||
func (engine *Engine) RemoveDoc(docId uint64, forceUpdate ...bool) {
|
||
var force bool
|
||
if len(forceUpdate) > 0 {
|
||
force = forceUpdate[0]
|
||
}
|
||
|
||
if !engine.initialized {
|
||
log.Fatal("The engine must be initialized first.")
|
||
}
|
||
|
||
if docId != 0 {
|
||
atomic.AddUint64(&engine.numRemovingReqs, 1)
|
||
}
|
||
if force {
|
||
atomic.AddUint64(&engine.numForceUpdatingReqs, 1)
|
||
}
|
||
for shard := 0; shard < engine.initOptions.NumShards; shard++ {
|
||
engine.indexerRemoveDocChans[shard] <- indexerRemoveDocReq{
|
||
docId: docId, forceUpdate: force}
|
||
|
||
if docId == 0 {
|
||
continue
|
||
}
|
||
engine.rankerRemoveDocChans[shard] <- rankerRemoveDocReq{docId: docId}
|
||
}
|
||
|
||
if engine.initOptions.UseStore && docId != 0 {
|
||
// 从数据库中删除
|
||
hash := murmur.Sum32(fmt.Sprintf("%d", docId)) %
|
||
uint32(engine.initOptions.StoreShards)
|
||
|
||
go engine.storeRemoveDocWorker(docId, hash)
|
||
}
|
||
}
|
||
|
||
// // 获取文本的分词结果
|
||
// func (engine *Engine) Tokens(text []byte) (tokens []string) {
|
||
// querySegments := engine.segmenter.Segment(text)
|
||
// for _, s := range querySegments {
|
||
// token := s.Token().Text()
|
||
// if !engine.stopTokens.IsStopToken(token) {
|
||
// tokens = append(tokens, token)
|
||
// }
|
||
// }
|
||
// return tokens
|
||
// }
|
||
|
||
// Segment get the word segmentation result of the text
|
||
// 获取文本的分词结果, 只分词与过滤弃用词
|
||
func (engine *Engine) Segment(content string) (keywords []string) {
|
||
segments := engine.segmenter.ModeSegment([]byte(content),
|
||
engine.initOptions.GseMode)
|
||
|
||
for _, segment := range segments {
|
||
token := segment.Token().Text()
|
||
if !engine.stopTokens.IsStopToken(token) {
|
||
keywords = append(keywords, token)
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// Tokens get the engine tokens
|
||
func (engine *Engine) Tokens(request types.SearchReq) (tokens []string) {
|
||
// 收集关键词
|
||
// tokens := []string{}
|
||
if request.Text != "" {
|
||
request.Text = strings.ToLower(request.Text)
|
||
if engine.initOptions.NotUseGse {
|
||
tokens = strings.Split(request.Text, " ")
|
||
} else {
|
||
// querySegments := engine.segmenter.Segment([]byte(request.Text))
|
||
// tokens = engine.Tokens([]byte(request.Text))
|
||
tokens = engine.Segment(request.Text)
|
||
}
|
||
|
||
// 叠加 tokens
|
||
for _, t := range request.Tokens {
|
||
tokens = append(tokens, t)
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
for _, t := range request.Tokens {
|
||
tokens = append(tokens, t)
|
||
}
|
||
return
|
||
}
|
||
|
||
// RankId rank docs by types.ScoredIDs
|
||
func (engine *Engine) RankId(request types.SearchReq, RankOpts types.RankOpts,
|
||
tokens []string, rankerReturnChan chan rankerReturnReq) (
|
||
output types.SearchResp) {
|
||
// 从通信通道读取排序器的输出
|
||
numDocs := 0
|
||
var rankOutput types.ScoredIDs
|
||
// var rankOutput interface{}
|
||
|
||
//**********/ begin
|
||
timeout := request.Timeout
|
||
isTimeout := false
|
||
if timeout <= 0 {
|
||
// 不设置超时
|
||
for shard := 0; shard < engine.initOptions.NumShards; shard++ {
|
||
rankerOutput := <-rankerReturnChan
|
||
if !request.CountDocsOnly {
|
||
if rankerOutput.docs != nil {
|
||
for _, doc := range rankerOutput.docs.(types.ScoredIDs) {
|
||
rankOutput = append(rankOutput, doc)
|
||
}
|
||
}
|
||
}
|
||
numDocs += rankerOutput.numDocs
|
||
}
|
||
} else {
|
||
// 设置超时
|
||
deadline := time.Now().Add(time.Nanosecond *
|
||
time.Duration(NumNanosecondsInAMillisecond*request.Timeout))
|
||
|
||
for shard := 0; shard < engine.initOptions.NumShards; shard++ {
|
||
select {
|
||
case rankerOutput := <-rankerReturnChan:
|
||
if !request.CountDocsOnly {
|
||
if rankerOutput.docs != nil {
|
||
for _, doc := range rankerOutput.docs.(types.ScoredIDs) {
|
||
rankOutput = append(rankOutput, doc)
|
||
}
|
||
}
|
||
}
|
||
numDocs += rankerOutput.numDocs
|
||
case <-time.After(deadline.Sub(time.Now())):
|
||
isTimeout = true
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 再排序
|
||
if !request.CountDocsOnly && !request.Orderless {
|
||
if RankOpts.ReverseOrder {
|
||
sort.Sort(sort.Reverse(rankOutput))
|
||
} else {
|
||
sort.Sort(rankOutput)
|
||
}
|
||
}
|
||
|
||
// 准备输出
|
||
output.Tokens = tokens
|
||
// 仅当 CountDocsOnly 为 false 时才充填 output.Docs
|
||
if !request.CountDocsOnly {
|
||
if request.Orderless {
|
||
// 无序状态无需对 Offset 截断
|
||
output.Docs = rankOutput
|
||
} else {
|
||
var start, end int
|
||
if RankOpts.MaxOutputs == 0 {
|
||
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
|
||
end = len(rankOutput)
|
||
} else {
|
||
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
|
||
end = utils.MinInt(start+RankOpts.MaxOutputs, len(rankOutput))
|
||
}
|
||
output.Docs = rankOutput[start:end]
|
||
}
|
||
}
|
||
|
||
output.NumDocs = numDocs
|
||
output.Timeout = isTimeout
|
||
|
||
return
|
||
}
|
||
|
||
// Ranks rank docs by types.ScoredDocs
|
||
func (engine *Engine) Ranks(request types.SearchReq, RankOpts types.RankOpts,
|
||
tokens []string, rankerReturnChan chan rankerReturnReq) (
|
||
output types.SearchResp) {
|
||
// 从通信通道读取排序器的输出
|
||
numDocs := 0
|
||
rankOutput := types.ScoredDocs{}
|
||
|
||
//**********/ begin
|
||
timeout := request.Timeout
|
||
isTimeout := false
|
||
if timeout <= 0 {
|
||
// 不设置超时
|
||
for shard := 0; shard < engine.initOptions.NumShards; shard++ {
|
||
rankerOutput := <-rankerReturnChan
|
||
if !request.CountDocsOnly {
|
||
if rankerOutput.docs != nil {
|
||
for _, doc := range rankerOutput.docs.(types.ScoredDocs) {
|
||
rankOutput = append(rankOutput, doc)
|
||
}
|
||
}
|
||
}
|
||
numDocs += rankerOutput.numDocs
|
||
}
|
||
} else {
|
||
// 设置超时
|
||
deadline := time.Now().Add(time.Nanosecond *
|
||
time.Duration(NumNanosecondsInAMillisecond*request.Timeout))
|
||
|
||
for shard := 0; shard < engine.initOptions.NumShards; shard++ {
|
||
select {
|
||
case rankerOutput := <-rankerReturnChan:
|
||
if !request.CountDocsOnly {
|
||
if rankerOutput.docs != nil {
|
||
for _, doc := range rankerOutput.docs.(types.ScoredDocs) {
|
||
rankOutput = append(rankOutput, doc)
|
||
}
|
||
}
|
||
}
|
||
numDocs += rankerOutput.numDocs
|
||
case <-time.After(deadline.Sub(time.Now())):
|
||
isTimeout = true
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 再排序
|
||
if !request.CountDocsOnly && !request.Orderless {
|
||
if RankOpts.ReverseOrder {
|
||
sort.Sort(sort.Reverse(rankOutput))
|
||
} else {
|
||
sort.Sort(rankOutput)
|
||
}
|
||
}
|
||
|
||
// 准备输出
|
||
output.Tokens = tokens
|
||
// 仅当 CountDocsOnly 为 false 时才充填 output.Docs
|
||
if !request.CountDocsOnly {
|
||
if request.Orderless {
|
||
// 无序状态无需对 Offset 截断
|
||
output.Docs = rankOutput
|
||
} else {
|
||
var start, end int
|
||
if RankOpts.MaxOutputs == 0 {
|
||
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
|
||
end = len(rankOutput)
|
||
} else {
|
||
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
|
||
end = utils.MinInt(start+RankOpts.MaxOutputs, len(rankOutput))
|
||
}
|
||
output.Docs = rankOutput[start:end]
|
||
}
|
||
}
|
||
|
||
output.NumDocs = numDocs
|
||
output.Timeout = isTimeout
|
||
|
||
return
|
||
}
|
||
|
||
// Search find the document that satisfies the search criteria.
|
||
// This function is thread safe
|
||
// 查找满足搜索条件的文档,此函数线程安全
|
||
func (engine *Engine) Search(request types.SearchReq) (output types.SearchResp) {
|
||
if !engine.initialized {
|
||
log.Fatal("The engine must be initialized first.")
|
||
}
|
||
|
||
tokens := engine.Tokens(request)
|
||
|
||
var RankOpts types.RankOpts
|
||
if request.RankOpts == nil {
|
||
RankOpts = *engine.initOptions.DefaultRankOpts
|
||
} else {
|
||
RankOpts = *request.RankOpts
|
||
}
|
||
if RankOpts.ScoringCriteria == nil {
|
||
RankOpts.ScoringCriteria = engine.initOptions.DefaultRankOpts.ScoringCriteria
|
||
}
|
||
|
||
// 建立排序器返回的通信通道
|
||
rankerReturnChan := make(
|
||
chan rankerReturnReq, engine.initOptions.NumShards)
|
||
|
||
// 生成查找请求
|
||
lookupRequest := indexerLookupReq{
|
||
countDocsOnly: request.CountDocsOnly,
|
||
tokens: tokens,
|
||
labels: request.Labels,
|
||
docIds: request.DocIds,
|
||
options: RankOpts,
|
||
rankerReturnChan: rankerReturnChan,
|
||
orderless: request.Orderless,
|
||
logic: request.Logic,
|
||
}
|
||
|
||
// 向索引器发送查找请求
|
||
for shard := 0; shard < engine.initOptions.NumShards; shard++ {
|
||
engine.indexerLookupChans[shard] <- lookupRequest
|
||
}
|
||
|
||
if engine.initOptions.IDOnly {
|
||
output = engine.RankId(request, RankOpts, tokens, rankerReturnChan)
|
||
return
|
||
}
|
||
|
||
output = engine.Ranks(request, RankOpts, tokens, rankerReturnChan)
|
||
return
|
||
}
|
||
|
||
// Flush block wait until all indexes are added
|
||
// 阻塞等待直到所有索引添加完毕
|
||
func (engine *Engine) Flush() {
|
||
for {
|
||
runtime.Gosched()
|
||
|
||
engine.loc.RLock()
|
||
inxd := engine.numIndexingReqs == engine.numDocsIndexed
|
||
rmd := engine.numRemovingReqs*uint64(engine.initOptions.NumShards) ==
|
||
engine.numDocsRemoved
|
||
stored := !engine.initOptions.UseStore || engine.numIndexingReqs ==
|
||
engine.numDocsStored
|
||
engine.loc.RUnlock()
|
||
|
||
if inxd && rmd && stored {
|
||
// 保证 CHANNEL 中 REQUESTS 全部被执行完
|
||
break
|
||
}
|
||
}
|
||
|
||
// 强制更新,保证其为最后的请求
|
||
engine.IndexDoc(0, types.DocData{}, true)
|
||
for {
|
||
runtime.Gosched()
|
||
|
||
engine.loc.RLock()
|
||
forced := engine.numForceUpdatingReqs*uint64(engine.initOptions.NumShards) ==
|
||
engine.numDocsForceUpdated
|
||
engine.loc.RUnlock()
|
||
|
||
if forced {
|
||
return
|
||
}
|
||
|
||
}
|
||
}
|
||
|
||
// FlushIndex block wait until all indexes are added
|
||
// 阻塞等待直到所有索引添加完毕
|
||
func (engine *Engine) FlushIndex() {
|
||
engine.Flush()
|
||
}
|
||
|
||
// Close close the engine
|
||
// 关闭引擎
|
||
func (engine *Engine) Close() {
|
||
engine.Flush()
|
||
if engine.initOptions.UseStore {
|
||
for _, db := range engine.dbs {
|
||
db.Close()
|
||
}
|
||
}
|
||
}
|
||
|
||
// 从文本hash得到要分配到的 shard
|
||
func (engine *Engine) getShard(hash uint32) int {
|
||
return int(hash - hash/uint32(engine.initOptions.NumShards)*
|
||
uint32(engine.initOptions.NumShards))
|
||
}
|