write.hpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
  11. #include <boost/beast/websocket/detail/mask.hpp>
  12. #include <boost/beast/core/async_base.hpp>
  13. #include <boost/beast/core/bind_handler.hpp>
  14. #include <boost/beast/core/buffer_traits.hpp>
  15. #include <boost/beast/core/buffers_cat.hpp>
  16. #include <boost/beast/core/buffers_prefix.hpp>
  17. #include <boost/beast/core/buffers_range.hpp>
  18. #include <boost/beast/core/buffers_suffix.hpp>
  19. #include <boost/beast/core/flat_static_buffer.hpp>
  20. #include <boost/beast/core/stream_traits.hpp>
  21. #include <boost/beast/core/detail/bind_continuation.hpp>
  22. #include <boost/beast/core/detail/clamp.hpp>
  23. #include <boost/beast/core/detail/config.hpp>
  24. #include <boost/beast/websocket/detail/frame.hpp>
  25. #include <boost/beast/websocket/impl/stream_impl.hpp>
  26. #include <boost/asio/coroutine.hpp>
  27. #include <boost/assert.hpp>
  28. #include <boost/config.hpp>
  29. #include <boost/throw_exception.hpp>
  30. #include <algorithm>
  31. #include <memory>
  32. namespace boost {
  33. namespace beast {
  34. namespace websocket {
  35. template<class NextLayer, bool deflateSupported>
  36. template<class Handler, class Buffers>
  37. class stream<NextLayer, deflateSupported>::write_some_op
  38. : public beast::async_base<
  39. Handler, beast::executor_type<stream>>
  40. , public asio::coroutine
  41. {
  42. enum
  43. {
  44. do_nomask_nofrag,
  45. do_nomask_frag,
  46. do_mask_nofrag,
  47. do_mask_frag,
  48. do_deflate
  49. };
  50. boost::weak_ptr<impl_type> wp_;
  51. buffers_suffix<Buffers> cb_;
  52. detail::frame_header fh_;
  53. detail::prepared_key key_;
  54. std::size_t bytes_transferred_ = 0;
  55. std::size_t remain_;
  56. std::size_t in_;
  57. int how_;
  58. bool fin_;
  59. bool more_ = false; // for ubsan
  60. bool cont_ = false;
  61. public:
  62. static constexpr int id = 2; // for soft_mutex
  63. template<class Handler_>
  64. write_some_op(
  65. Handler_&& h,
  66. boost::shared_ptr<impl_type> const& sp,
  67. bool fin,
  68. Buffers const& bs)
  69. : beast::async_base<Handler,
  70. beast::executor_type<stream>>(
  71. std::forward<Handler_>(h),
  72. sp->stream().get_executor())
  73. , wp_(sp)
  74. , cb_(bs)
  75. , fin_(fin)
  76. {
  77. auto& impl = *sp;
  78. // Set up the outgoing frame header
  79. if(! impl.wr_cont)
  80. {
  81. impl.begin_msg(beast::buffer_bytes(bs));
  82. fh_.rsv1 = impl.wr_compress;
  83. }
  84. else
  85. {
  86. fh_.rsv1 = false;
  87. }
  88. fh_.rsv2 = false;
  89. fh_.rsv3 = false;
  90. fh_.op = impl.wr_cont ?
  91. detail::opcode::cont : impl.wr_opcode;
  92. fh_.mask =
  93. impl.role == role_type::client;
  94. // Choose a write algorithm
  95. if(impl.wr_compress)
  96. {
  97. how_ = do_deflate;
  98. }
  99. else if(! fh_.mask)
  100. {
  101. if(! impl.wr_frag)
  102. {
  103. how_ = do_nomask_nofrag;
  104. }
  105. else
  106. {
  107. BOOST_ASSERT(impl.wr_buf_size != 0);
  108. remain_ = beast::buffer_bytes(cb_);
  109. if(remain_ > impl.wr_buf_size)
  110. how_ = do_nomask_frag;
  111. else
  112. how_ = do_nomask_nofrag;
  113. }
  114. }
  115. else
  116. {
  117. if(! impl.wr_frag)
  118. {
  119. how_ = do_mask_nofrag;
  120. }
  121. else
  122. {
  123. BOOST_ASSERT(impl.wr_buf_size != 0);
  124. remain_ = beast::buffer_bytes(cb_);
  125. if(remain_ > impl.wr_buf_size)
  126. how_ = do_mask_frag;
  127. else
  128. how_ = do_mask_nofrag;
  129. }
  130. }
  131. (*this)({}, 0, false);
  132. }
  133. void operator()(
  134. error_code ec = {},
  135. std::size_t bytes_transferred = 0,
  136. bool cont = true);
  137. };
  138. template<class NextLayer, bool deflateSupported>
  139. template<class Handler, class Buffers>
  140. void
  141. stream<NextLayer, deflateSupported>::
  142. write_some_op<Handler, Buffers>::
  143. operator()(
  144. error_code ec,
  145. std::size_t bytes_transferred,
  146. bool cont)
  147. {
  148. using beast::detail::clamp;
  149. std::size_t n;
  150. net::mutable_buffer b;
  151. auto sp = wp_.lock();
  152. if(! sp)
  153. {
  154. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  155. bytes_transferred_ = 0;
  156. return this->complete(cont, ec, bytes_transferred_);
  157. }
  158. auto& impl = *sp;
  159. BOOST_ASIO_CORO_REENTER(*this)
  160. {
  161. // Acquire the write lock
  162. if(! impl.wr_block.try_lock(this))
  163. {
  164. do_suspend:
  165. BOOST_ASIO_CORO_YIELD
  166. {
  167. BOOST_ASIO_HANDLER_LOCATION((
  168. __FILE__, __LINE__,
  169. fin_ ?
  170. "websocket::async_write" :
  171. "websocket::async_write_some"
  172. ));
  173. this->set_allowed_cancellation(net::cancellation_type::all);
  174. impl.op_wr.emplace(std::move(*this),
  175. net::cancellation_type::all);
  176. }
  177. if (ec)
  178. return this->complete(cont, ec, bytes_transferred_);
  179. this->set_allowed_cancellation(net::cancellation_type::terminal);
  180. impl.wr_block.lock(this);
  181. BOOST_ASIO_CORO_YIELD
  182. {
  183. BOOST_ASIO_HANDLER_LOCATION((
  184. __FILE__, __LINE__,
  185. fin_ ?
  186. "websocket::async_write" :
  187. "websocket::async_write_some"
  188. ));
  189. net::post(sp->stream().get_executor(), std::move(*this));
  190. }
  191. BOOST_ASSERT(impl.wr_block.is_locked(this));
  192. }
  193. if(impl.check_stop_now(ec))
  194. goto upcall;
  195. //------------------------------------------------------------------
  196. if(how_ == do_nomask_nofrag)
  197. {
  198. // send a single frame
  199. fh_.fin = fin_;
  200. fh_.len = beast::buffer_bytes(cb_);
  201. impl.wr_fb.clear();
  202. detail::write<flat_static_buffer_base>(
  203. impl.wr_fb, fh_);
  204. impl.wr_cont = ! fin_;
  205. BOOST_ASIO_CORO_YIELD
  206. {
  207. BOOST_ASIO_HANDLER_LOCATION((
  208. __FILE__, __LINE__,
  209. fin_ ?
  210. "websocket::async_write" :
  211. "websocket::async_write_some"
  212. ));
  213. net::async_write(impl.stream(),
  214. buffers_cat(
  215. net::const_buffer(impl.wr_fb.data()),
  216. net::const_buffer(0, 0),
  217. cb_,
  218. buffers_prefix(0, cb_)
  219. ),
  220. beast::detail::bind_continuation(std::move(*this)));
  221. }
  222. bytes_transferred_ += clamp(fh_.len);
  223. if(impl.check_stop_now(ec))
  224. goto upcall;
  225. goto upcall;
  226. }
  227. //------------------------------------------------------------------
  228. if(how_ == do_nomask_frag)
  229. {
  230. // send multiple frames
  231. for(;;)
  232. {
  233. n = clamp(remain_, impl.wr_buf_size);
  234. fh_.len = n;
  235. remain_ -= n;
  236. fh_.fin = fin_ ? remain_ == 0 : false;
  237. impl.wr_fb.clear();
  238. detail::write<flat_static_buffer_base>(
  239. impl.wr_fb, fh_);
  240. impl.wr_cont = ! fin_;
  241. // Send frame
  242. BOOST_ASIO_CORO_YIELD
  243. {
  244. BOOST_ASIO_HANDLER_LOCATION((
  245. __FILE__, __LINE__,
  246. fin_ ?
  247. "websocket::async_write" :
  248. "websocket::async_write_some"
  249. ));
  250. buffers_suffix<Buffers> empty_cb(cb_);
  251. empty_cb.consume(~std::size_t(0));
  252. net::async_write(impl.stream(),
  253. buffers_cat(
  254. net::const_buffer(impl.wr_fb.data()),
  255. net::const_buffer(0, 0),
  256. empty_cb,
  257. buffers_prefix(clamp(fh_.len), cb_)
  258. ),
  259. beast::detail::bind_continuation(std::move(*this)));
  260. }
  261. n = clamp(fh_.len); // restore `n` on yield
  262. bytes_transferred_ += n;
  263. if(impl.check_stop_now(ec))
  264. goto upcall;
  265. if(remain_ == 0)
  266. break;
  267. cb_.consume(n);
  268. fh_.op = detail::opcode::cont;
  269. // Give up the write lock in between each frame
  270. // so that outgoing control frames might be sent.
  271. impl.wr_block.unlock(this);
  272. if( impl.op_close.maybe_invoke()
  273. || impl.op_idle_ping.maybe_invoke()
  274. || impl.op_rd.maybe_invoke()
  275. || impl.op_ping.maybe_invoke())
  276. {
  277. BOOST_ASSERT(impl.wr_block.is_locked());
  278. goto do_suspend;
  279. }
  280. impl.wr_block.lock(this);
  281. }
  282. goto upcall;
  283. }
  284. //------------------------------------------------------------------
  285. if(how_ == do_mask_nofrag)
  286. {
  287. // send a single frame using multiple writes
  288. remain_ = beast::buffer_bytes(cb_);
  289. fh_.fin = fin_;
  290. fh_.len = remain_;
  291. fh_.key = impl.create_mask();
  292. detail::prepare_key(key_, fh_.key);
  293. impl.wr_fb.clear();
  294. detail::write<flat_static_buffer_base>(
  295. impl.wr_fb, fh_);
  296. n = clamp(remain_, impl.wr_buf_size);
  297. net::buffer_copy(net::buffer(
  298. impl.wr_buf.get(), n), cb_);
  299. detail::mask_inplace(net::buffer(
  300. impl.wr_buf.get(), n), key_);
  301. remain_ -= n;
  302. impl.wr_cont = ! fin_;
  303. // write frame header and some payload
  304. BOOST_ASIO_CORO_YIELD
  305. {
  306. BOOST_ASIO_HANDLER_LOCATION((
  307. __FILE__, __LINE__,
  308. fin_ ?
  309. "websocket::async_write" :
  310. "websocket::async_write_some"
  311. ));
  312. buffers_suffix<Buffers> empty_cb(cb_);
  313. empty_cb.consume(~std::size_t(0));
  314. net::async_write(impl.stream(),
  315. buffers_cat(
  316. net::const_buffer(impl.wr_fb.data()),
  317. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  318. empty_cb,
  319. buffers_prefix(0, empty_cb)
  320. ),
  321. beast::detail::bind_continuation(std::move(*this)));
  322. }
  323. // VFALCO What about consuming the buffer on error?
  324. bytes_transferred_ +=
  325. bytes_transferred - impl.wr_fb.size();
  326. if(impl.check_stop_now(ec))
  327. goto upcall;
  328. while(remain_ > 0)
  329. {
  330. cb_.consume(impl.wr_buf_size);
  331. n = clamp(remain_, impl.wr_buf_size);
  332. net::buffer_copy(net::buffer(
  333. impl.wr_buf.get(), n), cb_);
  334. detail::mask_inplace(net::buffer(
  335. impl.wr_buf.get(), n), key_);
  336. remain_ -= n;
  337. // write more payload
  338. BOOST_ASIO_CORO_YIELD
  339. {
  340. BOOST_ASIO_HANDLER_LOCATION((
  341. __FILE__, __LINE__,
  342. fin_ ?
  343. "websocket::async_write" :
  344. "websocket::async_write_some"
  345. ));
  346. buffers_suffix<Buffers> empty_cb(cb_);
  347. empty_cb.consume(~std::size_t(0));
  348. net::async_write(impl.stream(),
  349. buffers_cat(
  350. net::const_buffer(0, 0),
  351. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  352. empty_cb,
  353. buffers_prefix(0, empty_cb)
  354. ),
  355. beast::detail::bind_continuation(std::move(*this)));
  356. }
  357. bytes_transferred_ += bytes_transferred;
  358. if(impl.check_stop_now(ec))
  359. goto upcall;
  360. }
  361. goto upcall;
  362. }
  363. //------------------------------------------------------------------
  364. if(how_ == do_mask_frag)
  365. {
  366. // send multiple frames
  367. for(;;)
  368. {
  369. n = clamp(remain_, impl.wr_buf_size);
  370. remain_ -= n;
  371. fh_.len = n;
  372. fh_.key = impl.create_mask();
  373. fh_.fin = fin_ ? remain_ == 0 : false;
  374. detail::prepare_key(key_, fh_.key);
  375. net::buffer_copy(net::buffer(
  376. impl.wr_buf.get(), n), cb_);
  377. detail::mask_inplace(net::buffer(
  378. impl.wr_buf.get(), n), key_);
  379. impl.wr_fb.clear();
  380. detail::write<flat_static_buffer_base>(
  381. impl.wr_fb, fh_);
  382. impl.wr_cont = ! fin_;
  383. // Send frame
  384. BOOST_ASIO_CORO_YIELD
  385. {
  386. BOOST_ASIO_HANDLER_LOCATION((
  387. __FILE__, __LINE__,
  388. fin_ ?
  389. "websocket::async_write" :
  390. "websocket::async_write_some"
  391. ));
  392. buffers_suffix<Buffers> empty_cb(cb_);
  393. empty_cb.consume(~std::size_t(0));
  394. net::async_write(impl.stream(),
  395. buffers_cat(
  396. net::const_buffer(impl.wr_fb.data()),
  397. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  398. empty_cb,
  399. buffers_prefix(0, empty_cb)
  400. ),
  401. beast::detail::bind_continuation(std::move(*this)));
  402. }
  403. n = bytes_transferred - impl.wr_fb.size();
  404. bytes_transferred_ += n;
  405. if(impl.check_stop_now(ec))
  406. goto upcall;
  407. if(remain_ == 0)
  408. break;
  409. cb_.consume(n);
  410. fh_.op = detail::opcode::cont;
  411. // Give up the write lock in between each frame
  412. // so that outgoing control frames might be sent.
  413. impl.wr_block.unlock(this);
  414. if( impl.op_close.maybe_invoke()
  415. || impl.op_idle_ping.maybe_invoke()
  416. || impl.op_rd.maybe_invoke()
  417. || impl.op_ping.maybe_invoke())
  418. {
  419. BOOST_ASSERT(impl.wr_block.is_locked());
  420. goto do_suspend;
  421. }
  422. impl.wr_block.lock(this);
  423. }
  424. goto upcall;
  425. }
  426. //------------------------------------------------------------------
  427. if(how_ == do_deflate)
  428. {
  429. // send compressed frames
  430. for(;;)
  431. {
  432. b = net::buffer(impl.wr_buf.get(),
  433. impl.wr_buf_size);
  434. more_ = impl.deflate(b, cb_, fin_, in_, ec);
  435. if(impl.check_stop_now(ec))
  436. goto upcall;
  437. n = beast::buffer_bytes(b);
  438. if(n == 0)
  439. {
  440. // The input was consumed, but there is
  441. // no output due to compression latency.
  442. BOOST_ASSERT(! fin_);
  443. BOOST_ASSERT(beast::buffer_bytes(cb_) == 0);
  444. goto upcall;
  445. }
  446. if(fh_.mask)
  447. {
  448. fh_.key = impl.create_mask();
  449. detail::prepared_key key;
  450. detail::prepare_key(key, fh_.key);
  451. detail::mask_inplace(b, key);
  452. }
  453. fh_.fin = ! more_;
  454. fh_.len = n;
  455. impl.wr_fb.clear();
  456. detail::write<
  457. flat_static_buffer_base>(impl.wr_fb, fh_);
  458. impl.wr_cont = ! fin_;
  459. // Send frame
  460. BOOST_ASIO_CORO_YIELD
  461. {
  462. BOOST_ASIO_HANDLER_LOCATION((
  463. __FILE__, __LINE__,
  464. fin_ ?
  465. "websocket::async_write" :
  466. "websocket::async_write_some"
  467. ));
  468. buffers_suffix<Buffers> empty_cb(cb_);
  469. empty_cb.consume(~std::size_t(0));
  470. net::async_write(impl.stream(),
  471. buffers_cat(
  472. net::const_buffer(impl.wr_fb.data()),
  473. net::const_buffer(b),
  474. empty_cb,
  475. buffers_prefix(0, empty_cb)
  476. ),
  477. beast::detail::bind_continuation(std::move(*this)));
  478. }
  479. bytes_transferred_ += in_;
  480. if(impl.check_stop_now(ec))
  481. goto upcall;
  482. if(more_)
  483. {
  484. fh_.op = detail::opcode::cont;
  485. fh_.rsv1 = false;
  486. // Give up the write lock in between each frame
  487. // so that outgoing control frames might be sent.
  488. impl.wr_block.unlock(this);
  489. if( impl.op_close.maybe_invoke()
  490. || impl.op_idle_ping.maybe_invoke()
  491. || impl.op_rd.maybe_invoke()
  492. || impl.op_ping.maybe_invoke())
  493. {
  494. BOOST_ASSERT(impl.wr_block.is_locked());
  495. goto do_suspend;
  496. }
  497. impl.wr_block.lock(this);
  498. }
  499. else
  500. {
  501. if(fh_.fin)
  502. impl.do_context_takeover_write(impl.role);
  503. goto upcall;
  504. }
  505. }
  506. }
  507. //--------------------------------------------------------------------------
  508. upcall:
  509. impl.wr_block.unlock(this);
  510. impl.op_close.maybe_invoke()
  511. || impl.op_idle_ping.maybe_invoke()
  512. || impl.op_rd.maybe_invoke()
  513. || impl.op_ping.maybe_invoke();
  514. this->complete(cont, ec, bytes_transferred_);
  515. }
  516. }
  517. template<class NextLayer, bool deflateSupported>
  518. struct stream<NextLayer, deflateSupported>::
  519. run_write_some_op
  520. {
  521. template<
  522. class WriteHandler,
  523. class ConstBufferSequence>
  524. void
  525. operator()(
  526. WriteHandler&& h,
  527. boost::shared_ptr<impl_type> const& sp,
  528. bool fin,
  529. ConstBufferSequence const& b)
  530. {
  531. // If you get an error on the following line it means
  532. // that your handler does not meet the documented type
  533. // requirements for the handler.
  534. static_assert(
  535. beast::detail::is_invocable<WriteHandler,
  536. void(error_code, std::size_t)>::value,
  537. "WriteHandler type requirements not met");
  538. write_some_op<
  539. typename std::decay<WriteHandler>::type,
  540. ConstBufferSequence>(
  541. std::forward<WriteHandler>(h),
  542. sp,
  543. fin,
  544. b);
  545. }
  546. };
  547. //------------------------------------------------------------------------------
  548. template<class NextLayer, bool deflateSupported>
  549. template<class ConstBufferSequence>
  550. std::size_t
  551. stream<NextLayer, deflateSupported>::
  552. write_some(bool fin, ConstBufferSequence const& buffers)
  553. {
  554. static_assert(is_sync_stream<next_layer_type>::value,
  555. "SyncStream type requirements not met");
  556. static_assert(net::is_const_buffer_sequence<
  557. ConstBufferSequence>::value,
  558. "ConstBufferSequence type requirements not met");
  559. error_code ec;
  560. auto const bytes_transferred =
  561. write_some(fin, buffers, ec);
  562. if(ec)
  563. BOOST_THROW_EXCEPTION(system_error{ec});
  564. return bytes_transferred;
  565. }
  566. template<class NextLayer, bool deflateSupported>
  567. template<class ConstBufferSequence>
  568. std::size_t
  569. stream<NextLayer, deflateSupported>::
  570. write_some(bool fin,
  571. ConstBufferSequence const& buffers, error_code& ec)
  572. {
  573. static_assert(is_sync_stream<next_layer_type>::value,
  574. "SyncStream type requirements not met");
  575. static_assert(net::is_const_buffer_sequence<
  576. ConstBufferSequence>::value,
  577. "ConstBufferSequence type requirements not met");
  578. using beast::detail::clamp;
  579. auto& impl = *impl_;
  580. std::size_t bytes_transferred = 0;
  581. ec = {};
  582. if(impl.check_stop_now(ec))
  583. return bytes_transferred;
  584. detail::frame_header fh;
  585. if(! impl.wr_cont)
  586. {
  587. impl.begin_msg(beast::buffer_bytes(buffers));
  588. fh.rsv1 = impl.wr_compress;
  589. }
  590. else
  591. {
  592. fh.rsv1 = false;
  593. }
  594. fh.rsv2 = false;
  595. fh.rsv3 = false;
  596. fh.op = impl.wr_cont ?
  597. detail::opcode::cont : impl.wr_opcode;
  598. fh.mask = impl.role == role_type::client;
  599. auto remain = beast::buffer_bytes(buffers);
  600. if(impl.wr_compress)
  601. {
  602. buffers_suffix<
  603. ConstBufferSequence> cb(buffers);
  604. for(;;)
  605. {
  606. auto b = net::buffer(
  607. impl.wr_buf.get(), impl.wr_buf_size);
  608. auto const more = impl.deflate(
  609. b, cb, fin, bytes_transferred, ec);
  610. if(impl.check_stop_now(ec))
  611. return bytes_transferred;
  612. auto const n = beast::buffer_bytes(b);
  613. if(n == 0)
  614. {
  615. // The input was consumed, but there
  616. // is no output due to compression
  617. // latency.
  618. BOOST_ASSERT(! fin);
  619. BOOST_ASSERT(beast::buffer_bytes(cb) == 0);
  620. fh.fin = false;
  621. break;
  622. }
  623. if(fh.mask)
  624. {
  625. fh.key = this->impl_->create_mask();
  626. detail::prepared_key key;
  627. detail::prepare_key(key, fh.key);
  628. detail::mask_inplace(b, key);
  629. }
  630. fh.fin = ! more;
  631. fh.len = n;
  632. detail::fh_buffer fh_buf;
  633. detail::write<
  634. flat_static_buffer_base>(fh_buf, fh);
  635. impl.wr_cont = ! fin;
  636. net::write(impl.stream(),
  637. buffers_cat(fh_buf.data(), b), ec);
  638. if(impl.check_stop_now(ec))
  639. return bytes_transferred;
  640. if(! more)
  641. break;
  642. fh.op = detail::opcode::cont;
  643. fh.rsv1 = false;
  644. }
  645. if(fh.fin)
  646. impl.do_context_takeover_write(impl.role);
  647. }
  648. else if(! fh.mask)
  649. {
  650. if(! impl.wr_frag)
  651. {
  652. // no mask, no autofrag
  653. fh.fin = fin;
  654. fh.len = remain;
  655. detail::fh_buffer fh_buf;
  656. detail::write<
  657. flat_static_buffer_base>(fh_buf, fh);
  658. impl.wr_cont = ! fin;
  659. net::write(impl.stream(),
  660. buffers_cat(fh_buf.data(), buffers), ec);
  661. if(impl.check_stop_now(ec))
  662. return bytes_transferred;
  663. bytes_transferred += remain;
  664. }
  665. else
  666. {
  667. // no mask, autofrag
  668. BOOST_ASSERT(impl.wr_buf_size != 0);
  669. buffers_suffix<
  670. ConstBufferSequence> cb{buffers};
  671. for(;;)
  672. {
  673. auto const n = clamp(remain, impl.wr_buf_size);
  674. remain -= n;
  675. fh.len = n;
  676. fh.fin = fin ? remain == 0 : false;
  677. detail::fh_buffer fh_buf;
  678. detail::write<
  679. flat_static_buffer_base>(fh_buf, fh);
  680. impl.wr_cont = ! fin;
  681. net::write(impl.stream(),
  682. beast::buffers_cat(fh_buf.data(),
  683. beast::buffers_prefix(n, cb)), ec);
  684. bytes_transferred += n;
  685. if(impl.check_stop_now(ec))
  686. return bytes_transferred;
  687. if(remain == 0)
  688. break;
  689. fh.op = detail::opcode::cont;
  690. cb.consume(n);
  691. }
  692. }
  693. }
  694. else if(! impl.wr_frag)
  695. {
  696. // mask, no autofrag
  697. fh.fin = fin;
  698. fh.len = remain;
  699. fh.key = this->impl_->create_mask();
  700. detail::prepared_key key;
  701. detail::prepare_key(key, fh.key);
  702. detail::fh_buffer fh_buf;
  703. detail::write<
  704. flat_static_buffer_base>(fh_buf, fh);
  705. buffers_suffix<
  706. ConstBufferSequence> cb{buffers};
  707. {
  708. auto const n =
  709. clamp(remain, impl.wr_buf_size);
  710. auto const b =
  711. net::buffer(impl.wr_buf.get(), n);
  712. net::buffer_copy(b, cb);
  713. cb.consume(n);
  714. remain -= n;
  715. detail::mask_inplace(b, key);
  716. impl.wr_cont = ! fin;
  717. net::write(impl.stream(),
  718. buffers_cat(fh_buf.data(), b), ec);
  719. bytes_transferred += n;
  720. if(impl.check_stop_now(ec))
  721. return bytes_transferred;
  722. }
  723. while(remain > 0)
  724. {
  725. auto const n =
  726. clamp(remain, impl.wr_buf_size);
  727. auto const b =
  728. net::buffer(impl.wr_buf.get(), n);
  729. net::buffer_copy(b, cb);
  730. cb.consume(n);
  731. remain -= n;
  732. detail::mask_inplace(b, key);
  733. net::write(impl.stream(), b, ec);
  734. bytes_transferred += n;
  735. if(impl.check_stop_now(ec))
  736. return bytes_transferred;
  737. }
  738. }
  739. else
  740. {
  741. // mask, autofrag
  742. BOOST_ASSERT(impl.wr_buf_size != 0);
  743. buffers_suffix<
  744. ConstBufferSequence> cb(buffers);
  745. for(;;)
  746. {
  747. fh.key = this->impl_->create_mask();
  748. detail::prepared_key key;
  749. detail::prepare_key(key, fh.key);
  750. auto const n =
  751. clamp(remain, impl.wr_buf_size);
  752. auto const b =
  753. net::buffer(impl.wr_buf.get(), n);
  754. net::buffer_copy(b, cb);
  755. detail::mask_inplace(b, key);
  756. fh.len = n;
  757. remain -= n;
  758. fh.fin = fin ? remain == 0 : false;
  759. impl.wr_cont = ! fh.fin;
  760. detail::fh_buffer fh_buf;
  761. detail::write<
  762. flat_static_buffer_base>(fh_buf, fh);
  763. net::write(impl.stream(),
  764. buffers_cat(fh_buf.data(), b), ec);
  765. bytes_transferred += n;
  766. if(impl.check_stop_now(ec))
  767. return bytes_transferred;
  768. if(remain == 0)
  769. break;
  770. fh.op = detail::opcode::cont;
  771. cb.consume(n);
  772. }
  773. }
  774. return bytes_transferred;
  775. }
  776. template<class NextLayer, bool deflateSupported>
  777. template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
  778. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  779. stream<NextLayer, deflateSupported>::
  780. async_write_some(bool fin,
  781. ConstBufferSequence const& bs, WriteHandler&& handler)
  782. {
  783. static_assert(is_async_stream<next_layer_type>::value,
  784. "AsyncStream type requirements not met");
  785. static_assert(net::is_const_buffer_sequence<
  786. ConstBufferSequence>::value,
  787. "ConstBufferSequence type requirements not met");
  788. return net::async_initiate<
  789. WriteHandler,
  790. void(error_code, std::size_t)>(
  791. run_write_some_op{},
  792. handler,
  793. impl_,
  794. fin,
  795. bs);
  796. }
  797. //------------------------------------------------------------------------------
  798. template<class NextLayer, bool deflateSupported>
  799. template<class ConstBufferSequence>
  800. std::size_t
  801. stream<NextLayer, deflateSupported>::
  802. write(ConstBufferSequence const& buffers)
  803. {
  804. static_assert(is_sync_stream<next_layer_type>::value,
  805. "SyncStream type requirements not met");
  806. static_assert(net::is_const_buffer_sequence<
  807. ConstBufferSequence>::value,
  808. "ConstBufferSequence type requirements not met");
  809. error_code ec;
  810. auto const bytes_transferred = write(buffers, ec);
  811. if(ec)
  812. BOOST_THROW_EXCEPTION(system_error{ec});
  813. return bytes_transferred;
  814. }
  815. template<class NextLayer, bool deflateSupported>
  816. template<class ConstBufferSequence>
  817. std::size_t
  818. stream<NextLayer, deflateSupported>::
  819. write(ConstBufferSequence const& buffers, error_code& ec)
  820. {
  821. static_assert(is_sync_stream<next_layer_type>::value,
  822. "SyncStream type requirements not met");
  823. static_assert(net::is_const_buffer_sequence<
  824. ConstBufferSequence>::value,
  825. "ConstBufferSequence type requirements not met");
  826. return write_some(true, buffers, ec);
  827. }
  828. template<class NextLayer, bool deflateSupported>
  829. template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
  830. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  831. stream<NextLayer, deflateSupported>::
  832. async_write(
  833. ConstBufferSequence const& bs, WriteHandler&& handler)
  834. {
  835. static_assert(is_async_stream<next_layer_type>::value,
  836. "AsyncStream type requirements not met");
  837. static_assert(net::is_const_buffer_sequence<
  838. ConstBufferSequence>::value,
  839. "ConstBufferSequence type requirements not met");
  840. return net::async_initiate<
  841. WriteHandler,
  842. void(error_code, std::size_t)>(
  843. run_write_some_op{},
  844. handler,
  845. impl_,
  846. true,
  847. bs);
  848. }
  849. } // websocket
  850. } // beast
  851. } // boost
  852. #endif