123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329 |
- /****************************************************************************
- Copyright (c) 2010-2012 cocos2d-x.org
- Copyright (c) 2013-2017 Chukong Technologies Inc.
- http://www.cocos2d-x.org
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
- "[WebSocket module] is based in part on the work of the libwebsockets project
- (http://libwebsockets.org)"
- ****************************************************************************/
- #include "network/WebSocket.h"
- #include "network/Uri.h"
- #include "base/CCDirector.h"
- #include "base/CCScheduler.h"
- #include "base/CCEventDispatcher.h"
- #include "base/CCEventListenerCustom.h"
- #include "platform/CCFileUtils.h"
- #include <thread>
- #include <mutex>
- #include <queue>
- #include <list>
- #include <signal.h>
- #include <errno.h>
- #include "libwebsockets.h"
- #define NS_NETWORK_BEGIN namespace cocos2d { namespace network {
- #define NS_NETWORK_END }}
- #define WS_RX_BUFFER_SIZE (65536)
- #define WS_RESERVE_RECEIVE_BUFFER_SIZE (4096)
- #define LOG_TAG "WebSocket.cpp"
- #if (CC_TARGET_PLATFORM == CC_PLATFORM_WIN32)
- // log, CCLOG aren't threadsafe, since we uses sub threads for parsing pcm data, threadsafe log output
- // is needed. Define the following macros (ALOGV, ALOGD, ALOGI, ALOGW, ALOGE) for threadsafe log output.
- //FIXME: Move _winLog, winLog to a separated file
- static void _winLog(const char *format, va_list args)
- {
- static const int MAX_LOG_LENGTH = 16 * 1024;
- int bufferSize = MAX_LOG_LENGTH;
- char* buf = nullptr;
- do
- {
- buf = new (std::nothrow) char[bufferSize];
- if (buf == nullptr)
- return; // not enough memory
- int ret = vsnprintf(buf, bufferSize - 3, format, args);
- if (ret < 0)
- {
- bufferSize *= 2;
- delete[] buf;
- }
- else
- break;
- } while (true);
- strcat(buf, "\n");
- int pos = 0;
- int len = strlen(buf);
- char tempBuf[MAX_LOG_LENGTH + 1] = { 0 };
- WCHAR wszBuf[MAX_LOG_LENGTH + 1] = { 0 };
- do
- {
- std::copy(buf + pos, buf + pos + MAX_LOG_LENGTH, tempBuf);
- tempBuf[MAX_LOG_LENGTH] = 0;
- MultiByteToWideChar(CP_UTF8, 0, tempBuf, -1, wszBuf, sizeof(wszBuf));
- OutputDebugStringW(wszBuf);
- pos += MAX_LOG_LENGTH;
- } while (pos < len);
- delete[] buf;
- }
- static void wsLog(const char * format, ...)
- {
- va_list args;
- va_start(args, format);
- _winLog(format, args);
- va_end(args);
- }
- #else
- #define wsLog printf
- #endif
- #define QUOTEME_(x) #x
- #define QUOTEME(x) QUOTEME_(x)
- // Since CCLOG isn't thread safe, we uses LOGD for multi-thread logging.
- #ifdef ANDROID
- #if COCOS2D_DEBUG > 0
- #define LOGD(...) __android_log_print(ANDROID_LOG_DEBUG, LOG_TAG,__VA_ARGS__)
- #else
- #define LOGD(...)
- #endif
- #define LOGE(...) __android_log_print(ANDROID_LOG_ERROR, LOG_TAG,__VA_ARGS__)
- #else
- #if COCOS2D_DEBUG > 0
- #define LOGD(fmt, ...) wsLog("D/" LOG_TAG " (" QUOTEME(__LINE__) "): " fmt "", ##__VA_ARGS__)
- #else
- #define LOGD(fmt, ...)
- #endif
- #define LOGE(fmt, ...) wsLog("E/" LOG_TAG " (" QUOTEME(__LINE__) "): " fmt "", ##__VA_ARGS__)
- #endif
- static void printWebSocketLog(int level, const char *line)
- {
- #if COCOS2D_DEBUG > 0
- static const char * const log_level_names[] = {
- "ERR",
- "WARN",
- "NOTICE",
- "INFO",
- "DEBUG",
- "PARSER",
- "HEADER",
- "EXTENSION",
- "CLIENT",
- "LATENCY",
- };
- char buf[30] = {0};
- int n;
- for (n = 0; n < LLL_COUNT; n++) {
- if (level != (1 << n))
- continue;
- sprintf(buf, "%s: ", log_level_names[n]);
- break;
- }
- LOGD("%s%s\n", buf, line);
- #endif // #if COCOS2D_DEBUG > 0
- }
- NS_NETWORK_BEGIN
- enum WS_MSG {
- WS_MSG_TO_SUBTRHEAD_SENDING_STRING = 0,
- WS_MSG_TO_SUBTRHEAD_SENDING_BINARY,
- WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION
- };
- static std::vector<WebSocket*>* __websocketInstances = nullptr;
- static std::mutex __instanceMutex;
- static struct lws_context* __wsContext = nullptr;
- static WsThreadHelper* __wsHelper = nullptr;
- #if (CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
- static std::string getFileNameForPath(const std::string& filePath)
- {
- std::string fileName = filePath;
- const size_t lastSlashIdx = fileName.find_last_of("\\/");
- if (std::string::npos != lastSlashIdx)
- {
- fileName.erase(0, lastSlashIdx + 1);
- }
- return fileName;
- }
- #endif
- static struct lws_protocols __defaultProtocols[2];
- static lws_context_creation_info convertToContextCreationInfo(const struct lws_protocols* protocols, bool peerServerCert)
- {
- lws_context_creation_info info;
- memset(&info, 0, sizeof(info));
- /*
- * create the websocket context. This tracks open connections and
- * knows how to route any traffic and which protocol version to use,
- * and if each connection is client or server side.
- *
- * For this client-only demo, we tell it to not listen on any port.
- */
- info.port = CONTEXT_PORT_NO_LISTEN;
- info.protocols = protocols;
- // FIXME: Disable 'permessage-deflate' extension temporarily because of issues:
- // https://github.com/cocos2d/cocos2d-x/issues/16045, https://github.com/cocos2d/cocos2d-x/issues/15767
- // libwebsockets issue: https://github.com/warmcat/libwebsockets/issues/593
- // Currently, we couldn't find out the exact reason.
- // libwebsockets official said it's probably an issue of user code
- // since 'libwebsockets' passed AutoBahn stressed Test.
- // info.extensions = exts;
- info.gid = -1;
- info.uid = -1;
- if (peerServerCert)
- {
- info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- }
- else
- {
- info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_PEER_CERT_NOT_REQUIRED;
- }
- info.user = nullptr;
- return info;
- }
- class WsMessage
- {
- public:
- WsMessage() : id(++__id), what(0), data(nullptr), user(nullptr){}
- unsigned int id;
- unsigned int what; // message type
- void* data;
- void* user;
- private:
- static unsigned int __id;
- };
- unsigned int WsMessage::__id = 0;
- /**
- * @brief Websocket thread helper, it's used for sending message between UI thread and websocket thread.
- */
- class WsThreadHelper
- {
- public:
- WsThreadHelper();
- ~WsThreadHelper();
- // Creates a new thread
- bool createWebSocketThread();
- // Quits websocket thread.
- void quitWebSocketThread();
- // Sends message to Cocos thread. It's needed to be invoked in Websocket thread.
- void sendMessageToCocosThread(const std::function<void()>& cb);
- // Sends message to Websocket thread. It's needs to be invoked in Cocos thread.
- void sendMessageToWebSocketThread(WsMessage *msg);
- // Waits the sub-thread (websocket thread) to exit,
- void joinWebSocketThread();
- void onSubThreadStarted();
- void onSubThreadLoop();
- void onSubThreadEnded();
- protected:
- void wsThreadEntryFunc();
- public:
- std::list<WsMessage*>* _subThreadWsMessageQueue;
- std::mutex _subThreadWsMessageQueueMutex;
- std::thread* _subThreadInstance;
- private:
- bool _needQuit;
- };
- // Wrapper for converting websocket callback from static function to member function of WebSocket class.
- class WebSocketCallbackWrapper {
- public:
- static int onSocketCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
- {
- // Gets the user data from context. We know that it's a 'WebSocket' instance.
- if (wsi == nullptr) {
- return 0;
- }
- int ret = 0;
- WebSocket* ws = (WebSocket*)lws_wsi_user(wsi);
- if (ws != nullptr && __websocketInstances != nullptr)
- {
- if (std::find(__websocketInstances->begin(), __websocketInstances->end(), ws) != __websocketInstances->end())
- {
- ret = ws->onSocketCallback(wsi, reason, in, len);
- }
- }
- else
- {
- // LOGD("ws instance is nullptr.\n");
- }
- return ret;
- }
- };
- // Implementation of WsThreadHelper
- WsThreadHelper::WsThreadHelper()
- : _subThreadInstance(nullptr)
- , _needQuit(false)
- {
- _subThreadWsMessageQueue = new (std::nothrow) std::list<WsMessage*>();
- }
- WsThreadHelper::~WsThreadHelper()
- {
- joinWebSocketThread();
- CC_SAFE_DELETE(_subThreadInstance);
- delete _subThreadWsMessageQueue;
- }
- bool WsThreadHelper::createWebSocketThread()
- {
- // Creates websocket thread
- _subThreadInstance = new (std::nothrow) std::thread(&WsThreadHelper::wsThreadEntryFunc, this);
- return true;
- }
- void WsThreadHelper::quitWebSocketThread()
- {
- _needQuit = true;
- }
- void WsThreadHelper::onSubThreadLoop()
- {
- if (__wsContext)
- {
- // _readyStateMutex.unlock();
- __wsHelper->_subThreadWsMessageQueueMutex.lock();
- bool isEmpty = __wsHelper->_subThreadWsMessageQueue->empty();
- if (!isEmpty)
- {
- auto iter = __wsHelper->_subThreadWsMessageQueue->begin();
- for (; iter != __wsHelper->_subThreadWsMessageQueue->end(); )
- {
- auto msg = (*iter);
- auto ws = (WebSocket*)msg->user;
- // TODO: ws may be a invalid pointer
- if (msg->what == WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION)
- {
- ws->onClientOpenConnectionRequest();
- delete *iter;
- iter = __wsHelper->_subThreadWsMessageQueue->erase(iter);
- }
- else
- {
- ++iter;
- }
- }
- }
- __wsHelper->_subThreadWsMessageQueueMutex.unlock();
- // The second parameter passed to 'lws_service' means the timeout in milliseconds while polling websocket events.
- // The lower value the better, otherwise, it may trigger high CPU usage.
- // We set 2ms in 'lws_service' then sleep 3ms to make lower CPU cost.
- // Since messages are received in websocket thread and user code is in cocos thread, we need to post event to
- // cocos thread and trigger user callbacks by 'Scheduler::performFunctionInCocosThread'. If game's fps is set
- // to 60 (16.66ms), the latency will be (2ms + 3ms + 16.66ms + internet delay) > 21ms
- lws_service(__wsContext, 2);
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- }
- }
- void WsThreadHelper::onSubThreadStarted()
- {
- int log_level = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO/* | LLL_DEBUG | LLL_PARSER | LLL_HEADER*/ | LLL_EXT | LLL_CLIENT | LLL_LATENCY;
- lws_set_log_level(log_level, printWebSocketLog);
- memset(__defaultProtocols, 0, sizeof(2 * sizeof(struct lws_protocols)));
- __defaultProtocols[0].name = "";
- __defaultProtocols[0].callback = WebSocketCallbackWrapper::onSocketCallback;
- __defaultProtocols[0].rx_buffer_size = WS_RX_BUFFER_SIZE;
- __defaultProtocols[0].id = std::numeric_limits<uint32_t>::max();
- lws_context_creation_info creationInfo = convertToContextCreationInfo(__defaultProtocols, true);
- __wsContext = lws_create_context(&creationInfo);
- }
- void WsThreadHelper::onSubThreadEnded()
- {
- if (__wsContext != nullptr)
- {
- lws_context_destroy(__wsContext);
- }
- }
- void WsThreadHelper::wsThreadEntryFunc()
- {
- LOGD("WebSocket thread start, helper instance: %p\n", this);
- onSubThreadStarted();
- while (!_needQuit)
- {
- onSubThreadLoop();
- }
- onSubThreadEnded();
- LOGD("WebSocket thread exit, helper instance: %p\n", this);
- }
- void WsThreadHelper::sendMessageToCocosThread(const std::function<void()>& cb)
- {
- Director::getInstance()->getScheduler()->performFunctionInCocosThread(cb);
- }
- void WsThreadHelper::sendMessageToWebSocketThread(WsMessage *msg)
- {
- std::lock_guard<std::mutex> lk(_subThreadWsMessageQueueMutex);
- _subThreadWsMessageQueue->push_back(msg);
- }
- void WsThreadHelper::joinWebSocketThread()
- {
- if (_subThreadInstance->joinable())
- {
- _subThreadInstance->join();
- }
- }
- // Define a WebSocket frame
- class WebSocketFrame
- {
- public:
- WebSocketFrame()
- : _payload(nullptr)
- , _payloadLength(0)
- , _frameLength(0)
- {
- }
- bool init(unsigned char* buf, ssize_t len)
- {
- if (buf == nullptr && len > 0)
- return false;
- if (!_data.empty())
- {
- LOGD("WebSocketFrame was initialized, should not init it again!\n");
- return false;
- }
- _data.reserve(LWS_PRE + len);
- _data.resize(LWS_PRE, 0x00);
- if (len > 0)
- {
- _data.insert(_data.end(), buf, buf + len);
- }
- _payload = _data.data() + LWS_PRE;
- _payloadLength = len;
- _frameLength = len;
- return true;
- }
- void update(ssize_t issued)
- {
- _payloadLength -= issued;
- _payload += issued;
- }
- unsigned char* getPayload() const { return _payload; }
- ssize_t getPayloadLength() const { return _payloadLength; }
- ssize_t getFrameLength() const { return _frameLength; }
- private:
- unsigned char* _payload;
- ssize_t _payloadLength;
- ssize_t _frameLength;
- std::vector<unsigned char> _data;
- };
- //
- void WebSocket::closeAllConnections()
- {
- if (__websocketInstances != nullptr)
- {
- ssize_t count = __websocketInstances->size();
- for (ssize_t i = count-1; i >=0 ; i--)
- {
- WebSocket* instance = __websocketInstances->at(i);
- instance->close();
- }
- std::lock_guard<std::mutex> lk(__instanceMutex);
- __websocketInstances->clear();
- delete __websocketInstances;
- __websocketInstances = nullptr;
- }
- }
- WebSocket::WebSocket()
- : _readyState(State::CONNECTING)
- , _wsInstance(nullptr)
- , _lwsProtocols(nullptr)
- , _isDestroyed(std::make_shared<std::atomic<bool>>(false))
- , _delegate(nullptr)
- , _closeState(CloseState::NONE)
- {
- // reserve data buffer to avoid allocate memory frequently
- _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE);
- if (__websocketInstances == nullptr)
- {
- __websocketInstances = new (std::nothrow) std::vector<WebSocket*>();
- }
- __websocketInstances->push_back(this);
-
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- _resetDirectorListener = Director::getInstance()->getEventDispatcher()->addCustomEventListener(Director::EVENT_RESET, [this, isDestroyed](EventCustom*){
- if (*isDestroyed)
- return;
- close();
- });
- }
- WebSocket::~WebSocket()
- {
- LOGD("In the destructor of WebSocket (%p)\n", this);
- std::lock_guard<std::mutex> lk(__instanceMutex);
- if (__websocketInstances != nullptr)
- {
- auto iter = std::find(__websocketInstances->begin(), __websocketInstances->end(), this);
- if (iter != __websocketInstances->end())
- {
- __websocketInstances->erase(iter);
- }
- else
- {
- LOGD("ERROR: WebSocket instance (%p) wasn't added to the container which saves websocket instances!\n", this);
- }
- }
- if (__websocketInstances == nullptr || __websocketInstances->empty())
- {
- __wsHelper->quitWebSocketThread();
- LOGD("before join ws thread\n");
- __wsHelper->joinWebSocketThread();
- LOGD("after join ws thread\n");
- CC_SAFE_DELETE(__wsHelper);
- }
- Director::getInstance()->getEventDispatcher()->removeEventListener(_resetDirectorListener);
-
- *_isDestroyed = true;
- }
- bool WebSocket::init(const Delegate& delegate,
- const std::string& url,
- const std::vector<std::string>* protocols/* = nullptr*/,
- const std::string& caFilePath/* = ""*/)
- {
- _delegate = const_cast<Delegate*>(&delegate);
- _url = url;
- _caFilePath = caFilePath;
- if (_url.empty())
- return false;
- if (protocols != nullptr && !protocols->empty())
- {
- size_t size = protocols->size();
- _lwsProtocols = (struct lws_protocols*)malloc((size + 1) * sizeof(struct lws_protocols));
- memset(_lwsProtocols, 0, (size + 1) * sizeof(struct lws_protocols));
- static uint32_t __wsId = 0;
- for (size_t i = 0; i < size; ++i)
- {
- _lwsProtocols[i].callback = WebSocketCallbackWrapper::onSocketCallback;
- size_t nameLen = protocols->at(i).length();
- char* name = (char*)malloc(nameLen + 1);
- name[nameLen] = '\0';
- strcpy(name, protocols->at(i).c_str());
- _lwsProtocols[i].name = name;
- _lwsProtocols[i].id = ++__wsId;
- _lwsProtocols[i].rx_buffer_size = WS_RX_BUFFER_SIZE;
- _lwsProtocols[i].per_session_data_size = 0;
- _lwsProtocols[i].user = nullptr;
- _clientSupportedProtocols += name;
- if (i < (size - 1))
- {
- _clientSupportedProtocols += ",";
- }
- }
- }
- bool isWebSocketThreadCreated = true;
- if (__wsHelper == nullptr)
- {
- __wsHelper = new (std::nothrow) WsThreadHelper();
- isWebSocketThreadCreated = false;
- }
- WsMessage* msg = new (std::nothrow) WsMessage();
- msg->what = WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION;
- msg->user = this;
- __wsHelper->sendMessageToWebSocketThread(msg);
- // fixed https://github.com/cocos2d/cocos2d-x/issues/17433
- // createWebSocketThread has to be after message WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION was sent.
- // And websocket thread should only be created once.
- if (!isWebSocketThreadCreated)
- {
- __wsHelper->createWebSocketThread();
- }
- return true;
- }
- void WebSocket::send(const std::string& message)
- {
- if (_readyState == State::OPEN)
- {
- // In main thread
- Data* data = new (std::nothrow) Data();
- data->bytes = (char*)malloc(message.length() + 1);
- // Make sure the last byte is '\0'
- data->bytes[message.length()] = '\0';
- strcpy(data->bytes, message.c_str());
- data->len = static_cast<ssize_t>(message.length());
- WsMessage* msg = new (std::nothrow) WsMessage();
- msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_STRING;
- msg->data = data;
- msg->user = this;
- __wsHelper->sendMessageToWebSocketThread(msg);
- }
- else
- {
- LOGD("Couldn't send message since websocket wasn't opened!\n");
- }
- }
- void WebSocket::send(const unsigned char* binaryMsg, unsigned int len)
- {
- if (_readyState == State::OPEN)
- {
- // In main thread
- Data* data = new (std::nothrow) Data();
- if (len == 0)
- {
- // If data length is zero, allocate 1 byte for safe.
- data->bytes = (char*)malloc(1);
- data->bytes[0] = '\0';
- }
- else
- {
- data->bytes = (char*)malloc(len);
- memcpy((void*)data->bytes, (void*)binaryMsg, len);
- }
- data->len = len;
- WsMessage* msg = new (std::nothrow) WsMessage();
- msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_BINARY;
- msg->data = data;
- msg->user = this;
- __wsHelper->sendMessageToWebSocketThread(msg);
- }
- else
- {
- LOGD("Couldn't send message since websocket wasn't opened!\n");
- }
- }
- void WebSocket::close()
- {
- if (_closeState != CloseState::NONE)
- {
- LOGD("close was invoked, don't invoke it again!\n");
- return;
- }
- _closeState = CloseState::SYNC_CLOSING;
- LOGD("close: WebSocket (%p) is closing...\n", this);
- {
- _readyStateMutex.lock();
- if (_readyState == State::CLOSED)
- {
- // If readState is closed, it means that onConnectionClosed was invoked in websocket thread,
- // but the callback of performInCocosThread has not been triggered. We need to invoke
- // onClose to release the websocket instance.
- _readyStateMutex.unlock();
- _delegate->onClose(this);
- return;
- }
- _readyState = State::CLOSING;
- _readyStateMutex.unlock();
- }
- {
- std::unique_lock<std::mutex> lkClose(_closeMutex);
- _closeCondition.wait(lkClose);
- _closeState = CloseState::SYNC_CLOSED;
- }
- // Wait 5 milliseconds for onConnectionClosed to exit!
- std::this_thread::sleep_for(std::chrono::milliseconds(5));
- _delegate->onClose(this);
- }
- void WebSocket::closeAsync()
- {
- if (_closeState != CloseState::NONE)
- {
- LOGD("close was invoked, don't invoke it again!\n");
- return;
- }
- _closeState = CloseState::ASYNC_CLOSING;
- LOGD("closeAsync: WebSocket (%p) is closing...\n", this);
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- if (_readyState == State::CLOSED || _readyState == State::CLOSING)
- {
- LOGD("closeAsync: WebSocket (%p) was closed, no need to close it again!\n", this);
- return;
- }
- _readyState = State::CLOSING;
- }
- WebSocket::State WebSocket::getReadyState()
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- return _readyState;
- }
- struct lws_vhost* WebSocket::createVhost(struct lws_protocols* protocols, int& sslConnection)
- {
- auto fileUtils = FileUtils::getInstance();
- bool isCAFileExist = fileUtils->isFileExist(_caFilePath);
- if (isCAFileExist)
- {
- _caFilePath = fileUtils->fullPathForFilename(_caFilePath);
- }
- lws_context_creation_info info = convertToContextCreationInfo(protocols, isCAFileExist);
- if (sslConnection != 0)
- {
- if (isCAFileExist)
- {
- #if (CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
- // if ca file is in the apk, try to extract it to writable path
- std::string writablePath = fileUtils->getWritablePath();
- std::string caFileName = getFileNameForPath(_caFilePath);
- std::string newCaFilePath = writablePath + caFileName;
- if (fileUtils->isFileExist(newCaFilePath))
- {
- LOGD("CA file (%s) in writable path exists!", newCaFilePath.c_str());
- _caFilePath = newCaFilePath;
- info.ssl_ca_filepath = _caFilePath.c_str();
- }
- else
- {
- if (fileUtils->isFileExist(_caFilePath))
- {
- std::string fullPath = fileUtils->fullPathForFilename(_caFilePath);
- LOGD("Found CA file: %s", fullPath.c_str());
- if (fullPath[0] != '/')
- {
- LOGD("CA file is in APK");
- auto caData = fileUtils->getDataFromFile(fullPath);
- if (!caData.isNull())
- {
- FILE* fp = fopen(newCaFilePath.c_str(), "wb");
- if (fp != nullptr)
- {
- LOGD("New CA file path: %s", newCaFilePath.c_str());
- fwrite(caData.getBytes(), caData.getSize(), 1, fp);
- fclose(fp);
- _caFilePath = newCaFilePath;
- info.ssl_ca_filepath = _caFilePath.c_str();
- }
- else
- {
- CCASSERT(false, "Open new CA file failed");
- }
- }
- else
- {
- CCASSERT(false, "CA file is empty!");
- }
- }
- else
- {
- LOGD("CA file isn't in APK!");
- _caFilePath = fullPath;
- info.ssl_ca_filepath = _caFilePath.c_str();
- }
- }
- else
- {
- CCASSERT(false, "CA file doesn't exist!");
- }
- }
- #else
- info.ssl_ca_filepath = _caFilePath.c_str();
- #endif
- }
- else
- {
- LOGD("WARNING: CA Root file isn't set. SSL connection will not peer server certificate\n");
- sslConnection = sslConnection | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
- }
- }
- lws_vhost* vhost = lws_create_vhost(__wsContext, &info);
- return vhost;
- }
- void WebSocket::onClientOpenConnectionRequest()
- {
- if (nullptr != __wsContext)
- {
- static const struct lws_extension exts[] = {
- {
- "permessage-deflate",
- lws_extension_callback_pm_deflate,
- // client_no_context_takeover extension is not supported in the current version, it will cause connection fail
- // It may be a bug of lib websocket build
- // "permessage-deflate; client_no_context_takeover; client_max_window_bits"
- "permessage-deflate; client_max_window_bits"
- },
- {
- "deflate-frame",
- lws_extension_callback_pm_deflate,
- "deflate_frame"
- },
- { nullptr, nullptr, nullptr /* terminator */ }
- };
- _readyStateMutex.lock();
- _readyState = State::CONNECTING;
- _readyStateMutex.unlock();
- Uri uri = Uri::parse(_url);
- LOGD("scheme: %s, host: %s, port: %d, path: %s\n", uri.getScheme().c_str(), uri.getHostName().c_str(), static_cast<int>(uri.getPort()), uri.getPathEtc().c_str());
- int sslConnection = 0;
- if (uri.isSecure())
- sslConnection = LCCSCF_USE_SSL;
- struct lws_vhost* vhost = nullptr;
- if (_lwsProtocols != nullptr)
- {
- vhost = createVhost(_lwsProtocols, sslConnection);
- }
- else
- {
- vhost = createVhost(__defaultProtocols, sslConnection);
- }
- int port = static_cast<int>(uri.getPort());
- if (port == 0)
- port = uri.isSecure() ? 443 : 80;
- const std::string& hostName = uri.getHostName();
- std::string path = uri.getPathEtc();
- const std::string& authority = uri.getAuthority();
- if (path.empty())
- path = "/";
- struct lws_client_connect_info connectInfo;
- memset(&connectInfo, 0, sizeof(connectInfo));
- connectInfo.context = __wsContext;
- connectInfo.address = hostName.c_str();
- connectInfo.port = port;
- connectInfo.ssl_connection = sslConnection;
- connectInfo.path = path.c_str();
- connectInfo.host = hostName.c_str();
- connectInfo.origin = authority.c_str();
- connectInfo.protocol = _clientSupportedProtocols.empty() ? nullptr : _clientSupportedProtocols.c_str();
- connectInfo.ietf_version_or_minus_one = -1;
- connectInfo.userdata = this;
- connectInfo.client_exts = exts;
- connectInfo.vhost = vhost;
- _wsInstance = lws_client_connect_via_info(&connectInfo);
- if (nullptr == _wsInstance)
- {
- onConnectionError();
- return;
- }
- }
- else
- {
- LOGE("Create websocket context failed!");
- }
- }
- int WebSocket::onClientWritable()
- {
- // LOGD("onClientWritable ... \n");
- {
- std::lock_guard<std::mutex> readMutex(_readyStateMutex);
- if (_readyState == State::CLOSING)
- {
- LOGD("Closing websocket (%p) connection.\n", this);
- return -1;
- }
- }
- do
- {
- std::lock_guard<std::mutex> lk(__wsHelper->_subThreadWsMessageQueueMutex);
- if (__wsHelper->_subThreadWsMessageQueue->empty())
- {
- break;
- }
- std::list<WsMessage*>::iterator iter = __wsHelper->_subThreadWsMessageQueue->begin();
- while (iter != __wsHelper->_subThreadWsMessageQueue->end())
- {
- WsMessage* msg = *iter;
- if (msg->user == this)
- {
- break;
- }
- else
- {
- ++iter;
- }
- }
- ssize_t bytesWrite = 0;
- if (iter != __wsHelper->_subThreadWsMessageQueue->end())
- {
- WsMessage* subThreadMsg = *iter;
- Data* data = (Data*)subThreadMsg->data;
- const ssize_t c_bufferSize = WS_RX_BUFFER_SIZE;
- const ssize_t remaining = data->len - data->issued;
- const ssize_t n = std::min(remaining, c_bufferSize);
- WebSocketFrame* frame = nullptr;
- if (data->ext)
- {
- frame = (WebSocketFrame*)data->ext;
- }
- else
- {
- frame = new (std::nothrow) WebSocketFrame();
- bool success = frame && frame->init((unsigned char*)(data->bytes + data->issued), n);
- if (success)
- {
- data->ext = frame;
- }
- else
- { // If frame initialization failed, delete the frame and drop the sending data
- // These codes should never be called.
- LOGD("WebSocketFrame initialization failed, drop the sending data, msg(%d)\n", (int)subThreadMsg->id);
- delete frame;
- CC_SAFE_FREE(data->bytes);
- CC_SAFE_DELETE(data);
- __wsHelper->_subThreadWsMessageQueue->erase(iter);
- CC_SAFE_DELETE(subThreadMsg);
- break;
- }
- }
- int writeProtocol;
- if (data->issued == 0)
- {
- if (WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what)
- {
- writeProtocol = LWS_WRITE_TEXT;
- }
- else
- {
- writeProtocol = LWS_WRITE_BINARY;
- }
- // If we have more than 1 fragment
- if (data->len > c_bufferSize)
- writeProtocol |= LWS_WRITE_NO_FIN;
- } else {
- // we are in the middle of fragments
- writeProtocol = LWS_WRITE_CONTINUATION;
- // and if not in the last fragment
- if (remaining != n)
- writeProtocol |= LWS_WRITE_NO_FIN;
- }
- bytesWrite = lws_write(_wsInstance, frame->getPayload(), frame->getPayloadLength(), (lws_write_protocol)writeProtocol);
- // Handle the result of lws_write
- // Buffer overrun?
- if (bytesWrite < 0)
- {
- LOGD("ERROR: msg(%u), lws_write return: %d, but it should be %d, drop this message.\n", subThreadMsg->id, (int)bytesWrite, (int)n);
- // socket error, we need to close the socket connection
- CC_SAFE_FREE(data->bytes);
- delete ((WebSocketFrame*)data->ext);
- data->ext = nullptr;
- CC_SAFE_DELETE(data);
- __wsHelper->_subThreadWsMessageQueue->erase(iter);
- CC_SAFE_DELETE(subThreadMsg);
- closeAsync();
- }
- else if (bytesWrite < frame->getPayloadLength())
- {
- frame->update(bytesWrite);
- LOGD("frame wasn't sent completely, bytesWrite: %d, remain: %d\n", (int)bytesWrite, (int)frame->getPayloadLength());
- }
- // Do we have another fragments to send?
- else if (remaining > frame->getFrameLength() && bytesWrite == frame->getPayloadLength())
- {
- // A frame was totally sent, plus data->issued to send next frame
- LOGD("msg(%u) append: %d + %d = %d\n", subThreadMsg->id, (int)data->issued, (int)frame->getFrameLength(), (int)(data->issued + frame->getFrameLength()));
- data->issued += frame->getFrameLength();
- delete ((WebSocketFrame*)data->ext);
- data->ext = nullptr;
- }
- // Safely done!
- else
- {
- LOGD("Safely done, msg(%d)!\n", subThreadMsg->id);
- if (remaining == frame->getFrameLength())
- {
- LOGD("msg(%u) append: %d + %d = %d\n", subThreadMsg->id, (int)data->issued, (int)frame->getFrameLength(), (int)(data->issued + frame->getFrameLength()));
- LOGD("msg(%u) was totally sent!\n", subThreadMsg->id);
- }
- else
- {
- LOGD("ERROR: msg(%u), remaining(%d) < bytesWrite(%d)\n", subThreadMsg->id, (int)remaining, (int)frame->getFrameLength());
- LOGD("Drop the msg(%u)\n", subThreadMsg->id);
- closeAsync();
- }
- CC_SAFE_FREE(data->bytes);
- delete ((WebSocketFrame*)data->ext);
- data->ext = nullptr;
- CC_SAFE_DELETE(data);
- __wsHelper->_subThreadWsMessageQueue->erase(iter);
- CC_SAFE_DELETE(subThreadMsg);
- LOGD("-----------------------------------------------------------\n");
- }
- }
- } while(false);
- if (_wsInstance != nullptr)
- {
- lws_callback_on_writable(_wsInstance);
- }
- return 0;
- }
- int WebSocket::onClientReceivedData(void* in, ssize_t len)
- {
- // In websocket thread
- static int packageIndex = 0;
- packageIndex++;
- if (in != nullptr && len > 0)
- {
- LOGD("Receiving data:index:%d, len=%d\n", packageIndex, (int)len);
- unsigned char* inData = (unsigned char*)in;
- _receivedData.insert(_receivedData.end(), inData, inData + len);
- }
- else
- {
- LOGD("Empty message received, index=%d!\n", packageIndex);
- }
- // If no more data pending, send it to the client thread
- size_t remainingSize = lws_remaining_packet_payload(_wsInstance);
- int isFinalFragment = lws_is_final_fragment(_wsInstance);
- // LOGD("remainingSize: %d, isFinalFragment: %d\n", (int)remainingSize, isFinalFragment);
- if (remainingSize == 0 && isFinalFragment)
- {
- std::vector<char>* frameData = new (std::nothrow) std::vector<char>(std::move(_receivedData));
- // reset capacity of received data buffer
- _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE);
- ssize_t frameSize = frameData->size();
- bool isBinary = (lws_frame_is_binary(_wsInstance) != 0);
- if (!isBinary)
- {
- frameData->push_back('\0');
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, frameData, frameSize, isBinary, isDestroyed](){
- // In UI thread
- LOGD("Notify data len %d to Cocos thread.\n", (int)frameSize);
- Data data;
- data.isBinary = isBinary;
- data.bytes = (char*)frameData->data();
- data.len = frameSize;
- if (*isDestroyed)
- {
- LOGD("WebSocket instance was destroyed!\n");
- }
- else
- {
- _delegate->onMessage(this, data);
- }
- delete frameData;
- });
- }
- return 0;
- }
- int WebSocket::onConnectionOpened()
- {
- const lws_protocols* lwsSelectedProtocol = lws_get_protocol(_wsInstance);
- _selectedProtocol = lwsSelectedProtocol->name;
- LOGD("onConnectionOpened...: %p, client protocols: %s, server selected protocol: %s\n", this, _clientSupportedProtocols.c_str(), _selectedProtocol.c_str());
- /*
- * start the ball rolling,
- * LWS_CALLBACK_CLIENT_WRITEABLE will come next service
- */
- lws_callback_on_writable(_wsInstance);
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- if (_readyState == State::CLOSING || _readyState == State::CLOSED)
- {
- return 0;
- }
- _readyState = State::OPEN;
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, isDestroyed](){
- if (*isDestroyed)
- {
- LOGD("WebSocket instance was destroyed!\n");
- }
- else
- {
- _delegate->onOpen(this);
- }
- });
- return 0;
- }
- int WebSocket::onConnectionError()
- {
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- LOGD("WebSocket (%p) onConnectionError, state: %d ...\n", this, (int)_readyState);
- if (_readyState == State::CLOSED)
- {
- return 0;
- }
- _readyState = State::CLOSING;
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, isDestroyed](){
- if (*isDestroyed)
- {
- LOGD("WebSocket instance was destroyed!\n");
- }
- else
- {
- _delegate->onError(this, ErrorCode::CONNECTION_FAILURE);
- }
- });
- onConnectionClosed();
- return 0;
- }
- int WebSocket::onConnectionClosed()
- {
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- LOGD("WebSocket (%p) onConnectionClosed, state: %d ...\n", this, (int)_readyState);
- if (_readyState == State::CLOSED)
- {
- return 0;
- }
- if (_readyState == State::CLOSING)
- {
- if (_closeState == CloseState::SYNC_CLOSING)
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by client synchronously.\n", this);
- for(;;)
- {
- std::lock_guard<std::mutex> lkClose(_closeMutex);
- _closeCondition.notify_one();
- if (_closeState == CloseState::SYNC_CLOSED)
- {
- break;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- return 0;
- }
- else if (_closeState == CloseState::ASYNC_CLOSING)
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by client asynchronously.\n", this);
- }
- else
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by server.\n", this);
- }
- }
- else
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by server.\n", this);
- }
- _readyState = State::CLOSED;
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, isDestroyed](){
- if (*isDestroyed)
- {
- LOGD("WebSocket instance (%p) was destroyed!\n", this);
- }
- else
- {
- _delegate->onClose(this);
- }
- });
- LOGD("WebSocket (%p) onConnectionClosed DONE!\n", this);
- return 0;
- }
- int WebSocket::onSocketCallback(struct lws *wsi,
- int reason,
- void *in, ssize_t len)
- {
- //LOGD("socket callback for %d reason\n", reason);
- int ret = 0;
- switch (reason)
- {
- case LWS_CALLBACK_CLIENT_ESTABLISHED:
- ret = onConnectionOpened();
- break;
- case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
- ret = onConnectionError();
- break;
- case LWS_CALLBACK_WSI_DESTROY:
- ret = onConnectionClosed();
- break;
- case LWS_CALLBACK_CLIENT_RECEIVE:
- ret = onClientReceivedData(in, len);
- break;
- case LWS_CALLBACK_CLIENT_WRITEABLE:
- ret = onClientWritable();
- break;
- case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
- case LWS_CALLBACK_LOCK_POLL:
- case LWS_CALLBACK_UNLOCK_POLL:
- break;
- case LWS_CALLBACK_PROTOCOL_INIT:
- LOGD("protocol init...");
- break;
- case LWS_CALLBACK_PROTOCOL_DESTROY:
- LOGD("protocol destroy...");
- break;
- default:
- LOGD("WebSocket (%p) Unhandled websocket event: %d\n", this, reason);
- break;
- }
- return ret;
- }
- NS_NETWORK_END
|