Skip to content

Commit

Permalink
Improved network version exchange protocol
Browse files Browse the repository at this point in the history
This commit relaxes the check on the client side to allow
connecting to servers with a more recent network protocol in the
future. Clients also send their protocol version to the server.
This information is simply ignored by older servers and does not
break compatibility.

The idea is to create a transition period after which the protocol
can be modified (to exchange more information at connection time,
such as support for stereo streams) without breaking compatibilty
for clients and servers based on deflect >= 0.12.1.
  • Loading branch information
Raphael Dumusc committed Feb 1, 2017
1 parent 2c41044 commit 52b7a2f
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 60 deletions.
30 changes: 21 additions & 9 deletions deflect/ServerWorker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*********************************************************************/
/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */
/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <[email protected]> */
/* [email protected] */
/* All rights reserved. */
Expand Down Expand Up @@ -47,7 +47,10 @@

#include <QDataStream>

#define RECEIVE_TIMEOUT_MS 3000
namespace
{
const int RECEIVE_TIMEOUT_MS = 3000;
}

namespace deflect
{
Expand All @@ -56,6 +59,7 @@ ServerWorker::ServerWorker( const int socketDescriptor )
// Ensure that tcpSocket_ parent is *this* so it gets moved to thread
: _tcpSocket( new QTcpSocket( this ))
, _sourceId( socketDescriptor )
, _clientProtocolVersion( NETWORK_PROTOCOL_VERSION )
, _registeredToEvents( false )
{
if( !_tcpSocket->setSocketDescriptor( socketDescriptor ))
Expand Down Expand Up @@ -223,6 +227,9 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader,
return;
}
_streamId = uri;
// The version is only sent by deflect clients since v. 0.13.0
if( !byteArray.isEmpty( ))
_parseClientProtocolVersion( byteArray );
emit addStreamSource( _streamId, _sourceId );
break;

Expand Down Expand Up @@ -263,17 +270,22 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader,
}
}

void ServerWorker::_handlePixelStreamMessage( const QByteArray& byteArray )
void ServerWorker::_parseClientProtocolVersion( const QByteArray& message )
{
const SegmentParameters* parameters =
reinterpret_cast< const SegmentParameters* >( byteArray.data( ));
bool ok = false;
const int version = message.toInt( &ok );
if( ok )
_clientProtocolVersion = version;
}

void ServerWorker::_handlePixelStreamMessage( const QByteArray& message )
{
Segment segment;
segment.parameters = *parameters;

QByteArray imageData =
byteArray.right( byteArray.size() - sizeof( SegmentParameters ));
segment.imageData = imageData;
const auto data = message.data();
segment.parameters = *reinterpret_cast<const SegmentParameters*>( data );
segment.imageData = message.right( message.size() -
sizeof( SegmentParameters ));

emit( receivedSegment( _streamId, _sourceId, segment ));
}
Expand Down
6 changes: 4 additions & 2 deletions deflect/ServerWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private slots:

QString _streamId;
int _sourceId;
int _clientProtocolVersion;

bool _registeredToEvents;
QQueue<Event> _events;
Expand All @@ -105,8 +106,9 @@ private slots:
QByteArray _receiveMessageBody( int size );

void _handleMessage( const MessageHeader& messageHeader,
const QByteArray& byteArray );
void _handlePixelStreamMessage( const QByteArray& byteArray );
const QByteArray& message );
void _parseClientProtocolVersion( const QByteArray& message );
void _handlePixelStreamMessage( const QByteArray& message );

void _sendProtocolVersion();
void _sendBindReply( bool successful );
Expand Down
55 changes: 26 additions & 29 deletions deflect/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const unsigned short Socket::defaultPortNumber = DEFAULT_PORT_NUMBER;
Socket::Socket( const std::string& host, const unsigned short port )
: _host( host )
, _socket( new QTcpSocket( ))
, _remoteProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION )
, _serverProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION )
{
// disable warnings which occur if no QCoreApplication is present during
// _connect(): QObject::connect: Cannot connect (null)::destroyed() to
Expand Down Expand Up @@ -91,6 +91,11 @@ bool Socket::isConnected() const
return _socket->state() == QTcpSocket::ConnectedState;
}

int32_t Socket::getServerProtocolVersion() const
{
return _serverProtocolVersion;
}

int Socket::getFileDescriptor() const
{
return _socket->socketDescriptor();
Expand Down Expand Up @@ -172,11 +177,6 @@ bool Socket::receive( MessageHeader& messageHeader, QByteArray& message )
return true;
}

int32_t Socket::getRemoteProtocolVersion() const
{
return _remoteProtocolVersion;
}

