Skip to content

Commit

Permalink
Merge pull request #24 from rdumusc/serverbugs
Browse files Browse the repository at this point in the history
Important Server bug fixes + some cleanups
  • Loading branch information
Raphael Dumusc committed Jun 26, 2015
2 parents 12909a5 + 759da48 commit c2d3ecb
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 107 deletions.
2 changes: 1 addition & 1 deletion deflect/FrameDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void FrameDispatcher::removeSource( const QString uri,

void FrameDispatcher::processSegment( const QString uri,
const size_t sourceIndex,
Segment segment )
deflect::Segment segment )
{
if( _impl->streamBuffers.count( uri ))
_impl->streamBuffers[uri].insert( segment, sourceIndex );
Expand Down
2 changes: 1 addition & 1 deletion deflect/FrameDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public slots:
* @param segment The segment to process
*/
DEFLECT_API void processSegment( QString uri, size_t sourceIndex,
Segment segment );
deflect::Segment segment );

/**
* The given source has finished sending segments for the current frame.
Expand Down
22 changes: 12 additions & 10 deletions deflect/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ void Server::incomingConnection( const qintptr socketHandle )

worker->moveToThread( workerThread );

connect( workerThread, SIGNAL( started( )), worker, SLOT( initialize( )));
connect( worker, SIGNAL( finished( )), workerThread, SLOT( quit( )));
connect( workerThread, SIGNAL( started( )),
worker, SLOT( initConnection( )));
connect( worker, SIGNAL( connectionClosed( )),
workerThread, SLOT( quit( )));

// Make sure the thread will be deleted
connect( workerThread, SIGNAL( finished( )), worker, SLOT( deleteLater( )));
Expand All @@ -132,27 +134,27 @@ void Server::incomingConnection( const qintptr socketHandle )
this, SIGNAL( registerToEvents( QString, bool,
deflect::EventReceiver* )));
connect( this, SIGNAL( _pixelStreamerClosed( QString )),
worker, SLOT( _pixelStreamerClosed( QString )));
worker, SLOT( closeConnection( QString )));
connect( this, SIGNAL( _eventRegistrationReply( QString, bool )),
worker, SLOT( _eventRegistrationReply( QString, bool )));
worker, SLOT( replyToEventRegistration( QString, bool )));

// Commands
connect( worker, SIGNAL( receivedCommand( QString, QString )),
&_impl->commandHandler, SLOT( process( QString, QString )));

// PixelStreamDispatcher
connect( worker, SIGNAL( receivedAddPixelStreamSource( QString, size_t )),
connect( worker, SIGNAL( addStreamSource( QString, size_t )),
&_impl->pixelStreamDispatcher, SLOT(addSource( QString, size_t )));
connect( worker, SIGNAL( receivedPixelStreamSegement( QString, size_t,
Segment )),
connect( worker,
SIGNAL( receivedSegement( QString, size_t, deflect::Segment )),
&_impl->pixelStreamDispatcher,
SLOT( processSegment( QString, size_t, Segment )));
SLOT( processSegment( QString, size_t, deflect::Segment )));
connect( worker,
SIGNAL( receivedPixelStreamFinishFrame( QString, size_t )),
SIGNAL( receivedFrameFinished( QString, size_t )),
&_impl->pixelStreamDispatcher,
SLOT( processFrameFinished( QString, size_t )));
connect( worker,
SIGNAL( receivedRemovePixelStreamSource( QString, size_t )),
SIGNAL( removeStreamSource( QString, size_t )),
&_impl->pixelStreamDispatcher,
SLOT( removeSource( QString, size_t )));

Expand Down
161 changes: 81 additions & 80 deletions deflect/ServerWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,26 @@ namespace deflect
{

ServerWorker::ServerWorker( const int socketDescriptor )
: _socketDescriptor( socketDescriptor )
// Ensure that tcpSocket_ parent is *this* so it gets moved to thread
, _tcpSocket( new QTcpSocket( this ))
: _tcpSocket( new QTcpSocket( this ))
, _sourceId( socketDescriptor )
, _registeredToEvents( false )
{
if( !_tcpSocket->setSocketDescriptor( _socketDescriptor ))
if( !_tcpSocket->setSocketDescriptor( socketDescriptor ))
{
std::cerr << "could not set socket descriptor: "
<< _tcpSocket->errorString().toStdString() << std::endl;
emit( finished( ));
emit( connectionClosed( ));
return;
}

connect( _tcpSocket, SIGNAL( disconnected( )), this, SIGNAL( finished( )));
connect( _tcpSocket, SIGNAL( disconnected( )),
this, SIGNAL( connectionClosed( )));

connect( _tcpSocket, SIGNAL( readyRead( )),
this, SLOT( _process( )), Qt::QueuedConnection );
this, SLOT( _processMessages( )), Qt::QueuedConnection );
connect( this, SIGNAL( _dataAvailable( )),
this, SLOT( _process( )), Qt::QueuedConnection );
this, SLOT( _processMessages( )), Qt::QueuedConnection );
}

ServerWorker::~ServerWorker()
Expand All @@ -75,49 +77,75 @@ ServerWorker::~ServerWorker()
// We still want to remove this source so that the stream does not get stuck
// if other senders are still active / resp. the window gets closed if no
// more senders contribute to it.
if( !_pixelStreamUri.isEmpty( ))
emit receivedRemovePixelStreamSource( _pixelStreamUri,
_socketDescriptor );
if( !_streamUri.isEmpty( ))
emit removeStreamSource( _streamUri, _sourceId );

if( _tcpSocket->state() == QAbstractSocket::ConnectedState )
_sendQuit();

delete _tcpSocket;
}

void ServerWorker::_initialize()
void ServerWorker::processEvent( const Event evt )
{
_events.enqueue( evt );
emit _dataAvailable();
}

void ServerWorker::initConnection()
{
_sendProtocolVersion();
}

void ServerWorker::_process()
void ServerWorker::closeConnection( const QString uri )
{
if( uri != _streamUri )
return;

Event closeEvent;
closeEvent.type = Event::EVT_CLOSE;
_send( closeEvent );

emit( connectionClosed( ));
}

void ServerWorker::replyToEventRegistration( const QString uri,
const bool success )
{
if( uri != _streamUri )
return;

_registeredToEvents = success;
_sendBindReply( _registeredToEvents );
}

void ServerWorker::_processMessages()
{
if( _tcpSocket->bytesAvailable() >= qint64( MessageHeader::serializedSize ))
_socketReceiveMessage();
const qint64 headerSize( MessageHeader::serializedSize );

// send events if needed
if( _tcpSocket->bytesAvailable() >= headerSize )
_receiveMessage();

// Send all events
foreach( const Event& evt, _events )
{
_send( evt );
}
_events.clear();

// flush the socket
_tcpSocket->flush();

// Finish reading messages from the socket if connection closed
if( _tcpSocket->state() != QAbstractSocket::ConnectedState )
{
while( _tcpSocket->bytesAvailable() >= qint64(MessageHeader::serializedSize) )
_socketReceiveMessage();
while( _tcpSocket->bytesAvailable() >= headerSize )
_receiveMessage();

emit( finished( ));
emit( connectionClosed( ));
}
else if( _tcpSocket->bytesAvailable() >= qint64(MessageHeader::serializedSize) )
else if( _tcpSocket->bytesAvailable() >= headerSize )
emit _dataAvailable();
}

void ServerWorker::_socketReceiveMessage()
void ServerWorker::_receiveMessage()
{
const MessageHeader mh = _receiveMessageHeader();
const QByteArray messageByteArray = _receiveMessageBody( mh.size );
Expand All @@ -136,7 +164,6 @@ MessageHeader ServerWorker::_receiveMessageHeader()

QByteArray ServerWorker::_receiveMessageBody( const int size )
{
// next, read the actual message
QByteArray messageByteArray;

if( size > 0 )
Expand All @@ -147,7 +174,7 @@ QByteArray ServerWorker::_receiveMessageBody( const int size )
{
if( !_tcpSocket->waitForReadyRead( RECEIVE_TIMEOUT_MS ))
{
emit finished();
emit connectionClosed();
return QByteArray();
}

Expand All @@ -159,50 +186,53 @@ QByteArray ServerWorker::_receiveMessageBody( const int size )
return messageByteArray;
}

void ServerWorker::processEvent( const Event evt )
{
_events.enqueue( evt );
emit _dataAvailable();
}

void ServerWorker::_handleMessage( const MessageHeader& messageHeader,
const QByteArray& byteArray )
const QByteArray& byteArray )
{
const QString uri( messageHeader.uri );
if( uri.isEmpty( ))
{
std::cerr << "Warning: rejecting streamer with empty uri"
<< std::endl;
closeConnection( _streamUri );
return;
}
if( uri != _streamUri &&
messageHeader.type != MESSAGE_TYPE_PIXELSTREAM_OPEN )
{
std::cerr << "Warning: ingnoring message with incorrect stream uri: '"
<< messageHeader.uri << "', expected: '"
<< _streamUri.toStdString() << "'" << std::endl;
return;
}

switch( messageHeader.type )
{
case MESSAGE_TYPE_QUIT:
if ( _pixelStreamUri == uri )
{
emit receivedRemovePixelStreamSource( uri, _socketDescriptor );
_pixelStreamUri = QString();
}
emit removeStreamSource( _streamUri, _sourceId );
_streamUri = QString();
break;

case MESSAGE_TYPE_PIXELSTREAM_OPEN:
if( _pixelStreamUri.isEmpty( ))
if( !_streamUri.isEmpty( ))
{
_pixelStreamUri = uri;
emit receivedAddPixelStreamSource( uri, _socketDescriptor );
std::cerr << "Warning: PixelStream already opened!" << std::endl;
return;
}
else
std::cerr << "Error: PixelStream already opened!" << std::endl;
_streamUri = uri;
emit addStreamSource( _streamUri, _sourceId );
break;

case MESSAGE_TYPE_PIXELSTREAM_FINISH_FRAME:
if( _pixelStreamUri == uri )
{
emit receivedPixelStreamFinishFrame( uri, _socketDescriptor );
}
emit receivedFrameFinished( _streamUri, _sourceId );
break;

case MESSAGE_TYPE_PIXELSTREAM:
_handlePixelStreamMessage( uri, byteArray );
_handlePixelStreamMessage( byteArray );
break;

case MESSAGE_TYPE_COMMAND:
emit receivedCommand( QString( byteArray.data( )), uri );
emit receivedCommand( QString( byteArray.data( )), _streamUri );
break;

case MESSAGE_TYPE_BIND_EVENTS:
Expand All @@ -213,57 +243,28 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader,
{
const bool exclusive =
(messageHeader.type == MESSAGE_TYPE_BIND_EVENTS_EX);
emit registerToEvents( _pixelStreamUri, exclusive, this );
emit registerToEvents( _streamUri, exclusive, this );
}
break;

default:
break;
}

}

void ServerWorker::_handlePixelStreamMessage( const QString& uri,
const QByteArray& byteArray )
void ServerWorker::_handlePixelStreamMessage( const QByteArray& byteArray )
{
const SegmentParameters* parameters =
reinterpret_cast< const SegmentParameters* >( byteArray.data( ));

Segment segment;
segment.parameters = *parameters;

// read image data
QByteArray imageData =
byteArray.right( byteArray.size() - sizeof( SegmentParameters ));
segment.imageData = imageData;

if( _pixelStreamUri == uri )
emit( receivedPixelStreamSegement( uri, _socketDescriptor, segment ));
else
std::cerr << "received PixelStreamSegement from incorrect uri: "
<< uri.toStdString() << std::endl;
}

void ServerWorker::pixelStreamerClosed( const QString uri )
{
if( uri != _pixelStreamUri )
return;

Event closeEvent;
closeEvent.type = Event::EVT_CLOSE;
_send( closeEvent );

emit( finished( ));
}

void ServerWorker::eventRegistrationReply( const QString uri,
const bool success )
{
if( uri != _pixelStreamUri )
return;

_registeredToEvents = success;
_sendBindReply( _registeredToEvents );
emit( receivedSegement( _streamUri, _sourceId, segment ));
}

void ServerWorker::_sendProtocolVersion()
Expand Down
Loading

0 comments on commit c2d3ecb

Please sign in to comment.