From 52b7a2f60203d875392f2168b7977430033d563d Mon Sep 17 00:00:00 2001 From: Raphael Dumusc Date: Mon, 30 Jan 2017 18:51:58 +0100 Subject: [PATCH] Improved network version exchange protocol This commit relaxes the check on the client side to allow connecting to servers with a more recent network protocol in the future. Clients also send their protocol version to the server. This information is simply ignored by older servers and does not break compatibility. The idea is to create a transition period after which the protocol can be modified (to exchange more information at connection time, such as support for stereo streams) without breaking compatibilty for clients and servers based on deflect >= 0.12.1. --- deflect/ServerWorker.cpp | 30 ++++++++++++++------- deflect/ServerWorker.h | 6 +++-- deflect/Socket.cpp | 55 ++++++++++++++++++--------------------- deflect/Socket.h | 23 ++++++++-------- deflect/StreamPrivate.cpp | 6 +++-- doc/Changelog.md | 4 ++- tests/cpp/SocketTests.cpp | 5 ++-- tests/mock/MockServer.h | 4 +-- 8 files changed, 73 insertions(+), 60 deletions(-) diff --git a/deflect/ServerWorker.cpp b/deflect/ServerWorker.cpp index e15acb9..fd759d4 100644 --- a/deflect/ServerWorker.cpp +++ b/deflect/ServerWorker.cpp @@ -1,5 +1,5 @@ /*********************************************************************/ -/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ /* Raphael Dumusc */ /* Daniel.Nachbaur@epfl.ch */ /* All rights reserved. */ @@ -47,7 +47,10 @@ #include -#define RECEIVE_TIMEOUT_MS 3000 +namespace +{ +const int RECEIVE_TIMEOUT_MS = 3000; +} namespace deflect { @@ -56,6 +59,7 @@ ServerWorker::ServerWorker( const int socketDescriptor ) // Ensure that tcpSocket_ parent is *this* so it gets moved to thread : _tcpSocket( new QTcpSocket( this )) , _sourceId( socketDescriptor ) + , _clientProtocolVersion( NETWORK_PROTOCOL_VERSION ) , _registeredToEvents( false ) { if( !_tcpSocket->setSocketDescriptor( socketDescriptor )) @@ -223,6 +227,9 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader, return; } _streamId = uri; + // The version is only sent by deflect clients since v. 0.13.0 + if( !byteArray.isEmpty( )) + _parseClientProtocolVersion( byteArray ); emit addStreamSource( _streamId, _sourceId ); break; @@ -263,17 +270,22 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader, } } -void ServerWorker::_handlePixelStreamMessage( const QByteArray& byteArray ) +void ServerWorker::_parseClientProtocolVersion( const QByteArray& message ) { - const SegmentParameters* parameters = - reinterpret_cast< const SegmentParameters* >( byteArray.data( )); + bool ok = false; + const int version = message.toInt( &ok ); + if( ok ) + _clientProtocolVersion = version; +} +void ServerWorker::_handlePixelStreamMessage( const QByteArray& message ) +{ Segment segment; - segment.parameters = *parameters; - QByteArray imageData = - byteArray.right( byteArray.size() - sizeof( SegmentParameters )); - segment.imageData = imageData; + const auto data = message.data(); + segment.parameters = *reinterpret_cast( data ); + segment.imageData = message.right( message.size() - + sizeof( SegmentParameters )); emit( receivedSegment( _streamId, _sourceId, segment )); } diff --git a/deflect/ServerWorker.h b/deflect/ServerWorker.h index 7cc0b03..4717cbe 100644 --- a/deflect/ServerWorker.h +++ b/deflect/ServerWorker.h @@ -96,6 +96,7 @@ private slots: QString _streamId; int _sourceId; + int _clientProtocolVersion; bool _registeredToEvents; QQueue _events; @@ -105,8 +106,9 @@ private slots: QByteArray _receiveMessageBody( int size ); void _handleMessage( const MessageHeader& messageHeader, - const QByteArray& byteArray ); - void _handlePixelStreamMessage( const QByteArray& byteArray ); + const QByteArray& message ); + void _parseClientProtocolVersion( const QByteArray& message ); + void _handlePixelStreamMessage( const QByteArray& message ); void _sendProtocolVersion(); void _sendBindReply( bool successful ); diff --git a/deflect/Socket.cpp b/deflect/Socket.cpp index 2ea6d75..360bf15 100644 --- a/deflect/Socket.cpp +++ b/deflect/Socket.cpp @@ -59,7 +59,7 @@ const unsigned short Socket::defaultPortNumber = DEFAULT_PORT_NUMBER; Socket::Socket( const std::string& host, const unsigned short port ) : _host( host ) , _socket( new QTcpSocket( )) - , _remoteProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION ) + , _serverProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION ) { // disable warnings which occur if no QCoreApplication is present during // _connect(): QObject::connect: Cannot connect (null)::destroyed() to @@ -91,6 +91,11 @@ bool Socket::isConnected() const return _socket->state() == QTcpSocket::ConnectedState; } +int32_t Socket::getServerProtocolVersion() const +{ + return _serverProtocolVersion; +} + int Socket::getFileDescriptor() const { return _socket->socketDescriptor(); @@ -172,11 +177,6 @@ bool Socket::receive( MessageHeader& messageHeader, QByteArray& message ) return true; } -int32_t Socket::getRemoteProtocolVersion() const -{ - return _remoteProtocolVersion; -} - bool Socket::_receiveHeader( MessageHeader& messageHeader ) { while( _socket->bytesAvailable() < qint64(MessageHeader::serializedSize) ) @@ -193,45 +193,42 @@ bool Socket::_receiveHeader( MessageHeader& messageHeader ) bool Socket::_connect( const std::string& host, const unsigned short port ) { - // make sure we're disconnected - _socket->disconnectFromHost(); - - // open connection _socket->connectToHost( host.c_str(), port ); - if( !_socket->waitForConnected( RECEIVE_TIMEOUT_MS )) { - std::cerr << "could not connect to host " << host << ":" << port + std::cerr << "could not connect to " << host << ":" << port << std::endl; return false; } - // handshake - if( _checkProtocolVersion( )) - return true; + if( !_receiveProtocolVersion( )) + { + std::cerr << "server protocol version was not received" << std::endl; + _socket->disconnectFromHost(); + return false; + } + + if( _serverProtocolVersion < NETWORK_PROTOCOL_VERSION ) + { + std::cerr << "server uses unsupported protocol: " + << _serverProtocolVersion << " < " + << NETWORK_PROTOCOL_VERSION << std::endl; + _socket->disconnectFromHost(); + return false; + } - std::cerr << "Protocol version check failed for host: " << host << ":" - << port << std::endl; - _socket->disconnectFromHost(); - return false; + return true; } -bool Socket::_checkProtocolVersion() +bool Socket::_receiveProtocolVersion() { while( _socket->bytesAvailable() < qint64(sizeof(int32_t)) ) { if( !_socket->waitForReadyRead( RECEIVE_TIMEOUT_MS )) return false; } - - _socket->read((char *)&_remoteProtocolVersion, sizeof(int32_t)); - - if( _remoteProtocolVersion == NETWORK_PROTOCOL_VERSION ) - return true; - - std::cerr << "unsupported protocol version " << _remoteProtocolVersion - << " != " << NETWORK_PROTOCOL_VERSION << std::endl; - return false; + _socket->read((char*)&_serverProtocolVersion, sizeof(int32_t)); + return true; } } diff --git a/deflect/Socket.h b/deflect/Socket.h index 228f7b9..36533f3 100644 --- a/deflect/Socket.h +++ b/deflect/Socket.h @@ -85,11 +85,8 @@ class Socket : public QObject /** Is the Socket connected */ DEFLECT_API bool isConnected() const; - /** - * Is there a pending message - * @param messageSize Minimum size of the message - */ - bool hasMessage( const size_t messageSize = 0 ) const; + /** @return the protocol version of the server. */ + int32_t getServerProtocolVersion() const; /** * Get the FileDescriptor for the Socket (for use by poll()) @@ -97,6 +94,12 @@ class Socket : public QObject */ int getFileDescriptor() const; + /** + * Is there a pending message + * @param messageSize Minimum size of the message + */ + bool hasMessage( const size_t messageSize = 0 ) const; + /** * Send a message. * @param messageHeader The message header @@ -113,9 +116,6 @@ class Socket : public QObject */ bool receive( MessageHeader& messageHeader, QByteArray& message ); - /** Get the protocol version of the remote host */ - int32_t getRemoteProtocolVersion() const; - signals: /** Signal that the socket has been disconnected. */ void disconnected(); @@ -123,13 +123,12 @@ class Socket : public QObject private: const std::string _host; QTcpSocket* _socket; - int32_t _remoteProtocolVersion; mutable QMutex _socketMutex; - - bool _connect( const std::string &host, const unsigned short port ); - bool _checkProtocolVersion(); + int32_t _serverProtocolVersion; bool _receiveHeader( MessageHeader& messageHeader ); + bool _connect( const std::string &host, const unsigned short port ); + bool _receiveProtocolVersion(); }; } diff --git a/deflect/StreamPrivate.cpp b/deflect/StreamPrivate.cpp index 2530274..49b1113 100644 --- a/deflect/StreamPrivate.cpp +++ b/deflect/StreamPrivate.cpp @@ -41,6 +41,7 @@ #include "StreamPrivate.h" +#include "NetworkProtocol.h" #include "Segment.h" #include "SegmentParameters.h" #include "SizeHints.h" @@ -119,8 +120,9 @@ StreamPrivate::~StreamPrivate() void StreamPrivate::sendOpen() { - const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, 0, id ); - socket.send( mh, QByteArray( )); + const auto message = QByteArray::number( NETWORK_PROTOCOL_VERSION ); + const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, message.size(), id ); + socket.send( mh, message ); } void StreamPrivate::sendClose() diff --git a/doc/Changelog.md b/doc/Changelog.md index dbcb807..c8b830b 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -3,9 +3,11 @@ Changelog {#Changelog} ## Deflect 0.12 -### 0.12.1 (git master) +### 0.12.1 (01-02-2017) * [147](https://github.com/BlueBrain/Deflect/pull/147): + Improved handling of network protocol updates. Future updates should be + possible without breaking any client/server based on this release. Deflect server: better reporting of JPEG decompression errors. * [146](https://github.com/BlueBrain/Deflect/pull/146): Unified the command line options and help message of applications. diff --git a/tests/cpp/SocketTests.cpp b/tests/cpp/SocketTests.cpp index 575c01b..318e8c3 100644 --- a/tests/cpp/SocketTests.cpp +++ b/tests/cpp/SocketTests.cpp @@ -44,6 +44,7 @@ namespace ut = boost::unit_test; #include "MinimalGlobalQtApp.h" #include "MockServer.h" +#include #include #include @@ -53,14 +54,14 @@ BOOST_GLOBAL_FIXTURE( MinimalGlobalQtApp ); void testSocketConnect( const int32_t versionOffset ) { QThread thread; - MockServer* server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset ); + auto server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset ); server->moveToThread( &thread ); server->connect( &thread, &QThread::finished, server, &QObject::deleteLater ); thread.start(); deflect::Socket socket( "localhost", server->serverPort( )); - BOOST_CHECK( socket.isConnected() == (versionOffset == 0)); + BOOST_CHECK( socket.isConnected() == (versionOffset >= 0)); thread.quit(); thread.wait(); diff --git a/tests/mock/MockServer.h b/tests/mock/MockServer.h index b63bb21..8dd6f68 100644 --- a/tests/mock/MockServer.h +++ b/tests/mock/MockServer.h @@ -46,7 +46,6 @@ typedef __int32 int32_t; #include #include -#include #include @@ -55,8 +54,7 @@ class MockServer : public QTcpServer Q_OBJECT public: - DEFLECT_API explicit MockServer( int32_t protocolVersion = - NETWORK_PROTOCOL_VERSION ); + DEFLECT_API explicit MockServer( int32_t protocolVersion ); DEFLECT_API virtual ~MockServer(); protected: