diff --git a/.travis.yml b/.travis.yml index 031dee5..82adc91 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,29 +1,39 @@ -language: - - cpp - - python +# This config file for Travis CI utilizes ros-industrial/industrial_ci package. +# For more info for the package, see https://github.com/ros-industrial/industrial_ci/blob/master/README.rst + +services: + - docker +language: generic python: - "2.7" compiler: - gcc +notifications: + email: + on_success: always + on_failure: always + +env: + matrix: + - ROS_DISTRO=kinetic ROS_REPO=ros-shadow-fixed UPSTREAM_WORKSPACE=debian OS_NAME=ubuntu OS_CODE_NAME=xenial + - ROS_DISTRO=melodic ROS_REPO=ros-shadow-fixed UPSTREAM_WORKSPACE=debian OS_NAME=ubuntu OS_CODE_NAME=bionic + - ROS_DISTRO=melodic ROS_REPO=ros-shadow-fixed UPSTREAM_WORKSPACE=debian OS_NAME=debian OS_CODE_NAME=stretch + +matrix: + allow_failures: + - env: ROS_DISTRO=melodic ROS_REPO=ros-shadow-fixed UPSTREAM_WORKSPACE=debian OS_NAME=debian OS_CODE_NAME=stretch + + #- env: ROS_DISTRO="kinetic" PRERELEASE=true PRERELEASE_DOWNSTREAM_DEPTH=0 + #- env: ROS_DISTRO="melodic" PRERELEASE=true PRERELEASE_DOWNSTREAM_DEPTH=0 + branches: only: - master - develop install: - - sudo sh -c 'echo "deb http://packages.ros.org/ros/ubuntu precise main" > /etc/apt/sources.list.d/ros-latest.list' - - wget http://packages.ros.org/ros.key -O - | sudo apt-key add - - - sudo apt-get update -qq - - sudo apt-get install python-catkin-pkg python-rosdep ros-groovy-catkin ros-hydro-catkin -qq - - sudo rosdep init - - rosdep update - - mkdir -p /tmp/ws/src - - ln -s `pwd` /tmp/ws/src/package - - cd /tmp/ws - - rosdep install --from-paths src --ignore-src --rosdistro hydro -y + - git clone --quiet --depth 1 https://github.com/ros-industrial/industrial_ci.git .industrial_ci script: - - source /opt/ros/hydro/setup.bash - - catkin_make - - catkin_make install + - source .industrial_ci/travis.sh diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7e18347..5ace9f7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,116 @@ Changelog for package web_video_server ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.2.2 (2021-07-23) +------------------ +* fix vp9 and h264, support for opencv4 and ffmpeg 4 (`#103 `_) +* add a mention of mjpegcanvasjs in the readme (`#100 `_) +* fix multipart_stream.cpp HttpHeader values in order to solve DOMException(cross origin) CORS issue (`#92 `_) +* Contributors: Gady, okapi1125, randoms + +0.2.1 (2019-06-05) +------------------ +* Restream buffered frames with minimum publish rate (`#88 `_) + * Restream buffered frames with minimum publish rate + * Implement restreaming for ros_compressed_streamer +* Update travis config (`#89 `_) +* Fall back to mjpeg if ros_compressed is unavailable (`#87 `_) +* Contributors: Jihoon Lee, Viktor Kunovski, sfalexrog + +0.2.0 (2019-01-30) +------------------ +* Add "default_stream_type" parameter (`#84 `_) + This allows users to specify default stream type in their .launch files. Using a "ros_compressed" stream type sometimes + results in a much lower resource consumption, and having it set as a default is much nicer for end users. +* Add a workaround for MultipartStream constant busy state (`#83 `_) + * Add a workaround for MultipartStream constant busy state + * Remove C++11 features +* lax rule for topic name (`#77 `_) +* Add PngStreamer (`#74 `_) +* fix SteadyTimer check for backported ROS versions (`#71 `_) + i.e. on current kinetic +* Pkg format 2 (`#68 `_) + * use package format 2 + * add missing dependency on sensor_msgs +* fixed undeclared CODEC_FLAG_GLOBAL_HEADER (`#65 `_) +* Contributors: Andreas Klintberg, Dirk Thomas, Felix Ruess, Kazuto Murase, Viktor Kunovski, sfalexrog + +0.1.0 (2018-07-01) +------------------ +* Avoid queuing of images on slow ethernet connection (`#64 `_) +* use SteadyTimer (if available) for cleaning up inactive streams (`#63 `_) + * use SteadyTimer for cleaning up inactive streams + so that cleanup works correctly even if system time changes + SteadyTimer is available since roscpp 1.13.1 + * possibility to use SteadyTimer on older ROS versions + when SteadyTimer has been backported to those... +* Fix segfault in libav_streamer destructor (resolves `#59 `_) (`#60 `_) +* Fix vp8 in kinetic add vp9 and h264 support (`#52 `_) + * fix vp8 in kinetic + * add h264 and vp9 support +* Add Indigo test matrix in travis configuration (`#50 `_) +* Set image streamer as inactive if topic is not available (`#53 `_) + * Resolves `#38 `_ +* Fix Build for ubuntu 14.04 (`#48 `_) + * fix issue `#47 `_ + * fix double free +* Revert "use SteadyTimer for cleaning up inactive streams (`#45 `_)" (`#51 `_) + This reverts commit ae74f19ada22f288a7c7a99ada7a1b9b6037c7ce. +* use SteadyTimer for cleaning up inactive streams (`#45 `_) + so that cleanup works correctly even if system time changes +* Use trusty instead of xenial. See `travis-ci/travis-ci#7260 `_ (`#49 `_) + * Also see `RobotWebTools/rosbridge_suite#311 `_ +* Contributors: Felix Ruess, James Bailey, Jihoon Lee, randoms, schallerr + +0.0.7 (2017-11-20) +------------------ +* Ffmpeg 3 (`#43 `_) + * Correct use of deprecated parameters + codec_context\_->rc_buffer_aggressivity marked as "currently useless", so removed + codec_context\_->frame_skip_threshold access through new priv_data api + * New names for pixel formats + * AVPicture is deprecated, use AVFrame + * Switch to non-deprecated free functions + * Use new encoding api for newer versions + * codec_context is deprecated, use packet flags +* Update travis configuration to test against kinetic (`#44 `_) +* fixed misuse of remove_if (`#35 `_) +* Merge pull request `#33 `_ from achim-k/patch-1 + web_video_server: fix bool function not returning + This fix is required when compiling the package with `clang`. Otherwise a SIGILL (Illegal instruction) is triggered. +* Contributors: Hans-Joachim Krauch, Jan, Jihoon Lee, russelhowe + +0.0.6 (2017-01-17) +------------------ +* Fixed topic list to display all image topics, fixing Issue `#18 `_. +* Contributors: Eric + +0.0.5 (2016-10-13) +------------------ +* Merge pull request `#23 `_ from iki-wgt/develop + More information when server creation is failed +* Removed empty line +* More detailed exception message + Programm behavior is not changed since the exception is rethrown. +* Contributors: BennyRe, Russell Toris + +0.0.4 (2015-08-18) +------------------ +* Merge pull request #16 from mitchellwills/compressed + Adds support for streaming ROS compressed image topics without the need to reencode them +* Switch to checkout async_web_server_cpp from source +* Upgrade for change in signature of async_web_server_cpp request handler +* Added ros compressed video streamer type + This directly passes the ros compressed frame data to the http socket without reencoding it +* Switched from passing image transport to passing node handle to streamer constructors +* Added default transport parameter for regular image streamers +* Contributors: Mitchell Wills, Russell Toris + +0.0.3 (2015-05-07) +------------------ +* added verbose flag +* Contributors: Russell Toris + 0.0.2 (2015-02-20) ------------------ * Merge pull request #10 from mitchellwills/develop diff --git a/CMakeLists.txt b/CMakeLists.txt index 85ea8eb..dcb702f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ project(web_video_server) ## Find catkin macros and libraries ## if COMPONENTS list like find_package(catkin REQUIRED COMPONENTS xyz) ## is used, also find other catkin packages -find_package(catkin REQUIRED COMPONENTS roscpp roslib cv_bridge image_transport async_web_server_cpp) +find_package(catkin REQUIRED COMPONENTS roscpp roslib cv_bridge image_transport async_web_server_cpp sensor_msgs) find_package(OpenCV REQUIRED) find_package(Boost REQUIRED COMPONENTS thread) @@ -43,7 +43,13 @@ add_executable(${PROJECT_NAME} src/image_streamer.cpp src/libav_streamer.cpp src/vp8_streamer.cpp - src/jpeg_streamers.cpp) + src/h264_streamer.cpp + src/vp9_streamer.cpp + src/multipart_stream.cpp + src/ros_compressed_streamer.cpp + src/jpeg_streamers.cpp + src/png_streamers.cpp +) ## Specify libraries to link a library or executable target against target_link_libraries(${PROJECT_NAME} diff --git a/README.md b/README.md index cbe4568..decba1b 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ This node combines the capabilities of [ros_web_video](https://github.com/RobotW For full documentation, see [the ROS wiki](http://ros.org/wiki/web_video_server). [Doxygen](http://docs.ros.org/indigo/api/web_video_server/html/) files can be found on the ROS wiki. +[mjpegcanvasjs](https://github.com/rctoris/mjpegcanvasjs) can be used to display a MJPEG stream from the ROS web_video_server This project is released as part of the [Robot Web Tools](http://robotwebtools.org/) effort. diff --git a/include/web_video_server/h264_streamer.h b/include/web_video_server/h264_streamer.h new file mode 100644 index 0000000..28c67f2 --- /dev/null +++ b/include/web_video_server/h264_streamer.h @@ -0,0 +1,35 @@ +#ifndef H264_STREAMERS_H_ +#define H264_STREAMERS_H_ + +#include +#include "web_video_server/libav_streamer.h" +#include "async_web_server_cpp/http_request.hpp" +#include "async_web_server_cpp/http_connection.hpp" + +namespace web_video_server +{ + +class H264Streamer : public LibavStreamer +{ +public: + H264Streamer(const async_web_server_cpp::HttpRequest& request, async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + ~H264Streamer(); +protected: + virtual void initializeEncoder(); + std::string preset_; +}; + +class H264StreamerType : public LibavStreamerType +{ +public: + H264StreamerType(); + virtual boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest& request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); +}; + +} + +#endif + diff --git a/include/web_video_server/image_streamer.h b/include/web_video_server/image_streamer.h index cb33ab6..36b0f25 100644 --- a/include/web_video_server/image_streamer.h +++ b/include/web_video_server/image_streamer.h @@ -13,37 +13,60 @@ namespace web_video_server class ImageStreamer { public: - ImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it); + ImageStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + virtual void start() = 0; virtual ~ImageStreamer(); - void start(); + bool isInactive() + { + return inactive_; + } - bool isInactive(); + /** + * Restreams the last received image frame if older than max_age. + */ + virtual void restreamFrame(double max_age) = 0; std::string getTopic() { return topic_; } +protected: + async_web_server_cpp::HttpConnectionPtr connection_; + async_web_server_cpp::HttpRequest request_; + ros::NodeHandle nh_; + bool inactive_; + image_transport::Subscriber image_sub_; + std::string topic_; +}; + + +class ImageTransportImageStreamer : public ImageStreamer +{ +public: + ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + virtual ~ImageTransportImageStreamer(); + virtual void start(); + /** * Restreams the last received image frame if older than max_age. */ virtual void restreamFrame(double max_age); protected: virtual void sendImage(const cv::Mat &, const ros::WallTime &time) = 0; - virtual void initialize(const cv::Mat &); - async_web_server_cpp::HttpConnectionPtr connection_; - async_web_server_cpp::HttpRequest request_; - bool inactive_; image_transport::Subscriber image_sub_; - std::string topic_; int output_width_; int output_height_; bool invert_; + std::string default_transport_; + ros::WallTime last_frame; cv::Mat output_size_image; boost::mutex send_mutex_; @@ -60,7 +83,7 @@ class ImageStreamerType public: virtual boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it) = 0; + ros::NodeHandle& nh) = 0; virtual std::string create_viewer(const async_web_server_cpp::HttpRequest &request) = 0; }; diff --git a/include/web_video_server/jpeg_streamers.h b/include/web_video_server/jpeg_streamers.h index 11486f6..9ddb450 100644 --- a/include/web_video_server/jpeg_streamers.h +++ b/include/web_video_server/jpeg_streamers.h @@ -2,23 +2,26 @@ #define JPEG_STREAMERS_H_ #include +#include #include "web_video_server/image_streamer.h" #include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "web_video_server/multipart_stream.h" namespace web_video_server { -class MjpegStreamer : public ImageStreamer +class MjpegStreamer : public ImageTransportImageStreamer { public: MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it); + ros::NodeHandle& nh); ~MjpegStreamer(); protected: virtual void sendImage(const cv::Mat &, const ros::WallTime &time); private: + MultipartStream stream_; int quality_; }; @@ -27,17 +30,15 @@ class MjpegStreamerType : public ImageStreamerType public: boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it); - + ros::NodeHandle& nh); std::string create_viewer(const async_web_server_cpp::HttpRequest &request); }; -class JpegSnapshotStreamer : public ImageStreamer +class JpegSnapshotStreamer : public ImageTransportImageStreamer { public: JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, - async_web_server_cpp::HttpConnectionPtr connection, image_transport::ImageTransport it); - + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh); ~JpegSnapshotStreamer(); protected: virtual void sendImage(const cv::Mat &, const ros::WallTime &time); diff --git a/include/web_video_server/libav_streamer.h b/include/web_video_server/libav_streamer.h index 2fcaf78..03bfabd 100644 --- a/include/web_video_server/libav_streamer.h +++ b/include/web_video_server/libav_streamer.h @@ -15,16 +15,17 @@ extern "C" #include #include #include +#include } namespace web_video_server { -class LibavStreamer : public ImageStreamer +class LibavStreamer : public ImageTransportImageStreamer { public: LibavStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it, const std::string &format_name, const std::string &codec_name, + ros::NodeHandle& nh, const std::string &format_name, const std::string &codec_name, const std::string &content_type); ~LibavStreamer(); @@ -40,10 +41,10 @@ class LibavStreamer : public ImageStreamer AVCodecContext* codec_context_; AVStream* video_stream_; + AVDictionary* opt_; // container format options + private: AVFrame* frame_; - AVPicture* picture_; - AVPicture* tmp_picture_; struct SwsContext* sws_context_; ros::WallTime first_image_timestamp_; boost::mutex encode_mutex_; @@ -55,6 +56,8 @@ class LibavStreamer : public ImageStreamer int qmin_; int qmax_; int gop_; + + uint8_t* io_buffer_; // custom IO buffer }; class LibavStreamerType : public ImageStreamerType @@ -64,7 +67,7 @@ class LibavStreamerType : public ImageStreamerType boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it); + ros::NodeHandle& nh); std::string create_viewer(const async_web_server_cpp::HttpRequest &request); diff --git a/include/web_video_server/multipart_stream.h b/include/web_video_server/multipart_stream.h new file mode 100644 index 0000000..57a6992 --- /dev/null +++ b/include/web_video_server/multipart_stream.h @@ -0,0 +1,42 @@ +#ifndef MULTIPART_STREAM_H_ +#define MULTIPART_STREAM_H_ + +#include +#include + +#include + +namespace web_video_server +{ + +struct PendingFooter { + ros::WallTime timestamp; + boost::weak_ptr contents; +}; + +class MultipartStream { +public: + MultipartStream(async_web_server_cpp::HttpConnectionPtr& connection, + const std::string& boundry="boundarydonotcross", + std::size_t max_queue_size=1); + + void sendInitialHeader(); + void sendPartHeader(const ros::WallTime &time, const std::string& type, size_t payload_size); + void sendPartFooter(const ros::WallTime &time); + void sendPartAndClear(const ros::WallTime &time, const std::string& type, std::vector &data); + void sendPart(const ros::WallTime &time, const std::string& type, const boost::asio::const_buffer &buffer, + async_web_server_cpp::HttpConnection::ResourcePtr resource); + +private: + bool isBusy(); + +private: + const std::size_t max_queue_size_; + async_web_server_cpp::HttpConnectionPtr connection_; + std::string boundry_; + std::queue pending_footers_; +}; + +} + +#endif diff --git a/include/web_video_server/png_streamers.h b/include/web_video_server/png_streamers.h new file mode 100644 index 0000000..e5153b6 --- /dev/null +++ b/include/web_video_server/png_streamers.h @@ -0,0 +1,52 @@ +#ifndef PNG_STREAMERS_H_ +#define PNG_STREAMERS_H_ + +#include +#include +#include "web_video_server/image_streamer.h" +#include "async_web_server_cpp/http_request.hpp" +#include "async_web_server_cpp/http_connection.hpp" +#include "web_video_server/multipart_stream.h" + +namespace web_video_server +{ + +class PngStreamer : public ImageTransportImageStreamer +{ +public: + PngStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + ~PngStreamer(); +protected: + virtual void sendImage(const cv::Mat &, const ros::WallTime &time); + +private: + MultipartStream stream_; + int quality_; +}; + +class PngStreamerType : public ImageStreamerType +{ +public: + boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + std::string create_viewer(const async_web_server_cpp::HttpRequest &request); +}; + +class PngSnapshotStreamer : public ImageTransportImageStreamer +{ +public: + PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh); + ~PngSnapshotStreamer(); +protected: + virtual void sendImage(const cv::Mat &, const ros::WallTime &time); + +private: + int quality_; +}; + +} + +#endif diff --git a/include/web_video_server/ros_compressed_streamer.h b/include/web_video_server/ros_compressed_streamer.h new file mode 100644 index 0000000..333010f --- /dev/null +++ b/include/web_video_server/ros_compressed_streamer.h @@ -0,0 +1,46 @@ +#ifndef ROS_COMPRESSED_STREAMERS_H_ +#define ROS_COMPRESSED_STREAMERS_H_ + +#include +#include "web_video_server/image_streamer.h" +#include "async_web_server_cpp/http_request.hpp" +#include "async_web_server_cpp/http_connection.hpp" +#include "web_video_server/multipart_stream.h" + +namespace web_video_server +{ + +class RosCompressedStreamer : public ImageStreamer +{ +public: + RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + ~RosCompressedStreamer(); + + virtual void start(); + virtual void restreamFrame(double max_age); + +protected: + virtual void sendImage(const sensor_msgs::CompressedImageConstPtr &msg, const ros::WallTime &time); + +private: + void imageCallback(const sensor_msgs::CompressedImageConstPtr &msg); + MultipartStream stream_; + ros::Subscriber image_sub_; + ros::WallTime last_frame; + sensor_msgs::CompressedImageConstPtr last_msg; + boost::mutex send_mutex_; +}; + +class RosCompressedStreamerType : public ImageStreamerType +{ +public: + boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + std::string create_viewer(const async_web_server_cpp::HttpRequest &request); +}; + +} + +#endif diff --git a/include/web_video_server/vp8_streamer.h b/include/web_video_server/vp8_streamer.h index 940ebe6..f1632ae 100644 --- a/include/web_video_server/vp8_streamer.h +++ b/include/web_video_server/vp8_streamer.h @@ -49,7 +49,7 @@ class Vp8Streamer : public LibavStreamer { public: Vp8Streamer(const async_web_server_cpp::HttpRequest& request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it); + ros::NodeHandle& nh); ~Vp8Streamer(); protected: virtual void initializeEncoder(); @@ -63,7 +63,7 @@ class Vp8StreamerType : public LibavStreamerType Vp8StreamerType(); virtual boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest& request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it); + ros::NodeHandle& nh); }; } diff --git a/include/web_video_server/vp9_streamer.h b/include/web_video_server/vp9_streamer.h new file mode 100644 index 0000000..041f466 --- /dev/null +++ b/include/web_video_server/vp9_streamer.h @@ -0,0 +1,33 @@ +#ifndef VP9_STREAMERS_H_ +#define VP9_STREAMERS_H_ + +#include +#include "web_video_server/libav_streamer.h" +#include "async_web_server_cpp/http_request.hpp" +#include "async_web_server_cpp/http_connection.hpp" + +namespace web_video_server +{ + +class Vp9Streamer : public LibavStreamer +{ +public: + Vp9Streamer(const async_web_server_cpp::HttpRequest& request, async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); + ~Vp9Streamer(); +protected: + virtual void initializeEncoder(); +}; + +class Vp9StreamerType : public LibavStreamerType +{ +public: + Vp9StreamerType(); + virtual boost::shared_ptr create_streamer(const async_web_server_cpp::HttpRequest& request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh); +}; + +} + +#endif diff --git a/include/web_video_server/web_video_server.h b/include/web_video_server/web_video_server.h index 7e6ed15..e4e0d73 100644 --- a/include/web_video_server/web_video_server.h +++ b/include/web_video_server/web_video_server.h @@ -2,7 +2,6 @@ #define WEB_VIDEO_SERVER_H_ #include -#include #include #include #include "web_video_server/image_streamer.h" @@ -10,6 +9,8 @@ #include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#define ROS_HAS_STEADYTIMER (ROS_VERSION_MINIMUM(1, 13, 1) || ((ROS_VERSION_MINOR == 12) && (ROS_VERSION_PATCH >= 8))) + namespace web_video_server { @@ -49,12 +50,15 @@ class WebVideoServer async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end); private: - void restreamFrames( double max_age ); + void restreamFrames(double max_age); void cleanup_inactive_streams(); ros::NodeHandle nh_; - image_transport::ImageTransport image_transport_; +#if ROS_HAS_STEADYTIMER || defined USE_STEADY_TIMER + ros::SteadyTimer cleanup_timer_; +#else ros::Timer cleanup_timer_; +#endif int ros_threads_; double publish_rate_; int port_; diff --git a/package.xml b/package.xml index 5c1ca3a..74c4fe5 100644 --- a/package.xml +++ b/package.xml @@ -1,7 +1,7 @@ - + web_video_server - 0.0.2 + 0.2.2 HTTP Streaming of ROS Image Topics in Multiple Formats Russell Toris @@ -21,11 +21,13 @@ image_transport async_web_server_cpp ffmpeg + sensor_msgs - roscpp - roslib - cv_bridge - image_transport - async_web_server_cpp - ffmpeg + roscpp + roslib + cv_bridge + image_transport + async_web_server_cpp + ffmpeg + sensor_msgs diff --git a/src/h264_streamer.cpp b/src/h264_streamer.cpp new file mode 100644 index 0000000..5477369 --- /dev/null +++ b/src/h264_streamer.cpp @@ -0,0 +1,49 @@ +#include "web_video_server/h264_streamer.h" + +namespace web_video_server +{ + +H264Streamer::H264Streamer(const async_web_server_cpp::HttpRequest& request, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + LibavStreamer(request, connection, nh, "mp4", "libx264", "video/mp4") +{ + /* possible quality presets: + * ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow, placebo + * no latency improvements observed with ultrafast instead of medium + */ + preset_ = request.get_query_param_value_or_default("preset", "ultrafast"); +} + +H264Streamer::~H264Streamer() +{ +} + +void H264Streamer::initializeEncoder() +{ + av_opt_set(codec_context_->priv_data, "preset", preset_.c_str(), 0); + av_opt_set(codec_context_->priv_data, "tune", "zerolatency", 0); + av_opt_set_int(codec_context_->priv_data, "crf", 20, 0); + av_opt_set_int(codec_context_->priv_data, "bufsize", 100, 0); + av_opt_set_int(codec_context_->priv_data, "keyint", 30, 0); + av_opt_set_int(codec_context_->priv_data, "g", 1, 0); + + // container format options + if (!strcmp(format_context_->oformat->name, "mp4")) { + // set up mp4 for streaming (instead of seekable file output) + av_dict_set(&opt_, "movflags", "+frag_keyframe+empty_moov+faststart", 0); + } +} + +H264StreamerType::H264StreamerType() : + LibavStreamerType("mp4", "libx264", "video/mp4") +{ +} + +boost::shared_ptr H264StreamerType::create_streamer(const async_web_server_cpp::HttpRequest& request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh) +{ + return boost::shared_ptr(new H264Streamer(request, connection, nh)); +} + +} diff --git a/src/image_streamer.cpp b/src/image_streamer.cpp index 99f33b1..60db5de 100644 --- a/src/image_streamer.cpp +++ b/src/image_streamer.cpp @@ -1,33 +1,56 @@ #include "web_video_server/image_streamer.h" #include #include + namespace web_video_server { ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request, - async_web_server_cpp::HttpConnectionPtr connection, image_transport::ImageTransport it) : - request_(request), connection_(connection), it_(it), inactive_(false), initialized_(false) + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + request_(request), connection_(connection), nh_(nh), inactive_(false) { topic_ = request.get_query_param_value_or_default("topic", ""); +} + +ImageStreamer::~ImageStreamer() +{ +} + +ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + ImageStreamer(request, connection, nh), it_(nh), initialized_(false) +{ output_width_ = request.get_query_param_value_or_default("width", -1); output_height_ = request.get_query_param_value_or_default("height", -1); invert_ = request.has_query_param("invert"); + default_transport_ = request.get_query_param_value_or_default("default_transport", "raw"); } -ImageStreamer::~ImageStreamer() +ImageTransportImageStreamer::~ImageTransportImageStreamer() { } -void ImageStreamer::start() +void ImageTransportImageStreamer::start() { - image_sub_ = it_.subscribe(topic_, 1, &ImageStreamer::imageCallback, this); + image_transport::TransportHints hints(default_transport_); + ros::master::V_TopicInfo available_topics; + ros::master::getTopics(available_topics); + inactive_ = true; + for (size_t it = 0; it("quality", 95); - - async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( - "Server", "web_video_server").header("Cache-Control", - "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header( - "Pragma", "no-cache").header("Content-type", "multipart/x-mixed-replace;boundary=--boundarydonotcross ").header( - "Access-Control-Allow-Origin", "*").write(connection); - connection->write("--boundarydonotcross \r\n"); + stream_.sendInitialHeader(); } MjpegStreamer::~MjpegStreamer() @@ -27,31 +21,24 @@ MjpegStreamer::~MjpegStreamer() void MjpegStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) { std::vector encode_params; +#if CV_VERSION_MAJOR >= 3 + encode_params.push_back(cv::IMWRITE_JPEG_QUALITY); +#else encode_params.push_back(CV_IMWRITE_JPEG_QUALITY); +#endif encode_params.push_back(quality_); std::vector encoded_buffer; cv::imencode(".jpeg", img, encoded_buffer, encode_params); - char stamp[20]; - sprintf(stamp, "%.06lf", time.toSec()); - boost::shared_ptr > headers( - new std::vector()); - headers->push_back(async_web_server_cpp::HttpHeader("Content-type", "image/jpeg")); - headers->push_back(async_web_server_cpp::HttpHeader("X-Timestamp", stamp)); - headers->push_back( - async_web_server_cpp::HttpHeader("Content-Length", boost::lexical_cast(encoded_buffer.size()))); - headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Allow-Origin", "*")); - connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers); - connection_->write_and_clear(encoded_buffer); - connection_->write("\r\n--boundarydonotcross \r\n"); + stream_.sendPartAndClear(time, "image/jpeg", encoded_buffer); } boost::shared_ptr MjpegStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it) + ros::NodeHandle& nh) { - return boost::shared_ptr(new MjpegStreamer(request, connection, it)); + return boost::shared_ptr(new MjpegStreamer(request, connection, nh)); } std::string MjpegStreamerType::create_viewer(const async_web_server_cpp::HttpRequest &request) @@ -65,8 +52,8 @@ std::string MjpegStreamerType::create_viewer(const async_web_server_cpp::HttpReq JpegSnapshotStreamer::JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it) : - ImageStreamer(request, connection, it) + ros::NodeHandle& nh) : + ImageTransportImageStreamer(request, connection, nh) { quality_ = request.get_query_param_value_or_default("quality", 95); } @@ -77,12 +64,14 @@ JpegSnapshotStreamer::~JpegSnapshotStreamer() boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. } - - void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) { std::vector encode_params; +#if CV_VERSION_MAJOR >= 3 + encode_params.push_back(cv::IMWRITE_JPEG_QUALITY); +#else encode_params.push_back(CV_IMWRITE_JPEG_QUALITY); +#endif encode_params.push_back(quality_); std::vector encoded_buffer; @@ -90,13 +79,19 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::WallTime &ti char stamp[20]; sprintf(stamp, "%.06lf", time.toSec()); - async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( - "Server", "web_video_server").header("Cache-Control", - "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header( - "X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/jpeg").header( - "Access-Control-Allow-Origin", "*").header("Content-Length", - boost::lexical_cast(encoded_buffer.size())).write( - connection_); + async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) + .header("Connection", "close") + .header("Server", "web_video_server") + .header("Cache-Control", + "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, " + "max-age=0") + .header("X-Timestamp", stamp) + .header("Pragma", "no-cache") + .header("Content-type", "image/jpeg") + .header("Access-Control-Allow-Origin", "*") + .header("Content-Length", + boost::lexical_cast(encoded_buffer.size())) + .write(connection_); connection_->write_and_clear(encoded_buffer); inactive_ = true; } diff --git a/src/libav_streamer.cpp b/src/libav_streamer.cpp index 959c008..0a9dc8d 100644 --- a/src/libav_streamer.cpp +++ b/src/libav_streamer.cpp @@ -1,6 +1,10 @@ #include "web_video_server/libav_streamer.h" #include "async_web_server_cpp/http_reply.hpp" +/*https://stackoverflow.com/questions/46884682/error-in-building-opencv-with-ffmpeg*/ +#define AV_CODEC_FLAG_GLOBAL_HEADER (1 << 22) +#define CODEC_FLAG_GLOBAL_HEADER AV_CODEC_FLAG_GLOBAL_HEADER + namespace web_video_server { @@ -45,25 +49,26 @@ static int ffmpeg_boost_mutex_lock_manager(void **mutex, enum AVLockOp op) } LibavStreamer::LibavStreamer(const async_web_server_cpp::HttpRequest &request, - async_web_server_cpp::HttpConnectionPtr connection, image_transport::ImageTransport it, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh, const std::string &format_name, const std::string &codec_name, const std::string &content_type) : - ImageStreamer(request, connection, it), output_format_(0), format_context_(0), codec_(0), codec_context_(0), video_stream_( - 0), frame_(0), picture_(0), tmp_picture_(0), sws_context_(0), first_image_timestamp_(0), format_name_( - format_name), codec_name_(codec_name), content_type_(content_type) + ImageTransportImageStreamer(request, connection, nh), output_format_(0), format_context_(0), codec_(0), codec_context_(0), video_stream_( + 0), frame_(0), sws_context_(0), first_image_timestamp_(0), format_name_( + format_name), codec_name_(codec_name), content_type_(content_type), opt_(0), io_buffer_(0) { bitrate_ = request.get_query_param_value_or_default("bitrate", 100000); - qmin_ = request.get_query_param_value_or_default("qmin", 1); - qmax_ = request.get_query_param_value_or_default("qmax", 40); - gop_ = request.get_query_param_value_or_default("gop", 64); - + qmin_ = request.get_query_param_value_or_default("qmin", 10); + qmax_ = request.get_query_param_value_or_default("qmax", 42); + gop_ = request.get_query_param_value_or_default("gop", 25); } void LibavStreamer::SetupAVLibrary() { +#if ( LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58,9,100) ) av_lockmgr_register(&ffmpeg_boost_mutex_lock_manager); av_register_all(); +#endif } LibavStreamer::~LibavStreamer() @@ -71,33 +76,43 @@ LibavStreamer::~LibavStreamer() this->inactive_ = true; boost::mutex::scoped_lock lock(send_mutex_); // protects all structures below when send is still in progress. if (codec_context_) + { + #if ( LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58,9,100) ) avcodec_close(codec_context_); + #else + avcodec_free_context(&codec_context_); + #endif + } if (frame_) { -#if (LIBAVCODEC_VERSION_MAJOR < 54) +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(55,28,1) av_free(frame_); frame_ = NULL; #else - avcodec_free_frame(&frame_); + av_frame_free(&frame_); #endif } - if (format_context_) + if (io_buffer_) + delete io_buffer_; + if (format_context_) { + if (format_context_->pb) + av_free(format_context_->pb); avformat_free_context(format_context_); - if (picture_) - { - avpicture_free(picture_); - delete picture_; - picture_ = NULL; - } - if (tmp_picture_) - { - delete tmp_picture_; - tmp_picture_ = NULL; } if (sws_context_) sws_freeContext(sws_context_); } +// output callback for ffmpeg IO context +static int dispatch_output_packet(void* opaque, uint8_t* buffer, int buffer_size) +{ + async_web_server_cpp::HttpConnectionPtr connection = *((async_web_server_cpp::HttpConnectionPtr*) opaque); + std::vector encoded_frame; + encoded_frame.assign(buffer, buffer + buffer_size); + connection->write_and_clear(encoded_frame); + return 0; // TODO: can this fail? +} + void LibavStreamer::initialize(const cv::Mat &img) { // Load format @@ -119,6 +134,23 @@ void LibavStreamer::initialize(const cv::Mat &img) } format_context_->oformat = output_format_; + // Set up custom IO callback. + size_t io_buffer_size = 3 * 1024; // 3M seen elsewhere and adjudged good + io_buffer_ = new unsigned char[io_buffer_size]; + AVIOContext* io_ctx = avio_alloc_context(io_buffer_, io_buffer_size, AVIO_FLAG_WRITE, &connection_, NULL, dispatch_output_packet, NULL); + if (!io_ctx) + { + async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::internal_server_error)(request_, + connection_, + NULL, NULL); + throw std::runtime_error("Error setting up IO context"); + } + io_ctx->seekable = 0; // no seeking, it's a stream + format_context_->pb = io_ctx; + format_context_->max_interleave_delta = 0; + output_format_->flags |= AVFMT_FLAG_CUSTOM_IO; + output_format_->flags |= AVFMT_NOFILE; + // Load codec if (codec_name_.empty()) // use default codec if none specified codec_ = avcodec_find_encoder(output_format_->video_codec); @@ -139,13 +171,17 @@ void LibavStreamer::initialize(const cv::Mat &img) NULL, NULL); throw std::runtime_error("Error creating video stream"); } + #if ( LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58,9,100) ) codec_context_ = video_stream_->codec; + #else + codec_context_ = avcodec_alloc_context3(codec_); + #endif // Set options // calls av_opt_set_defaults internally: avcodec_get_context_defaults3(codec_context_, codec_); - codec_context_->codec_id = output_format_->video_codec; + codec_context_->codec_id = codec_->id; codec_context_->bit_rate = bitrate_; codec_context_->width = output_width_; @@ -158,18 +194,21 @@ void LibavStreamer::initialize(const cv::Mat &img) codec_context_->time_base.num = 1; codec_context_->time_base.den = 1; codec_context_->gop_size = gop_; - codec_context_->pix_fmt = PIX_FMT_YUV420P; + codec_context_->pix_fmt = AV_PIX_FMT_YUV420P; codec_context_->max_b_frames = 1; // Quality settings codec_context_->qmin = qmin_; codec_context_->qmax = qmax_; + #if ( LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(58,9,100) ) + avcodec_parameters_from_context(video_stream_->codecpar, codec_context_); + #endif + initializeEncoder(); - // Some formats want stream headers to be separate - if (format_context_->oformat->flags & AVFMT_GLOBALHEADER) - codec_context_->flags |= CODEC_FLAG_GLOBAL_HEADER; + codec_context_->flags |= AV_CODEC_FLAG_LOW_DELAY; + codec_context_->max_b_frames = 0; // Open Codec if (avcodec_open2(codec_context_, codec_, NULL) < 0) @@ -181,19 +220,17 @@ void LibavStreamer::initialize(const cv::Mat &img) } // Allocate frame buffers +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(55,28,1) frame_ = avcodec_alloc_frame(); - tmp_picture_ = new AVPicture; - picture_ = new AVPicture; - int ret = avpicture_alloc(picture_, codec_context_->pix_fmt, output_width_, output_height_); - if (ret < 0) - { - async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::internal_server_error)(request_, - connection_, - NULL, NULL); - throw std::runtime_error("Could not allocate picture frame"); - } - *((AVPicture *)frame_) = *picture_; +#else + frame_ = av_frame_alloc(); +#endif + av_image_alloc(frame_->data, frame_->linesize, output_width_, output_height_, + codec_context_->pix_fmt, 1); + frame_->width = output_width_; + frame_->height = output_height_; + frame_->format = codec_context_->pix_fmt; output_format_->flags |= AVFMT_NOFILE; // Generate header @@ -204,24 +241,6 @@ void LibavStreamer::initialize(const cv::Mat &img) av_dict_set(&format_context_->metadata, "author", "ROS web_video_server", 0); av_dict_set(&format_context_->metadata, "title", topic_.c_str(), 0); - if (avio_open_dyn_buf(&format_context_->pb) >= 0) - { - if (avformat_write_header(format_context_, NULL) < 0) - { - async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::internal_server_error)(request_, - connection_, - NULL, NULL); - throw std::runtime_error("Error openning dynamic buffer"); - } - header_size = avio_close_dyn_buf(format_context_->pb, &header_raw_buffer); - - // copy header buffer to vector - header_buffer.resize(header_size); - memcpy(&header_buffer[0], header_raw_buffer, header_size); - - av_free(header_raw_buffer); - } - // Send response headers async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( "Server", "web_video_server").header("Cache-Control", @@ -230,7 +249,13 @@ void LibavStreamer::initialize(const cv::Mat &img) "Content-type", content_type_).header("Access-Control-Allow-Origin", "*").write(connection_); // Send video stream header - connection_->write_and_clear(header_buffer); + if (avformat_write_header(format_context_, &opt_) < 0) + { + async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::internal_server_error)(request_, + connection_, + NULL, NULL); + throw std::runtime_error("Error openning dynamic buffer"); + } } void LibavStreamer::initializeEncoder() @@ -245,12 +270,22 @@ void LibavStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) first_image_timestamp_ = time; } std::vector encoded_frame; -#if (LIBAVUTIL_VERSION_MAJOR < 52) +#if (LIBAVUTIL_VERSION_MAJOR < 53) PixelFormat input_coding_format = PIX_FMT_BGR24; #else - AVPixelFormat input_coding_format = PIX_FMT_BGR24; + AVPixelFormat input_coding_format = AV_PIX_FMT_BGR24; +#endif + +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(55,28,1) + AVPicture *raw_frame = new AVPicture; + avpicture_fill(raw_frame, img.data, input_coding_format, output_width_, output_height_); +#else + AVFrame *raw_frame = av_frame_alloc(); + av_image_fill_arrays(raw_frame->data, raw_frame->linesize, + img.data, input_coding_format, output_width_, output_height_, 1); #endif - avpicture_fill(tmp_picture_, img.data, input_coding_format, output_width_, output_height_); + + // Convert from opencv to libav if (!sws_context_) @@ -264,8 +299,16 @@ void LibavStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) } } - int ret = sws_scale(sws_context_, (const uint8_t * const *)tmp_picture_->data, tmp_picture_->linesize, 0, - output_height_, picture_->data, picture_->linesize); + + int ret = sws_scale(sws_context_, + (const uint8_t * const *)raw_frame->data, raw_frame->linesize, 0, + output_height_, frame_->data, frame_->linesize); + +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(55,28,1) + delete raw_frame; +#else + av_frame_free(&raw_frame); +#endif // Encode the frame AVPacket pkt; @@ -277,13 +320,38 @@ void LibavStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) pkt.data = (uint8_t*)av_malloc(buf_size); pkt.size = avcodec_encode_video(codec_context_, pkt.data, buf_size, frame_); got_packet = pkt.size > 0; -#else +#elif (LIBAVCODEC_VERSION_MAJOR < 57) pkt.data = NULL; // packet data will be allocated by the encoder pkt.size = 0; if (avcodec_encode_video2(codec_context_, &pkt, frame_, &got_packet) < 0) + { + throw std::runtime_error("Error encoding video frame"); + } +#else + ret = avcodec_send_frame(codec_context_, frame_); + if (ret == AVERROR_EOF) + { + ROS_DEBUG_STREAM("avcodec_send_frame() encoder flushed"); + } + else if (ret == AVERROR(EAGAIN)) + { + ROS_DEBUG_STREAM("avcodec_send_frame() need output read out"); + } + if (ret < 0) { throw std::runtime_error("Error encoding video frame"); } + ret = avcodec_receive_packet(codec_context_, &pkt); + got_packet = pkt.size > 0; + if (ret == AVERROR_EOF) + { + ROS_DEBUG_STREAM("avcodec_recieve_packet() encoder flushed"); + } + else if (ret == AVERROR(EAGAIN)) + { + ROS_DEBUG_STREAM("avcodec_recieve_packet() need more input"); + got_packet = 0; + } #endif if (got_packet) @@ -298,34 +366,29 @@ void LibavStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) pkt.pts = 1; pkt.dts = AV_NOPTS_VALUE; - if (codec_context_->coded_frame->key_frame) + if (pkt.flags&AV_PKT_FLAG_KEY) pkt.flags |= AV_PKT_FLAG_KEY; pkt.stream_index = video_stream_->index; - if (avio_open_dyn_buf(&format_context_->pb) >= 0) + if (av_write_frame(format_context_, &pkt)) { - if (av_write_frame(format_context_, &pkt)) - { - throw std::runtime_error("Error when writing frame"); - } - size = avio_close_dyn_buf(format_context_->pb, &output_buf); - - encoded_frame.resize(size); - memcpy(&encoded_frame[0], output_buf, size); - - av_free(output_buf); + throw std::runtime_error("Error when writing frame"); } } else { encoded_frame.clear(); } -#if (LIBAVCODEC_VERSION_MAJOR < 54) +#if LIBAVCODEC_VERSION_INT < 54 av_free(pkt.data); #endif +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(55,28,1) av_free_packet(&pkt); +#else + av_packet_unref(&pkt); +#endif connection_->write_and_clear(encoded_frame); } @@ -338,10 +401,10 @@ LibavStreamerType::LibavStreamerType(const std::string &format_name, const std:: boost::shared_ptr LibavStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it) + ros::NodeHandle& nh) { return boost::shared_ptr( - new LibavStreamer(request, connection, it, format_name_, codec_name_, content_type_)); + new LibavStreamer(request, connection, nh, format_name_, codec_name_, content_type_)); } std::string LibavStreamerType::create_viewer(const async_web_server_cpp::HttpRequest &request) diff --git a/src/multipart_stream.cpp b/src/multipart_stream.cpp new file mode 100644 index 0000000..9cd3493 --- /dev/null +++ b/src/multipart_stream.cpp @@ -0,0 +1,90 @@ +#include "web_video_server/multipart_stream.h" +#include "async_web_server_cpp/http_reply.hpp" + +namespace web_video_server +{ + +MultipartStream::MultipartStream( + async_web_server_cpp::HttpConnectionPtr& connection, + const std::string& boundry, + std::size_t max_queue_size) + : connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size) +{} + +void MultipartStream::sendInitialHeader() { + async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( + "Server", "web_video_server").header("Cache-Control", + "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header( + "Pragma", "no-cache").header("Content-type", "multipart/x-mixed-replace;boundary="+boundry_).header( + "Access-Control-Allow-Origin", "*").header("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,HEAD,OPTIONS").header( + "Access-Control-Allow-Headers", "Origin, Authorization, Accept, Content-Type").header("Access-Control-Max-Age", "3600").write(connection_); + connection_->write("--"+boundry_+"\r\n"); +} + +void MultipartStream::sendPartHeader(const ros::WallTime &time, const std::string& type, size_t payload_size) { + char stamp[20]; + sprintf(stamp, "%.06lf", time.toSec()); + boost::shared_ptr > headers( + new std::vector()); + headers->push_back(async_web_server_cpp::HttpHeader("Content-type", type)); + headers->push_back(async_web_server_cpp::HttpHeader("X-Timestamp", stamp)); + headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Allow-Origin", "*")); + headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,HEAD,OPTIONS")); + headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Allow-Headers", "Origin, Authorization, Accept, Content-Type")); + headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Max-Age", "3600")); + headers->push_back( + async_web_server_cpp::HttpHeader("Content-Length", boost::lexical_cast(payload_size))); + // copied from 3d3bec1d90f310b9c82d6acc3ed5d681ec452078: jpeg: also allow cross-origin in next image frames. + headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Allow-Origin", "*")); + connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers); +} + +void MultipartStream::sendPartFooter(const ros::WallTime &time) { + boost::shared_ptr str(new std::string("\r\n--"+boundry_+"\r\n")); + PendingFooter pf; + pf.timestamp = time; + pf.contents = str; + connection_->write(boost::asio::buffer(*str), str); + if (max_queue_size_ > 0) pending_footers_.push(pf); +} + +void MultipartStream::sendPartAndClear(const ros::WallTime &time, const std::string& type, + std::vector &data) { + if (!isBusy()) + { + sendPartHeader(time, type, data.size()); + connection_->write_and_clear(data); + sendPartFooter(time); + } +} + +void MultipartStream::sendPart(const ros::WallTime &time, const std::string& type, + const boost::asio::const_buffer &buffer, + async_web_server_cpp::HttpConnection::ResourcePtr resource) { + if (!isBusy()) + { + sendPartHeader(time, type, boost::asio::buffer_size(buffer)); + connection_->write(buffer, resource); + sendPartFooter(time); + } +} + +bool MultipartStream::isBusy() { + const ros::WallTime currentTime = ros::WallTime::now(); + while (!pending_footers_.empty()) + { + if (pending_footers_.front().contents.expired()) { + pending_footers_.pop(); + } else { + ros::WallTime footerTime = pending_footers_.front().timestamp; + if ((currentTime - footerTime).toSec() > 0.5) { + pending_footers_.pop(); + } else { + break; + } + } + } + return !(max_queue_size_ == 0 || pending_footers_.size() < max_queue_size_); +} + +} diff --git a/src/png_streamers.cpp b/src/png_streamers.cpp new file mode 100644 index 0000000..fda27b8 --- /dev/null +++ b/src/png_streamers.cpp @@ -0,0 +1,99 @@ +#include "web_video_server/png_streamers.h" +#include "async_web_server_cpp/http_reply.hpp" + +namespace web_video_server +{ + +PngStreamer::PngStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + ImageTransportImageStreamer(request, connection, nh), stream_(connection) +{ + quality_ = request.get_query_param_value_or_default("quality", 3); + stream_.sendInitialHeader(); +} + +PngStreamer::~PngStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + +void PngStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) +{ + std::vector encode_params; +#if CV_VERSION_MAJOR >= 3 + encode_params.push_back(cv::IMWRITE_PNG_COMPRESSION); +#else + encode_params.push_back(CV_IMWRITE_PNG_COMPRESSION); +#endif + encode_params.push_back(quality_); + + std::vector encoded_buffer; + cv::imencode(".png", img, encoded_buffer, encode_params); + + stream_.sendPartAndClear(time, "image/png", encoded_buffer); +} + +boost::shared_ptr PngStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh) +{ + return boost::shared_ptr(new PngStreamer(request, connection, nh)); +} + +std::string PngStreamerType::create_viewer(const async_web_server_cpp::HttpRequest &request) +{ + std::stringstream ss; + ss << ""; + return ss.str(); +} + +PngSnapshotStreamer::PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh) : + ImageTransportImageStreamer(request, connection, nh) +{ + quality_ = request.get_query_param_value_or_default("quality", 3); +} + +PngSnapshotStreamer::~PngSnapshotStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + +void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time) +{ + std::vector encode_params; +#if CV_VERSION_MAJOR >= 3 + encode_params.push_back(cv::IMWRITE_PNG_COMPRESSION); +#else + encode_params.push_back(CV_IMWRITE_PNG_COMPRESSION); +#endif + encode_params.push_back(quality_); + + std::vector encoded_buffer; + cv::imencode(".png", img, encoded_buffer, encode_params); + + char stamp[20]; + sprintf(stamp, "%.06lf", time.toSec()); + async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) + .header("Connection", "close") + .header("Server", "web_video_server") + .header("Cache-Control", + "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, " + "max-age=0") + .header("X-Timestamp", stamp) + .header("Pragma", "no-cache") + .header("Content-type", "image/png") + .header("Access-Control-Allow-Origin", "*") + .header("Content-Length", + boost::lexical_cast(encoded_buffer.size())) + .write(connection_); + connection_->write_and_clear(encoded_buffer); + inactive_ = true; +} + +} diff --git a/src/ros_compressed_streamer.cpp b/src/ros_compressed_streamer.cpp new file mode 100644 index 0000000..fdabe47 --- /dev/null +++ b/src/ros_compressed_streamer.cpp @@ -0,0 +1,99 @@ +#include "web_video_server/ros_compressed_streamer.h" + +namespace web_video_server +{ + +RosCompressedStreamer::RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + ImageStreamer(request, connection, nh), stream_(connection) +{ + stream_.sendInitialHeader(); +} + +RosCompressedStreamer::~RosCompressedStreamer() +{ + this->inactive_ = true; + boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage. +} + +void RosCompressedStreamer::start() { + std::string compressed_topic = topic_ + "/compressed"; + image_sub_ = nh_.subscribe(compressed_topic, 1, &RosCompressedStreamer::imageCallback, this); +} + +void RosCompressedStreamer::restreamFrame(double max_age) +{ + if (inactive_ || (last_msg == 0)) + return; + + if ( last_frame + ros::WallDuration(max_age) < ros::WallTime::now() ) { + boost::mutex::scoped_lock lock(send_mutex_); + sendImage(last_msg, ros::WallTime::now() ); // don't update last_frame, it may remain an old value. + } +} + +void RosCompressedStreamer::sendImage(const sensor_msgs::CompressedImageConstPtr &msg, + const ros::WallTime &time) { + try { + std::string content_type; + if(msg->format.find("jpeg") != std::string::npos) { + content_type = "image/jpeg"; + } + else if(msg->format.find("png") != std::string::npos) { + content_type = "image/png"; + } + else { + ROS_WARN_STREAM("Unknown ROS compressed image format: " << msg->format); + return; + } + + stream_.sendPart(time, content_type, boost::asio::buffer(msg->data), msg); + } + catch (boost::system::system_error &e) + { + // happens when client disconnects + ROS_DEBUG("system_error exception: %s", e.what()); + inactive_ = true; + return; + } + catch (std::exception &e) + { + ROS_ERROR_THROTTLE(30, "exception: %s", e.what()); + inactive_ = true; + return; + } + catch (...) + { + ROS_ERROR_THROTTLE(30, "exception"); + inactive_ = true; + return; + } +} + + +void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageConstPtr &msg) { + boost::mutex::scoped_lock lock(send_mutex_); // protects last_msg and last_frame + last_msg = msg; + last_frame = ros::WallTime(msg->header.stamp.sec, msg->header.stamp.nsec); + sendImage(last_msg, last_frame); +} + + +boost::shared_ptr RosCompressedStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh) +{ + return boost::shared_ptr(new RosCompressedStreamer(request, connection, nh)); +} + +std::string RosCompressedStreamerType::create_viewer(const async_web_server_cpp::HttpRequest &request) +{ + std::stringstream ss; + ss << ""; + return ss.str(); +} + + +} diff --git a/src/vp8_streamer.cpp b/src/vp8_streamer.cpp index b8f3bc4..067f5ae 100644 --- a/src/vp8_streamer.cpp +++ b/src/vp8_streamer.cpp @@ -40,8 +40,8 @@ namespace web_video_server { Vp8Streamer::Vp8Streamer(const async_web_server_cpp::HttpRequest& request, - async_web_server_cpp::HttpConnectionPtr connection, image_transport::ImageTransport it) : - LibavStreamer(request, connection, it, "webm", "libvpx", "video/webm") + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + LibavStreamer(request, connection, nh, "webm", "libvpx", "video/webm") { quality_ = request.get_query_param_value_or_default("quality", "realtime"); } @@ -71,8 +71,7 @@ void Vp8Streamer::initializeEncoder() av_opt_set_int(codec_context_->priv_data, "bufsize", bufsize, 0); av_opt_set_int(codec_context_->priv_data, "buf-initial", bufsize, 0); av_opt_set_int(codec_context_->priv_data, "buf-optimal", bufsize, 0); - codec_context_->rc_buffer_aggressivity = 0.5; - codec_context_->frame_skip_threshold = 2; + av_opt_set_int(codec_context_->priv_data, "skip_threshold", 2, 0); #endif } @@ -83,9 +82,9 @@ Vp8StreamerType::Vp8StreamerType() : boost::shared_ptr Vp8StreamerType::create_streamer(const async_web_server_cpp::HttpRequest& request, async_web_server_cpp::HttpConnectionPtr connection, - image_transport::ImageTransport it) + ros::NodeHandle& nh) { - return boost::shared_ptr(new Vp8Streamer(request, connection, it)); + return boost::shared_ptr(new Vp8Streamer(request, connection, nh)); } } diff --git a/src/vp9_streamer.cpp b/src/vp9_streamer.cpp new file mode 100644 index 0000000..b487092 --- /dev/null +++ b/src/vp9_streamer.cpp @@ -0,0 +1,38 @@ +#include "web_video_server/vp9_streamer.h" + +namespace web_video_server +{ + +Vp9Streamer::Vp9Streamer(const async_web_server_cpp::HttpRequest& request, + async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) : + LibavStreamer(request, connection, nh, "webm", "libvpx-vp9", "video/webm") +{ +} +Vp9Streamer::~Vp9Streamer() +{ +} + +void Vp9Streamer::initializeEncoder() +{ + + // codec options set up to provide somehow reasonable performance in cost of poor quality + // should be updated as soon as VP9 encoding matures + av_opt_set_int(codec_context_->priv_data, "pass", 1, 0); + av_opt_set_int(codec_context_->priv_data, "speed", 8, 0); + av_opt_set_int(codec_context_->priv_data, "cpu-used", 4, 0); // 8 is max + av_opt_set_int(codec_context_->priv_data, "crf", 20, 0); // 0..63 (higher is lower quality) +} + +Vp9StreamerType::Vp9StreamerType() : + LibavStreamerType("webm", "libvpx-vp9", "video/webm") +{ +} + +boost::shared_ptr Vp9StreamerType::create_streamer(const async_web_server_cpp::HttpRequest& request, + async_web_server_cpp::HttpConnectionPtr connection, + ros::NodeHandle& nh) +{ + return boost::shared_ptr(new Vp9Streamer(request, connection, nh)); +} + +} diff --git a/src/web_video_server.cpp b/src/web_video_server.cpp index 200a55c..8bd010e 100644 --- a/src/web_video_server.cpp +++ b/src/web_video_server.cpp @@ -6,19 +6,30 @@ #include #include "web_video_server/web_video_server.h" +#include "web_video_server/ros_compressed_streamer.h" #include "web_video_server/jpeg_streamers.h" +#include "web_video_server/png_streamers.h" #include "web_video_server/vp8_streamer.h" +#include "web_video_server/h264_streamer.h" +#include "web_video_server/vp9_streamer.h" #include "async_web_server_cpp/http_reply.hpp" namespace web_video_server { +static bool __verbose; + +static std::string __default_stream_type; + static bool ros_connection_logger(async_web_server_cpp::HttpServerRequestHandler forward, const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end) { - ROS_INFO_STREAM("Handling Request: " << request.uri); + if (__verbose) + { + ROS_INFO_STREAM("Handling Request: " << request.uri); + } try { forward(request, connection, begin, end); @@ -33,12 +44,17 @@ static bool ros_connection_logger(async_web_server_cpp::HttpServerRequestHandler } WebVideoServer::WebVideoServer(ros::NodeHandle &nh, ros::NodeHandle &private_nh) : - nh_(nh), image_transport_(nh), handler_group_( + nh_(nh), handler_group_( async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)) { +#if ROS_HAS_STEADYTIMER || defined USE_STEADY_TIMER + cleanup_timer_ = nh.createSteadyTimer(ros::WallDuration(0.5), boost::bind(&WebVideoServer::cleanup_inactive_streams, this)); +#else cleanup_timer_ = nh.createTimer(ros::Duration(0.5), boost::bind(&WebVideoServer::cleanup_inactive_streams, this)); +#endif private_nh.param("port", port_, 8080); + private_nh.param("verbose", __verbose, true); private_nh.param("address", address_, "0.0.0.0"); @@ -48,10 +64,16 @@ WebVideoServer::WebVideoServer(ros::NodeHandle &nh, ros::NodeHandle &private_nh) private_nh.param("ros_threads", ros_threads_, 2); private_nh.param("publish_rate", publish_rate_, -1.0); + private_nh.param("default_stream_type", __default_stream_type, "mjpeg"); + // required single-threaded, once-only init: LibavStreamer::SetupAVLibrary(); stream_types_["mjpeg"] = boost::shared_ptr(new MjpegStreamerType()); + stream_types_["png"] = boost::shared_ptr(new PngStreamerType()); + stream_types_["ros_compressed"] = boost::shared_ptr(new RosCompressedStreamerType()); stream_types_["vp8"] = boost::shared_ptr(new Vp8StreamerType()); + stream_types_["h264"] = boost::shared_ptr(new H264StreamerType()); + stream_types_["vp9"] = boost::shared_ptr(new Vp9StreamerType()); handler_group_.addHandlerForPath("/", boost::bind(&WebVideoServer::handle_list_streams, this, _1, _2, _3, _4)); handler_group_.addHandlerForPath("/stream", boost::bind(&WebVideoServer::handle_stream, this, _1, _2, _3, _4)); @@ -59,10 +81,18 @@ WebVideoServer::WebVideoServer(ros::NodeHandle &nh, ros::NodeHandle &private_nh) boost::bind(&WebVideoServer::handle_stream_viewer, this, _1, _2, _3, _4)); handler_group_.addHandlerForPath("/snapshot", boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4)); - server_.reset( - new async_web_server_cpp::HttpServer(address_, boost::lexical_cast(port_), - boost::bind(ros_connection_logger, handler_group_, _1, _2, _3, _4), - server_threads)); + try + { + server_.reset( + new async_web_server_cpp::HttpServer(address_, boost::lexical_cast(port_), + boost::bind(ros_connection_logger, handler_group_, _1, _2, _3, _4), + server_threads)); + } + catch(boost::exception& e) + { + ROS_ERROR("Exception when creating the web server! %s:%d", address_.c_str(), port_); + throw; + } } WebVideoServer::~WebVideoServer() @@ -88,7 +118,7 @@ void WebVideoServer::spin() } else { ros::waitForShutdown(); } - + server_->stop(); } @@ -104,25 +134,22 @@ void WebVideoServer::restreamFrames( double max_age ) } } - void WebVideoServer::cleanup_inactive_streams() { - boost::mutex::scoped_lock lock(subscriber_mutex_); + boost::mutex::scoped_lock lock(subscriber_mutex_, boost::try_to_lock); if (lock) { typedef std::vector >::iterator itr_type; - itr_type itr = image_subscribers_.begin(); - while ( itr != image_subscribers_.end() ) + itr_type new_end = std::partition(image_subscribers_.begin(), image_subscribers_.end(), + !boost::bind(&ImageStreamer::isInactive, _1)); + if (__verbose) { - if ( (*itr)->isInactive() ) { - ROS_INFO_STREAM("Removing Stream: " << (*itr)->getTopic() << " (streams left: "<< (image_subscribers_.end() - itr ) -1 << ")"); - image_subscribers_.erase( itr ); - itr = image_subscribers_.begin(); - } - else { - ++itr; + for (itr_type itr = new_end; itr < image_subscribers_.end(); ++itr) + { + ROS_INFO_STREAM("Removed Stream: " << (*itr)->getTopic()); } } + image_subscribers_.erase(new_end, image_subscribers_.end()); } } @@ -130,11 +157,32 @@ bool WebVideoServer::handle_stream(const async_web_server_cpp::HttpRequest &requ async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end) { - std::string type = request.get_query_param_value_or_default("type", "mjpeg"); + std::string type = request.get_query_param_value_or_default("type", __default_stream_type); if (stream_types_.find(type) != stream_types_.end()) { - boost::shared_ptr streamer = stream_types_[type]->create_streamer(request, connection, - image_transport_); + std::string topic = request.get_query_param_value_or_default("topic", ""); + // Fallback for topics without corresponding compressed topics + if (type == std::string("ros_compressed")) + { + std::string compressed_topic_name = topic + "/compressed"; + ros::master::V_TopicInfo topics; + ros::master::getTopics(topics); + bool did_find_compressed_topic = false; + for(ros::master::V_TopicInfo::iterator it = topics.begin(); it != topics.end(); ++it) + { + if (it->name == compressed_topic_name) + { + did_find_compressed_topic = true; + break; + } + } + if (!did_find_compressed_topic) + { + ROS_WARN_STREAM("Could not find compressed image topic for " << topic << ", falling back to mjpeg"); + type = "mjpeg"; + } + } + boost::shared_ptr streamer = stream_types_[type]->create_streamer(request, connection, nh_); streamer->start(); boost::mutex::scoped_lock lock(subscriber_mutex_); image_subscribers_.push_back(streamer); @@ -151,7 +199,7 @@ bool WebVideoServer::handle_snapshot(const async_web_server_cpp::HttpRequest &re async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end) { - boost::shared_ptr streamer(new JpegSnapshotStreamer(request, connection, image_transport_)); + boost::shared_ptr streamer(new JpegSnapshotStreamer(request, connection, nh_)); streamer->start(); boost::mutex::scoped_lock lock(subscriber_mutex_); @@ -163,10 +211,32 @@ bool WebVideoServer::handle_stream_viewer(const async_web_server_cpp::HttpReques async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end) { - std::string type = request.get_query_param_value_or_default("type", "mjpeg"); + std::string type = request.get_query_param_value_or_default("type", __default_stream_type); if (stream_types_.find(type) != stream_types_.end()) { std::string topic = request.get_query_param_value_or_default("topic", ""); + // Fallback for topics without corresponding compressed topics + if (type == std::string("ros_compressed")) + { + + std::string compressed_topic_name = topic + "/compressed"; + ros::master::V_TopicInfo topics; + ros::master::getTopics(topics); + bool did_find_compressed_topic = false; + for(ros::master::V_TopicInfo::iterator it = topics.begin(); it != topics.end(); ++it) + { + if (it->name == compressed_topic_name) + { + did_find_compressed_topic = true; + break; + } + } + if (!did_find_compressed_topic) + { + ROS_WARN_STREAM("Could not find compressed image topic for " << topic << ", falling back to mjpeg"); + type = "mjpeg"; + } + } async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header( "Server", "web_video_server").header("Content-type", "text/html;").write(connection); @@ -254,6 +324,23 @@ bool WebVideoServer::handle_list_streams(const async_web_server_cpp::HttpRequest } connection->write(""); } + connection->write(""); + // Add the rest of the image topics that don't have camera_info. + connection->write(""); return true; }