Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev IME #37

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tests/xs_ime/__init__.py
Empty file.
165 changes: 165 additions & 0 deletions tests/xs_ime/test_xs_ime_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from typing import List, Tuple

import pytest
from tfprotocol_client.extensions.xs_ime import XSIme
from tfprotocol_client.misc.timeout_func import TimeLimitExpired, timelimit
from tfprotocol_client.models.status_info import StatusInfo
from tfprotocol_client.models.status_server_code import StatusServerCode
import os
# pylint: disable=unused-import
from .xs_ime import (xsime_instance,XSIme)
from ..tfprotocol_commands.tfprotocol import tfprotocol_instance

@pytest.mark.ime
@pytest.mark.run(order=96)
def test_xsime_start_close_commands(

):
"""Test for open and close commands."""
tfproto:XSIme = xsime_instance()
resps: List[StatusInfo] = []
# OPEN command
try:
timelimit(
6,
lambda *_, **__: tfproto.start_command(
response_handler=resps.append
),
)
except TimeLimitExpired:
raise AssertionError(
'Timeout expired: open_command timeout'
)
index=0
assert resps[index].status == StatusServerCode.OK, resps[index]


try:
timelimit(
6,
lambda *_, **__: tfproto.close_command(),
)
assert True, "close_command"

except TimeLimitExpired:
raise AssertionError(
'Timeout expired: close_command timeout'
)
tfproto.disconnect()



@pytest.mark.ime
@pytest.mark.run(order=97)
@pytest.mark.parametrize("origin_user,destination_user"
,[("user1","user2"),("user2","user1"),])
def test_xsime_send_messages_commands(
origin_user,destination_user,

tfprotocol_instance:tfprotocol_instance

):

tfproto=xsime_instance()

TIME_SEC_MAX=6
tfproto:XSIme = tfproto
resps: List[StatusInfo] = []
# OPEN command
try:
timelimit(
TIME_SEC_MAX,
lambda *_, **__: tfproto.start_command(
response_handler=resps.append
),
)
except TimeLimitExpired:
raise AssertionError(
'Timeout expired: open_command timeout'
)
index=0
assert resps[index].status == StatusServerCode.OK, resps[index]


NAME_CHAT="chattest"
CHAT_FOLDER="/"+NAME_CHAT
USER_1=origin_user
USER_2=destination_user
CHAT_FOLDER_USER_1="/"+NAME_CHAT+"/"+USER_1
CHAT_FOLDER_USER_2="/"+NAME_CHAT+"/"+USER_2

tfprotocol_instance.mkdir_command(CHAT_FOLDER)
tfprotocol_instance.mkdir_command(CHAT_FOLDER_USER_1)
tfprotocol_instance.mkdir_command(CHAT_FOLDER_USER_2)



try:
timelimit(
10,
lambda *_, **__: tfproto.setup_command(
unique_user=True,auto_delete=False,timestamp=0
),
)
assert True, "setup_command"

except TimeLimitExpired:
raise AssertionError(
'Timeout expired: setup_command timeout'
)

try:
timelimit(
TIME_SEC_MAX,
lambda *_, **__: tfproto.set_path_command(path=CHAT_FOLDER),
)
assert True, "set_path_command"

except TimeLimitExpired:
raise AssertionError(
'Timeout expired: set_path_command timeout'
)

try:
timelimit(
TIME_SEC_MAX,
lambda *_, **__: tfproto.set_username_path_command(username_path=USER_1),
)
assert True, "set_username_path_command"

except TimeLimitExpired:
raise AssertionError(
'Timeout expired: set_username_path_command timeout'
)

try:
timelimit(
TIME_SEC_MAX,
lambda *_, **__: tfproto.send_message_command(message=f"mensaje de ${USER_1} a ${USER_2}",dest=USER_2),
)
assert True, "send_message_command"

except TimeLimitExpired:
raise AssertionError(
'Timeout expired: send_message_command timeout'
)


try:
timelimit(
TIME_SEC_MAX,
lambda *_, **__: tfproto.close_command(),
)
assert True, "close_command"

except TimeLimitExpired:
raise AssertionError(
'Timeout expired: close_command timeout'
)

tfproto.disconnect()


#


43 changes: 43 additions & 0 deletions tests/xs_ime/xs_ime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
from collections import namedtuple
from typing import Tuple

import pytest
from dotenv import load_dotenv
from tfprotocol_client.extensions.xs_ime import XSIme

load_dotenv()



#@pytest.fixture(scope='module')
def xsime_instance()->XSIme:
PROTO_VERSION = os.environ.get('PROTO_VERSION', '2.4.1')
PROTO_PUBLIC_KEY = os.environ.get('PROTO_PUBLIC_KEY',"""-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3B7iLfTJ/Lkfgz9/Mq6n
3tBWNaP/818mcEyA5phFv1wyBk9hkroXGX0/J/unxRGF1ax3etNEbN1RASTcZNKT
zeEcwC0yf5Mn+6hmZDOIiDqr1pSAPyNm1soiY3V27/bUhcX0rnCql5Mb7QlfkbK6
o4CddL8pOG99tlSFaTYoXWgUhK5DXF1xptlz0DHv9STuNkukuy+LmAHTJYks1B/w
BC7CZyTtg4VPbHJ11VYXWI6dLTmOuoaTDrJGc/mGOK86zPOchgnAeYekwtBWlk/N
59VtaAWcfgzYnDET45qidcQfF+TIXxjf386igq8rlWTI0+Rh0e/GlYEpRp2YJPMC
AwIDAQAB
-----END PUBLIC KEY-----""")
PROTO_CLIENT_HASH = os.environ.get('PROTO_CLIENT_HASH', 'testhash')
PROTO_SERVER_ADDRESS = os.environ.get(
'PROTO_SERVER_ADDRESS', 'tfproto.expresscuba.com'
)
PROTO_SERVER_PORT = int(os.environ.get('PROTO_SERVER_PORT', 10345))

tfproto = XSIme(
PROTO_VERSION,
PROTO_PUBLIC_KEY,
PROTO_CLIENT_HASH,
PROTO_SERVER_ADDRESS,
PROTO_SERVER_PORT,
)
tfproto.connect()
return tfproto
# yield tfproto


# tfproto.disconnect()
Loading
Loading