bool Socket::_receiveHeader( MessageHeader& messageHeader )
{
while( _socket->bytesAvailable() < qint64(MessageHeader::serializedSize) )
Expand All @@ -193,45 +193,42 @@ bool Socket::_receiveHeader( MessageHeader& messageHeader )

bool Socket::_connect( const std::string& host, const unsigned short port )
{
// make sure we're disconnected
_socket->disconnectFromHost();

// open connection
_socket->connectToHost( host.c_str(), port );

if( !_socket->waitForConnected( RECEIVE_TIMEOUT_MS ))
{
std::cerr << "could not connect to host " << host << ":" << port
std::cerr << "could not connect to " << host << ":" << port
<< std::endl;
return false;
}

// handshake
if( _checkProtocolVersion( ))
return true;
if( !_receiveProtocolVersion( ))
{
std::cerr << "server protocol version was not received" << std::endl;
_socket->disconnectFromHost();
return false;
}

if( _serverProtocolVersion < NETWORK_PROTOCOL_VERSION )
{
std::cerr << "server uses unsupported protocol: "
<< _serverProtocolVersion << " < "
<< NETWORK_PROTOCOL_VERSION << std::endl;
_socket->disconnectFromHost();
return false;
}

std::cerr << "Protocol version check failed for host: " << host << ":"
<< port << std::endl;
_socket->disconnectFromHost();
return false;
return true;
}

bool Socket::_checkProtocolVersion()
bool Socket::_receiveProtocolVersion()
{
while( _socket->bytesAvailable() < qint64(sizeof(int32_t)) )
{
if( !_socket->waitForReadyRead( RECEIVE_TIMEOUT_MS ))
return false;
}

_socket->read((char *)&_remoteProtocolVersion, sizeof(int32_t));

if( _remoteProtocolVersion == NETWORK_PROTOCOL_VERSION )
return true;

std::cerr << "unsupported protocol version " << _remoteProtocolVersion
<< " != " << NETWORK_PROTOCOL_VERSION << std::endl;
return false;
_socket->read((char*)&_serverProtocolVersion, sizeof(int32_t));
return true;
}

}
23 changes: 11 additions & 12 deletions deflect/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,21 @@ class Socket : public QObject
/** Is the Socket connected */
DEFLECT_API bool isConnected() const;

/**
* Is there a pending message
* @param messageSize Minimum size of the message
*/
bool hasMessage( const size_t messageSize = 0 ) const;
/** @return the protocol version of the server. */
int32_t getServerProtocolVersion() const;

/**
* Get the FileDescriptor for the Socket (for use by poll())
* @return The file descriptor if available, otherwise return -1.
*/
int getFileDescriptor() const;

/**
* Is there a pending message
* @param messageSize Minimum size of the message
*/
bool hasMessage( const size_t messageSize = 0 ) const;

/**
* Send a message.
* @param messageHeader The message header
Expand All @@ -113,23 +116,19 @@ class Socket : public QObject
*/
bool receive( MessageHeader& messageHeader, QByteArray& message );

/** Get the protocol version of the remote host */
int32_t getRemoteProtocolVersion() const;

signals:
/** Signal that the socket has been disconnected. */
void disconnected();

private:
const std::string _host;
QTcpSocket* _socket;
int32_t _remoteProtocolVersion;
mutable QMutex _socketMutex;

bool _connect( const std::string &host, const unsigned short port );
bool _checkProtocolVersion();
int32_t _serverProtocolVersion;

bool _receiveHeader( MessageHeader& messageHeader );
bool _connect( const std::string &host, const unsigned short port );
bool _receiveProtocolVersion();
};

}
Expand Down
6 changes: 4 additions & 2 deletions deflect/StreamPrivate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#include "StreamPrivate.h"

#include "NetworkProtocol.h"
#include "Segment.h"
#include "SegmentParameters.h"
#include "SizeHints.h"
Expand Down Expand Up @@ -119,8 +120,9 @@ StreamPrivate::~StreamPrivate()

void StreamPrivate::sendOpen()
{
const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, 0, id );
socket.send( mh, QByteArray( ));
const auto message = QByteArray::number( NETWORK_PROTOCOL_VERSION );
const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, message.size(), id );
socket.send( mh, message );
}

void StreamPrivate::sendClose()
Expand Down
4 changes: 3 additions & 1 deletion doc/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ Changelog {#Changelog}

## Deflect 0.12

### 0.12.1 (git master)
### 0.12.1 (01-02-2017)

* [147](https://github.com/BlueBrain/Deflect/pull/147):
Improved handling of network protocol updates. Future updates should be
possible without breaking any client/server based on this release.
Deflect server: better reporting of JPEG decompression errors.
* [146](https://github.com/BlueBrain/Deflect/pull/146):
Unified the command line options and help message of applications.
Expand Down
5 changes: 3 additions & 2 deletions tests/cpp/SocketTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace ut = boost::unit_test;
#include "MinimalGlobalQtApp.h"
#include "MockServer.h"

#include <deflect/NetworkProtocol.h>
#include <deflect/Socket.h>

#include <QThread>
Expand All @@ -53,14 +54,14 @@ BOOST_GLOBAL_FIXTURE( MinimalGlobalQtApp );
void testSocketConnect( const int32_t versionOffset )
{
QThread thread;
MockServer* server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset );
auto server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset );
server->moveToThread( &thread );
server->connect( &thread, &QThread::finished, server, &QObject::deleteLater );
thread.start();

deflect::Socket socket( "localhost", server->serverPort( ));

BOOST_CHECK( socket.isConnected() == (versionOffset == 0));
BOOST_CHECK( socket.isConnected() == (versionOffset >= 0));

thread.quit();
thread.wait();
Expand Down
4 changes: 1 addition & 3 deletions tests/mock/MockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ typedef __int32 int32_t;

#include <deflect/mock/api.h>
#include <deflect/config.h>
#include <deflect/NetworkProtocol.h>

#include <QtNetwork/QTcpServer>

Expand All @@ -55,8 +54,7 @@ class MockServer : public QTcpServer
Q_OBJECT

public:
DEFLECT_API explicit MockServer( int32_t protocolVersion =
NETWORK_PROTOCOL_VERSION );
DEFLECT_API explicit MockServer( int32_t protocolVersion );
DEFLECT_API virtual ~MockServer();

protected:
Expand Down

0 comments on commit 52b7a2f

Please sign in to comment.