CCDownloader-curl.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. /****************************************************************************
  2. Copyright (c) 2015-2016 Chukong Technologies Inc.
  3. Copyright (c) 2017-2018 Xiamen Yaji Software Co., Ltd.
  4. http://www.cocos2d-x.org
  5. Permission is hereby granted, free of charge, to any person obtaining a copy
  6. of this software and associated documentation files (the "Software"), to deal
  7. in the Software without restriction, including without limitation the rights
  8. to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. copies of the Software, and to permit persons to whom the Software is
  10. furnished to do so, subject to the following conditions:
  11. The above copyright notice and this permission notice shall be included in
  12. all copies or substantial portions of the Software.
  13. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. THE SOFTWARE.
  20. ****************************************************************************/
  21. #include "network/CCDownloader-curl.h"
  22. #include <set>
  23. #include <curl/curl.h>
  24. #include <deque>
  25. #include "base/CCScheduler.h"
  26. #include "platform/CCFileUtils.h"
  27. #include "platform/CCApplication.h"
  28. #include "network/CCDownloader.h"
  29. // **NOTE**
  30. // In the file:
  31. // member function with suffix "Proc" designed called in DownloaderCURL::_threadProc
  32. // member function without suffix designed called in main thread
  33. #ifndef CC_CURL_POLL_TIMEOUT_MS
  34. #define CC_CURL_POLL_TIMEOUT_MS 50
  35. #endif
  36. namespace cocos2d { namespace network {
  37. using namespace std;
  38. ////////////////////////////////////////////////////////////////////////////////
  39. // Implementation DownloadTaskCURL
  40. class DownloadTaskCURL : public IDownloadTask
  41. {
  42. static int _sSerialId;
  43. // if more than one task write to one file, cause file broken
  44. // so use a set to check this situation
  45. static set<string> _sStoragePathSet;
  46. public:
  47. int serialId;
  48. DownloadTaskCURL()
  49. : serialId(_sSerialId++)
  50. , _fp(nullptr)
  51. {
  52. _initInternal();
  53. DLLOG("Construct DownloadTaskCURL %p", this);
  54. }
  55. virtual ~DownloadTaskCURL()
  56. {
  57. // if task destroyed unnormally, we should release WritenFileName stored in set.
  58. // Normally, this action should done when task finished.
  59. if (_tempFileName.length() && _sStoragePathSet.end() != _sStoragePathSet.find(_tempFileName))
  60. {
  61. DownloadTaskCURL::_sStoragePathSet.erase(_tempFileName);
  62. }
  63. if (_fp)
  64. {
  65. fclose(_fp);
  66. _fp = nullptr;
  67. }
  68. DLLOG("Destruct DownloadTaskCURL %p", this);
  69. }
  70. bool init(const string& filename, const string& tempSuffix)
  71. {
  72. if (0 == filename.length())
  73. {
  74. // data task
  75. _buf.reserve(CURL_MAX_WRITE_SIZE);
  76. return true;
  77. }
  78. // file task
  79. _fileName = filename;
  80. _tempFileName = filename;
  81. _tempFileName.append(tempSuffix);
  82. if (_sStoragePathSet.end() != _sStoragePathSet.find(_tempFileName))
  83. {
  84. // there is another task uses this storage path
  85. _errCode = DownloadTask::ERROR_FILE_OP_FAILED;
  86. _errCodeInternal = 0;
  87. _errDescription = "More than one download file task write to same file:";
  88. _errDescription.append(_tempFileName);
  89. return false;
  90. }
  91. _sStoragePathSet.insert(_tempFileName);
  92. // open temp file handle for write
  93. bool ret = false;
  94. do
  95. {
  96. string dir;
  97. size_t found = _tempFileName.find_last_of("/\\");
  98. if (found == string::npos)
  99. {
  100. _errCode = DownloadTask::ERROR_INVALID_PARAMS;
  101. _errCodeInternal = 0;
  102. _errDescription = "Can't find dirname in storagePath.";
  103. break;
  104. }
  105. // ensure directory is exist
  106. auto util = FileUtils::getInstance();
  107. dir = _tempFileName.substr(0, found+1);
  108. if (false == util->isDirectoryExist(dir))
  109. {
  110. if (false == util->createDirectory(dir))
  111. {
  112. _errCode = DownloadTask::ERROR_FILE_OP_FAILED;
  113. _errCodeInternal = 0;
  114. _errDescription = "Can't create dir:";
  115. _errDescription.append(dir);
  116. break;
  117. }
  118. }
  119. // open file
  120. _fp = fopen(util->getSuitableFOpen(_tempFileName).c_str(), "ab");
  121. if (nullptr == _fp)
  122. {
  123. _errCode = DownloadTask::ERROR_FILE_OP_FAILED;
  124. _errCodeInternal = 0;
  125. _errDescription = "Can't open file:";
  126. _errDescription.append(_tempFileName);
  127. }
  128. ret = true;
  129. } while (0);
  130. return ret;
  131. }
  132. void initProc()
  133. {
  134. lock_guard<mutex> lock(_mutex);
  135. _initInternal();
  136. }
  137. void setErrorProc(int code, int codeInternal, const char *desc)
  138. {
  139. lock_guard<mutex> lock(_mutex);
  140. _errCode = code;
  141. _errCodeInternal = codeInternal;
  142. _errDescription = desc;
  143. }
  144. size_t writeDataProc(unsigned char *buffer, size_t size, size_t count)
  145. {
  146. lock_guard<mutex> lock(_mutex);
  147. size_t ret = 0;
  148. if (_fp)
  149. {
  150. ret = fwrite(buffer, size, count, _fp);
  151. }
  152. else
  153. {
  154. ret = size * count;
  155. auto cap = _buf.capacity();
  156. auto bufSize = _buf.size();
  157. if (cap < bufSize + ret)
  158. {
  159. _buf.reserve(bufSize * 2);
  160. }
  161. _buf.insert(_buf.end() , buffer, buffer + ret);
  162. }
  163. if (ret)
  164. {
  165. _bytesReceived += ret;
  166. _totalBytesReceived += ret;
  167. }
  168. return ret;
  169. }
  170. private:
  171. friend class DownloaderCURL;
  172. // for lock object instance
  173. mutex _mutex;
  174. // header info
  175. bool _acceptRanges;
  176. bool _headerAchieved;
  177. int64_t _totalBytesExpected;
  178. string _header; // temp buffer for receive header string, only used in thread proc
  179. // progress
  180. int64_t _bytesReceived;
  181. int64_t _totalBytesReceived;
  182. // error
  183. int _errCode;
  184. int _errCodeInternal;
  185. string _errDescription;
  186. // for saving data
  187. string _fileName;
  188. string _tempFileName;
  189. vector<unsigned char> _buf;
  190. FILE* _fp;
  191. void _initInternal()
  192. {
  193. _acceptRanges = (false);
  194. _headerAchieved = (false);
  195. _bytesReceived = (0);
  196. _totalBytesReceived = (0);
  197. _totalBytesExpected = (0);
  198. _errCode = (DownloadTask::ERROR_NO_ERROR);
  199. _errCodeInternal = (CURLE_OK);
  200. _header.resize(0);
  201. _header.reserve(384); // pre alloc header string buffer
  202. }
  203. };
  204. int DownloadTaskCURL::_sSerialId;
  205. set<string> DownloadTaskCURL::_sStoragePathSet;
  206. typedef pair< shared_ptr<const DownloadTask>, DownloadTaskCURL *> TaskWrapper;
  207. ////////////////////////////////////////////////////////////////////////////////
  208. // Implementation DownloaderCURL::Impl
  209. // This class shared by DownloaderCURL and work thread.
  210. class DownloaderCURL::Impl : public enable_shared_from_this<DownloaderCURL::Impl>
  211. {
  212. public:
  213. DownloaderHints hints;
  214. Impl()
  215. // : _thread(nullptr)
  216. {
  217. DLLOG("Construct DownloaderCURL::Impl %p", this);
  218. }
  219. ~Impl()
  220. {
  221. DLLOG("Destruct DownloaderCURL::Impl %p %d", this, _thread.joinable());
  222. }
  223. void addTask(std::shared_ptr<const DownloadTask> task, DownloadTaskCURL* coTask)
  224. {
  225. if (DownloadTask::ERROR_NO_ERROR == coTask->_errCode)
  226. {
  227. lock_guard<mutex> lock(_requestMutex);
  228. _requestQueue.push_back(make_pair(task, coTask));
  229. }
  230. else
  231. {
  232. lock_guard<mutex> lock(_finishedMutex);
  233. _finishedQueue.push_back(make_pair(task, coTask));
  234. }
  235. }
  236. void run()
  237. {
  238. lock_guard<mutex> lock(_threadMutex);
  239. if (false == _thread.joinable())
  240. {
  241. thread newThread(&DownloaderCURL::Impl::_threadProc, this);
  242. _thread.swap(newThread);
  243. }
  244. }
  245. void stop()
  246. {
  247. lock_guard<mutex> lock(_threadMutex);
  248. if (_thread.joinable())
  249. {
  250. _thread.detach();
  251. }
  252. }
  253. bool stoped()
  254. {
  255. lock_guard<mutex> lock(_threadMutex);
  256. return false == _thread.joinable() ? true : false;
  257. }
  258. void getProcessTasks(vector<TaskWrapper>& outList)
  259. {
  260. lock_guard<mutex> lock(_processMutex);
  261. outList.reserve(_processSet.size());
  262. outList.insert(outList.end(), _processSet.begin(), _processSet.end());
  263. }
  264. void getFinishedTasks(vector<TaskWrapper>& outList)
  265. {
  266. lock_guard<mutex> lock(_finishedMutex);
  267. outList.reserve(_finishedQueue.size());
  268. outList.insert(outList.end(), _finishedQueue.begin(), _finishedQueue.end());
  269. _finishedQueue.clear();
  270. }
  271. private:
  272. static size_t _outputHeaderCallbackProc(void *buffer, size_t size, size_t count, void *userdata)
  273. {
  274. int strLen = int(size * count);
  275. DLLOG(" _outputHeaderCallbackProc: %.*s", strLen, buffer);
  276. DownloadTaskCURL& coTask = *((DownloadTaskCURL*)(userdata));
  277. coTask._header.append((const char *)buffer, strLen);
  278. return strLen;
  279. }
  280. static size_t _outputDataCallbackProc(void *buffer, size_t size, size_t count, void *userdata)
  281. {
  282. // DLLOG(" _outputDataCallbackProc: size(%ld), count(%ld)", size, count);
  283. DownloadTaskCURL *coTask = (DownloadTaskCURL*)userdata;
  284. // If your callback function returns CURL_WRITEFUNC_PAUSE it will cause this transfer to become paused.
  285. return coTask->writeDataProc((unsigned char *)buffer, size, count);
  286. }
  287. // this function designed call in work thread
  288. // the curl handle destroyed in _threadProc
  289. // handle inited for get header
  290. void _initCurlHandleProc(CURL *handle, TaskWrapper& wrapper, bool forContent = false)
  291. {
  292. const DownloadTask& task = *wrapper.first;
  293. const DownloadTaskCURL* coTask = wrapper.second;
  294. // set url
  295. curl_easy_setopt(handle, CURLOPT_URL, task.requestURL.c_str());
  296. // set write func
  297. if (forContent)
  298. {
  299. curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, DownloaderCURL::Impl::_outputDataCallbackProc);
  300. }
  301. else
  302. {
  303. curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, DownloaderCURL::Impl::_outputHeaderCallbackProc);
  304. }
  305. curl_easy_setopt(handle, CURLOPT_WRITEDATA, coTask);
  306. curl_easy_setopt(handle, CURLOPT_NOPROGRESS, true);
  307. // curl_easy_setopt(handle, CURLOPT_XFERINFOFUNCTION, DownloaderCURL::Impl::_progressCallbackProc);
  308. // curl_easy_setopt(handle, CURLOPT_XFERINFODATA, coTask);
  309. curl_easy_setopt(handle, CURLOPT_FAILONERROR, true);
  310. curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
  311. if (forContent)
  312. {
  313. /** if server acceptRanges and local has part of file, we continue to download **/
  314. if (coTask->_acceptRanges && coTask->_totalBytesReceived > 0)
  315. {
  316. curl_easy_setopt(handle, CURLOPT_RESUME_FROM_LARGE,(curl_off_t)coTask->_totalBytesReceived);
  317. }
  318. }
  319. else
  320. {
  321. // get header options
  322. curl_easy_setopt(handle, CURLOPT_HEADER, 1);
  323. curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
  324. }
  325. // if (!sProxy.empty())
  326. // {
  327. // curl_easy_setopt(curl, CURLOPT_PROXY, sProxy.c_str());
  328. // }
  329. if (hints.timeoutInSeconds)
  330. {
  331. curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, hints.timeoutInSeconds);
  332. }
  333. static const long LOW_SPEED_LIMIT = 1;
  334. static const long LOW_SPEED_TIME = 10;
  335. curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, LOW_SPEED_LIMIT);
  336. curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, LOW_SPEED_TIME);
  337. curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, false);
  338. curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, false);
  339. static const int MAX_REDIRS = 5;
  340. if (MAX_REDIRS)
  341. {
  342. curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, true);
  343. curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_REDIRS);
  344. }
  345. }
  346. // get header info, if success set handle to content download state
  347. bool _getHeaderInfoProc(CURL *handle, TaskWrapper& wrapper)
  348. {
  349. DownloadTaskCURL& coTask = *wrapper.second;
  350. CURLcode rc = CURLE_OK;
  351. do
  352. {
  353. long httpResponseCode = 0;
  354. rc = curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &httpResponseCode);
  355. if (CURLE_OK != rc)
  356. {
  357. break;
  358. }
  359. if (200 != httpResponseCode)
  360. {
  361. char buf[256] = {0};
  362. sprintf(buf
  363. , "When request url(%s) header info, return unexcept http response code(%ld)"
  364. , wrapper.first->requestURL.c_str()
  365. , httpResponseCode);
  366. coTask.setErrorProc(DownloadTask::ERROR_IMPL_INTERNAL, CURLE_OK, buf);
  367. }
  368. // curl_easy_getinfo(handle, CURLINFO_EFFECTIVE_URL, &effectiveUrl);
  369. // curl_easy_getinfo(handle, CURLINFO_CONTENT_TYPE, &contentType);
  370. double contentLen = 0;
  371. rc = curl_easy_getinfo(handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &contentLen);
  372. if (CURLE_OK != rc)
  373. {
  374. break;
  375. }
  376. bool acceptRanges = (string::npos != coTask._header.find("Accept-Ranges")) ? true : false;
  377. // get current file size
  378. int64_t fileSize = 0;
  379. if (acceptRanges && coTask._tempFileName.length())
  380. {
  381. fileSize = FileUtils::getInstance()->getFileSize(coTask._tempFileName);
  382. }
  383. // set header info to coTask
  384. lock_guard<mutex> lock(coTask._mutex);
  385. coTask._totalBytesExpected = (int64_t)contentLen;
  386. coTask._acceptRanges = acceptRanges;
  387. if (acceptRanges && fileSize > 0)
  388. {
  389. coTask._totalBytesReceived = fileSize;
  390. }
  391. coTask._headerAchieved = true;
  392. } while (0);
  393. if (CURLE_OK != rc)
  394. {
  395. coTask.setErrorProc(DownloadTask::ERROR_IMPL_INTERNAL, rc, curl_easy_strerror(rc));
  396. }
  397. return coTask._headerAchieved;
  398. }
  399. void _threadProc()
  400. {
  401. DLLOG("++++DownloaderCURL::Impl::_threadProc begin %p", this);
  402. // the holder prevent DownloaderCURL::Impl class instance be destruct in main thread
  403. auto holder = this->shared_from_this();
  404. auto thisThreadId = this_thread::get_id();
  405. uint32_t countOfMaxProcessingTasks = this->hints.countOfMaxProcessingTasks;
  406. // init curl content
  407. CURLM* curlmHandle = curl_multi_init();
  408. unordered_map<CURL*, TaskWrapper> coTaskMap;
  409. int runningHandles = 0;
  410. CURLMcode mcode = CURLM_OK;
  411. int rc = 0; // select return code
  412. do
  413. {
  414. // check the thread should exit or not
  415. {
  416. lock_guard<mutex> lock(_threadMutex);
  417. // if the Impl stoped, this->_thread.reset will be called, thus _thread.get_id() not equal with thisThreadId
  418. if (thisThreadId != this->_thread.get_id())
  419. {
  420. break;
  421. }
  422. }
  423. if (runningHandles)
  424. {
  425. // get timeout setting from multi-handle
  426. long timeoutMS = -1;
  427. curl_multi_timeout(curlmHandle, &timeoutMS);
  428. if(timeoutMS < 0)
  429. {
  430. timeoutMS = 1000;
  431. }
  432. /* get file descriptors from the transfers */
  433. fd_set fdread;
  434. fd_set fdwrite;
  435. fd_set fdexcep;
  436. int maxfd = -1;
  437. FD_ZERO(&fdread);
  438. FD_ZERO(&fdwrite);
  439. FD_ZERO(&fdexcep);
  440. mcode = curl_multi_fdset(curlmHandle, &fdread, &fdwrite, &fdexcep, &maxfd);
  441. if (CURLM_OK != mcode)
  442. {
  443. break;
  444. }
  445. // do wait action
  446. if(maxfd == -1)
  447. {
  448. this_thread::sleep_for(chrono::milliseconds(CC_CURL_POLL_TIMEOUT_MS));
  449. rc = 0;
  450. }
  451. else
  452. {
  453. struct timeval timeout;
  454. timeout.tv_sec = timeoutMS / 1000;
  455. timeout.tv_usec = (timeoutMS % 1000) * 1000;
  456. rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
  457. }
  458. if (rc < 0)
  459. {
  460. DLLOG(" _threadProc: select return unexpect code: %d", rc);
  461. }
  462. }
  463. if (coTaskMap.size())
  464. {
  465. mcode = CURLM_CALL_MULTI_PERFORM;
  466. while(CURLM_CALL_MULTI_PERFORM == mcode)
  467. {
  468. mcode = curl_multi_perform(curlmHandle, &runningHandles);
  469. }
  470. if (CURLM_OK != mcode)
  471. {
  472. break;
  473. }
  474. struct CURLMsg *m;
  475. do {
  476. int msgq = 0;
  477. m = curl_multi_info_read(curlmHandle, &msgq);
  478. if(m && (m->msg == CURLMSG_DONE))
  479. {
  480. CURL *curlHandle = m->easy_handle;
  481. CURLcode errCode = m->data.result;
  482. TaskWrapper wrapper = coTaskMap[curlHandle];
  483. // remove from multi-handle
  484. curl_multi_remove_handle(curlmHandle, curlHandle);
  485. bool reinited = false;
  486. do
  487. {
  488. if (CURLE_OK != errCode)
  489. {
  490. wrapper.second->setErrorProc(DownloadTask::ERROR_IMPL_INTERNAL, errCode, curl_easy_strerror(errCode));
  491. break;
  492. }
  493. // if the task is content download task, cleanup the handle
  494. if (wrapper.second->_headerAchieved)
  495. {
  496. break;
  497. }
  498. // the task is get header task
  499. // first, we get info from response
  500. if (false == _getHeaderInfoProc(curlHandle, wrapper))
  501. {
  502. // the error info has been set in _getHeaderInfoProc
  503. break;
  504. }
  505. // after get header info success
  506. // wrapper.second->_totalBytesReceived inited by local file size
  507. // if the local file size equal with the content size from header, the file has downloaded finish
  508. if (wrapper.second->_totalBytesReceived &&
  509. wrapper.second->_totalBytesReceived == wrapper.second->_totalBytesExpected)
  510. {
  511. // the file has download complete
  512. // break to move this task to finish queue
  513. break;
  514. }
  515. // reinit curl handle for download content
  516. curl_easy_reset(curlHandle);
  517. _initCurlHandleProc(curlHandle, wrapper, true);
  518. mcode = curl_multi_add_handle(curlmHandle, curlHandle);
  519. if (CURLM_OK != mcode)
  520. {
  521. wrapper.second->setErrorProc(DownloadTask::ERROR_IMPL_INTERNAL, mcode, curl_multi_strerror(mcode));
  522. break;
  523. }
  524. reinited = true;
  525. } while (0);
  526. if (reinited)
  527. {
  528. continue;
  529. }
  530. curl_easy_cleanup(curlHandle);
  531. DLLOG(" _threadProc task clean cur handle :%p with errCode:%d", curlHandle, errCode);
  532. // remove from coTaskMap
  533. coTaskMap.erase(curlHandle);
  534. // remove from _processSet
  535. {
  536. lock_guard<mutex> lock(_processMutex);
  537. if (_processSet.end() != _processSet.find(wrapper)) {
  538. _processSet.erase(wrapper);
  539. }
  540. }
  541. // add to finishedQueue
  542. {
  543. lock_guard<mutex> lock(_finishedMutex);
  544. _finishedQueue.push_back(wrapper);
  545. }
  546. }
  547. } while(m);
  548. }
  549. // process tasks in _requestList
  550. auto size = coTaskMap.size();
  551. while (0 == countOfMaxProcessingTasks || size < countOfMaxProcessingTasks)
  552. {
  553. // get task wrapper from request queue
  554. TaskWrapper wrapper;
  555. {
  556. lock_guard<mutex> lock(_requestMutex);
  557. if (_requestQueue.size())
  558. {
  559. wrapper = _requestQueue.front();
  560. _requestQueue.pop_front();
  561. }
  562. }
  563. // if request queue is empty, the wrapper.first is nullptr
  564. if (! wrapper.first)
  565. {
  566. break;
  567. }
  568. wrapper.second->initProc();
  569. // create curl handle from task and add into curl multi handle
  570. CURL* curlHandle = curl_easy_init();
  571. if (nullptr == curlHandle)
  572. {
  573. wrapper.second->setErrorProc(DownloadTask::ERROR_IMPL_INTERNAL, 0, "Alloc curl handle failed.");
  574. lock_guard<mutex> lock(_finishedMutex);
  575. _finishedQueue.push_back(wrapper);
  576. continue;
  577. }
  578. // init curl handle for get header info
  579. _initCurlHandleProc(curlHandle, wrapper);
  580. // add curl handle to process list
  581. mcode = curl_multi_add_handle(curlmHandle, curlHandle);
  582. if (CURLM_OK != mcode)
  583. {
  584. wrapper.second->setErrorProc(DownloadTask::ERROR_IMPL_INTERNAL, mcode, curl_multi_strerror(mcode));
  585. lock_guard<mutex> lock(_finishedMutex);
  586. _finishedQueue.push_back(wrapper);
  587. continue;
  588. }
  589. DLLOG(" _threadProc task create curl handle:%p", curlHandle);
  590. coTaskMap[curlHandle] = wrapper;
  591. lock_guard<mutex> lock(_processMutex);
  592. _processSet.insert(wrapper);
  593. }
  594. } while (coTaskMap.size());
  595. curl_multi_cleanup(curlmHandle);
  596. this->stop();
  597. DLLOG("----DownloaderCURL::Impl::_threadProc end");
  598. }
  599. thread _thread;
  600. deque<TaskWrapper> _requestQueue;
  601. set<TaskWrapper> _processSet;
  602. deque<TaskWrapper> _finishedQueue;
  603. mutex _threadMutex;
  604. mutex _requestMutex;
  605. mutex _processMutex;
  606. mutex _finishedMutex;
  607. };
  608. ////////////////////////////////////////////////////////////////////////////////
  609. // Implementation DownloaderCURL
  610. DownloaderCURL::DownloaderCURL(const DownloaderHints& hints)
  611. : _impl(std::make_shared<Impl>())
  612. , _currTask(nullptr)
  613. {
  614. DLLOG("Construct DownloaderCURL %p", this);
  615. _impl->hints = hints;
  616. _scheduler = Application::getInstance()->getScheduler();
  617. _transferDataToBuffer = [this](void *buf, int64_t len)->int64_t
  618. {
  619. DownloadTaskCURL& coTask = *_currTask;
  620. int64_t dataLen = coTask._buf.size();
  621. if (len < dataLen)
  622. {
  623. return 0;
  624. }
  625. memcpy(buf, coTask._buf.data(), dataLen);
  626. coTask._buf.resize(0);
  627. return dataLen;
  628. };
  629. char key[128];
  630. sprintf(key, "DownloaderCURL(%p)", this);
  631. _schedulerKey = key;
  632. if(auto sche = _scheduler.lock())
  633. {
  634. sche->schedule(bind(&DownloaderCURL::_onSchedule, this, placeholders::_1),
  635. this,
  636. 0.1f,
  637. true,
  638. _schedulerKey);
  639. }
  640. }
  641. DownloaderCURL::~DownloaderCURL()
  642. {
  643. if(auto sche = _scheduler.lock())
  644. {
  645. sche->unschedule(_schedulerKey, this);
  646. }
  647. _impl->stop();
  648. DLLOG("Destruct DownloaderCURL %p", this);
  649. }
  650. IDownloadTask *DownloaderCURL::createCoTask(std::shared_ptr<const DownloadTask>& task)
  651. {
  652. DownloadTaskCURL *coTask = new (std::nothrow) DownloadTaskCURL;
  653. coTask->init(task->storagePath, _impl->hints.tempFileNameSuffix);
  654. DLLOG(" DownloaderCURL: createTask: Id(%d)", coTask->serialId);
  655. _impl->addTask(task, coTask);
  656. _impl->run();
  657. if(auto sche = _scheduler.lock())
  658. {
  659. sche->resumeTarget(this);
  660. }
  661. return coTask;
  662. }
  663. void DownloaderCURL::abort(const std::unique_ptr<IDownloadTask>& task) {
  664. // REFINE
  665. // https://github.com/cocos-creator/cocos2d-x-lite/pull/1291
  666. DLLOG("%s isn't implemented!\n", __FUNCTION__);
  667. }
  668. void DownloaderCURL::_onSchedule(float)
  669. {
  670. vector<TaskWrapper> tasks;
  671. // update processing tasks
  672. _impl->getProcessTasks(tasks);
  673. for (auto& wrapper : tasks)
  674. {
  675. const DownloadTask& task = *wrapper.first;
  676. DownloadTaskCURL& coTask = *wrapper.second;
  677. lock_guard<mutex> lock(coTask._mutex);
  678. if (coTask._bytesReceived)
  679. {
  680. _currTask = &coTask;
  681. onTaskProgress(task,
  682. coTask._bytesReceived,
  683. coTask._totalBytesReceived,
  684. coTask._totalBytesExpected,
  685. _transferDataToBuffer);
  686. _currTask = nullptr;
  687. coTask._bytesReceived = 0;
  688. }
  689. }
  690. tasks.resize(0);
  691. // update finished tasks
  692. _impl->getFinishedTasks(tasks);
  693. if (_impl->stoped())
  694. {
  695. if (auto sche = _scheduler.lock())
  696. {
  697. sche->pauseTarget(this);
  698. }
  699. }
  700. for (auto& wrapper : tasks)
  701. {
  702. const DownloadTask& task = *wrapper.first;
  703. DownloadTaskCURL& coTask = *wrapper.second;
  704. // if there is bytesReceived, call progress update first
  705. if (coTask._bytesReceived)
  706. {
  707. _currTask = &coTask;
  708. onTaskProgress(task,
  709. coTask._bytesReceived,
  710. coTask._totalBytesReceived,
  711. coTask._totalBytesExpected,
  712. _transferDataToBuffer);
  713. coTask._bytesReceived = 0;
  714. _currTask = nullptr;
  715. }
  716. // if file task, close file handle and rename file if needed
  717. if (coTask._fp)
  718. {
  719. fclose(coTask._fp);
  720. coTask._fp = nullptr;
  721. do
  722. {
  723. if (0 == coTask._fileName.length())
  724. {
  725. break;
  726. }
  727. auto util = FileUtils::getInstance();
  728. // if file already exist, remove it
  729. if (util->isFileExist(coTask._fileName))
  730. {
  731. if (false == util->removeFile(coTask._fileName))
  732. {
  733. coTask._errCode = DownloadTask::ERROR_FILE_OP_FAILED;
  734. coTask._errCodeInternal = 0;
  735. coTask._errDescription = "Can't remove old file: ";
  736. coTask._errDescription.append(coTask._fileName);
  737. break;
  738. }
  739. }
  740. // rename file
  741. if (util->renameFile(coTask._tempFileName, coTask._fileName))
  742. {
  743. // success, remove storage from set
  744. DownloadTaskCURL::_sStoragePathSet.erase(coTask._tempFileName);
  745. break;
  746. }
  747. // failed
  748. coTask._errCode = DownloadTask::ERROR_FILE_OP_FAILED;
  749. coTask._errCodeInternal = 0;
  750. coTask._errDescription = "Can't renamefile from: ";
  751. coTask._errDescription.append(coTask._tempFileName);
  752. coTask._errDescription.append(" to: ");
  753. coTask._errDescription.append(coTask._fileName);
  754. } while (0);
  755. }
  756. // needn't lock coTask here, because tasks has removed form _impl
  757. onTaskFinish(task, coTask._errCode, coTask._errCodeInternal, coTask._errDescription, coTask._buf);
  758. DLLOG(" DownloaderCURL: finish Task: Id(%d)", coTask.serialId);
  759. }
  760. }
  761. }} // namespace cocos2d::network