read.hpp 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  11. #include <boost/beast/core/buffer_traits.hpp>
  12. #include <boost/beast/websocket/teardown.hpp>
  13. #include <boost/beast/websocket/detail/mask.hpp>
  14. #include <boost/beast/websocket/impl/stream_impl.hpp>
  15. #include <boost/beast/core/async_base.hpp>
  16. #include <boost/beast/core/bind_handler.hpp>
  17. #include <boost/beast/core/buffers_prefix.hpp>
  18. #include <boost/beast/core/buffers_suffix.hpp>
  19. #include <boost/beast/core/flat_static_buffer.hpp>
  20. #include <boost/beast/core/read_size.hpp>
  21. #include <boost/beast/core/stream_traits.hpp>
  22. #include <boost/beast/core/detail/bind_continuation.hpp>
  23. #include <boost/beast/core/detail/buffer.hpp>
  24. #include <boost/beast/core/detail/clamp.hpp>
  25. #include <boost/beast/core/detail/config.hpp>
  26. #include <boost/asio/coroutine.hpp>
  27. #include <boost/asio/post.hpp>
  28. #include <boost/assert.hpp>
  29. #include <boost/config.hpp>
  30. #include <boost/optional.hpp>
  31. #include <boost/throw_exception.hpp>
  32. #include <algorithm>
  33. #include <limits>
  34. #include <memory>
  35. namespace boost {
  36. namespace beast {
  37. namespace websocket {
  38. /* Read some message data into a buffer sequence.
  39. Also reads and handles control frames.
  40. */
  41. template<class NextLayer, bool deflateSupported>
  42. template<class Handler, class MutableBufferSequence>
  43. class stream<NextLayer, deflateSupported>::read_some_op
  44. : public beast::async_base<
  45. Handler, beast::executor_type<stream>>
  46. , public asio::coroutine
  47. {
  48. boost::weak_ptr<impl_type> wp_;
  49. MutableBufferSequence bs_;
  50. buffers_suffix<MutableBufferSequence> cb_;
  51. std::size_t bytes_written_ = 0;
  52. error_code result_;
  53. close_code code_;
  54. bool did_read_ = false;
  55. public:
  56. static constexpr int id = 1; // for soft_mutex
  57. template<class Handler_>
  58. read_some_op(
  59. Handler_&& h,
  60. boost::shared_ptr<impl_type> const& sp,
  61. MutableBufferSequence const& bs)
  62. : async_base<
  63. Handler, beast::executor_type<stream>>(
  64. std::forward<Handler_>(h),
  65. sp->stream().get_executor())
  66. , wp_(sp)
  67. , bs_(bs)
  68. , cb_(bs)
  69. , code_(close_code::none)
  70. {
  71. (*this)({}, 0, false);
  72. }
  73. void operator()(
  74. error_code ec = {},
  75. std::size_t bytes_transferred = 0,
  76. bool cont = true)
  77. {
  78. using beast::detail::clamp;
  79. auto sp = wp_.lock();
  80. if(! sp)
  81. {
  82. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  83. bytes_written_ = 0;
  84. return this->complete(cont, ec, bytes_written_);
  85. }
  86. auto& impl = *sp;
  87. BOOST_ASIO_CORO_REENTER(*this)
  88. {
  89. impl.update_timer(this->get_executor());
  90. acquire_read_lock:
  91. // Acquire the read lock
  92. if(! impl.rd_block.try_lock(this))
  93. {
  94. do_suspend:
  95. BOOST_ASIO_CORO_YIELD
  96. {
  97. BOOST_ASIO_HANDLER_LOCATION((
  98. __FILE__, __LINE__,
  99. "websocket::async_read_some"));
  100. this->set_allowed_cancellation(net::cancellation_type::all);
  101. impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
  102. }
  103. if (ec)
  104. return this->complete(cont, ec, bytes_written_);
  105. this->set_allowed_cancellation(net::cancellation_type::terminal);
  106. impl.rd_block.lock(this);
  107. BOOST_ASIO_CORO_YIELD
  108. {
  109. BOOST_ASIO_HANDLER_LOCATION((
  110. __FILE__, __LINE__,
  111. "websocket::async_read_some"));
  112. net::post(sp->stream().get_executor(), std::move(*this));
  113. }
  114. BOOST_ASSERT(impl.rd_block.is_locked(this));
  115. BOOST_ASSERT(!ec);
  116. if(impl.check_stop_now(ec))
  117. {
  118. // Issue 2264 - There is no guarantee that the next
  119. // error will be operation_aborted.
  120. // The error could be a result of the peer resetting the
  121. // connection
  122. // BOOST_ASSERT(ec == net::error::operation_aborted);
  123. goto upcall;
  124. }
  125. // VFALCO Should never get here
  126. // The only way to get read blocked is if
  127. // a `close_op` wrote a close frame
  128. BOOST_ASSERT(impl.wr_close);
  129. BOOST_ASSERT(impl.status_ != status::open);
  130. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  131. goto upcall;
  132. }
  133. else
  134. {
  135. // Make sure the stream is not closed
  136. if( impl.status_ == status::closed ||
  137. impl.status_ == status::failed)
  138. {
  139. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  140. goto upcall;
  141. }
  142. }
  143. // if status_ == status::closing, we want to suspend
  144. // the read operation until the close completes,
  145. // then finish the read with operation_aborted.
  146. loop:
  147. BOOST_ASSERT(impl.rd_block.is_locked(this));
  148. // See if we need to read a frame header. This
  149. // condition is structured to give the decompressor
  150. // a chance to emit the final empty deflate block
  151. //
  152. if(impl.rd_remain == 0 &&
  153. (! impl.rd_fh.fin || impl.rd_done))
  154. {
  155. // Read frame header
  156. while(! impl.parse_fh(
  157. impl.rd_fh, impl.rd_buf, result_))
  158. {
  159. if(result_)
  160. {
  161. // _Fail the WebSocket Connection_
  162. if(result_ == error::message_too_big)
  163. code_ = close_code::too_big;
  164. else
  165. code_ = close_code::protocol_error;
  166. goto close;
  167. }
  168. BOOST_ASSERT(impl.rd_block.is_locked(this));
  169. BOOST_ASIO_CORO_YIELD
  170. {
  171. BOOST_ASIO_HANDLER_LOCATION((
  172. __FILE__, __LINE__,
  173. "websocket::async_read_some"));
  174. impl.stream().async_read_some(
  175. impl.rd_buf.prepare(read_size(
  176. impl.rd_buf, impl.rd_buf.max_size())),
  177. std::move(*this));
  178. }
  179. BOOST_ASSERT(impl.rd_block.is_locked(this));
  180. impl.rd_buf.commit(bytes_transferred);
  181. if(impl.check_stop_now(ec))
  182. goto upcall;
  183. impl.reset_idle();
  184. // Allow a close operation
  185. // to acquire the read block
  186. impl.rd_block.unlock(this);
  187. if( impl.op_r_close.maybe_invoke())
  188. {
  189. // Suspend
  190. BOOST_ASSERT(impl.rd_block.is_locked());
  191. goto do_suspend;
  192. }
  193. // Acquire read block
  194. impl.rd_block.lock(this);
  195. }
  196. // Immediately apply the mask to the portion
  197. // of the buffer holding payload data.
  198. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  199. detail::mask_inplace(buffers_prefix(
  200. clamp(impl.rd_fh.len),
  201. impl.rd_buf.data()),
  202. impl.rd_key);
  203. if(detail::is_control(impl.rd_fh.op))
  204. {
  205. // Clear this otherwise the next
  206. // frame will be considered final.
  207. impl.rd_fh.fin = false;
  208. // Handle ping frame
  209. if(impl.rd_fh.op == detail::opcode::ping)
  210. {
  211. if(impl.ctrl_cb)
  212. {
  213. if(! cont)
  214. {
  215. BOOST_ASIO_CORO_YIELD
  216. {
  217. BOOST_ASIO_HANDLER_LOCATION((
  218. __FILE__, __LINE__,
  219. "websocket::async_read_some"));
  220. net::post(sp->stream().get_executor(), std::move(*this));
  221. }
  222. BOOST_ASSERT(cont);
  223. // VFALCO call check_stop_now() here?
  224. }
  225. }
  226. {
  227. auto const b = buffers_prefix(
  228. clamp(impl.rd_fh.len),
  229. impl.rd_buf.data());
  230. auto const len = buffer_bytes(b);
  231. BOOST_ASSERT(len == impl.rd_fh.len);
  232. ping_data payload;
  233. detail::read_ping(payload, b);
  234. impl.rd_buf.consume(len);
  235. // Ignore ping when closing
  236. if(impl.status_ == status::closing)
  237. goto loop;
  238. if(impl.ctrl_cb)
  239. impl.ctrl_cb(
  240. frame_type::ping, to_string_view(payload));
  241. impl.rd_fb.clear();
  242. impl.template write_ping<
  243. flat_static_buffer_base>(impl.rd_fb,
  244. detail::opcode::pong, payload);
  245. }
  246. // Allow a close operation
  247. // to acquire the read block
  248. impl.rd_block.unlock(this);
  249. impl.op_r_close.maybe_invoke();
  250. // Acquire the write lock
  251. if(! impl.wr_block.try_lock(this))
  252. {
  253. BOOST_ASIO_CORO_YIELD
  254. {
  255. BOOST_ASIO_HANDLER_LOCATION((
  256. __FILE__, __LINE__,
  257. "websocket::async_read_some"));
  258. impl.op_rd.emplace(std::move(*this));
  259. }
  260. if (ec)
  261. return this->complete(cont, ec, bytes_written_);
  262. impl.wr_block.lock(this);
  263. BOOST_ASIO_CORO_YIELD
  264. {
  265. BOOST_ASIO_HANDLER_LOCATION((
  266. __FILE__, __LINE__,
  267. "websocket::async_read_some"));
  268. net::post(sp->stream().get_executor(), std::move(*this));
  269. }
  270. BOOST_ASSERT(impl.wr_block.is_locked(this));
  271. if(impl.check_stop_now(ec))
  272. goto upcall;
  273. }
  274. // Send pong
  275. BOOST_ASSERT(impl.wr_block.is_locked(this));
  276. BOOST_ASIO_CORO_YIELD
  277. {
  278. BOOST_ASIO_HANDLER_LOCATION((
  279. __FILE__, __LINE__,
  280. "websocket::async_read_some"));
  281. net::async_write(
  282. impl.stream(), net::const_buffer(impl.rd_fb.data()),
  283. beast::detail::bind_continuation(std::move(*this)));
  284. }
  285. BOOST_ASSERT(impl.wr_block.is_locked(this));
  286. if(impl.check_stop_now(ec))
  287. goto upcall;
  288. impl.wr_block.unlock(this);
  289. impl.op_close.maybe_invoke()
  290. || impl.op_idle_ping.maybe_invoke()
  291. || impl.op_ping.maybe_invoke()
  292. || impl.op_wr.maybe_invoke();
  293. goto acquire_read_lock;
  294. }
  295. // Handle pong frame
  296. if(impl.rd_fh.op == detail::opcode::pong)
  297. {
  298. // Ignore pong when closing
  299. if(! impl.wr_close && impl.ctrl_cb)
  300. {
  301. if(! cont)
  302. {
  303. BOOST_ASIO_CORO_YIELD
  304. {
  305. BOOST_ASIO_HANDLER_LOCATION((
  306. __FILE__, __LINE__,
  307. "websocket::async_read_some"));
  308. net::post(sp->stream().get_executor(), std::move(*this));
  309. }
  310. BOOST_ASSERT(cont);
  311. }
  312. }
  313. auto const cb = buffers_prefix(clamp(
  314. impl.rd_fh.len), impl.rd_buf.data());
  315. auto const len = buffer_bytes(cb);
  316. BOOST_ASSERT(len == impl.rd_fh.len);
  317. ping_data payload;
  318. detail::read_ping(payload, cb);
  319. impl.rd_buf.consume(len);
  320. // Ignore pong when closing
  321. if(! impl.wr_close && impl.ctrl_cb)
  322. impl.ctrl_cb(frame_type::pong, to_string_view(payload));
  323. goto loop;
  324. }
  325. // Handle close frame
  326. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  327. {
  328. if(impl.ctrl_cb)
  329. {
  330. if(! cont)
  331. {
  332. BOOST_ASIO_CORO_YIELD
  333. {
  334. BOOST_ASIO_HANDLER_LOCATION((
  335. __FILE__, __LINE__,
  336. "websocket::async_read_some"));
  337. net::post(sp->stream().get_executor(), std::move(*this));
  338. }
  339. BOOST_ASSERT(cont);
  340. }
  341. }
  342. auto const cb = buffers_prefix(clamp(
  343. impl.rd_fh.len), impl.rd_buf.data());
  344. auto const len = buffer_bytes(cb);
  345. BOOST_ASSERT(len == impl.rd_fh.len);
  346. BOOST_ASSERT(! impl.rd_close);
  347. impl.rd_close = true;
  348. close_reason cr;
  349. detail::read_close(cr, cb, result_);
  350. if(result_)
  351. {
  352. // _Fail the WebSocket Connection_
  353. code_ = close_code::protocol_error;
  354. goto close;
  355. }
  356. impl.cr = cr;
  357. impl.rd_buf.consume(len);
  358. if(impl.ctrl_cb)
  359. impl.ctrl_cb(frame_type::close,
  360. to_string_view(impl.cr.reason));
  361. // See if we are already closing
  362. if(impl.status_ == status::closing)
  363. {
  364. // _Close the WebSocket Connection_
  365. BOOST_ASSERT(impl.wr_close);
  366. code_ = close_code::none;
  367. result_ = error::closed;
  368. goto close;
  369. }
  370. // _Start the WebSocket Closing Handshake_
  371. code_ = cr.code == close_code::none ?
  372. close_code::normal :
  373. static_cast<close_code>(cr.code);
  374. result_ = error::closed;
  375. goto close;
  376. }
  377. }
  378. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  379. {
  380. // Empty non-final frame
  381. goto loop;
  382. }
  383. impl.rd_done = false;
  384. }
  385. if(! impl.rd_deflated())
  386. {
  387. if(impl.rd_remain > 0)
  388. {
  389. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  390. (std::min)(clamp(impl.rd_remain),
  391. buffer_bytes(cb_)))
  392. {
  393. // Fill the read buffer first, otherwise we
  394. // get fewer bytes at the cost of one I/O.
  395. BOOST_ASIO_CORO_YIELD
  396. {
  397. BOOST_ASIO_HANDLER_LOCATION((
  398. __FILE__, __LINE__,
  399. "websocket::async_read_some"));
  400. impl.stream().async_read_some(
  401. impl.rd_buf.prepare(read_size(
  402. impl.rd_buf, impl.rd_buf.max_size())),
  403. std::move(*this));
  404. }
  405. impl.rd_buf.commit(bytes_transferred);
  406. if(impl.check_stop_now(ec))
  407. goto upcall;
  408. impl.reset_idle();
  409. if(impl.rd_fh.mask)
  410. detail::mask_inplace(buffers_prefix(clamp(
  411. impl.rd_remain), impl.rd_buf.data()),
  412. impl.rd_key);
  413. }
  414. if(impl.rd_buf.size() > 0)
  415. {
  416. // Copy from the read buffer.
  417. // The mask was already applied.
  418. bytes_transferred = net::buffer_copy(cb_,
  419. impl.rd_buf.data(), clamp(impl.rd_remain));
  420. auto const mb = buffers_prefix(
  421. bytes_transferred, cb_);
  422. impl.rd_remain -= bytes_transferred;
  423. if(impl.rd_op == detail::opcode::text)
  424. {
  425. if(! impl.rd_utf8.write(mb) ||
  426. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  427. ! impl.rd_utf8.finish()))
  428. {
  429. // _Fail the WebSocket Connection_
  430. code_ = close_code::bad_payload;
  431. result_ = error::bad_frame_payload;
  432. goto close;
  433. }
  434. }
  435. bytes_written_ += bytes_transferred;
  436. impl.rd_size += bytes_transferred;
  437. impl.rd_buf.consume(bytes_transferred);
  438. }
  439. else
  440. {
  441. // Read into caller's buffer
  442. BOOST_ASSERT(impl.rd_remain > 0);
  443. BOOST_ASSERT(buffer_bytes(cb_) > 0);
  444. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  445. clamp(impl.rd_remain), cb_)) > 0);
  446. BOOST_ASIO_CORO_YIELD
  447. {
  448. BOOST_ASIO_HANDLER_LOCATION((
  449. __FILE__, __LINE__,
  450. "websocket::async_read_some"));
  451. impl.stream().async_read_some(buffers_prefix(
  452. clamp(impl.rd_remain), cb_), std::move(*this));
  453. }
  454. if(impl.check_stop_now(ec))
  455. goto upcall;
  456. impl.reset_idle();
  457. BOOST_ASSERT(bytes_transferred > 0);
  458. auto const mb = buffers_prefix(
  459. bytes_transferred, cb_);
  460. impl.rd_remain -= bytes_transferred;
  461. if(impl.rd_fh.mask)
  462. detail::mask_inplace(mb, impl.rd_key);
  463. if(impl.rd_op == detail::opcode::text)
  464. {
  465. if(! impl.rd_utf8.write(mb) ||
  466. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  467. ! impl.rd_utf8.finish()))
  468. {
  469. // _Fail the WebSocket Connection_
  470. code_ = close_code::bad_payload;
  471. result_ = error::bad_frame_payload;
  472. goto close;
  473. }
  474. }
  475. bytes_written_ += bytes_transferred;
  476. impl.rd_size += bytes_transferred;
  477. }
  478. }
  479. BOOST_ASSERT( ! impl.rd_done );
  480. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  481. impl.rd_done = true;
  482. }
  483. else
  484. {
  485. // Read compressed message frame payload:
  486. // inflate even if rd_fh_.len == 0, otherwise we
  487. // never emit the end-of-stream deflate block.
  488. while(buffer_bytes(cb_) > 0)
  489. {
  490. if( impl.rd_remain > 0 &&
  491. impl.rd_buf.size() == 0 &&
  492. ! did_read_)
  493. {
  494. // read new
  495. BOOST_ASIO_CORO_YIELD
  496. {
  497. BOOST_ASIO_HANDLER_LOCATION((
  498. __FILE__, __LINE__,
  499. "websocket::async_read_some"));
  500. impl.stream().async_read_some(
  501. impl.rd_buf.prepare(read_size(
  502. impl.rd_buf, impl.rd_buf.max_size())),
  503. std::move(*this));
  504. }
  505. if(impl.check_stop_now(ec))
  506. goto upcall;
  507. impl.reset_idle();
  508. BOOST_ASSERT(bytes_transferred > 0);
  509. impl.rd_buf.commit(bytes_transferred);
  510. if(impl.rd_fh.mask)
  511. detail::mask_inplace(
  512. buffers_prefix(clamp(impl.rd_remain),
  513. impl.rd_buf.data()), impl.rd_key);
  514. did_read_ = true;
  515. }
  516. zlib::z_params zs;
  517. {
  518. auto const out = buffers_front(cb_);
  519. zs.next_out = out.data();
  520. zs.avail_out = out.size();
  521. BOOST_ASSERT(zs.avail_out > 0);
  522. }
  523. // boolean to track the end of the message.
  524. bool fin = false;
  525. if(impl.rd_remain > 0)
  526. {
  527. if(impl.rd_buf.size() > 0)
  528. {
  529. // use what's there
  530. auto const in = buffers_prefix(
  531. clamp(impl.rd_remain), buffers_front(
  532. impl.rd_buf.data()));
  533. zs.avail_in = in.size();
  534. zs.next_in = in.data();
  535. }
  536. else
  537. {
  538. break;
  539. }
  540. }
  541. else if(impl.rd_fh.fin)
  542. {
  543. // append the empty block codes
  544. static std::uint8_t constexpr
  545. empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
  546. zs.next_in = empty_block;
  547. zs.avail_in = sizeof(empty_block);
  548. fin = true;
  549. }
  550. else
  551. {
  552. break;
  553. }
  554. impl.inflate(zs, zlib::Flush::sync, ec);
  555. if(impl.check_stop_now(ec))
  556. goto upcall;
  557. if(fin && zs.total_out == 0) {
  558. impl.do_context_takeover_read(impl.role);
  559. impl.rd_done = true;
  560. break;
  561. }
  562. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  563. impl.rd_size, zs.total_out, impl.rd_msg_max))
  564. {
  565. // _Fail the WebSocket Connection_
  566. code_ = close_code::too_big;
  567. result_ = error::message_too_big;
  568. goto close;
  569. }
  570. cb_.consume(zs.total_out);
  571. impl.rd_size += zs.total_out;
  572. if (! fin) {
  573. impl.rd_remain -= zs.total_in;
  574. impl.rd_buf.consume(zs.total_in);
  575. }
  576. bytes_written_ += zs.total_out;
  577. }
  578. if(impl.rd_op == detail::opcode::text)
  579. {
  580. // check utf8
  581. if(! impl.rd_utf8.write(
  582. buffers_prefix(bytes_written_, bs_)) || (
  583. impl.rd_done && ! impl.rd_utf8.finish()))
  584. {
  585. // _Fail the WebSocket Connection_
  586. code_ = close_code::bad_payload;
  587. result_ = error::bad_frame_payload;
  588. goto close;
  589. }
  590. }
  591. }
  592. goto upcall;
  593. close:
  594. // Acquire the write lock
  595. if(! impl.wr_block.try_lock(this))
  596. {
  597. BOOST_ASIO_CORO_YIELD
  598. {
  599. BOOST_ASIO_HANDLER_LOCATION((
  600. __FILE__, __LINE__,
  601. "websocket::async_read_some"));
  602. impl.op_rd.emplace(std::move(*this));
  603. }
  604. if (ec)
  605. return this->complete(cont, ec, bytes_written_);
  606. impl.wr_block.lock(this);
  607. BOOST_ASIO_CORO_YIELD
  608. {
  609. BOOST_ASIO_HANDLER_LOCATION((
  610. __FILE__, __LINE__,
  611. "websocket::async_read_some"));
  612. net::post(sp->stream().get_executor(), std::move(*this));
  613. }
  614. BOOST_ASSERT(impl.wr_block.is_locked(this));
  615. if(impl.check_stop_now(ec))
  616. goto upcall;
  617. }
  618. impl.change_status(status::closing);
  619. if(! impl.wr_close)
  620. {
  621. impl.wr_close = true;
  622. // Serialize close frame
  623. impl.rd_fb.clear();
  624. impl.template write_close<
  625. flat_static_buffer_base>(
  626. impl.rd_fb, code_);
  627. // Send close frame
  628. BOOST_ASSERT(impl.wr_block.is_locked(this));
  629. BOOST_ASIO_CORO_YIELD
  630. {
  631. BOOST_ASIO_HANDLER_LOCATION((
  632. __FILE__, __LINE__,
  633. "websocket::async_read_some"));
  634. net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
  635. beast::detail::bind_continuation(std::move(*this)));
  636. }
  637. BOOST_ASSERT(impl.wr_block.is_locked(this));
  638. if(impl.check_stop_now(ec))
  639. goto upcall;
  640. }
  641. // Teardown
  642. using beast::websocket::async_teardown;
  643. BOOST_ASSERT(impl.wr_block.is_locked(this));
  644. BOOST_ASIO_CORO_YIELD
  645. {
  646. BOOST_ASIO_HANDLER_LOCATION((
  647. __FILE__, __LINE__,
  648. "websocket::async_read_some"));
  649. async_teardown(impl.role, impl.stream(),
  650. beast::detail::bind_continuation(std::move(*this)));
  651. }
  652. BOOST_ASSERT(impl.wr_block.is_locked(this));
  653. if(ec == net::error::eof)
  654. {
  655. // Rationale:
  656. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  657. ec = {};
  658. }
  659. if(! ec)
  660. {
  661. BOOST_BEAST_ASSIGN_EC(ec, result_);
  662. }
  663. if(ec && ec != error::closed)
  664. impl.change_status(status::failed);
  665. else
  666. impl.change_status(status::closed);
  667. impl.close();
  668. upcall:
  669. impl.rd_block.try_unlock(this);
  670. impl.op_r_close.maybe_invoke();
  671. if(impl.wr_block.try_unlock(this))
  672. impl.op_close.maybe_invoke()
  673. || impl.op_idle_ping.maybe_invoke()
  674. || impl.op_ping.maybe_invoke()
  675. || impl.op_wr.maybe_invoke();
  676. this->complete(cont, ec, bytes_written_);
  677. }
  678. }
  679. };
  680. //------------------------------------------------------------------------------
  681. template<class NextLayer, bool deflateSupported>
  682. template<class Handler, class DynamicBuffer>
  683. class stream<NextLayer, deflateSupported>::read_op
  684. : public beast::async_base<
  685. Handler, beast::executor_type<stream>>
  686. , public asio::coroutine
  687. {
  688. boost::weak_ptr<impl_type> wp_;
  689. DynamicBuffer& b_;
  690. std::size_t limit_;
  691. std::size_t bytes_written_ = 0;
  692. bool some_;
  693. public:
  694. template<class Handler_>
  695. read_op(
  696. Handler_&& h,
  697. boost::shared_ptr<impl_type> const& sp,
  698. DynamicBuffer& b,
  699. std::size_t limit,
  700. bool some)
  701. : async_base<Handler,
  702. beast::executor_type<stream>>(
  703. std::forward<Handler_>(h),
  704. sp->stream().get_executor())
  705. , wp_(sp)
  706. , b_(b)
  707. , limit_(limit ? limit : (
  708. std::numeric_limits<std::size_t>::max)())
  709. , some_(some)
  710. {
  711. (*this)({}, 0, false);
  712. }
  713. void operator()(
  714. error_code ec = {},
  715. std::size_t bytes_transferred = 0,
  716. bool cont = true)
  717. {
  718. using beast::detail::clamp;
  719. auto sp = wp_.lock();
  720. if(! sp)
  721. {
  722. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  723. bytes_written_ = 0;
  724. return this->complete(cont, ec, bytes_written_);
  725. }
  726. auto& impl = *sp;
  727. using mutable_buffers_type = typename
  728. DynamicBuffer::mutable_buffers_type;
  729. BOOST_ASIO_CORO_REENTER(*this)
  730. {
  731. do
  732. {
  733. // VFALCO TODO use boost::beast::bind_continuation
  734. BOOST_ASIO_CORO_YIELD
  735. {
  736. auto mb = beast::detail::dynamic_buffer_prepare(b_,
  737. clamp(impl.read_size_hint_db(b_), limit_),
  738. ec, error::buffer_overflow);
  739. if(impl.check_stop_now(ec))
  740. goto upcall;
  741. BOOST_ASIO_HANDLER_LOCATION((
  742. __FILE__, __LINE__,
  743. "websocket::async_read"));
  744. read_some_op<read_op, mutable_buffers_type>(
  745. std::move(*this), sp, *mb);
  746. }
  747. b_.commit(bytes_transferred);
  748. bytes_written_ += bytes_transferred;
  749. if(ec)
  750. goto upcall;
  751. }
  752. while(! some_ && ! impl.rd_done);
  753. upcall:
  754. this->complete(cont, ec, bytes_written_);
  755. }
  756. }
  757. };
  758. template<class NextLayer, bool deflateSupported>
  759. struct stream<NextLayer, deflateSupported>::
  760. run_read_some_op
  761. {
  762. template<
  763. class ReadHandler,
  764. class MutableBufferSequence>
  765. void
  766. operator()(
  767. ReadHandler&& h,
  768. boost::shared_ptr<impl_type> const& sp,
  769. MutableBufferSequence const& b)
  770. {
  771. // If you get an error on the following line it means
  772. // that your handler does not meet the documented type
  773. // requirements for the handler.
  774. static_assert(
  775. beast::detail::is_invocable<ReadHandler,
  776. void(error_code, std::size_t)>::value,
  777. "ReadHandler type requirements not met");
  778. read_some_op<
  779. typename std::decay<ReadHandler>::type,
  780. MutableBufferSequence>(
  781. std::forward<ReadHandler>(h),
  782. sp,
  783. b);
  784. }
  785. };
  786. template<class NextLayer, bool deflateSupported>
  787. struct stream<NextLayer, deflateSupported>::
  788. run_read_op
  789. {
  790. template<
  791. class ReadHandler,
  792. class DynamicBuffer>
  793. void
  794. operator()(
  795. ReadHandler&& h,
  796. boost::shared_ptr<impl_type> const& sp,
  797. DynamicBuffer* b,
  798. std::size_t limit,
  799. bool some)
  800. {
  801. // If you get an error on the following line it means
  802. // that your handler does not meet the documented type
  803. // requirements for the handler.
  804. static_assert(
  805. beast::detail::is_invocable<ReadHandler,
  806. void(error_code, std::size_t)>::value,
  807. "ReadHandler type requirements not met");
  808. read_op<
  809. typename std::decay<ReadHandler>::type,
  810. DynamicBuffer>(
  811. std::forward<ReadHandler>(h),
  812. sp,
  813. *b,
  814. limit,
  815. some);
  816. }
  817. };
  818. //------------------------------------------------------------------------------
  819. template<class NextLayer, bool deflateSupported>
  820. template<class DynamicBuffer>
  821. std::size_t
  822. stream<NextLayer, deflateSupported>::
  823. read(DynamicBuffer& buffer)
  824. {
  825. static_assert(is_sync_stream<next_layer_type>::value,
  826. "SyncStream type requirements not met");
  827. static_assert(
  828. net::is_dynamic_buffer<DynamicBuffer>::value,
  829. "DynamicBuffer type requirements not met");
  830. error_code ec;
  831. auto const bytes_written = read(buffer, ec);
  832. if(ec)
  833. BOOST_THROW_EXCEPTION(system_error{ec});
  834. return bytes_written;
  835. }
  836. template<class NextLayer, bool deflateSupported>
  837. template<class DynamicBuffer>
  838. std::size_t
  839. stream<NextLayer, deflateSupported>::
  840. read(DynamicBuffer& buffer, error_code& ec)
  841. {
  842. static_assert(is_sync_stream<next_layer_type>::value,
  843. "SyncStream type requirements not met");
  844. static_assert(
  845. net::is_dynamic_buffer<DynamicBuffer>::value,
  846. "DynamicBuffer type requirements not met");
  847. std::size_t bytes_written = 0;
  848. do
  849. {
  850. bytes_written += read_some(buffer, 0, ec);
  851. if(ec)
  852. return bytes_written;
  853. }
  854. while(! is_message_done());
  855. return bytes_written;
  856. }
  857. template<class NextLayer, bool deflateSupported>
  858. template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  859. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  860. stream<NextLayer, deflateSupported>::
  861. async_read(DynamicBuffer& buffer, ReadHandler&& handler)
  862. {
  863. static_assert(is_async_stream<next_layer_type>::value,
  864. "AsyncStream type requirements not met");
  865. static_assert(
  866. net::is_dynamic_buffer<DynamicBuffer>::value,
  867. "DynamicBuffer type requirements not met");
  868. return net::async_initiate<
  869. ReadHandler,
  870. void(error_code, std::size_t)>(
  871. run_read_op{},
  872. handler,
  873. impl_,
  874. &buffer,
  875. 0,
  876. false);
  877. }
  878. //------------------------------------------------------------------------------
  879. template<class NextLayer, bool deflateSupported>
  880. template<class DynamicBuffer>
  881. std::size_t
  882. stream<NextLayer, deflateSupported>::
  883. read_some(
  884. DynamicBuffer& buffer,
  885. std::size_t limit)
  886. {
  887. static_assert(is_sync_stream<next_layer_type>::value,
  888. "SyncStream type requirements not met");
  889. static_assert(
  890. net::is_dynamic_buffer<DynamicBuffer>::value,
  891. "DynamicBuffer type requirements not met");
  892. error_code ec;
  893. auto const bytes_written =
  894. read_some(buffer, limit, ec);
  895. if(ec)
  896. BOOST_THROW_EXCEPTION(system_error{ec});
  897. return bytes_written;
  898. }
  899. template<class NextLayer, bool deflateSupported>
  900. template<class DynamicBuffer>
  901. std::size_t
  902. stream<NextLayer, deflateSupported>::
  903. read_some(
  904. DynamicBuffer& buffer,
  905. std::size_t limit,
  906. error_code& ec)
  907. {
  908. static_assert(is_sync_stream<next_layer_type>::value,
  909. "SyncStream type requirements not met");
  910. static_assert(
  911. net::is_dynamic_buffer<DynamicBuffer>::value,
  912. "DynamicBuffer type requirements not met");
  913. using beast::detail::clamp;
  914. if(! limit)
  915. limit = (std::numeric_limits<std::size_t>::max)();
  916. auto const size =
  917. clamp(read_size_hint(buffer), limit);
  918. BOOST_ASSERT(size > 0);
  919. auto mb = beast::detail::dynamic_buffer_prepare(
  920. buffer, size, ec, error::buffer_overflow);
  921. if(impl_->check_stop_now(ec))
  922. return 0;
  923. auto const bytes_written = read_some(*mb, ec);
  924. buffer.commit(bytes_written);
  925. return bytes_written;
  926. }
  927. template<class NextLayer, bool deflateSupported>
  928. template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  929. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  930. stream<NextLayer, deflateSupported>::
  931. async_read_some(
  932. DynamicBuffer& buffer,
  933. std::size_t limit,
  934. ReadHandler&& handler)
  935. {
  936. static_assert(is_async_stream<next_layer_type>::value,
  937. "AsyncStream type requirements not met");
  938. static_assert(
  939. net::is_dynamic_buffer<DynamicBuffer>::value,
  940. "DynamicBuffer type requirements not met");
  941. return net::async_initiate<
  942. ReadHandler,
  943. void(error_code, std::size_t)>(
  944. run_read_op{},
  945. handler,
  946. impl_,
  947. &buffer,
  948. limit,
  949. true);
  950. }
  951. //------------------------------------------------------------------------------
  952. template<class NextLayer, bool deflateSupported>
  953. template<class MutableBufferSequence>
  954. std::size_t
  955. stream<NextLayer, deflateSupported>::
  956. read_some(
  957. MutableBufferSequence const& buffers)
  958. {
  959. static_assert(is_sync_stream<next_layer_type>::value,
  960. "SyncStream type requirements not met");
  961. static_assert(net::is_mutable_buffer_sequence<
  962. MutableBufferSequence>::value,
  963. "MutableBufferSequence type requirements not met");
  964. error_code ec;
  965. auto const bytes_written = read_some(buffers, ec);
  966. if(ec)
  967. BOOST_THROW_EXCEPTION(system_error{ec});
  968. return bytes_written;
  969. }
  970. template<class NextLayer, bool deflateSupported>
  971. template<class MutableBufferSequence>
  972. std::size_t
  973. stream<NextLayer, deflateSupported>::
  974. read_some(
  975. MutableBufferSequence const& buffers,
  976. error_code& ec)
  977. {
  978. static_assert(is_sync_stream<next_layer_type>::value,
  979. "SyncStream type requirements not met");
  980. static_assert(net::is_mutable_buffer_sequence<
  981. MutableBufferSequence>::value,
  982. "MutableBufferSequence type requirements not met");
  983. using beast::detail::clamp;
  984. auto& impl = *impl_;
  985. close_code code{};
  986. std::size_t bytes_written = 0;
  987. ec = {};
  988. // Make sure the stream is open
  989. if(impl.check_stop_now(ec))
  990. return bytes_written;
  991. loop:
  992. // See if we need to read a frame header. This
  993. // condition is structured to give the decompressor
  994. // a chance to emit the final empty deflate block
  995. //
  996. if(impl.rd_remain == 0 && (
  997. ! impl.rd_fh.fin || impl.rd_done))
  998. {
  999. // Read frame header
  1000. error_code result;
  1001. while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
  1002. {
  1003. if(result)
  1004. {
  1005. // _Fail the WebSocket Connection_
  1006. if(result == error::message_too_big)
  1007. code = close_code::too_big;
  1008. else
  1009. code = close_code::protocol_error;
  1010. do_fail(code, result, ec);
  1011. return bytes_written;
  1012. }
  1013. auto const bytes_transferred =
  1014. impl.stream().read_some(
  1015. impl.rd_buf.prepare(read_size(
  1016. impl.rd_buf, impl.rd_buf.max_size())),
  1017. ec);
  1018. impl.rd_buf.commit(bytes_transferred);
  1019. if(impl.check_stop_now(ec))
  1020. return bytes_written;
  1021. }
  1022. // Immediately apply the mask to the portion
  1023. // of the buffer holding payload data.
  1024. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  1025. detail::mask_inplace(buffers_prefix(
  1026. clamp(impl.rd_fh.len), impl.rd_buf.data()),
  1027. impl.rd_key);
  1028. if(detail::is_control(impl.rd_fh.op))
  1029. {
  1030. // Get control frame payload
  1031. auto const b = buffers_prefix(
  1032. clamp(impl.rd_fh.len), impl.rd_buf.data());
  1033. auto const len = buffer_bytes(b);
  1034. BOOST_ASSERT(len == impl.rd_fh.len);
  1035. // Clear this otherwise the next
  1036. // frame will be considered final.
  1037. impl.rd_fh.fin = false;
  1038. // Handle ping frame
  1039. if(impl.rd_fh.op == detail::opcode::ping)
  1040. {
  1041. ping_data payload;
  1042. detail::read_ping(payload, b);
  1043. impl.rd_buf.consume(len);
  1044. if(impl.wr_close)
  1045. {
  1046. // Ignore ping when closing
  1047. goto loop;
  1048. }
  1049. if(impl.ctrl_cb)
  1050. impl.ctrl_cb(frame_type::ping, to_string_view(payload));
  1051. detail::frame_buffer fb;
  1052. impl.template write_ping<flat_static_buffer_base>(fb,
  1053. detail::opcode::pong, payload);
  1054. net::write(impl.stream(), fb.data(), ec);
  1055. if(impl.check_stop_now(ec))
  1056. return bytes_written;
  1057. goto loop;
  1058. }
  1059. // Handle pong frame
  1060. if(impl.rd_fh.op == detail::opcode::pong)
  1061. {
  1062. ping_data payload;
  1063. detail::read_ping(payload, b);
  1064. impl.rd_buf.consume(len);
  1065. if(impl.ctrl_cb)
  1066. impl.ctrl_cb(frame_type::pong, to_string_view(payload));
  1067. goto loop;
  1068. }
  1069. // Handle close frame
  1070. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  1071. {
  1072. BOOST_ASSERT(! impl.rd_close);
  1073. impl.rd_close = true;
  1074. close_reason cr;
  1075. detail::read_close(cr, b, result);
  1076. if(result)
  1077. {
  1078. // _Fail the WebSocket Connection_
  1079. do_fail(close_code::protocol_error,
  1080. result, ec);
  1081. return bytes_written;
  1082. }
  1083. impl.cr = cr;
  1084. impl.rd_buf.consume(len);
  1085. if(impl.ctrl_cb)
  1086. impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
  1087. BOOST_ASSERT(! impl.wr_close);
  1088. // _Start the WebSocket Closing Handshake_
  1089. do_fail(
  1090. cr.code == close_code::none ?
  1091. close_code::normal :
  1092. static_cast<close_code>(cr.code),
  1093. error::closed, ec);
  1094. return bytes_written;
  1095. }
  1096. }
  1097. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  1098. {
  1099. // Empty non-final frame
  1100. goto loop;
  1101. }
  1102. impl.rd_done = false;
  1103. }
  1104. else
  1105. {
  1106. ec = {};
  1107. }
  1108. if(! impl.rd_deflated())
  1109. {
  1110. if(impl.rd_remain > 0)
  1111. {
  1112. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  1113. (std::min)(clamp(impl.rd_remain),
  1114. buffer_bytes(buffers)))
  1115. {
  1116. // Fill the read buffer first, otherwise we
  1117. // get fewer bytes at the cost of one I/O.
  1118. impl.rd_buf.commit(impl.stream().read_some(
  1119. impl.rd_buf.prepare(read_size(impl.rd_buf,
  1120. impl.rd_buf.max_size())), ec));
  1121. if(impl.check_stop_now(ec))
  1122. return bytes_written;
  1123. if(impl.rd_fh.mask)
  1124. detail::mask_inplace(
  1125. buffers_prefix(clamp(impl.rd_remain),
  1126. impl.rd_buf.data()), impl.rd_key);
  1127. }
  1128. if(impl.rd_buf.size() > 0)
  1129. {
  1130. // Copy from the read buffer.
  1131. // The mask was already applied.
  1132. auto const bytes_transferred = net::buffer_copy(
  1133. buffers, impl.rd_buf.data(),
  1134. clamp(impl.rd_remain));
  1135. auto const mb = buffers_prefix(
  1136. bytes_transferred, buffers);
  1137. impl.rd_remain -= bytes_transferred;
  1138. if(impl.rd_op == detail::opcode::text)
  1139. {
  1140. if(! impl.rd_utf8.write(mb) ||
  1141. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1142. ! impl.rd_utf8.finish()))
  1143. {
  1144. // _Fail the WebSocket Connection_
  1145. do_fail(close_code::bad_payload,
  1146. error::bad_frame_payload, ec);
  1147. return bytes_written;
  1148. }
  1149. }
  1150. bytes_written += bytes_transferred;
  1151. impl.rd_size += bytes_transferred;
  1152. impl.rd_buf.consume(bytes_transferred);
  1153. }
  1154. else
  1155. {
  1156. // Read into caller's buffer
  1157. BOOST_ASSERT(impl.rd_remain > 0);
  1158. BOOST_ASSERT(buffer_bytes(buffers) > 0);
  1159. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  1160. clamp(impl.rd_remain), buffers)) > 0);
  1161. auto const bytes_transferred =
  1162. impl.stream().read_some(buffers_prefix(
  1163. clamp(impl.rd_remain), buffers), ec);
  1164. // VFALCO What if some bytes were written?
  1165. if(impl.check_stop_now(ec))
  1166. return bytes_written;
  1167. BOOST_ASSERT(bytes_transferred > 0);
  1168. auto const mb = buffers_prefix(
  1169. bytes_transferred, buffers);
  1170. impl.rd_remain -= bytes_transferred;
  1171. if(impl.rd_fh.mask)
  1172. detail::mask_inplace(mb, impl.rd_key);
  1173. if(impl.rd_op == detail::opcode::text)
  1174. {
  1175. if(! impl.rd_utf8.write(mb) ||
  1176. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1177. ! impl.rd_utf8.finish()))
  1178. {
  1179. // _Fail the WebSocket Connection_
  1180. do_fail(close_code::bad_payload,
  1181. error::bad_frame_payload, ec);
  1182. return bytes_written;
  1183. }
  1184. }
  1185. bytes_written += bytes_transferred;
  1186. impl.rd_size += bytes_transferred;
  1187. }
  1188. }
  1189. BOOST_ASSERT( ! impl.rd_done );
  1190. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  1191. impl.rd_done = true;
  1192. }
  1193. else
  1194. {
  1195. // Read compressed message frame payload:
  1196. // inflate even if rd_fh_.len == 0, otherwise we
  1197. // never emit the end-of-stream deflate block.
  1198. //
  1199. bool did_read = false;
  1200. buffers_suffix<MutableBufferSequence> cb(buffers);
  1201. while(buffer_bytes(cb) > 0)
  1202. {
  1203. zlib::z_params zs;
  1204. {
  1205. auto const out = beast::buffers_front(cb);
  1206. zs.next_out = out.data();
  1207. zs.avail_out = out.size();
  1208. BOOST_ASSERT(zs.avail_out > 0);
  1209. }
  1210. // boolean to track the end of the message.
  1211. bool fin = false;
  1212. if(impl.rd_remain > 0)
  1213. {
  1214. if(impl.rd_buf.size() > 0)
  1215. {
  1216. // use what's there
  1217. auto const in = buffers_prefix(
  1218. clamp(impl.rd_remain), beast::buffers_front(
  1219. impl.rd_buf.data()));
  1220. zs.avail_in = in.size();
  1221. zs.next_in = in.data();
  1222. }
  1223. else if(! did_read)
  1224. {
  1225. // read new
  1226. auto const bytes_transferred =
  1227. impl.stream().read_some(
  1228. impl.rd_buf.prepare(read_size(
  1229. impl.rd_buf, impl.rd_buf.max_size())),
  1230. ec);
  1231. if(impl.check_stop_now(ec))
  1232. return bytes_written;
  1233. BOOST_ASSERT(bytes_transferred > 0);
  1234. impl.rd_buf.commit(bytes_transferred);
  1235. if(impl.rd_fh.mask)
  1236. detail::mask_inplace(
  1237. buffers_prefix(clamp(impl.rd_remain),
  1238. impl.rd_buf.data()), impl.rd_key);
  1239. auto const in = buffers_prefix(
  1240. clamp(impl.rd_remain), buffers_front(
  1241. impl.rd_buf.data()));
  1242. zs.avail_in = in.size();
  1243. zs.next_in = in.data();
  1244. did_read = true;
  1245. }
  1246. else
  1247. {
  1248. break;
  1249. }
  1250. }
  1251. else if(impl.rd_fh.fin)
  1252. {
  1253. // append the empty block codes
  1254. static std::uint8_t constexpr
  1255. empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
  1256. zs.next_in = empty_block;
  1257. zs.avail_in = sizeof(empty_block);
  1258. fin = true;
  1259. }
  1260. else
  1261. {
  1262. break;
  1263. }
  1264. impl.inflate(zs, zlib::Flush::sync, ec);
  1265. if(impl.check_stop_now(ec))
  1266. return bytes_written;
  1267. if (fin && zs.total_out == 0) {
  1268. impl.do_context_takeover_read(impl.role);
  1269. impl.rd_done = true;
  1270. break;
  1271. }
  1272. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  1273. impl.rd_size, zs.total_out, impl.rd_msg_max))
  1274. {
  1275. do_fail(close_code::too_big,
  1276. error::message_too_big, ec);
  1277. return bytes_written;
  1278. }
  1279. cb.consume(zs.total_out);
  1280. impl.rd_size += zs.total_out;
  1281. if (! fin) {
  1282. impl.rd_remain -= zs.total_in;
  1283. impl.rd_buf.consume(zs.total_in);
  1284. }
  1285. bytes_written += zs.total_out;
  1286. }
  1287. if(impl.rd_op == detail::opcode::text)
  1288. {
  1289. // check utf8
  1290. if(! impl.rd_utf8.write(beast::buffers_prefix(
  1291. bytes_written, buffers)) || (
  1292. impl.rd_done && ! impl.rd_utf8.finish()))
  1293. {
  1294. // _Fail the WebSocket Connection_
  1295. do_fail(close_code::bad_payload,
  1296. error::bad_frame_payload, ec);
  1297. return bytes_written;
  1298. }
  1299. }
  1300. }
  1301. return bytes_written;
  1302. }
  1303. template<class NextLayer, bool deflateSupported>
  1304. template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  1305. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  1306. stream<NextLayer, deflateSupported>::
  1307. async_read_some(
  1308. MutableBufferSequence const& buffers,
  1309. ReadHandler&& handler)
  1310. {
  1311. static_assert(is_async_stream<next_layer_type>::value,
  1312. "AsyncStream type requirements not met");
  1313. static_assert(net::is_mutable_buffer_sequence<
  1314. MutableBufferSequence>::value,
  1315. "MutableBufferSequence type requirements not met");
  1316. return net::async_initiate<
  1317. ReadHandler,
  1318. void(error_code, std::size_t)>(
  1319. run_read_some_op{},
  1320. handler,
  1321. impl_,
  1322. buffers);
  1323. }
  1324. } // websocket
  1325. } // beast
  1326. } // boost
  1327. #endif