关键词搜索

源码搜索 ×
×

tars源码分析之6

发布2022-07-03浏览541次

详情内容

喜欢网络编程的同学,千万不要错过tars中关于socket的部分,来看看客户端操作。

真的是非常经典,看看send/recv吧,写得真是严密:

  1. #include <cerrno>
  2. #include <iostream>
  3. #include "util/tc_clientsocket.h"
  4. #include "util/tc_epoller.h"
  5. #include "util/tc_common.h"
  6. namespace tars
  7. {
  8. TC_Endpoint::TC_Endpoint()
  9. {
  10. _host = "0.0.0.0";
  11. _port = 0;
  12. _timeout = 3000;
  13. _type = TCP;
  14. _grid = 0;
  15. _qos = 0;
  16. _weight = -1;
  17. _weighttype = 0;
  18. _authType = 0;
  19. }
  20. void TC_Endpoint::init(const string& host, int port, int timeout, int type, int grid, int qos, int weight, unsigned int weighttype, int authType)
  21. {
  22. _host = host;
  23. _port = port;
  24. _timeout = timeout;
  25. _type = type;
  26. _grid = grid;
  27. _qos = qos;
  28. if (weighttype == 0)
  29. {
  30. _weight = -1;
  31. _weighttype = 0;
  32. }
  33. else
  34. {
  35. if (weight == -1)
  36. {
  37. weight = 100;
  38. }
  39. _weight = (weight > 100 ? 100 : weight);
  40. _weighttype = weighttype;
  41. }
  42. _authType = authType;
  43. }
  44. void TC_Endpoint::parse(const string &str)
  45. {
  46. _grid = 0;
  47. _qos = 0;
  48. _weight = -1;
  49. _weighttype = 0;
  50. _authType = 0;
  51. const string delim = " \t\n\r";
  52. string::size_type beg;
  53. string::size_type end = 0;
  54. beg = str.find_first_not_of(delim, end);
  55. if(beg == string::npos)
  56. {
  57. throw TC_EndpointParse_Exception("TC_Endpoint::parse error : " + str);
  58. }
  59. end = str.find_first_of(delim, beg);
  60. if(end == string::npos)
  61. {
  62. end = str.length();
  63. }
  64. string desc = str.substr(beg, end - beg);
  65. if(desc == "tcp")
  66. {
  67. _type = TCP;
  68. }
  69. else if (desc == "ssl")
  70. {
  71. _type = SSL;
  72. }
  73. else if(desc == "udp")
  74. {
  75. _type = UDP;
  76. }
  77. else
  78. {
  79. throw TC_EndpointParse_Exception("TC_Endpoint::parse tcp or udp or ssl error : " + str);
  80. }
  81. desc = str.substr(end);
  82. end = 0;
  83. while(true)
  84. {
  85. beg = desc.find_first_not_of(delim, end);
  86. if(beg == string::npos)
  87. {
  88. break;
  89. }
  90. end = desc.find_first_of(delim, beg);
  91. if(end == string::npos)
  92. {
  93. end = desc.length();
  94. }
  95. string option = desc.substr(beg, end - beg);
  96. if(option.length() != 2 || option[0] != '-')
  97. {
  98. throw TC_EndpointParse_Exception("TC_Endpoint::parse error : " + str);
  99. }
  100. string argument;
  101. string::size_type argumentBeg = desc.find_first_not_of(delim, end);
  102. if(argumentBeg != string::npos && desc[argumentBeg] != '-')
  103. {
  104. beg = argumentBeg;
  105. end = desc.find_first_of(delim, beg);
  106. if(end == string::npos)
  107. {
  108. end = desc.length();
  109. }
  110. argument = desc.substr(beg, end - beg);
  111. }
  112. switch(option[1])
  113. {
  114. case 'h':
  115. {
  116. if(argument.empty())
  117. {
  118. throw TC_EndpointParse_Exception("TC_Endpoint::parse -h error : " + str);
  119. }
  120. const_cast<string&>(_host) = argument;
  121. break;
  122. }
  123. case 'p':
  124. {
  125. istringstream p(argument);
  126. if(!(p >> const_cast<int&>(_port)) || !p.eof() || _port < 0 || _port > 65535)
  127. {
  128. throw TC_EndpointParse_Exception("TC_Endpoint::parse -p error : " + str);
  129. }
  130. break;
  131. }
  132. case 't':
  133. {
  134. istringstream t(argument);
  135. if(!(t >> const_cast<int&>(_timeout)) || !t.eof())
  136. {
  137. throw TC_EndpointParse_Exception("TC_Endpoint::parse -t error : " + str);
  138. }
  139. break;
  140. }
  141. case 'g':
  142. {
  143. istringstream t(argument);
  144. if(!(t >> const_cast<int&>(_grid)) || !t.eof())
  145. {
  146. throw TC_EndpointParse_Exception("TC_Endpoint::parse -g error : " + str);
  147. }
  148. break;
  149. }
  150. case 'q':
  151. {
  152. istringstream t(argument);
  153. if(!(t >> const_cast<int&>(_qos)) || !t.eof())
  154. {
  155. throw TC_EndpointParse_Exception("TC_Endpoint::parse -q error : " + str);
  156. }
  157. break;
  158. }
  159. case 'w':
  160. {
  161. istringstream t(argument);
  162. if(!(t >> const_cast<int&>(_weight)) || !t.eof())
  163. {
  164. throw TC_EndpointParse_Exception("TC_Endpoint::parse -w error : " + str);
  165. }
  166. break;
  167. }
  168. case 'v':
  169. {
  170. istringstream t(argument);
  171. if(!(t >> const_cast<unsigned int&>(_weighttype)) || !t.eof())
  172. {
  173. throw TC_EndpointParse_Exception("TC_Endpoint::parse -v error : " + str);
  174. }
  175. break;
  176. }
  177. // auth type
  178. case 'e':
  179. {
  180. istringstream p(argument);
  181. if (!(p >> const_cast<int&>(_authType)) || !p.eof() || _authType < 0 || _authType > 1)
  182. {
  183. throw TC_EndpointParse_Exception("TC_Endpoint::parse -e error : " + str);
  184. }
  185. break;
  186. }
  187. default:
  188. {
  189. ///throw TC_EndpointParse_Exception("TC_Endpoint::parse error : " + str);
  190. }
  191. }
  192. }
  193. if(_weighttype != 0)
  194. {
  195. if(_weight == -1)
  196. {
  197. _weight = 100;
  198. }
  199. _weight = (_weight > 100 ? 100 : _weight);
  200. }
  201. if(_host.empty())
  202. {
  203. throw TC_EndpointParse_Exception("TC_Endpoint::parse error : host must not be empty: " + str);
  204. }
  205. else if(_host == "*")
  206. {
  207. const_cast<string&>(_host) = "0.0.0.0";
  208. }
  209. if (_authType < 0)
  210. _authType = 0;
  211. else if (_authType > 0)
  212. _authType = 1;
  213. }
  214. /*************************************TC_TCPClient**************************************/
  215. #define LEN_MAXRECV 8196
  216. int TC_TCPClient::checkSocket()
  217. {
  218. if(!_socket.isValid())
  219. {
  220. try
  221. {
  222. if(_port == 0)
  223. {
  224. _socket.createSocket(SOCK_STREAM, AF_LOCAL);
  225. }
  226. else
  227. {
  228. _socket.createSocket(SOCK_STREAM, AF_INET);
  229. }
  230. //设置非阻塞模式
  231. _socket.setblock(false);
  232. try
  233. {
  234. if(_port == 0)
  235. {
  236. _socket.connect(_ip.c_str());
  237. }
  238. else
  239. {
  240. _socket.connect(_ip, _port);
  241. }
  242. }
  243. catch(TC_SocketConnect_Exception &ex)
  244. {
  245. if(errno != EINPROGRESS)
  246. {
  247. _socket.close();
  248. return EM_CONNECT;
  249. }
  250. }
  251. if(errno != EINPROGRESS)
  252. {
  253. _socket.close();
  254. return EM_CONNECT;
  255. }
  256. TC_Epoller epoller(false);
  257. epoller.create(1);
  258. epoller.add(_socket.getfd(), 0, EPOLLOUT);
  259. int iRetCode = epoller.wait(_timeout);
  260. if (iRetCode < 0)
  261. {
  262. _socket.close();
  263. return EM_SELECT;
  264. }
  265. else if (iRetCode == 0)
  266. {
  267. _socket.close();
  268. return EM_TIMEOUT;
  269. }
  270. else
  271. {
  272. for(int i = 0; i < iRetCode; ++i)
  273. {
  274. const epoll_event& ev = epoller.get(i);
  275. if (ev.events & EPOLLERR || ev.events & EPOLLHUP)
  276. {
  277. _socket.close();
  278. return EM_CONNECT;
  279. }
  280. else
  281. {
  282. int iVal = 0;
  283. socklen_t iLen = static_cast<socklen_t>(sizeof(int));
  284. if (::getsockopt(_socket.getfd(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen) == -1 || iVal)
  285. {
  286. _socket.close();
  287. return EM_CONNECT;
  288. }
  289. }
  290. }
  291. }
  292. //设置为阻塞模式
  293. _socket.setblock(true);
  294. }
  295. catch(TC_Socket_Exception &ex)
  296. {
  297. _socket.close();
  298. return EM_SOCKET;
  299. }
  300. }
  301. return EM_SUCCESS;
  302. }
  303. int TC_TCPClient::send(const char *sSendBuffer, size_t iSendLen)
  304. {
  305. int iRet = checkSocket();
  306. if(iRet < 0)
  307. {
  308. return iRet;
  309. }
  310. iRet = _socket.send(sSendBuffer, iSendLen);
  311. if(iRet < 0)
  312. {
  313. _socket.close();
  314. return EM_SEND;
  315. }
  316. return EM_SUCCESS;
  317. }
  318. int TC_TCPClient::recv(char *sRecvBuffer, size_t &iRecvLen)
  319. {
  320. int iRet = checkSocket();
  321. if(iRet < 0)
  322. {
  323. return iRet;
  324. }
  325. TC_Epoller epoller(false);
  326. epoller.create(1);
  327. epoller.add(_socket.getfd(), 0, EPOLLIN);
  328. int iRetCode = epoller.wait(_timeout);
  329. if (iRetCode < 0)
  330. {
  331. _socket.close();
  332. return EM_SELECT;
  333. }
  334. else if (iRetCode == 0)
  335. {
  336. _socket.close();
  337. return EM_TIMEOUT;
  338. }
  339. epoll_event ev = epoller.get(0);
  340. if(ev.events & EPOLLIN)
  341. {
  342. int iLen = _socket.recv((void*)sRecvBuffer, iRecvLen);
  343. if (iLen < 0)
  344. {
  345. _socket.close();
  346. return EM_RECV;
  347. }
  348. else if (iLen == 0)
  349. {
  350. _socket.close();
  351. return EM_CLOSE;
  352. }
  353. iRecvLen = iLen;
  354. return EM_SUCCESS;
  355. }
  356. else
  357. {
  358. _socket.close();
  359. }
  360. return EM_SELECT;
  361. }
  362. int TC_TCPClient::recvBySep(string &sRecvBuffer, const string &sSep)
  363. {
  364. sRecvBuffer.clear();
  365. int iRet = checkSocket();
  366. if(iRet < 0)
  367. {
  368. return iRet;
  369. }
  370. TC_Epoller epoller(false);
  371. epoller.create(1);
  372. epoller.add(_socket.getfd(), 0, EPOLLIN);
  373. while(true)
  374. {
  375. int iRetCode = epoller.wait(_timeout);
  376. if (iRetCode < 0)
  377. {
  378. _socket.close();
  379. return EM_SELECT;
  380. }
  381. else if (iRetCode == 0)
  382. {
  383. _socket.close();
  384. return EM_TIMEOUT;
  385. }
  386. epoll_event ev = epoller.get(0);
  387. if(ev.events & EPOLLIN)
  388. {
  389. char buffer[LEN_MAXRECV] = "\0";
  390. int len = _socket.recv((void*)&buffer, sizeof(buffer));
  391. if (len < 0)
  392. {
  393. _socket.close();
  394. return EM_RECV;
  395. }
  396. else if (len == 0)
  397. {
  398. _socket.close();
  399. return EM_CLOSE;
  400. }
  401. sRecvBuffer.append(buffer, len);
  402. if(sRecvBuffer.length() >= sSep.length()
  403. && sRecvBuffer.compare(sRecvBuffer.length() - sSep.length(), sSep.length(), sSep) == 0)
  404. {
  405. break;
  406. }
  407. }
  408. }
  409. return EM_SUCCESS;
  410. }
  411. int TC_TCPClient::recvAll(string &sRecvBuffer)
  412. {
  413. sRecvBuffer.clear();
  414. int iRet = checkSocket();
  415. if(iRet < 0)
  416. {
  417. return iRet;
  418. }
  419. TC_Epoller epoller(false);
  420. epoller.create(1);
  421. epoller.add(_socket.getfd(), 0, EPOLLIN);
  422. while(true)
  423. {
  424. int iRetCode = epoller.wait(_timeout);
  425. if (iRetCode < 0)
  426. {
  427. _socket.close();
  428. return EM_SELECT;
  429. }
  430. else if (iRetCode == 0)
  431. {
  432. _socket.close();
  433. return EM_TIMEOUT;
  434. }
  435. epoll_event ev = epoller.get(0);
  436. if(ev.events & EPOLLIN)
  437. {
  438. char sTmpBuffer[LEN_MAXRECV] = "\0";
  439. int len = _socket.recv((void*)sTmpBuffer, LEN_MAXRECV);
  440. if (len < 0)
  441. {
  442. _socket.close();
  443. return EM_RECV;
  444. }
  445. else if (len == 0)
  446. {
  447. _socket.close();
  448. return EM_SUCCESS;
  449. }
  450. sRecvBuffer.append(sTmpBuffer, len);
  451. }
  452. else
  453. {
  454. _socket.close();
  455. return EM_SELECT;
  456. }
  457. }
  458. return EM_SUCCESS;
  459. }
  460. int TC_TCPClient::recvLength(char *sRecvBuffer, size_t iRecvLen)
  461. {
  462. int iRet = checkSocket();
  463. if(iRet < 0)
  464. {
  465. return iRet;
  466. }
  467. size_t iRecvLeft = iRecvLen;
  468. iRecvLen = 0;
  469. TC_Epoller epoller(false);
  470. epoller.create(1);
  471. epoller.add(_socket.getfd(), 0, EPOLLIN);
  472. while(iRecvLeft != 0)
  473. {
  474. int iRetCode = epoller.wait(_timeout);
  475. if (iRetCode < 0)
  476. {
  477. _socket.close();
  478. return EM_SELECT;
  479. }
  480. else if (iRetCode == 0)
  481. {
  482. _socket.close();
  483. return EM_TIMEOUT;
  484. }
  485. epoll_event ev = epoller.get(0);
  486. if(ev.events & EPOLLIN)
  487. {
  488. int len = _socket.recv((void*)(sRecvBuffer + iRecvLen), iRecvLeft);
  489. if (len < 0)
  490. {
  491. _socket.close();
  492. return EM_RECV;
  493. }
  494. else if (len == 0)
  495. {
  496. _socket.close();
  497. return EM_CLOSE;
  498. }
  499. iRecvLeft -= len;
  500. iRecvLen += len;
  501. }
  502. else
  503. {
  504. _socket.close();
  505. return EM_SELECT;
  506. }
  507. }
  508. return EM_SUCCESS;
  509. }
  510. int TC_TCPClient::sendRecv(const char* sSendBuffer, size_t iSendLen, char *sRecvBuffer, size_t &iRecvLen)
  511. {
  512. int iRet = send(sSendBuffer, iSendLen);
  513. if(iRet != EM_SUCCESS)
  514. {
  515. return iRet;
  516. }
  517. return recv(sRecvBuffer, iRecvLen);
  518. }
  519. int TC_TCPClient::sendRecvBySep(const char* sSendBuffer, size_t iSendLen, string &sRecvBuffer, const string &sSep)
  520. {
  521. int iRet = send(sSendBuffer, iSendLen);
  522. if(iRet != EM_SUCCESS)
  523. {
  524. return iRet;
  525. }
  526. return recvBySep(sRecvBuffer, sSep);
  527. }
  528. int TC_TCPClient::sendRecvLine(const char* sSendBuffer, size_t iSendLen, string &sRecvBuffer)
  529. {
  530. return sendRecvBySep(sSendBuffer, iSendLen, sRecvBuffer, "\r\n");
  531. }
  532. int TC_TCPClient::sendRecvAll(const char* sSendBuffer, size_t iSendLen, string &sRecvBuffer)
  533. {
  534. int iRet = send(sSendBuffer, iSendLen);
  535. if(iRet != EM_SUCCESS)
  536. {
  537. return iRet;
  538. }
  539. return recvAll(sRecvBuffer);
  540. }
  541. /*************************************TC_UDPClient**************************************/
  542. int TC_UDPClient::checkSocket()
  543. {
  544. if(!_socket.isValid())
  545. {
  546. try
  547. {
  548. if(_port == 0)
  549. {
  550. _socket.createSocket(SOCK_DGRAM, AF_LOCAL);
  551. }
  552. else
  553. {
  554. _socket.createSocket(SOCK_DGRAM, AF_INET);
  555. }
  556. }
  557. catch(TC_Socket_Exception &ex)
  558. {
  559. _socket.close();
  560. return EM_SOCKET;
  561. }
  562. try
  563. {
  564. if(_port == 0)
  565. {
  566. _socket.connect(_ip.c_str());
  567. if(_port == 0)
  568. {
  569. _socket.bind(_ip.c_str());
  570. }
  571. }
  572. else
  573. {
  574. _socket.connect(_ip, _port);
  575. }
  576. }
  577. catch(TC_SocketConnect_Exception &ex)
  578. {
  579. _socket.close();
  580. return EM_CONNECT;
  581. }
  582. catch(TC_Socket_Exception &ex)
  583. {
  584. _socket.close();
  585. return EM_SOCKET;
  586. }
  587. }
  588. return EM_SUCCESS;
  589. }
  590. int TC_UDPClient::send(const char *sSendBuffer, size_t iSendLen)
  591. {
  592. int iRet = checkSocket();
  593. if(iRet < 0)
  594. {
  595. return iRet;
  596. }
  597. iRet = _socket.send(sSendBuffer, iSendLen);
  598. if(iRet <0 )
  599. {
  600. return EM_SEND;
  601. }
  602. return EM_SUCCESS;
  603. }
  604. int TC_UDPClient::recv(char *sRecvBuffer, size_t &iRecvLen)
  605. {
  606. string sTmpIp;
  607. uint16_t iTmpPort;
  608. return recv(sRecvBuffer, iRecvLen, sTmpIp, iTmpPort);
  609. }
  610. int TC_UDPClient::recv(char *sRecvBuffer, size_t &iRecvLen, string &sRemoteIp, uint16_t &iRemotePort)
  611. {
  612. int iRet = checkSocket();
  613. if(iRet < 0)
  614. {
  615. return iRet;
  616. }
  617. TC_Epoller epoller(false);
  618. epoller.create(1);
  619. epoller.add(_socket.getfd(), 0, EPOLLIN);
  620. int iRetCode = epoller.wait(_timeout);
  621. if (iRetCode < 0)
  622. {
  623. return EM_SELECT;
  624. }
  625. else if (iRetCode == 0)
  626. {
  627. return EM_TIMEOUT;
  628. }
  629. epoll_event ev = epoller.get(0);
  630. if(ev.events & EPOLLIN)
  631. {
  632. iRet = _socket.recvfrom(sRecvBuffer, iRecvLen, sRemoteIp, iRemotePort);
  633. if(iRet <0 )
  634. {
  635. return EM_SEND;
  636. }
  637. iRecvLen = iRet;
  638. return EM_SUCCESS;
  639. }
  640. return EM_SELECT;
  641. }
  642. int TC_UDPClient::sendRecv(const char *sSendBuffer, size_t iSendLen, char *sRecvBuffer, size_t &iRecvLen)
  643. {
  644. int iRet = send(sSendBuffer, iSendLen);
  645. if(iRet != EM_SUCCESS)
  646. {
  647. return iRet;
  648. }
  649. return recv(sRecvBuffer, iRecvLen);
  650. }
  651. int TC_UDPClient::sendRecv(const char *sSendBuffer, size_t iSendLen, char *sRecvBuffer, size_t &iRecvLen, string &sRemoteIp, uint16_t &iRemotePort)
  652. {
  653. int iRet = send(sSendBuffer, iSendLen);
  654. if(iRet != EM_SUCCESS)
  655. {
  656. return iRet;
  657. }
  658. return recv(sRecvBuffer, iRecvLen, sRemoteIp, iRemotePort);
  659. }
  660. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载