Skip to content

Commit

Permalink
fix: fix issue with same doc type in spawned processes (#6062)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM authored Sep 29, 2023
1 parent 67c83c2 commit 31e0c9a
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 45
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 45
Expand Down
3 changes: 2 additions & 1 deletion jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ async def post(body: input_model, response: Response):
else:
docs = DocList[input_doc_list_model]([data])
if body.header is None:
req_id = docs[0].id
if hasattr(docs[0], 'id'):
req_id = docs[0].id

try:
async for resp in streamer.stream_docs(
Expand Down
11 changes: 10 additions & 1 deletion jina/serve/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ async def _get_endpoints_input_output_models(
connection_pool, retry_forever=True, is_cancel=is_cancel
)
self.logger.debug(f'Got all endpoints from TopologyGraph {endpoints}')

if endpoints is not None:
for endp in endpoints:
for origin_node in topology_graph.origin_nodes:
Expand All @@ -102,6 +101,16 @@ async def _get_endpoints_input_output_models(
and len(leaf_input_output_model) > 0
):
_endpoints_models_map[endp] = leaf_input_output_model[0]
cached_models = {}
for k, v in _endpoints_models_map.items():
if v['input'].__name__ not in cached_models:
cached_models[v['input'].__name__] = v['input']
else:
v['input'] = cached_models[v['input'].__name__]
if v['output'].__name__ not in cached_models:
cached_models[v['output'].__name__] = v['output']
else:
v['output'] = cached_models[v['output'].__name__]
return _endpoints_models_map

async def stream_doc(
Expand Down
Empty file.
7 changes: 7 additions & 0 deletions tests/integration/docarray_v2/docker/executor1/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

COPY . /executor_root/

WORKDIR /executor_root

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/docarray_v2/docker/executor1/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
jtype: Encoder
metas:
name: EncoderPrivate
py_modules:
- executor.py
23 changes: 23 additions & 0 deletions tests/integration/docarray_v2/docker/executor1/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Optional
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
from jina import Executor, requests
import numpy as np

class MyDoc(BaseDoc):
text: str
embedding: Optional[NdArray] = None


class Encoder(Executor):
def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

@requests
def encode(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
doc.embedding = np.random.random(128)
7 changes: 7 additions & 0 deletions tests/integration/docarray_v2/docker/executor2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

COPY . /executor_root/

WORKDIR /executor_root

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/docarray_v2/docker/executor2/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
jtype: Indexer
metas:
name: IndexerPrivate
py_modules:
- executor.py
39 changes: 39 additions & 0 deletions tests/integration/docarray_v2/docker/executor2/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Optional, List
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
from docarray.index import InMemoryExactNNIndex
from jina import Executor, requests


class MyDoc(BaseDoc):
text: str
embedding: Optional[NdArray] = None


class MyDocWithMatches(MyDoc):
matches: DocList[MyDoc] = []
scores: List[float] = []


class Indexer(Executor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._indexer = InMemoryExactNNIndex[MyDoc]()

@requests(on='/index')
def index(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
self._indexer.index(docs)
return docs

@requests(on='/search')
def search(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDocWithMatches]:
res = DocList[MyDocWithMatches]()
ret = self._indexer.find_batched(docs, search_field='embedding')
matched_documents = ret.documents
matched_scores = ret.scores
for query, matches, scores in zip(docs, matched_documents, matched_scores):
output_doc = MyDocWithMatches(**query.dict())
output_doc.matches = matches
output_doc.scores = scores.tolist()
res.append(output_doc)
return res
61 changes: 61 additions & 0 deletions tests/integration/docarray_v2/docker/test_with_docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import time

import pytest
import requests as general_requests

from jina import Flow

cur_dir = os.path.dirname(os.path.abspath(__file__))


@pytest.fixture
def executor_images_built():
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir, 'executor1'), tag='encoder-executor')
client.images.build(path=os.path.join(cur_dir, 'executor2'), tag='indexer-executor')
client.close()
yield
time.sleep(2)
client = docker.from_env()
client.containers.prune()


@pytest.mark.parametrize('protocol', ['http', 'grpc'])
def test_flow_with_docker(executor_images_built, protocol):
from docarray import BaseDoc, DocList
from typing import Optional, List
from docarray.typing import NdArray

class MyDoc(BaseDoc):
text: str
embedding: Optional[NdArray] = None

class MyDocWithMatches(MyDoc):
matches: DocList[MyDoc] = []
scores: List[float] = []

f = Flow(protocol=protocol).add(uses='docker://encoder-executor').add(uses='docker://indexer-executor')

with f:
if protocol == 'http':
resp = general_requests.get(f'http://localhost:{f.port}/openapi.json')
resp.json()

sentences = ['This framework generates embeddings for each input sentence',
'Sentences are passed as a list of string.',
'The quick brown fox jumps over the lazy dog.']

inputs = DocList[MyDoc]([MyDoc(text=sentence) for sentence in sentences])
f.post(on='/index', inputs=inputs)
queries = inputs[0:2]
search_results = f.post(on='/search', inputs=queries, return_type=DocList[MyDocWithMatches])

assert len(search_results) == len(queries)
for result in search_results:
assert result.text in sentences
assert len(result.matches) == len(sentences)
for m in result.matches:
assert m.text in sentences

0 comments on commit 31e0c9a

Please sign in to comment.