close.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/websocket/detail/mask.hpp>
  13. #include <boost/beast/websocket/impl/stream_impl.hpp>
  14. #include <boost/beast/core/async_base.hpp>
  15. #include <boost/beast/core/flat_static_buffer.hpp>
  16. #include <boost/beast/core/stream_traits.hpp>
  17. #include <boost/beast/core/detail/bind_continuation.hpp>
  18. #include <boost/asio/coroutine.hpp>
  19. #include <boost/asio/post.hpp>
  20. #include <boost/throw_exception.hpp>
  21. #include <memory>
  22. namespace boost {
  23. namespace beast {
  24. namespace websocket {
  25. /* Close the WebSocket Connection
  26. This composed operation sends the close frame if it hasn't already
  27. been sent, then reads and discards frames until receiving a close
  28. frame. Finally it invokes the teardown operation to shut down the
  29. underlying connection.
  30. */
  31. template<class NextLayer, bool deflateSupported>
  32. template<class Handler>
  33. class stream<NextLayer, deflateSupported>::close_op
  34. : public beast::stable_async_base<
  35. Handler, beast::executor_type<stream>>
  36. , public asio::coroutine
  37. {
  38. boost::weak_ptr<impl_type> wp_;
  39. error_code ev_;
  40. detail::frame_buffer& fb_;
  41. public:
  42. static constexpr int id = 5; // for soft_mutex
  43. template<class Handler_>
  44. close_op(
  45. Handler_&& h,
  46. boost::shared_ptr<impl_type> const& sp,
  47. close_reason const& cr)
  48. : stable_async_base<Handler,
  49. beast::executor_type<stream>>(
  50. std::forward<Handler_>(h),
  51. sp->stream().get_executor())
  52. , wp_(sp)
  53. , fb_(beast::allocate_stable<
  54. detail::frame_buffer>(*this))
  55. {
  56. // Serialize the close frame
  57. sp->template write_close<
  58. flat_static_buffer_base>(fb_, cr);
  59. (*this)({}, 0, false);
  60. }
  61. void
  62. operator()(
  63. error_code ec = {},
  64. std::size_t bytes_transferred = 0,
  65. bool cont = true)
  66. {
  67. using beast::detail::clamp;
  68. auto sp = wp_.lock();
  69. if(! sp)
  70. {
  71. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  72. return this->complete(cont, ec);
  73. }
  74. auto& impl = *sp;
  75. BOOST_ASIO_CORO_REENTER(*this)
  76. {
  77. // Acquire the write lock
  78. if(! impl.wr_block.try_lock(this))
  79. {
  80. BOOST_ASIO_CORO_YIELD
  81. {
  82. BOOST_ASIO_HANDLER_LOCATION((
  83. __FILE__, __LINE__,
  84. "websocket::async_close"));
  85. this->set_allowed_cancellation(net::cancellation_type::all);
  86. impl.op_close.emplace(std::move(*this),
  87. net::cancellation_type::all);
  88. }
  89. // cancel fired before we could do anything.
  90. if (ec == net::error::operation_aborted)
  91. return this->complete(cont, ec);
  92. this->set_allowed_cancellation(net::cancellation_type::terminal);
  93. impl.wr_block.lock(this);
  94. BOOST_ASIO_CORO_YIELD
  95. {
  96. BOOST_ASIO_HANDLER_LOCATION((
  97. __FILE__, __LINE__,
  98. "websocket::async_close"));
  99. net::post(sp->stream().get_executor(), std::move(*this));
  100. }
  101. BOOST_ASSERT(impl.wr_block.is_locked(this));
  102. }
  103. if(impl.check_stop_now(ec))
  104. goto upcall;
  105. // Can't call close twice
  106. // TODO return a custom error code
  107. BOOST_ASSERT(! impl.wr_close);
  108. // Send close frame
  109. impl.wr_close = true;
  110. impl.change_status(status::closing);
  111. impl.update_timer(this->get_executor());
  112. BOOST_ASIO_CORO_YIELD
  113. {
  114. BOOST_ASIO_HANDLER_LOCATION((
  115. __FILE__, __LINE__,
  116. "websocket::async_close"));
  117. net::async_write(impl.stream(), fb_.data(),
  118. beast::detail::bind_continuation(std::move(*this)));
  119. }
  120. if(impl.check_stop_now(ec))
  121. goto upcall;
  122. if(impl.rd_close)
  123. {
  124. // This happens when the read_op gets a close frame
  125. // at the same time close_op is sending the close frame.
  126. // The read_op will be suspended on the write block.
  127. goto teardown;
  128. }
  129. // Acquire the read lock
  130. if(! impl.rd_block.try_lock(this))
  131. {
  132. BOOST_ASIO_CORO_YIELD
  133. {
  134. BOOST_ASIO_HANDLER_LOCATION((
  135. __FILE__, __LINE__,
  136. "websocket::async_close"));
  137. // terminal only, that's the default
  138. impl.op_r_close.emplace(std::move(*this));
  139. }
  140. if (ec == net::error::operation_aborted)
  141. {
  142. // if a cancellation fires here, we do a dirty shutdown
  143. impl.change_status(status::closed);
  144. close_socket(get_lowest_layer(impl.stream()));
  145. return this->complete(cont, ec);
  146. }
  147. impl.rd_block.lock(this);
  148. BOOST_ASIO_CORO_YIELD
  149. {
  150. BOOST_ASIO_HANDLER_LOCATION((
  151. __FILE__, __LINE__,
  152. "websocket::async_close"));
  153. net::post(sp->stream().get_executor(), std::move(*this));
  154. }
  155. BOOST_ASSERT(impl.rd_block.is_locked(this));
  156. if(impl.check_stop_now(ec))
  157. goto upcall;
  158. BOOST_ASSERT(! impl.rd_close);
  159. }
  160. // Read until a receiving a close frame
  161. // TODO There should be a timeout on this
  162. if(impl.rd_remain > 0)
  163. goto read_payload;
  164. for(;;)
  165. {
  166. // Read frame header
  167. while(! impl.parse_fh(
  168. impl.rd_fh, impl.rd_buf, ev_))
  169. {
  170. if(ev_)
  171. goto teardown;
  172. BOOST_ASIO_CORO_YIELD
  173. {
  174. BOOST_ASIO_HANDLER_LOCATION((
  175. __FILE__, __LINE__,
  176. "websocket::async_close"));
  177. impl.stream().async_read_some(
  178. impl.rd_buf.prepare(read_size(
  179. impl.rd_buf, impl.rd_buf.max_size())),
  180. beast::detail::bind_continuation(std::move(*this)));
  181. }
  182. impl.rd_buf.commit(bytes_transferred);
  183. if(impl.check_stop_now(ec)) //< this catches cancellation
  184. goto upcall;
  185. }
  186. if(detail::is_control(impl.rd_fh.op))
  187. {
  188. // Discard ping or pong frame
  189. if(impl.rd_fh.op != detail::opcode::close)
  190. {
  191. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  192. continue;
  193. }
  194. // Process close frame
  195. // TODO Should we invoke the control callback?
  196. BOOST_ASSERT(! impl.rd_close);
  197. impl.rd_close = true;
  198. auto const mb = buffers_prefix(
  199. clamp(impl.rd_fh.len),
  200. impl.rd_buf.data());
  201. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  202. detail::mask_inplace(mb, impl.rd_key);
  203. detail::read_close(impl.cr, mb, ev_);
  204. if(ev_)
  205. goto teardown;
  206. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  207. goto teardown;
  208. }
  209. read_payload:
  210. // Discard message frame
  211. while(impl.rd_buf.size() < impl.rd_remain)
  212. {
  213. impl.rd_remain -= impl.rd_buf.size();
  214. impl.rd_buf.consume(impl.rd_buf.size());
  215. BOOST_ASIO_CORO_YIELD
  216. {
  217. BOOST_ASIO_HANDLER_LOCATION((
  218. __FILE__, __LINE__,
  219. "websocket::async_close"));
  220. impl.stream().async_read_some(
  221. impl.rd_buf.prepare(read_size(
  222. impl.rd_buf, impl.rd_buf.max_size())),
  223. beast::detail::bind_continuation(std::move(*this)));
  224. }
  225. impl.rd_buf.commit(bytes_transferred);
  226. if(impl.check_stop_now(ec))
  227. goto upcall;
  228. }
  229. BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
  230. impl.rd_buf.consume(clamp(impl.rd_remain));
  231. impl.rd_remain = 0;
  232. }
  233. teardown:
  234. // Teardown
  235. BOOST_ASSERT(impl.wr_block.is_locked(this));
  236. using beast::websocket::async_teardown;
  237. BOOST_ASIO_CORO_YIELD
  238. {
  239. BOOST_ASIO_HANDLER_LOCATION((
  240. __FILE__, __LINE__,
  241. "websocket::async_close"));
  242. async_teardown(impl.role, impl.stream(),
  243. beast::detail::bind_continuation(std::move(*this)));
  244. }
  245. BOOST_ASSERT(impl.wr_block.is_locked(this));
  246. if(ec == net::error::eof)
  247. {
  248. // Rationale:
  249. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  250. ec = {};
  251. }
  252. if(! ec)
  253. {
  254. BOOST_BEAST_ASSIGN_EC(ec, ev_);
  255. }
  256. if(ec)
  257. impl.change_status(status::failed);
  258. else
  259. impl.change_status(status::closed);
  260. impl.close();
  261. upcall:
  262. impl.wr_block.unlock(this);
  263. impl.rd_block.try_unlock(this)
  264. && impl.op_r_rd.maybe_invoke();
  265. impl.op_rd.maybe_invoke()
  266. || impl.op_idle_ping.maybe_invoke()
  267. || impl.op_ping.maybe_invoke()
  268. || impl.op_wr.maybe_invoke();
  269. this->complete(cont, ec);
  270. }
  271. }
  272. };
  273. template<class NextLayer, bool deflateSupported>
  274. struct stream<NextLayer, deflateSupported>::
  275. run_close_op
  276. {
  277. template<class CloseHandler>
  278. void
  279. operator()(
  280. CloseHandler&& h,
  281. boost::shared_ptr<impl_type> const& sp,
  282. close_reason const& cr)
  283. {
  284. // If you get an error on the following line it means
  285. // that your handler does not meet the documented type
  286. // requirements for the handler.
  287. static_assert(
  288. beast::detail::is_invocable<CloseHandler,
  289. void(error_code)>::value,
  290. "CloseHandler type requirements not met");
  291. close_op<
  292. typename std::decay<CloseHandler>::type>(
  293. std::forward<CloseHandler>(h),
  294. sp,
  295. cr);
  296. }
  297. };
  298. //------------------------------------------------------------------------------
  299. template<class NextLayer, bool deflateSupported>
  300. void
  301. stream<NextLayer, deflateSupported>::
  302. close(close_reason const& cr)
  303. {
  304. static_assert(is_sync_stream<next_layer_type>::value,
  305. "SyncStream type requirements not met");
  306. error_code ec;
  307. close(cr, ec);
  308. if(ec)
  309. BOOST_THROW_EXCEPTION(system_error{ec});
  310. }
  311. template<class NextLayer, bool deflateSupported>
  312. void
  313. stream<NextLayer, deflateSupported>::
  314. close(close_reason const& cr, error_code& ec)
  315. {
  316. static_assert(is_sync_stream<next_layer_type>::value,
  317. "SyncStream type requirements not met");
  318. using beast::detail::clamp;
  319. auto& impl = *impl_;
  320. ec = {};
  321. if(impl.check_stop_now(ec))
  322. return;
  323. BOOST_ASSERT(! impl.rd_close);
  324. // Can't call close twice
  325. // TODO return a custom error code
  326. BOOST_ASSERT(! impl.wr_close);
  327. // Send close frame
  328. {
  329. impl.wr_close = true;
  330. impl.change_status(status::closing);
  331. detail::frame_buffer fb;
  332. impl.template write_close<flat_static_buffer_base>(fb, cr);
  333. net::write(impl.stream(), fb.data(), ec);
  334. if(impl.check_stop_now(ec))
  335. return;
  336. }
  337. // Read until a receiving a close frame
  338. error_code ev;
  339. if(impl.rd_remain > 0)
  340. goto read_payload;
  341. for(;;)
  342. {
  343. // Read frame header
  344. while(! impl.parse_fh(
  345. impl.rd_fh, impl.rd_buf, ev))
  346. {
  347. if(ev)
  348. {
  349. // Protocol violation
  350. return do_fail(close_code::none, ev, ec);
  351. }
  352. impl.rd_buf.commit(impl.stream().read_some(
  353. impl.rd_buf.prepare(read_size(
  354. impl.rd_buf, impl.rd_buf.max_size())), ec));
  355. if(impl.check_stop_now(ec))
  356. return;
  357. }
  358. if(detail::is_control(impl.rd_fh.op))
  359. {
  360. // Discard ping/pong frame
  361. if(impl.rd_fh.op != detail::opcode::close)
  362. {
  363. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  364. continue;
  365. }
  366. // Handle close frame
  367. // TODO Should we invoke the control callback?
  368. BOOST_ASSERT(! impl.rd_close);
  369. impl.rd_close = true;
  370. auto const mb = buffers_prefix(
  371. clamp(impl.rd_fh.len),
  372. impl.rd_buf.data());
  373. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  374. detail::mask_inplace(mb, impl.rd_key);
  375. detail::read_close(impl.cr, mb, ev);
  376. if(ev)
  377. {
  378. // Protocol violation
  379. return do_fail(close_code::none, ev, ec);
  380. }
  381. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  382. break;
  383. }
  384. read_payload:
  385. // Discard message frame
  386. while(impl.rd_buf.size() < impl.rd_remain)
  387. {
  388. impl.rd_remain -= impl.rd_buf.size();
  389. impl.rd_buf.consume(impl.rd_buf.size());
  390. impl.rd_buf.commit(
  391. impl.stream().read_some(
  392. impl.rd_buf.prepare(
  393. read_size(
  394. impl.rd_buf,
  395. impl.rd_buf.max_size())),
  396. ec));
  397. if(impl.check_stop_now(ec))
  398. return;
  399. }
  400. BOOST_ASSERT(
  401. impl.rd_buf.size() >= impl.rd_remain);
  402. impl.rd_buf.consume(clamp(impl.rd_remain));
  403. impl.rd_remain = 0;
  404. }
  405. // _Close the WebSocket Connection_
  406. do_fail(close_code::none, error::closed, ec);
  407. if(ec == error::closed)
  408. ec = {};
  409. }
  410. template<class NextLayer, bool deflateSupported>
  411. template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
  412. BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
  413. stream<NextLayer, deflateSupported>::
  414. async_close(close_reason const& cr, CloseHandler&& handler)
  415. {
  416. static_assert(is_async_stream<next_layer_type>::value,
  417. "AsyncStream type requirements not met");
  418. return net::async_initiate<
  419. CloseHandler,
  420. void(error_code)>(
  421. run_close_op{},
  422. handler,
  423. impl_,
  424. cr);
  425. }
  426. } // websocket
  427. } // beast
  428. } // boost
  429. #endif