diff --git a/quesma/elasticsearch/client.go b/quesma/elasticsearch/client.go index 4568aab4a..d7cbed66e 100644 --- a/quesma/elasticsearch/client.go +++ b/quesma/elasticsearch/client.go @@ -44,6 +44,18 @@ func (es *SimpleClient) RequestWithHeaders(ctx context.Context, method, endpoint return es.doRequest(ctx, method, endpoint, body, headers) } +func (es *SimpleClient) DoRequestCheckResponseStatusOK(ctx context.Context, method, endpoint string, body []byte) (resp *http.Response, err error) { + resp, err = es.doRequest(ctx, method, endpoint, body, nil) + if err != nil { + return + } + + if resp.StatusCode != http.StatusOK { + return resp, fmt.Errorf("response code from Elastic is not 200 OK, but %s", resp.Status) + } + return resp, nil +} + func (es *SimpleClient) Authenticate(ctx context.Context, authHeader string) bool { var authEndpoint string // This is really suboptimal, and we should find a better way to set this systematically (config perhaps?) diff --git a/quesma/persistence/elastic.go b/quesma/persistence/elastic.go index ba9e959c9..aab614284 100644 --- a/quesma/persistence/elastic.go +++ b/quesma/persistence/elastic.go @@ -142,7 +142,7 @@ func (p *ElasticJSONDatabase) List() ([]string, error) { var ids []string // Unmarshal the JSON response var result map[string]interface{} - if err := json.Unmarshal(jsonAsBytes, &result); err != nil { + if err = json.Unmarshal(jsonAsBytes, &result); err != nil { log.Fatalf("Error parsing the response JSON: %s", err) } diff --git a/quesma/persistence/elastic_with_eviction.go b/quesma/persistence/elastic_with_eviction.go new file mode 100644 index 000000000..6a35a513b --- /dev/null +++ b/quesma/persistence/elastic_with_eviction.go @@ -0,0 +1,297 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package persistence + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/k0kubun/pp" + "io" + "math" + "net/http" + "quesma/quesma/config" + "quesma/quesma/types" + "time" +) + +// so far I serialize entire struct and keep only 1 string in ES +type ElasticDatabaseWithEviction struct { + ctx context.Context + *ElasticJSONDatabase // maybe remove and copy fields here + // EvictorInterface TODO: rethink how eviction should work, maybe remove + sizeInBytesLimit int64 +} + +func NewElasticDatabaseWithEviction(cfg config.ElasticsearchConfiguration, indexName string, sizeInBytesLimit int64) *ElasticDatabaseWithEviction { + return &ElasticDatabaseWithEviction{ + ctx: context.Background(), + ElasticJSONDatabase: NewElasticJSONDatabase(cfg, indexName), + // EvictorInterface: &Evictor{}, + sizeInBytesLimit: sizeInBytesLimit, + } +} + +const printDebugElasticDB = false // TODO: remove this + all occurances after final version of the storage + +func (db *ElasticDatabaseWithEviction) Put(document *JSONWithSize) error { + dbSize, err := db.SizeInBytes() + if err != nil { + return err + } + if printDebugElasticDB { + fmt.Println("kk dbg Put() dbSize:", dbSize) + } + bytesNeeded := dbSize + document.SizeInBytesTotal + if bytesNeeded > db.SizeInBytesLimit() { + return errors.New("elastic database: is full, cannot put document") + /* + TODO: restore after eviction readded + logger.Info().Msgf("elastic database: is full, need %d bytes more. Evicting documents", bytesNeeded-db.SizeInBytesLimit()) + allDocs, err := db.getAll() + if err != nil { + return err + } + bytesEvicted := db.Evict(allDocs, bytesNeeded-db.SizeInBytesLimit()) + logger.Info().Msgf("elastic database: evicted %d bytes", bytesEvicted) + bytesNeeded -= bytesEvicted + + */ + } + + elasticsearchURL := fmt.Sprintf("%s/_update/%s", db.indexName, document.id) + if printDebugElasticDB { + fmt.Println("kk dbg Put() elasticsearchURL:", elasticsearchURL) + } + + updateContent := types.JSON{} + updateContent["doc"] = document.JSON + updateContent["doc_as_upsert"] = true + + jsonData, err := json.Marshal(updateContent) + if err != nil { + return err + } + + resp, err := db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodPost, elasticsearchURL, jsonData) + if printDebugElasticDB { + fmt.Println("kk dbg Put() resp:", resp, "err:", err) + } + if err != nil && (resp == nil || resp.StatusCode != http.StatusCreated) { + return err + } + return nil +} + +// Get TODO: probably change return type to some more useful +func (db *ElasticDatabaseWithEviction) Get(id string) ([]byte, error) { + elasticsearchURL := fmt.Sprintf("%s/_source/%s", db.indexName, id) + resp, err := db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodGet, elasticsearchURL, nil) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + return io.ReadAll(resp.Body) +} + +func (db *ElasticDatabaseWithEviction) Delete(id string) error { + elasticsearchURL := fmt.Sprintf("%s/_doc/%s", db.indexName, id) + resp, err := db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodDelete, elasticsearchURL, nil) + if err != nil && (resp == nil || resp.StatusCode != http.StatusCreated) { + return err + } + return nil +} + +func (db *ElasticDatabaseWithEviction) DeleteOld(deleteOlderThan time.Duration) (err error) { + if deleteOlderThan < 1*time.Second { + deleteOlderThan = 1 * time.Second + } + + rangeStr := fmt.Sprintf("now-%dm", int(math.Floor(deleteOlderThan.Minutes()))) + if deleteOlderThan < 5*time.Minute { + rangeStr = fmt.Sprintf("now-%ds", int(math.Floor(deleteOlderThan.Seconds()))) + } + + elasticsearchURL := fmt.Sprintf("%s/_delete_by_query", db.indexName) + query := fmt.Sprintf(`{ + "query": { + "range": { + "added": { + "lte": "%s" + } + } + } + }`, rangeStr) + + if printDebugElasticDB { + fmt.Println(query) + } + + var resp *http.Response + resp, err = db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodPost, elasticsearchURL, []byte(query)) + if printDebugElasticDB { + fmt.Println("kk dbg DocCount() resp:", resp, "err:", err, "elastic url:", elasticsearchURL) + } + return err +} + +func (db *ElasticDatabaseWithEviction) DocCount() (docCount int, err error) { + elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName) + query := `{ + "_source": false, + "size": 0, + "track_total_hits": true + }` + + var resp *http.Response + resp, err = db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodGet, elasticsearchURL, []byte(query)) + if printDebugElasticDB { + fmt.Println("kk dbg DocCount() resp:", resp, "err:", err, "elastic url:", elasticsearchURL) + } + if err != nil { + if resp != nil && (resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusNotFound) { + return 0, nil + } + return -1, err + } + + var jsonAsBytes []byte + jsonAsBytes, err = io.ReadAll(resp.Body) + if err != nil { + return + } + + // Unmarshal the JSON response + var result map[string]interface{} + if err = json.Unmarshal(jsonAsBytes, &result); err != nil { + return + } + + if printDebugElasticDB { + fmt.Println("kk dbg DocCount() result:", result) + } + + return int(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)), nil // TODO: add some checks... to prevent panic +} + +func (db *ElasticDatabaseWithEviction) SizeInBytes() (sizeInBytes int64, err error) { + elasticsearchURL := fmt.Sprintf("%s/_search", db.indexName) + // TODO change query to aggregation + query := `{ + "_source": ["sizeInBytes"], + "size": 10000, + "track_total_hits": true + }` + + var resp *http.Response + resp, err = db.httpClient.DoRequestCheckResponseStatusOK(context.Background(), http.MethodGet, elasticsearchURL, []byte(query)) + if printDebugElasticDB { + fmt.Println("kk dbg SizeInBytes() err:", err, "\nresp:", resp) + } + if err != nil { + if resp != nil && resp.StatusCode == 404 { + return 0, nil + } + return + } + defer resp.Body.Close() // add everywhere + + var jsonAsBytes []byte + jsonAsBytes, err = io.ReadAll(resp.Body) + if err != nil { + return + } + + if printDebugElasticDB { + fmt.Println("kk dbg SizeInBytes() resp.StatusCode:", resp.StatusCode) + } + + // Unmarshal the JSON response + var result map[string]interface{} + if err = json.Unmarshal(jsonAsBytes, &result); err != nil { + return + } + + sizes := make([]int64, 0) + for _, hit := range result["hits"].(map[string]interface{})["hits"].([]interface{}) { + if printDebugElasticDB { + pp.Println("hit:", hit) + } + b := sizeInBytes + sizeInBytes += int64(hit.(map[string]interface{})["_source"].(map[string]interface{})["sizeInBytes"].(float64)) // TODO: add checks + sizes = append(sizes, sizeInBytes-b) + } + if printDebugElasticDB { + fmt.Println("kk dbg SizeInBytes() sizes in storage:", sizes) + } + return sizeInBytes, nil +} + +func (db *ElasticDatabaseWithEviction) SizeInBytesLimit() int64 { + return db.sizeInBytesLimit +} + +/* TODO: restore after eviction readded, or remove +func (db *ElasticDatabaseWithEviction) getAll() (documents []*JSONWithSize, err error) { + _ = fmt.Sprintf("%s/_search", db.indexName) + _ = `{ + "_source": { + "excludes": "data" + }, + "size": 10000, + "track_total_hits": true + }` + + db.httpClient. + + resp, err := db.httpClient.Request(context.Background(), "GET", elasticsearchURL, []byte(query)) + if err != nil { + return + } + defer resp.Body.Close() + + jsonAsBytes, err := io.ReadAll(resp.Body) + if err != nil { + return + } + + fmt.Println("kk dbg getAll() resp.StatusCode:", resp.StatusCode) + + switch resp.StatusCode { + case http.StatusOK: + break + default: + logger.WarnWithCtx(db.ctx).Msgf("failed to get from elastic: %s, response status code: %v", string(jsonAsBytes), resp.StatusCode) + return + } + + // Unmarshal the JSON response + var result map[string]interface{} + if err = json.Unmarshal(jsonAsBytes, &result); err != nil { + logger.WarnWithCtx(db.ctx).Msgf("Error parsing the response JSON: %s", err) + return + } + + fmt.Println("kk dbg getAll() documents:") + for _, hit := range result["hits"].(map[string]interface{})["hits"].([]interface{}) { + doc := &document{ + Id: hit.(map[string]interface{})["_id"].(string), + Index: hit.(map[string]interface{})["_index"].(string), + SizeInBytes: int64(hit.(map[string]interface{})["_source"].(map[string]interface{})["sizeInBytes"].(float64)), // TODO: add checks + //Timestamp: hit.(map[string]interface{})["_source"].(map[string]interface{})["timestamp"].(time.Time), // TODO: add checks + MarkedAsDeleted: hit.(map[string]interface{})["_source"].(map[string]interface{})["markedAsDeleted"].(bool), // TODO: add checks + } + fmt.Println(doc) + documents = append(documents, doc) + } + return documents, nil +} +*/ + +func (db *ElasticDatabaseWithEviction) fullIndexName() string { + now := time.Now().UTC() + return fmt.Sprintf("%s-%d-%d-%d-%d-%d-%d", db.indexName, now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second()) +} diff --git a/quesma/persistence/evictor.go b/quesma/persistence/evictor.go new file mode 100644 index 000000000..fa84e9f12 --- /dev/null +++ b/quesma/persistence/evictor.go @@ -0,0 +1,17 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package persistence + +type EvictorInterface interface { + Evict(documents []*JSONWithSize, sizeNeeded int64) (bytesEvicted int64) +} + +// TODO: Find out how this might work. My old idea doesn't work now, +// as don't remove entire indices, but delete single documents. +// (It turned out consistency was too eventual to rely on it) +// old comment: It's only 1 implementation, which looks well suited for ElasticSearch. It can be implemented differently. +type Evictor struct{} + +func (e *Evictor) Evict(documents []*JSONWithSize, sizeNeeded int64) (bytesEvicted int64) { + panic("implement me (or remove)") +} diff --git a/quesma/persistence/model.go b/quesma/persistence/model.go index b1b16c3b8..9fd2e7be2 100644 --- a/quesma/persistence/model.go +++ b/quesma/persistence/model.go @@ -2,16 +2,45 @@ // SPDX-License-Identifier: Elastic-2.0 package persistence -// JSONDatabase is an interface for a database that stores JSON data. -// Treat it as `etcd` equivalent rather than `MongoDB`. -// The main usage is to store our configuration data, like -// - schema -// - user settings -// - statistics, etc -// -// For each case, we should have a separate database. -type JSONDatabase interface { - List() (keys []string, err error) - Get(key string) (string, bool, error) - Put(key string, data string) error +import ( + "quesma/quesma/types" + "time" +) + +type ( + // JSONDatabase is an interface for a database that stores JSON data. + // Treat it as `etcd` equivalent rather than `MongoDB`. + // The main usage is to store our configuration data, like + // - schema + // - user settings + // - statistics, etc + // + // For each case, we should have a separate database. + JSONDatabase interface { + List() (keys []string, err error) + Get(key string) (string, bool, error) + Put(key string, data string) error + } + DatabaseWithEviction interface { // for sure JSON? maybe not only json? check + Put(doc *JSONWithSize) error + Get(id string) ([]byte, error) + Delete(id string) error + DeleteOld(time.Duration) error + DocCount() (int, error) + SizeInBytes() (int64, error) + SizeInBytesLimit() int64 + } + JSONWithSize struct { + types.JSON + id string + SizeInBytesTotal int64 + } +) + +func NewJSONWithSize(data types.JSON, id string, sizeInBytesTotal int64) *JSONWithSize { + return &JSONWithSize{ + JSON: data, + id: id, + SizeInBytesTotal: sizeInBytesTotal, + } } diff --git a/quesma/persistence/persistence_test.go b/quesma/persistence/persistence_test.go index 1bde8e446..4518c5a17 100644 --- a/quesma/persistence/persistence_test.go +++ b/quesma/persistence/persistence_test.go @@ -4,13 +4,17 @@ package persistence import ( "fmt" + "github.com/stretchr/testify/assert" "net/url" + "quesma/logger" "quesma/quesma/config" "quesma/quesma/types" "testing" "time" ) +const elasticUpdateTime = 2 * time.Second // time to wait for elastic to update + func TestNewElasticPersistence(t *testing.T) { var p JSONDatabase @@ -36,6 +40,7 @@ func TestNewElasticPersistence(t *testing.T) { } p = NewElasticJSONDatabase(cfg, indexName) + fmt.Println("??") } m1 := make(types.JSON) @@ -78,5 +83,165 @@ func TestNewElasticPersistence(t *testing.T) { if d2["foo"] != "bar" { t.Fatal("expected bar") } +} + +func TestJSONDatabaseWithEviction_noEviction(t *testing.T) { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + logger.InitSimpleLoggerForTests() + indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli()) + fmt.Println("indexName:", indexName) + + realUrl, err := url.Parse("http://localhost:9200") + assert.NoError(t, err) + cfgUrl := config.Url(*realUrl) + cfg := config.ElasticsearchConfiguration{Url: &cfgUrl} + + const bigSizeLimit = int64(1_000_000_000) + db := NewElasticDatabaseWithEviction(cfg, indexName, bigSizeLimit) + + // check initial state + assert.Equal(t, bigSizeLimit, db.SizeInBytesLimit()) + + docCount, err := db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 0, docCount) + + sizeInBytes, err := db.SizeInBytes() + assert.NoError(t, err) + assert.Equal(t, int64(0), sizeInBytes) + + // put first documents + docs := []*JSONWithSize{ + doc("doc1", 100), + doc("doc2", 200), + doc("doc3", 300), + doc("doc4", 400), + doc("doc5", 500), + } + for _, d := range docs { + assert.NoError(t, db.Put(d)) + } + + // check state after put (5 documents + "get" OK) + time.Sleep(elasticUpdateTime) + docCount, err = db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 5, docCount) + + val, err := db.Get(docs[0].id) + assert.NoError(t, err) + assert.Contains(t, string(val), `"id":"doc1"`) + assert.Contains(t, string(val), `"sizeInBytes":100`) + + // delete some documents + err = db.Delete(docs[1].id) + assert.NoError(t, err) + err = db.Delete(docs[3].id) + assert.NoError(t, err) + + // doc_count should be 3 and "get" should fail for deleted documents + time.Sleep(elasticUpdateTime) + docCount, err = db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 3, docCount) + val, err = db.Get(docs[1].id) + assert.Error(t, err) + assert.Empty(t, val) + val, err = db.Get(docs[3].id) + assert.Error(t, err) + assert.Empty(t, val) + + assert.Equal(t, bigSizeLimit, db.SizeInBytesLimit()) +} + +func TestJSONDatabaseWithEviction_withEviction(t *testing.T) { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + logger.InitSimpleLoggerForTests() + indexName := fmt.Sprintf("quesma_test_%d", time.Now().UnixMilli()) + + realUrl, err := url.Parse("http://localhost:9200") + assert.NoError(t, err) + + cfgUrl := config.Url(*realUrl) + cfg := config.ElasticsearchConfiguration{Url: &cfgUrl} + + const smallSizeLimit = int64(1100) + db := NewElasticDatabaseWithEviction(cfg, indexName, smallSizeLimit) + fmt.Println("indexName:", indexName, "fullIndexName:", db.fullIndexName()) + + // check initial state + assert.Equal(t, smallSizeLimit, db.SizeInBytesLimit()) + + docCount, err := db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 0, docCount) + + sizeInBytes, err := db.SizeInBytes() + assert.NoError(t, err) + assert.Equal(t, int64(0), sizeInBytes) + + // put first documents + docs := []*JSONWithSize{ + doc("doc1", 200), + doc("doc2", 300), + doc("doc3", 400), + doc("doc4", 600), + doc("doc5", 500), + } + for _, d := range docs[:2] { + fmt.Println("put", d.SizeInBytesTotal, db.Put(d)) + } + time.Sleep(elasticUpdateTime) + fmt.Println("put", docs[2].SizeInBytesTotal, db.Put(docs[2])) + time.Sleep(elasticUpdateTime) + + docCount, err = db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 3, docCount) + + // storage should be full => error on put + err = db.Put(docs[3]) + assert.Error(t, err) + + err = db.Delete("doc2") + assert.NoError(t, err) + + time.Sleep(elasticUpdateTime) + + docCount, err = db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 2, docCount) + + err = db.Put(docs[4]) + assert.NoError(t, err) + + time.Sleep(elasticUpdateTime) + + docCount, err = db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 3, docCount) + + val, ok := db.Get(docs[0].id) + fmt.Println(val, ok) + // TODO: deserialize and check content + + err = db.Delete(docs[0].id) + assert.NoError(t, err) + err = db.Delete(docs[3].id) + assert.Error(t, err) + + time.Sleep(elasticUpdateTime) + docCount, err = db.DocCount() + assert.NoError(t, err) + assert.Equal(t, 2, docCount) + + assert.Equal(t, smallSizeLimit, db.SizeInBytesLimit()) +} +func doc(id string, size int64) *JSONWithSize { + json := types.JSON{} + json["id"] = id + json["sizeInBytes"] = size + json["timestamp"] = time.Now() + return NewJSONWithSize(json, id, size) } diff --git a/quesma/quesma/async_search_storage/evictor.go b/quesma/quesma/async_search_storage/evictor.go new file mode 100644 index 000000000..d53ac07a4 --- /dev/null +++ b/quesma/quesma/async_search_storage/evictor.go @@ -0,0 +1,45 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package async_search_storage + +import ( + "context" + "quesma/logger" + "quesma/quesma/recovery" + "time" +) + +type AsyncQueriesEvictor struct { + ctx context.Context + cancel context.CancelFunc + AsyncRequestStorage AsyncRequestResultStorage + AsyncQueriesContexts AsyncQueryContextStorage +} + +func NewAsyncQueriesEvictor(AsyncRequestStorage AsyncRequestResultStorage, AsyncQueriesContexts AsyncQueryContextStorage) *AsyncQueriesEvictor { + ctx, cancel := context.WithCancel(context.Background()) + return &AsyncQueriesEvictor{ctx: ctx, cancel: cancel, AsyncRequestStorage: AsyncRequestStorage, AsyncQueriesContexts: AsyncQueriesContexts} +} + +func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(olderThan time.Duration) { + e.AsyncRequestStorage.evict(olderThan) + e.AsyncQueriesContexts.evict(olderThan) +} + +func (e *AsyncQueriesEvictor) AsyncQueriesGC() { + defer recovery.LogPanic() + for { + select { + case <-e.ctx.Done(): + logger.Debug().Msg("evictor stopped") + return + case <-time.After(gcInterval): + e.tryEvictAsyncRequests(evictionInterval) + } + } +} + +func (e *AsyncQueriesEvictor) Close() { + e.cancel() + logger.Info().Msg("AsyncQueriesEvictor Stopped") +} diff --git a/quesma/quesma/async_search_storage/in_elastic.go b/quesma/quesma/async_search_storage/in_elastic.go deleted file mode 100644 index 2575a1057..000000000 --- a/quesma/quesma/async_search_storage/in_elastic.go +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright Quesma, licensed under the Elastic License 2.0. -// SPDX-License-Identifier: Elastic-2.0 -package async_search_storage - -// TODO :( diff --git a/quesma/quesma/async_search_storage/in_elasticsearch.go b/quesma/quesma/async_search_storage/in_elasticsearch.go new file mode 100644 index 000000000..62d53cb2b --- /dev/null +++ b/quesma/quesma/async_search_storage/in_elasticsearch.go @@ -0,0 +1,130 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package async_search_storage + +import ( + "encoding/json" + "fmt" + "quesma/logger" + "quesma/persistence" + "quesma/quesma/config" + "time" +) + +type AsyncRequestResultStorageInElasticsearch struct { + db *persistence.ElasticDatabaseWithEviction +} + +func NewAsyncRequestResultStorageInElasticsearch(cfg config.ElasticsearchConfiguration) AsyncRequestResultStorage { + /* some test config, maybe you'll find it easier to debug with it + realUrl, err := url.Parse("http://localhost:9201") + if err != nil { + fmt.Println("ERR", err) + } + cfgUrl := config.Url(*realUrl) + cfg := config.ElasticsearchConfiguration{ + Url: &cfgUrl, + User: "", + Password: "", + } + fmt.Println("kk dbg NewAsyncRequestResultStorageInElasticsearch() i:", cfg) + return AsyncRequestResultStorageInElasticsearch{ + db: persistence.NewElasticDatabaseWithEviction(cfg, "quesma_async_storage-"+strconv.Itoa(i), 1_000_000_000), + } + */ + return AsyncRequestResultStorageInElasticsearch{ + db: persistence.NewElasticDatabaseWithEviction(cfg, defaultElasticDbName, defaultElasticDbStorageLimitInBytes), + } +} + +func (s AsyncRequestResultStorageInElasticsearch) Store(id string, result *AsyncRequestResult) { + err := s.db.Put(result.toJSON(id)) + if err != nil { + logger.Warn().Err(err).Msg("failed to store document") + } +} + +func (s AsyncRequestResultStorageInElasticsearch) Load(id string) (*AsyncRequestResult, error) { + resultAsBytes, err := s.db.Get(id) + if err != nil { + return nil, err + } + + result := AsyncRequestResult{} + err = json.Unmarshal(resultAsBytes, &result) + if err != nil { + return nil, err + } + + return &result, nil +} + +func (s AsyncRequestResultStorageInElasticsearch) Delete(id string) { + err := s.db.Delete(id) + if err != nil { + logger.Warn().Err(err).Msg("failed to delete document") + } +} + +func (s AsyncRequestResultStorageInElasticsearch) DeleteOld(t time.Duration) { + err := s.db.DeleteOld(t) + if err != nil { + logger.Warn().Err(err).Msg("failed to delete old documents") + } +} + +// DocCount returns the number of documents in the database, or -1 if the count could not be retrieved. +func (s AsyncRequestResultStorageInElasticsearch) DocCount() int { + cnt, err := s.db.DocCount() + if err != nil { + logger.Warn().Err(err).Msg("failed to get document count") + return -1 + } + return cnt +} + +// StorageSizeInBytes returns the total size of all documents in the database, or -1 if the size could not be retrieved. +func (s AsyncRequestResultStorageInElasticsearch) SpaceInUse() int64 { + size, err := s.db.SizeInBytes() + if err != nil { + logger.Warn().Err(err).Msg("failed to get storage size") + return -1 + } + return size +} + +func (s AsyncRequestResultStorageInElasticsearch) SpaceMaxAvailable() int64 { + return s.db.SizeInBytesLimit() +} + +func (s AsyncRequestResultStorageInElasticsearch) evict(evictOlderThan time.Duration) { + err := s.db.DeleteOld(evictOlderThan) + if err != nil { + logger.Warn().Err(err).Msgf("failed to evict documents, err: %v", err) + } +} + +type AsyncQueryContextStorageInElasticsearch struct { + db *persistence.ElasticDatabaseWithEviction +} + +func NewAsyncQueryContextStorageInElasticsearch(cfg config.ElasticsearchConfiguration) AsyncQueryContextStorage { + fmt.Println("kk dbg NewAsyncQueryContextStorageInElasticsearch() i:", cfg) + return AsyncQueryContextStorageInElasticsearch{ + db: persistence.NewElasticDatabaseWithEviction(cfg, "async_search", 1_000_000_000), + } +} + +func (s AsyncQueryContextStorageInElasticsearch) Store(context *AsyncQueryContext) { + err := s.db.Put(context.toJSON()) + if err != nil { + logger.Warn().Err(err).Msg("failed to store document") + } +} + +func (s AsyncQueryContextStorageInElasticsearch) evict(evictOlderThan time.Duration) { + err := s.db.DeleteOld(evictOlderThan) + if err != nil { + logger.Warn().Err(err).Msg("failed to delete old documents") + } +} diff --git a/quesma/quesma/async_search_storage/in_memory.go b/quesma/quesma/async_search_storage/in_memory.go index 129c73f95..0979f8004 100644 --- a/quesma/quesma/async_search_storage/in_memory.go +++ b/quesma/quesma/async_search_storage/in_memory.go @@ -4,6 +4,8 @@ package async_search_storage import ( "context" + "fmt" + "math" "quesma/logger" "quesma/quesma/recovery" "quesma/tracing" @@ -12,88 +14,84 @@ import ( "time" ) -const EvictionInterval = 15 * time.Minute -const GCInterval = 1 * time.Minute - -type AsyncSearchStorageInMemory struct { +type AsyncRequestResultStorageInMemory struct { idToResult *util.SyncMap[string, *AsyncRequestResult] } -func NewAsyncSearchStorageInMemory() AsyncSearchStorageInMemory { - return AsyncSearchStorageInMemory{ +func NewAsyncRequestResultStorageInMemory() AsyncRequestResultStorage { // change result type to AsyncRequestResultStorage interface + return AsyncRequestResultStorageInMemory{ idToResult: util.NewSyncMap[string, *AsyncRequestResult](), } } -func (s AsyncSearchStorageInMemory) Store(id string, result *AsyncRequestResult) { +func (s AsyncRequestResultStorageInMemory) Store(id string, result *AsyncRequestResult) { s.idToResult.Store(id, result) } -func (s AsyncSearchStorageInMemory) Range(f func(key string, value *AsyncRequestResult) bool) { +func (s AsyncRequestResultStorageInMemory) Range(f func(key string, value *AsyncRequestResult) bool) { s.idToResult.Range(f) } -func (s AsyncSearchStorageInMemory) Load(id string) (*AsyncRequestResult, bool) { - return s.idToResult.Load(id) +func (s AsyncRequestResultStorageInMemory) Load(id string) (*AsyncRequestResult, error) { + if val, ok := s.idToResult.Load(id); ok { + return val, nil + } + return nil, fmt.Errorf("key %s not found", id) } -func (s AsyncSearchStorageInMemory) Delete(id string) { +func (s AsyncRequestResultStorageInMemory) Delete(id string) { s.idToResult.Delete(id) } -func (s AsyncSearchStorageInMemory) Size() int { +func (s AsyncRequestResultStorageInMemory) DocCount() int { return s.idToResult.Size() } -type AsyncQueryContextStorageInMemory struct { - idToContext *util.SyncMap[string, *AsyncQueryContext] -} - -func NewAsyncQueryContextStorageInMemory() AsyncQueryContextStorageInMemory { - return AsyncQueryContextStorageInMemory{ - idToContext: util.NewSyncMap[string, *AsyncQueryContext](), - } +// in bytes +func (s AsyncRequestResultStorageInMemory) SpaceInUse() int64 { + size := int64(0) + s.Range(func(key string, value *AsyncRequestResult) bool { + size += int64(len(value.ResponseBody)) + return true + }) + return size } -func (s AsyncQueryContextStorageInMemory) Store(id string, context *AsyncQueryContext) { - s.idToContext.Store(id, context) +func (s AsyncRequestResultStorageInMemory) SpaceMaxAvailable() int64 { + return math.MaxInt64 / 16 // some huge number for now, can be changed if we want to limit in-memory storage } -type AsyncQueriesEvictor struct { - ctx context.Context - cancel context.CancelFunc - AsyncRequestStorage AsyncSearchStorageInMemory - AsyncQueriesContexts AsyncQueryContextStorageInMemory +func (s AsyncRequestResultStorageInMemory) evict(evictOlderThan time.Duration) { + var ids []string + s.Range(func(key string, value *AsyncRequestResult) bool { + if time.Since(value.Added) > evictOlderThan { + ids = append(ids, key) + } + return true + }) + for _, id := range ids { + s.Delete(id) + } } -func NewAsyncQueriesEvictor(AsyncRequestStorage AsyncSearchStorageInMemory, AsyncQueriesContexts AsyncQueryContextStorageInMemory) *AsyncQueriesEvictor { - ctx, cancel := context.WithCancel(context.Background()) - return &AsyncQueriesEvictor{ctx: ctx, cancel: cancel, AsyncRequestStorage: AsyncRequestStorage, AsyncQueriesContexts: AsyncQueriesContexts} +type AsyncQueryContextStorageInMemory struct { + idToContext *util.SyncMap[string, *AsyncQueryContext] } -func elapsedTime(t time.Time) time.Duration { - return time.Since(t) +func NewAsyncQueryContextStorageInMemory() AsyncQueryContextStorage { + return AsyncQueryContextStorageInMemory{ + idToContext: util.NewSyncMap[string, *AsyncQueryContext](), + } } -type asyncQueryIdWithTime struct { - id string - time time.Time +func (s AsyncQueryContextStorageInMemory) Store(context *AsyncQueryContext) { + s.idToContext.Store(context.id, context) } -func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time.Duration) { - var ids []asyncQueryIdWithTime - e.AsyncRequestStorage.Range(func(key string, value *AsyncRequestResult) bool { - if timeFun(value.added) > EvictionInterval { - ids = append(ids, asyncQueryIdWithTime{id: key, time: value.added}) - } - return true - }) - for _, id := range ids { - e.AsyncRequestStorage.idToResult.Delete(id.id) - } +func (s AsyncQueryContextStorageInMemory) evict(evictOlderThan time.Duration) { var asyncQueriesContexts []*AsyncQueryContext - e.AsyncQueriesContexts.idToContext.Range(func(key string, value *AsyncQueryContext) bool { - if timeFun(value.added) > EvictionInterval { + s.idToContext.Range(func(key string, value *AsyncQueryContext) bool { + if time.Since(value.added) > evictOlderThan { if value != nil { asyncQueriesContexts = append(asyncQueriesContexts, value) } @@ -102,7 +100,7 @@ func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time }) evictedIds := make([]string, 0) for _, asyncQueryContext := range asyncQueriesContexts { - e.AsyncQueriesContexts.idToContext.Delete(asyncQueryContext.id) + s.idToContext.Delete(asyncQueryContext.id) if asyncQueryContext.cancel != nil { evictedIds = append(evictedIds, asyncQueryContext.id) asyncQueryContext.cancel() @@ -113,22 +111,8 @@ func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time } } -func (e *AsyncQueriesEvictor) AsyncQueriesGC() { - defer recovery.LogPanic() - for { - select { - case <-e.ctx.Done(): - logger.Debug().Msg("evictor stopped") - return - case <-time.After(GCInterval): - e.tryEvictAsyncRequests(elapsedTime) - } - } -} - -func (e *AsyncQueriesEvictor) Close() { - e.cancel() - logger.Info().Msg("AsyncQueriesEvictor Stopped") +func elapsedTime(t time.Time) time.Duration { + return time.Since(t) } type AsyncQueryTraceLoggerEvictor struct { @@ -151,7 +135,7 @@ func (e *AsyncQueryTraceLoggerEvictor) Stop() { func (e *AsyncQueryTraceLoggerEvictor) TryFlushHangingAsyncQueryTrace(timeFun func(time.Time) time.Duration) { asyncIds := []string{} e.AsyncQueryTrace.Range(func(key string, value tracing.TraceCtx) bool { - if timeFun(value.Added) > EvictionInterval { + if timeFun(value.Added) > evictionInterval { asyncIds = append(asyncIds, key) logger.Error().Msgf("Async query %s was not finished", key) var formattedLines strings.Builder @@ -170,7 +154,7 @@ func (e *AsyncQueryTraceLoggerEvictor) FlushHangingAsyncQueryTrace(timeFun func( defer recovery.LogPanic() for { select { - case <-time.After(GCInterval): + case <-time.After(gcInterval): e.TryFlushHangingAsyncQueryTrace(timeFun) case <-e.ctx.Done(): logger.Debug().Msg("AsyncQueryTraceLoggerEvictor stopped") diff --git a/quesma/quesma/async_search_storage/in_memory_fallback_elasticsearch.go b/quesma/quesma/async_search_storage/in_memory_fallback_elasticsearch.go new file mode 100644 index 000000000..5cdc15c62 --- /dev/null +++ b/quesma/quesma/async_search_storage/in_memory_fallback_elasticsearch.go @@ -0,0 +1,80 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package async_search_storage + +import ( + "quesma/quesma/config" + "time" +) + +type AsyncRequestResultStorageInMemoryFallbackElastic struct { + inMemory AsyncRequestResultStorageInMemory + inElasticsearch AsyncRequestResultStorageInElasticsearch +} + +func NewAsyncSearchStorageInMemoryFallbackElastic(cfg config.ElasticsearchConfiguration) AsyncRequestResultStorageInMemoryFallbackElastic { + return AsyncRequestResultStorageInMemoryFallbackElastic{ + inMemory: NewAsyncRequestResultStorageInMemory().(AsyncRequestResultStorageInMemory), + inElasticsearch: NewAsyncRequestResultStorageInElasticsearch(cfg).(AsyncRequestResultStorageInElasticsearch), + } +} + +func (s AsyncRequestResultStorageInMemoryFallbackElastic) Store(id string, result *AsyncRequestResult) { + s.inMemory.Store(id, result) + go s.inElasticsearch.Store(id, result) +} + +func (s AsyncRequestResultStorageInMemoryFallbackElastic) Load(id string) (*AsyncRequestResult, error) { + result, err := s.inMemory.Load(id) + if err == nil { + return result, nil + } + return s.inElasticsearch.Load(id) +} + +func (s AsyncRequestResultStorageInMemoryFallbackElastic) Delete(id string) { + s.inMemory.Delete(id) + go s.inElasticsearch.Delete(id) +} + +// DocCount returns inMemory doc count +func (s AsyncRequestResultStorageInMemoryFallbackElastic) DocCount() int { + return s.inMemory.DocCount() +} + +// SpaceInUse returns inMemory size in bytes +func (s AsyncRequestResultStorageInMemoryFallbackElastic) SpaceInUse() int64 { + return s.inMemory.SpaceInUse() +} + +// SpaceMaxAvailable returns inMemory size in bytes limit +func (s AsyncRequestResultStorageInMemoryFallbackElastic) SpaceMaxAvailable() int64 { + return s.inMemory.SpaceMaxAvailable() +} + +func (s AsyncRequestResultStorageInMemoryFallbackElastic) evict(olderThan time.Duration) { + s.inMemory.evict(olderThan) + go s.inElasticsearch.DeleteOld(olderThan) +} + +type AsyncQueryContextStorageInMemoryFallbackElasticsearch struct { + inMemory AsyncQueryContextStorageInMemory + inElasticsearch AsyncQueryContextStorageInElasticsearch +} + +func NewAsyncQueryContextStorageInMemoryFallbackElasticsearch(cfg config.ElasticsearchConfiguration) AsyncQueryContextStorage { + return AsyncQueryContextStorageInMemoryFallbackElasticsearch{ + inMemory: NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory), + inElasticsearch: NewAsyncQueryContextStorageInElasticsearch(cfg).(AsyncQueryContextStorageInElasticsearch), + } +} + +func (s AsyncQueryContextStorageInMemoryFallbackElasticsearch) Store(context *AsyncQueryContext) { + s.inMemory.Store(context) + go s.inElasticsearch.Store(context) +} + +func (s AsyncQueryContextStorageInMemoryFallbackElasticsearch) evict(evictOlderThan time.Duration) { + s.inMemory.evict(evictOlderThan) + go s.inElasticsearch.evict(evictOlderThan) +} diff --git a/quesma/quesma/async_search_storage/in_memory_test.go b/quesma/quesma/async_search_storage/in_memory_test.go deleted file mode 100644 index 59b129d2f..000000000 --- a/quesma/quesma/async_search_storage/in_memory_test.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright Quesma, licensed under the Elastic License 2.0. -// SPDX-License-Identifier: Elastic-2.0 -package async_search_storage - -import ( - "github.com/stretchr/testify/assert" - "quesma/util" - "testing" - "time" -) - -func TestAsyncQueriesEvictorTimePassed(t *testing.T) { - queryContextStorage := NewAsyncQueryContextStorageInMemory() - queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) - evictor := NewAsyncQueriesEvictor(NewAsyncSearchStorageInMemory(), queryContextStorage) - evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{added: time.Now()}) - evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{added: time.Now()}) - evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{added: time.Now()}) - evictor.tryEvictAsyncRequests(func(time.Time) time.Duration { - return 20 * time.Minute - }) - - assert.Equal(t, 0, evictor.AsyncRequestStorage.Size()) -} - -func TestAsyncQueriesEvictorStillAlive(t *testing.T) { - queryContextStorage := NewAsyncQueryContextStorageInMemory() - queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) - evictor := NewAsyncQueriesEvictor(NewAsyncSearchStorageInMemory(), queryContextStorage) - evictor.AsyncRequestStorage.idToResult = util.NewSyncMap[string, *AsyncRequestResult]() - evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{added: time.Now()}) - evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{added: time.Now()}) - evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{added: time.Now()}) - evictor.tryEvictAsyncRequests(func(time.Time) time.Duration { - return time.Second - }) - - assert.Equal(t, 3, evictor.AsyncRequestStorage.Size()) -} diff --git a/quesma/quesma/async_search_storage/model.go b/quesma/quesma/async_search_storage/model.go index 86ceeef88..e63ad2658 100644 --- a/quesma/quesma/async_search_storage/model.go +++ b/quesma/quesma/async_search_storage/model.go @@ -4,43 +4,53 @@ package async_search_storage import ( "context" + "quesma/persistence" + "quesma/quesma/types" "time" ) -type AsyncRequestResultStorage interface { - Store(id string, result *AsyncRequestResult) - Range(func(key string, value *AsyncRequestResult) bool) // ideally I'd like to get rid of this, but not sure if it's possible - Load(id string) (*AsyncRequestResult, bool) - Delete(id string) - Size() int -} +const ( + evictionInterval = 15 * time.Minute + gcInterval = 1 * time.Minute + defaultElasticDbName = "async_search_storage" + defaultElasticDbStorageLimitInBytes = int64(100 * 1024 * 1024 * 1024) // 100GB +) -// TODO: maybe merge those 2? -type AsyncQueryContextStorage interface { - Store(id string, context *AsyncQueryContext) -} +type ( + AsyncRequestResultStorage interface { + Store(id string, result *AsyncRequestResult) + Load(id string) (*AsyncRequestResult, error) + Delete(id string) + DocCount() int + SpaceInUse() int64 + SpaceMaxAvailable() int64 -type AsyncRequestResult struct { - responseBody []byte - added time.Time - isCompressed bool - err error -} + evict(olderThan time.Duration) + } + AsyncQueryContextStorage interface { + Store(context *AsyncQueryContext) + evict(olderThan time.Duration) + } + AsyncRequestResult struct { + ResponseBody []byte `json:"responseBody"` + Added time.Time `json:"added"` + IsCompressed bool `json:"isCompressed"` + Err error `json:"err"` + } +) func NewAsyncRequestResult(responseBody []byte, err error, added time.Time, isCompressed bool) *AsyncRequestResult { - return &AsyncRequestResult{responseBody: responseBody, err: err, added: added, isCompressed: isCompressed} + return &AsyncRequestResult{ResponseBody: responseBody, Err: err, Added: added, IsCompressed: isCompressed} } -func (r *AsyncRequestResult) GetResponseBody() []byte { - return r.responseBody -} - -func (r *AsyncRequestResult) GetErr() error { - return r.err -} - -func (r *AsyncRequestResult) IsCompressed() bool { - return r.isCompressed +func (r *AsyncRequestResult) toJSON(id string) *persistence.JSONWithSize { + const sizeInBytesUpperBoundEstimate = int64(100) // 100 is a rough upper bound estimate of the size of the rest of the fields + json := types.JSON{} + json["id"] = id + json["data"] = string(r.ResponseBody) + json["sizeInBytes"] = int64(len(r.ResponseBody)) + int64(len(id)) + sizeInBytesUpperBoundEstimate + json["added"] = r.Added + return persistence.NewJSONWithSize(json, id, json["sizeInBytes"].(int64)) } type AsyncQueryContext struct { @@ -53,3 +63,12 @@ type AsyncQueryContext struct { func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id string) *AsyncQueryContext { return &AsyncQueryContext{ctx: ctx, cancel: cancel, added: time.Now(), id: id} } + +func (c *AsyncQueryContext) toJSON() *persistence.JSONWithSize { + const sizeInBytesUpperBoundEstimate = int64(100) + json := types.JSON{} + json["id"] = c.id + json["added"] = c.added + json["sizeInBytes"] = sizeInBytesUpperBoundEstimate + return persistence.NewJSONWithSize(json, c.id, sizeInBytesUpperBoundEstimate) +} diff --git a/quesma/quesma/async_search_storage/storage_test.go b/quesma/quesma/async_search_storage/storage_test.go new file mode 100644 index 000000000..4af98fbc4 --- /dev/null +++ b/quesma/quesma/async_search_storage/storage_test.go @@ -0,0 +1,161 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package async_search_storage + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/k0kubun/pp" + "github.com/stretchr/testify/assert" + "net/url" + "quesma/quesma/config" + "testing" + "time" +) + +func TestAsyncQueriesEvictorTimePassed(t *testing.T) { + storageKinds := []AsyncRequestResultStorage{ + NewAsyncRequestResultStorageInMemory(), + NewAsyncRequestResultStorageInElasticsearch(testConfig()), + NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()), + } + for _, storage := range storageKinds { + t.Run(fmt.Sprintf("storage %T", storage), func(t *testing.T) { + _, inMemory := storage.(AsyncRequestResultStorageInMemory) + if !inMemory { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + } + + queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory) + queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) + evictor := NewAsyncQueriesEvictor(storage, queryContextStorage) + evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now().Add(-2 * time.Second)}) + evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now().Add(-5 * time.Second)}) + evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now().Add(2 * time.Second)}) + + if !inMemory { + time.Sleep(2 * time.Second) + } + evictor.tryEvictAsyncRequests(1 * time.Second) + if !inMemory { + time.Sleep(2 * time.Second) + } + + assert.Equal(t, 1, evictor.AsyncRequestStorage.DocCount()) + }) + } +} + +func TestAsyncQueriesEvictorStillAlive(t *testing.T) { + storageKinds := []AsyncRequestResultStorage{ + NewAsyncRequestResultStorageInMemory(), + NewAsyncRequestResultStorageInElasticsearch(testConfig()), + NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()), + } + for _, storage := range storageKinds { + t.Run(fmt.Sprintf("storage %T", storage), func(t *testing.T) { + _, inMemory := storage.(AsyncRequestResultStorageInMemory) + if !inMemory { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + } + + queryContextStorage := NewAsyncQueryContextStorageInMemory().(AsyncQueryContextStorageInMemory) + queryContextStorage.idToContext.Store("1", &AsyncQueryContext{}) + evictor := NewAsyncQueriesEvictor(storage, queryContextStorage) + evictor.AsyncRequestStorage.Store("1", &AsyncRequestResult{Added: time.Now()}) + evictor.AsyncRequestStorage.Store("2", &AsyncRequestResult{Added: time.Now()}) + evictor.AsyncRequestStorage.Store("3", &AsyncRequestResult{Added: time.Now()}) + + if !inMemory { + time.Sleep(2 * time.Second) + } + evictor.tryEvictAsyncRequests(10 * time.Second) + if !inMemory { + time.Sleep(2 * time.Second) + } + + assert.Equal(t, 3, evictor.AsyncRequestStorage.DocCount()) + }) + } +} + +func TestInMemoryFallbackElasticStorage(t *testing.T) { + t.Skip("Test passes locally (20.12.2024), but requires elasticsearch to be running, so skipping for now") + storage := NewAsyncSearchStorageInMemoryFallbackElastic(testConfig()) + storage.Store("1", &AsyncRequestResult{}) + storage.Store("2", &AsyncRequestResult{}) + storage.Store("3", &AsyncRequestResult{}) + + assert.Equal(t, 0, storage.inElasticsearch.DocCount()) // inElasticsearch is async, probably shouldn't be updated yet + assert.Equal(t, 3, storage.inMemory.DocCount()) + time.Sleep(2 * time.Second) + assert.Equal(t, 3, storage.inElasticsearch.DocCount()) + assert.Equal(t, 3, storage.DocCount()) + + storage.Delete("1") + storage.Delete("2") + assert.Equal(t, 1, storage.DocCount()) + assert.Equal(t, 1, storage.inMemory.DocCount()) + assert.Equal(t, 3, storage.inElasticsearch.DocCount()) // inElasticsearch is async, probably shouldn't be updated yet + time.Sleep(2 * time.Second) + assert.Equal(t, 1, storage.inElasticsearch.DocCount()) + assert.Equal(t, 1, storage.DocCount()) + + // simulate Quesma, and inMemory storage restart + storage.inMemory = NewAsyncRequestResultStorageInMemory().(AsyncRequestResultStorageInMemory) + assert.Equal(t, 0, storage.DocCount()) + assert.Equal(t, 1, storage.inElasticsearch.DocCount()) + + doc, err := storage.Load("1") + pp.Println(err, doc) + assert.Nil(t, doc) + assert.NotNil(t, err) + + doc, err = storage.Load("3") + pp.Println(err, doc) + assert.NotNil(t, doc) + assert.Nil(t, err) +} + +const qid = "abc" + +func testConfig() config.ElasticsearchConfiguration { + realUrl, err := url.Parse("http://localhost:9201") + if err != nil { + fmt.Println("ERR", err) + } + cfgUrl := config.Url(*realUrl) + return config.ElasticsearchConfiguration{ + Url: &cfgUrl, + User: "", + Password: "", + } +} + +func TestEvictingAsyncQuery_1(t *testing.T) { + t.Skip("TODO: automize this test after evicting from Clickhouse from UI works") + options := clickhouse.Options{Addr: []string{"localhost:9000"}} + db := clickhouse.OpenDB(&options) + defer db.Close() + + ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(qid)) + rows, err := db.QueryContext(ctx, "SELECT number FROM (SELECT number FROM numbers(100_000_000_000)) ORDER BY number DESC LIMIT 10") + var i int64 + for rows.Next() { + rows.Scan(&i) + fmt.Println(i) + } + + fmt.Println(rows, "i:", i, err) +} + +func TestEvictingAsyncQuery_2(t *testing.T) { + t.Skip("TODO: automize this test after evicting from Clickhouse from UI works") + options := clickhouse.Options{Addr: []string{"localhost:9000"}} + db := clickhouse.OpenDB(&options) + defer db.Close() + + rows, err := db.Query("KILL QUERY WHERE query_id= 'x'") + fmt.Println(rows, err) +} diff --git a/quesma/quesma/dual_write_proxy.go b/quesma/quesma/dual_write_proxy.go index 5a80e3f80..76cb7ed9b 100644 --- a/quesma/quesma/dual_write_proxy.go +++ b/quesma/quesma/dual_write_proxy.go @@ -143,8 +143,8 @@ func newDualWriteProxy(schemaLoader clickhouse.TableDiscovery, logManager *click logManager: logManager, publicPort: config.PublicTcpPort, asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor( - queryRunner.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), - queryRunner.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), + queryRunner.AsyncRequestStorage, + queryRunner.AsyncQueriesContexts, ), queryRunner: queryRunner, } diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index 32594025a..6b9c8e434 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -122,8 +122,8 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic logManager: logManager, publicPort: config.PublicTcpPort, asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor( - queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), - queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), + queryProcessor.AsyncRequestStorage, + queryProcessor.AsyncQueriesContexts, ), queryRunner: queryProcessor, } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 5bfce7364..7f5e2cf86 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -82,8 +82,8 @@ func NewQueryRunner(lm *clickhouse.LogManager, return &QueryRunner{logManager: lm, cfg: cfg, im: im, debugInfoCollector: qmc, executionCtx: ctx, cancel: cancel, - AsyncRequestStorage: async_search_storage.NewAsyncSearchStorageInMemory(), - AsyncQueriesContexts: async_search_storage.NewAsyncQueryContextStorageInMemory(), + AsyncRequestStorage: async_search_storage.NewAsyncSearchStorageInMemoryFallbackElastic(cfg.Elasticsearch), + AsyncQueriesContexts: async_search_storage.NewAsyncQueryContextStorageInMemoryFallbackElasticsearch(cfg.Elasticsearch), transformationPipeline: TransformationPipeline{ transformers: []model.QueryTransformer{ &SchemaCheckPass{ @@ -495,17 +495,8 @@ func (q *QueryRunner) storeAsyncSearch(qmc diag.DebugInfoCollector, id, asyncId return } -func (q *QueryRunner) asyncQueriesCumulatedBodySize() int { - size := 0 - q.AsyncRequestStorage.Range(func(key string, value *async_search_storage.AsyncRequestResult) bool { - size += len(value.GetResponseBody()) - return true - }) - return size -} - func (q *QueryRunner) handleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) { - if _, ok := q.AsyncRequestStorage.Load(id); ok { // there IS a result in storage, so query is completed/no longer running, + if _, err := q.AsyncRequestStorage.Load(id); err != nil { // there IS a result in storage, so query is completed/no longer running, return queryparser.EmptyAsyncSearchStatusResponse(id, false, false, 200) } else { // there is no result so query is might be(*) running return queryparser.EmptyAsyncSearchStatusResponse(id, true, true, 0) // 0 is a placeholder for missing completion status @@ -519,16 +510,16 @@ func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ( logger.ErrorWithCtx(ctx).Msgf("non quesma async id: %v", id) return queryparser.EmptyAsyncSearchResponse(id, false, 503) } - if result, ok := q.AsyncRequestStorage.Load(id); ok { - if err := result.GetErr(); err != nil { + if result, err := q.AsyncRequestStorage.Load(id); result != nil && err != nil { + if result.Err != nil { q.AsyncRequestStorage.Delete(id) logger.ErrorWithCtx(ctx).Msgf("error processing async query: %v", err) return queryparser.EmptyAsyncSearchResponse(id, false, 503) } q.AsyncRequestStorage.Delete(id) // We use zstd to conserve memory, as we have a lot of async queries - if result.IsCompressed() { - buf, err := util.Decompress(result.GetResponseBody()) + if result.IsCompressed { + buf, err := util.Decompress(result.ResponseBody) if err == nil { // Mark trace end is called only when the async query is fully processed // which means that isPartial is false @@ -541,7 +532,7 @@ func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ( // Mark trace end is called only when the async query is fully processed // which means that isPartial is false logger.MarkTraceEndWithCtx(ctx).Msgf("Async query id : %s ended successfully", id) - return result.GetResponseBody(), nil + return result.ResponseBody, nil } else { const isPartial = true logger.InfoWithCtx(ctx).Msgf("async query id : %s partial result", id) @@ -558,7 +549,7 @@ func (q *QueryRunner) deleteAsyncSearch(id string) ([]byte, error) { } func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- asyncSearchWithError) bool { - if q.AsyncRequestStorage.Size() < asyncQueriesLimit && q.asyncQueriesCumulatedBodySize() < asyncQueriesLimitBytes { + if q.AsyncRequestStorage.DocCount() < asyncQueriesLimit && q.AsyncRequestStorage.SpaceInUse() < asyncQueriesLimitBytes { return false } err := errors.New("too many async queries") @@ -568,7 +559,7 @@ func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, d } func (q *QueryRunner) addAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, asyncRequestIdStr string) { - q.AsyncQueriesContexts.Store(asyncRequestIdStr, async_search_storage.NewAsyncQueryContext(ctx, cancel, asyncRequestIdStr)) + q.AsyncQueriesContexts.Store(async_search_storage.NewAsyncQueryContext(ctx, cancel, asyncRequestIdStr)) } // This is a HACK diff --git a/smoke-test/async_query.go b/smoke-test/async_query.go index 172fedf3e..95a42f4dc 100644 --- a/smoke-test/async_query.go +++ b/smoke-test/async_query.go @@ -286,7 +286,8 @@ func waitForAsyncQuery(timeout time.Duration) { } defer resp.Body.Close() if resp.StatusCode != 200 { - fmt.Printf("async query status is %d %s\n", resp.StatusCode, asyncGetQueryUrlPrefix+asyncQuery.Id) + body, _ = io.ReadAll(resp.Body) + fmt.Printf("async query status is %d %s, resp body: %s\n", resp.StatusCode, asyncGetQueryUrlPrefix+asyncQuery.Id, body) panic("async query status is not 200") } }