socklnd_cb.c 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632
  1. /*
  2. * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  3. *
  4. * Copyright (c) 2011, 2012, Intel Corporation.
  5. *
  6. * Author: Zach Brown <zab@zabbo.net>
  7. * Author: Peter J. Braam <braam@clusterfs.com>
  8. * Author: Phil Schwan <phil@clusterfs.com>
  9. * Author: Eric Barton <eric@bartonsoftware.com>
  10. *
  11. * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  12. *
  13. * Portals is free software; you can redistribute it and/or
  14. * modify it under the terms of version 2 of the GNU General Public
  15. * License as published by the Free Software Foundation.
  16. *
  17. * Portals is distributed in the hope that it will be useful,
  18. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  19. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  20. * GNU General Public License for more details.
  21. *
  22. * You should have received a copy of the GNU General Public License
  23. * along with Portals; if not, write to the Free Software
  24. * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  25. */
  26. #include "socklnd.h"
  27. ksock_tx_t *
  28. ksocknal_alloc_tx(int type, int size)
  29. {
  30. ksock_tx_t *tx = NULL;
  31. if (type == KSOCK_MSG_NOOP) {
  32. LASSERT(size == KSOCK_NOOP_TX_SIZE);
  33. /* searching for a noop tx in free list */
  34. spin_lock(&ksocknal_data.ksnd_tx_lock);
  35. if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
  36. tx = list_entry(ksocknal_data.ksnd_idle_noop_txs. \
  37. next, ksock_tx_t, tx_list);
  38. LASSERT(tx->tx_desc_size == size);
  39. list_del(&tx->tx_list);
  40. }
  41. spin_unlock(&ksocknal_data.ksnd_tx_lock);
  42. }
  43. if (tx == NULL)
  44. LIBCFS_ALLOC(tx, size);
  45. if (tx == NULL)
  46. return NULL;
  47. atomic_set(&tx->tx_refcount, 1);
  48. tx->tx_zc_aborted = 0;
  49. tx->tx_zc_capable = 0;
  50. tx->tx_zc_checked = 0;
  51. tx->tx_desc_size = size;
  52. atomic_inc(&ksocknal_data.ksnd_nactive_txs);
  53. return tx;
  54. }
  55. ksock_tx_t *
  56. ksocknal_alloc_tx_noop(__u64 cookie, int nonblk)
  57. {
  58. ksock_tx_t *tx;
  59. tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE);
  60. if (tx == NULL) {
  61. CERROR("Can't allocate noop tx desc\n");
  62. return NULL;
  63. }
  64. tx->tx_conn = NULL;
  65. tx->tx_lnetmsg = NULL;
  66. tx->tx_kiov = NULL;
  67. tx->tx_nkiov = 0;
  68. tx->tx_iov = tx->tx_frags.virt.iov;
  69. tx->tx_niov = 1;
  70. tx->tx_nonblk = nonblk;
  71. socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
  72. tx->tx_msg.ksm_zc_cookies[1] = cookie;
  73. return tx;
  74. }
  75. void
  76. ksocknal_free_tx (ksock_tx_t *tx)
  77. {
  78. atomic_dec(&ksocknal_data.ksnd_nactive_txs);
  79. if (tx->tx_lnetmsg == NULL && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
  80. /* it's a noop tx */
  81. spin_lock(&ksocknal_data.ksnd_tx_lock);
  82. list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs);
  83. spin_unlock(&ksocknal_data.ksnd_tx_lock);
  84. } else {
  85. LIBCFS_FREE(tx, tx->tx_desc_size);
  86. }
  87. }
  88. static int
  89. ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
  90. {
  91. struct kvec *iov = tx->tx_iov;
  92. int nob;
  93. int rc;
  94. LASSERT(tx->tx_niov > 0);
  95. /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */
  96. rc = ksocknal_lib_send_iov(conn, tx);
  97. if (rc <= 0) /* sent nothing? */
  98. return rc;
  99. nob = rc;
  100. LASSERT (nob <= tx->tx_resid);
  101. tx->tx_resid -= nob;
  102. /* "consume" iov */
  103. do {
  104. LASSERT(tx->tx_niov > 0);
  105. if (nob < (int) iov->iov_len) {
  106. iov->iov_base = (void *)((char *)iov->iov_base + nob);
  107. iov->iov_len -= nob;
  108. return rc;
  109. }
  110. nob -= iov->iov_len;
  111. tx->tx_iov = ++iov;
  112. tx->tx_niov--;
  113. } while (nob != 0);
  114. return rc;
  115. }
  116. static int
  117. ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
  118. {
  119. lnet_kiov_t *kiov = tx->tx_kiov;
  120. int nob;
  121. int rc;
  122. LASSERT(tx->tx_niov == 0);
  123. LASSERT(tx->tx_nkiov > 0);
  124. /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */
  125. rc = ksocknal_lib_send_kiov(conn, tx);
  126. if (rc <= 0) /* sent nothing? */
  127. return rc;
  128. nob = rc;
  129. LASSERT (nob <= tx->tx_resid);
  130. tx->tx_resid -= nob;
  131. /* "consume" kiov */
  132. do {
  133. LASSERT(tx->tx_nkiov > 0);
  134. if (nob < (int)kiov->kiov_len) {
  135. kiov->kiov_offset += nob;
  136. kiov->kiov_len -= nob;
  137. return rc;
  138. }
  139. nob -= (int)kiov->kiov_len;
  140. tx->tx_kiov = ++kiov;
  141. tx->tx_nkiov--;
  142. } while (nob != 0);
  143. return rc;
  144. }
  145. static int
  146. ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
  147. {
  148. int rc;
  149. int bufnob;
  150. if (ksocknal_data.ksnd_stall_tx != 0) {
  151. set_current_state(TASK_UNINTERRUPTIBLE);
  152. schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_tx));
  153. }
  154. LASSERT(tx->tx_resid != 0);
  155. rc = ksocknal_connsock_addref(conn);
  156. if (rc != 0) {
  157. LASSERT (conn->ksnc_closing);
  158. return -ESHUTDOWN;
  159. }
  160. do {
  161. if (ksocknal_data.ksnd_enomem_tx > 0) {
  162. /* testing... */
  163. ksocknal_data.ksnd_enomem_tx--;
  164. rc = -EAGAIN;
  165. } else if (tx->tx_niov != 0) {
  166. rc = ksocknal_send_iov (conn, tx);
  167. } else {
  168. rc = ksocknal_send_kiov (conn, tx);
  169. }
  170. bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
  171. if (rc > 0) /* sent something? */
  172. conn->ksnc_tx_bufnob += rc; /* account it */
  173. if (bufnob < conn->ksnc_tx_bufnob) {
  174. /* allocated send buffer bytes < computed; infer
  175. * something got ACKed */
  176. conn->ksnc_tx_deadline =
  177. cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
  178. conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
  179. conn->ksnc_tx_bufnob = bufnob;
  180. mb();
  181. }
  182. if (rc <= 0) { /* Didn't write anything? */
  183. if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
  184. rc = -EAGAIN;
  185. /* Check if EAGAIN is due to memory pressure */
  186. if (rc == -EAGAIN && ksocknal_lib_memory_pressure(conn))
  187. rc = -ENOMEM;
  188. break;
  189. }
  190. /* socket's wmem_queued now includes 'rc' bytes */
  191. atomic_sub (rc, &conn->ksnc_tx_nob);
  192. rc = 0;
  193. } while (tx->tx_resid != 0);
  194. ksocknal_connsock_decref(conn);
  195. return rc;
  196. }
  197. static int
  198. ksocknal_recv_iov (ksock_conn_t *conn)
  199. {
  200. struct kvec *iov = conn->ksnc_rx_iov;
  201. int nob;
  202. int rc;
  203. LASSERT(conn->ksnc_rx_niov > 0);
  204. /* Never touch conn->ksnc_rx_iov or change connection
  205. * status inside ksocknal_lib_recv_iov */
  206. rc = ksocknal_lib_recv_iov(conn);
  207. if (rc <= 0)
  208. return rc;
  209. /* received something... */
  210. nob = rc;
  211. conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
  212. conn->ksnc_rx_deadline =
  213. cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
  214. mb(); /* order with setting rx_started */
  215. conn->ksnc_rx_started = 1;
  216. conn->ksnc_rx_nob_wanted -= nob;
  217. conn->ksnc_rx_nob_left -= nob;
  218. do {
  219. LASSERT(conn->ksnc_rx_niov > 0);
  220. if (nob < (int)iov->iov_len) {
  221. iov->iov_len -= nob;
  222. iov->iov_base += nob;
  223. return -EAGAIN;
  224. }
  225. nob -= iov->iov_len;
  226. conn->ksnc_rx_iov = ++iov;
  227. conn->ksnc_rx_niov--;
  228. } while (nob != 0);
  229. return rc;
  230. }
  231. static int
  232. ksocknal_recv_kiov (ksock_conn_t *conn)
  233. {
  234. lnet_kiov_t *kiov = conn->ksnc_rx_kiov;
  235. int nob;
  236. int rc;
  237. LASSERT(conn->ksnc_rx_nkiov > 0);
  238. /* Never touch conn->ksnc_rx_kiov or change connection
  239. * status inside ksocknal_lib_recv_iov */
  240. rc = ksocknal_lib_recv_kiov(conn);
  241. if (rc <= 0)
  242. return rc;
  243. /* received something... */
  244. nob = rc;
  245. conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
  246. conn->ksnc_rx_deadline =
  247. cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
  248. mb(); /* order with setting rx_started */
  249. conn->ksnc_rx_started = 1;
  250. conn->ksnc_rx_nob_wanted -= nob;
  251. conn->ksnc_rx_nob_left -= nob;
  252. do {
  253. LASSERT(conn->ksnc_rx_nkiov > 0);
  254. if (nob < (int) kiov->kiov_len) {
  255. kiov->kiov_offset += nob;
  256. kiov->kiov_len -= nob;
  257. return -EAGAIN;
  258. }
  259. nob -= kiov->kiov_len;
  260. conn->ksnc_rx_kiov = ++kiov;
  261. conn->ksnc_rx_nkiov--;
  262. } while (nob != 0);
  263. return 1;
  264. }
  265. static int
  266. ksocknal_receive (ksock_conn_t *conn)
  267. {
  268. /* Return 1 on success, 0 on EOF, < 0 on error.
  269. * Caller checks ksnc_rx_nob_wanted to determine
  270. * progress/completion. */
  271. int rc;
  272. if (ksocknal_data.ksnd_stall_rx != 0) {
  273. set_current_state(TASK_UNINTERRUPTIBLE);
  274. schedule_timeout(cfs_time_seconds(ksocknal_data.ksnd_stall_rx));
  275. }
  276. rc = ksocknal_connsock_addref(conn);
  277. if (rc != 0) {
  278. LASSERT (conn->ksnc_closing);
  279. return -ESHUTDOWN;
  280. }
  281. for (;;) {
  282. if (conn->ksnc_rx_niov != 0)
  283. rc = ksocknal_recv_iov (conn);
  284. else
  285. rc = ksocknal_recv_kiov (conn);
  286. if (rc <= 0) {
  287. /* error/EOF or partial receive */
  288. if (rc == -EAGAIN) {
  289. rc = 1;
  290. } else if (rc == 0 && conn->ksnc_rx_started) {
  291. /* EOF in the middle of a message */
  292. rc = -EPROTO;
  293. }
  294. break;
  295. }
  296. /* Completed a fragment */
  297. if (conn->ksnc_rx_nob_wanted == 0) {
  298. rc = 1;
  299. break;
  300. }
  301. }
  302. ksocknal_connsock_decref(conn);
  303. return rc;
  304. }
  305. void
  306. ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx)
  307. {
  308. lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
  309. int rc = (tx->tx_resid == 0 && !tx->tx_zc_aborted) ? 0 : -EIO;
  310. LASSERT(ni != NULL || tx->tx_conn != NULL);
  311. if (tx->tx_conn != NULL)
  312. ksocknal_conn_decref(tx->tx_conn);
  313. if (ni == NULL && tx->tx_conn != NULL)
  314. ni = tx->tx_conn->ksnc_peer->ksnp_ni;
  315. ksocknal_free_tx (tx);
  316. if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */
  317. lnet_finalize (ni, lnetmsg, rc);
  318. }
  319. void
  320. ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error)
  321. {
  322. ksock_tx_t *tx;
  323. while (!list_empty (txlist)) {
  324. tx = list_entry(txlist->next, ksock_tx_t, tx_list);
  325. if (error && tx->tx_lnetmsg != NULL) {
  326. CNETERR("Deleting packet type %d len %d %s->%s\n",
  327. le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
  328. le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
  329. libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
  330. libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid)));
  331. } else if (error) {
  332. CNETERR("Deleting noop packet\n");
  333. }
  334. list_del(&tx->tx_list);
  335. LASSERT(atomic_read(&tx->tx_refcount) == 1);
  336. ksocknal_tx_done(ni, tx);
  337. }
  338. }
  339. static void
  340. ksocknal_check_zc_req(ksock_tx_t *tx)
  341. {
  342. ksock_conn_t *conn = tx->tx_conn;
  343. ksock_peer_t *peer = conn->ksnc_peer;
  344. /* Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx
  345. * to ksnp_zc_req_list if some fragment of this message should be sent
  346. * zero-copy. Our peer will send an ACK containing this cookie when
  347. * she has received this message to tell us we can signal completion.
  348. * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on
  349. * ksnp_zc_req_list. */
  350. LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
  351. LASSERT(tx->tx_zc_capable);
  352. tx->tx_zc_checked = 1;
  353. if (conn->ksnc_proto == &ksocknal_protocol_v1x ||
  354. !conn->ksnc_zc_capable)
  355. return;
  356. /* assign cookie and queue tx to pending list, it will be released when
  357. * a matching ack is received. See ksocknal_handle_zcack() */
  358. ksocknal_tx_addref(tx);
  359. spin_lock(&peer->ksnp_lock);
  360. /* ZC_REQ is going to be pinned to the peer */
  361. tx->tx_deadline =
  362. cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
  363. LASSERT(tx->tx_msg.ksm_zc_cookies[0] == 0);
  364. tx->tx_msg.ksm_zc_cookies[0] = peer->ksnp_zc_next_cookie++;
  365. if (peer->ksnp_zc_next_cookie == 0)
  366. peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
  367. list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
  368. spin_unlock(&peer->ksnp_lock);
  369. }
  370. static void
  371. ksocknal_uncheck_zc_req(ksock_tx_t *tx)
  372. {
  373. ksock_peer_t *peer = tx->tx_conn->ksnc_peer;
  374. LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
  375. LASSERT(tx->tx_zc_capable);
  376. tx->tx_zc_checked = 0;
  377. spin_lock(&peer->ksnp_lock);
  378. if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
  379. /* Not waiting for an ACK */
  380. spin_unlock(&peer->ksnp_lock);
  381. return;
  382. }
  383. tx->tx_msg.ksm_zc_cookies[0] = 0;
  384. list_del(&tx->tx_zc_list);
  385. spin_unlock(&peer->ksnp_lock);
  386. ksocknal_tx_decref(tx);
  387. }
  388. static int
  389. ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
  390. {
  391. int rc;
  392. if (tx->tx_zc_capable && !tx->tx_zc_checked)
  393. ksocknal_check_zc_req(tx);
  394. rc = ksocknal_transmit (conn, tx);
  395. CDEBUG(D_NET, "send(%d) %d\n", tx->tx_resid, rc);
  396. if (tx->tx_resid == 0) {
  397. /* Sent everything OK */
  398. LASSERT (rc == 0);
  399. return 0;
  400. }
  401. if (rc == -EAGAIN)
  402. return rc;
  403. if (rc == -ENOMEM) {
  404. static int counter;
  405. counter++; /* exponential backoff warnings */
  406. if ((counter & (-counter)) == counter)
  407. CWARN("%u ENOMEM tx %p\n", counter, conn);
  408. /* Queue on ksnd_enomem_conns for retry after a timeout */
  409. spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
  410. /* enomem list takes over scheduler's ref... */
  411. LASSERT (conn->ksnc_tx_scheduled);
  412. list_add_tail(&conn->ksnc_tx_list,
  413. &ksocknal_data.ksnd_enomem_conns);
  414. if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(),
  415. SOCKNAL_ENOMEM_RETRY),
  416. ksocknal_data.ksnd_reaper_waketime))
  417. wake_up (&ksocknal_data.ksnd_reaper_waitq);
  418. spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
  419. return rc;
  420. }
  421. /* Actual error */
  422. LASSERT(rc < 0);
  423. if (!conn->ksnc_closing) {
  424. switch (rc) {
  425. case -ECONNRESET:
  426. LCONSOLE_WARN("Host %pI4h reset our connection while we were sending data; it may have rebooted.\n",
  427. &conn->ksnc_ipaddr);
  428. break;
  429. default:
  430. LCONSOLE_WARN("There was an unexpected network error while writing to %pI4h: %d.\n",
  431. &conn->ksnc_ipaddr, rc);
  432. break;
  433. }
  434. CDEBUG(D_NET, "[%p] Error %d on write to %s ip %pI4h:%d\n",
  435. conn, rc,
  436. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  437. &conn->ksnc_ipaddr,
  438. conn->ksnc_port);
  439. }
  440. if (tx->tx_zc_checked)
  441. ksocknal_uncheck_zc_req(tx);
  442. /* it's not an error if conn is being closed */
  443. ksocknal_close_conn_and_siblings (conn,
  444. (conn->ksnc_closing) ? 0 : rc);
  445. return rc;
  446. }
  447. static void
  448. ksocknal_launch_connection_locked (ksock_route_t *route)
  449. {
  450. /* called holding write lock on ksnd_global_lock */
  451. LASSERT(!route->ksnr_scheduled);
  452. LASSERT(!route->ksnr_connecting);
  453. LASSERT((ksocknal_route_mask() & ~route->ksnr_connected) != 0);
  454. route->ksnr_scheduled = 1; /* scheduling conn for connd */
  455. ksocknal_route_addref(route); /* extra ref for connd */
  456. spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
  457. list_add_tail(&route->ksnr_connd_list,
  458. &ksocknal_data.ksnd_connd_routes);
  459. wake_up(&ksocknal_data.ksnd_connd_waitq);
  460. spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
  461. }
  462. void
  463. ksocknal_launch_all_connections_locked (ksock_peer_t *peer)
  464. {
  465. ksock_route_t *route;
  466. /* called holding write lock on ksnd_global_lock */
  467. for (;;) {
  468. /* launch any/all connections that need it */
  469. route = ksocknal_find_connectable_route_locked(peer);
  470. if (route == NULL)
  471. return;
  472. ksocknal_launch_connection_locked(route);
  473. }
  474. }
  475. ksock_conn_t *
  476. ksocknal_find_conn_locked(ksock_peer_t *peer, ksock_tx_t *tx, int nonblk)
  477. {
  478. struct list_head *tmp;
  479. ksock_conn_t *conn;
  480. ksock_conn_t *typed = NULL;
  481. ksock_conn_t *fallback = NULL;
  482. int tnob = 0;
  483. int fnob = 0;
  484. list_for_each (tmp, &peer->ksnp_conns) {
  485. ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
  486. int nob = atomic_read(&c->ksnc_tx_nob) +
  487. c->ksnc_sock->sk->sk_wmem_queued;
  488. int rc;
  489. LASSERT(!c->ksnc_closing);
  490. LASSERT(c->ksnc_proto != NULL &&
  491. c->ksnc_proto->pro_match_tx != NULL);
  492. rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk);
  493. switch (rc) {
  494. default:
  495. LBUG();
  496. case SOCKNAL_MATCH_NO: /* protocol rejected the tx */
  497. continue;
  498. case SOCKNAL_MATCH_YES: /* typed connection */
  499. if (typed == NULL || tnob > nob ||
  500. (tnob == nob && *ksocknal_tunables.ksnd_round_robin &&
  501. cfs_time_after(typed->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
  502. typed = c;
  503. tnob = nob;
  504. }
  505. break;
  506. case SOCKNAL_MATCH_MAY: /* fallback connection */
  507. if (fallback == NULL || fnob > nob ||
  508. (fnob == nob && *ksocknal_tunables.ksnd_round_robin &&
  509. cfs_time_after(fallback->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
  510. fallback = c;
  511. fnob = nob;
  512. }
  513. break;
  514. }
  515. }
  516. /* prefer the typed selection */
  517. conn = (typed != NULL) ? typed : fallback;
  518. if (conn != NULL)
  519. conn->ksnc_tx_last_post = cfs_time_current();
  520. return conn;
  521. }
  522. void
  523. ksocknal_tx_prep(ksock_conn_t *conn, ksock_tx_t *tx)
  524. {
  525. conn->ksnc_proto->pro_pack(tx);
  526. atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
  527. ksocknal_conn_addref(conn); /* +1 ref for tx */
  528. tx->tx_conn = conn;
  529. }
  530. void
  531. ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
  532. {
  533. ksock_sched_t *sched = conn->ksnc_scheduler;
  534. ksock_msg_t *msg = &tx->tx_msg;
  535. ksock_tx_t *ztx = NULL;
  536. int bufnob = 0;
  537. /* called holding global lock (read or irq-write) and caller may
  538. * not have dropped this lock between finding conn and calling me,
  539. * so we don't need the {get,put}connsock dance to deref
  540. * ksnc_sock... */
  541. LASSERT(!conn->ksnc_closing);
  542. CDEBUG(D_NET, "Sending to %s ip %pI4h:%d\n",
  543. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  544. &conn->ksnc_ipaddr,
  545. conn->ksnc_port);
  546. ksocknal_tx_prep(conn, tx);
  547. /* Ensure the frags we've been given EXACTLY match the number of
  548. * bytes we want to send. Many TCP/IP stacks disregard any total
  549. * size parameters passed to them and just look at the frags.
  550. *
  551. * We always expect at least 1 mapped fragment containing the
  552. * complete ksocknal message header. */
  553. LASSERT(lnet_iov_nob (tx->tx_niov, tx->tx_iov) +
  554. lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) ==
  555. (unsigned int)tx->tx_nob);
  556. LASSERT(tx->tx_niov >= 1);
  557. LASSERT(tx->tx_resid == tx->tx_nob);
  558. CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
  559. tx, (tx->tx_lnetmsg != NULL) ? tx->tx_lnetmsg->msg_hdr.type :
  560. KSOCK_MSG_NOOP,
  561. tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
  562. /*
  563. * FIXME: SOCK_WMEM_QUEUED and SOCK_ERROR could block in __DARWIN8__
  564. * but they're used inside spinlocks a lot.
  565. */
  566. bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
  567. spin_lock_bh(&sched->kss_lock);
  568. if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) {
  569. /* First packet starts the timeout */
  570. conn->ksnc_tx_deadline =
  571. cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
  572. if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */
  573. conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
  574. conn->ksnc_tx_bufnob = 0;
  575. mb(); /* order with adding to tx_queue */
  576. }
  577. if (msg->ksm_type == KSOCK_MSG_NOOP) {
  578. /* The packet is noop ZC ACK, try to piggyback the ack_cookie
  579. * on a normal packet so I don't need to send it */
  580. LASSERT(msg->ksm_zc_cookies[1] != 0);
  581. LASSERT(conn->ksnc_proto->pro_queue_tx_zcack != NULL);
  582. if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0))
  583. ztx = tx; /* ZC ACK piggybacked on ztx release tx later */
  584. } else {
  585. /* It's a normal packet - can it piggback a noop zc-ack that
  586. * has been queued already? */
  587. LASSERT(msg->ksm_zc_cookies[1] == 0);
  588. LASSERT(conn->ksnc_proto->pro_queue_tx_msg != NULL);
  589. ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx);
  590. /* ztx will be released later */
  591. }
  592. if (ztx != NULL) {
  593. atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);
  594. list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
  595. }
  596. if (conn->ksnc_tx_ready && /* able to send */
  597. !conn->ksnc_tx_scheduled) { /* not scheduled to send */
  598. /* +1 ref for scheduler */
  599. ksocknal_conn_addref(conn);
  600. list_add_tail (&conn->ksnc_tx_list,
  601. &sched->kss_tx_conns);
  602. conn->ksnc_tx_scheduled = 1;
  603. wake_up (&sched->kss_waitq);
  604. }
  605. spin_unlock_bh(&sched->kss_lock);
  606. }
  607. ksock_route_t *
  608. ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
  609. {
  610. unsigned long now = cfs_time_current();
  611. struct list_head *tmp;
  612. ksock_route_t *route;
  613. list_for_each (tmp, &peer->ksnp_routes) {
  614. route = list_entry (tmp, ksock_route_t, ksnr_list);
  615. LASSERT(!route->ksnr_connecting || route->ksnr_scheduled);
  616. if (route->ksnr_scheduled) /* connections being established */
  617. continue;
  618. /* all route types connected ? */
  619. if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0)
  620. continue;
  621. if (!(route->ksnr_retry_interval == 0 || /* first attempt */
  622. cfs_time_aftereq(now, route->ksnr_timeout))) {
  623. CDEBUG(D_NET,
  624. "Too soon to retry route %pI4h (cnted %d, interval %ld, %ld secs later)\n",
  625. &route->ksnr_ipaddr,
  626. route->ksnr_connected,
  627. route->ksnr_retry_interval,
  628. cfs_duration_sec(route->ksnr_timeout - now));
  629. continue;
  630. }
  631. return route;
  632. }
  633. return NULL;
  634. }
  635. ksock_route_t *
  636. ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
  637. {
  638. struct list_head *tmp;
  639. ksock_route_t *route;
  640. list_for_each (tmp, &peer->ksnp_routes) {
  641. route = list_entry (tmp, ksock_route_t, ksnr_list);
  642. LASSERT(!route->ksnr_connecting || route->ksnr_scheduled);
  643. if (route->ksnr_scheduled)
  644. return route;
  645. }
  646. return NULL;
  647. }
  648. int
  649. ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
  650. {
  651. ksock_peer_t *peer;
  652. ksock_conn_t *conn;
  653. rwlock_t *g_lock;
  654. int retry;
  655. int rc;
  656. LASSERT(tx->tx_conn == NULL);
  657. g_lock = &ksocknal_data.ksnd_global_lock;
  658. for (retry = 0;; retry = 1) {
  659. read_lock(g_lock);
  660. peer = ksocknal_find_peer_locked(ni, id);
  661. if (peer != NULL) {
  662. if (ksocknal_find_connectable_route_locked(peer) == NULL) {
  663. conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
  664. if (conn != NULL) {
  665. /* I've got no routes that need to be
  666. * connecting and I do have an actual
  667. * connection... */
  668. ksocknal_queue_tx_locked (tx, conn);
  669. read_unlock(g_lock);
  670. return 0;
  671. }
  672. }
  673. }
  674. /* I'll need a write lock... */
  675. read_unlock(g_lock);
  676. write_lock_bh(g_lock);
  677. peer = ksocknal_find_peer_locked(ni, id);
  678. if (peer != NULL)
  679. break;
  680. write_unlock_bh(g_lock);
  681. if ((id.pid & LNET_PID_USERFLAG) != 0) {
  682. CERROR("Refusing to create a connection to userspace process %s\n",
  683. libcfs_id2str(id));
  684. return -EHOSTUNREACH;
  685. }
  686. if (retry) {
  687. CERROR("Can't find peer %s\n", libcfs_id2str(id));
  688. return -EHOSTUNREACH;
  689. }
  690. rc = ksocknal_add_peer(ni, id,
  691. LNET_NIDADDR(id.nid),
  692. lnet_acceptor_port());
  693. if (rc != 0) {
  694. CERROR("Can't add peer %s: %d\n",
  695. libcfs_id2str(id), rc);
  696. return rc;
  697. }
  698. }
  699. ksocknal_launch_all_connections_locked(peer);
  700. conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
  701. if (conn != NULL) {
  702. /* Connection exists; queue message on it */
  703. ksocknal_queue_tx_locked (tx, conn);
  704. write_unlock_bh(g_lock);
  705. return 0;
  706. }
  707. if (peer->ksnp_accepting > 0 ||
  708. ksocknal_find_connecting_route_locked (peer) != NULL) {
  709. /* the message is going to be pinned to the peer */
  710. tx->tx_deadline =
  711. cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
  712. /* Queue the message until a connection is established */
  713. list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
  714. write_unlock_bh(g_lock);
  715. return 0;
  716. }
  717. write_unlock_bh(g_lock);
  718. /* NB Routes may be ignored if connections to them failed recently */
  719. CNETERR("No usable routes to %s\n", libcfs_id2str(id));
  720. return -EHOSTUNREACH;
  721. }
  722. int
  723. ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
  724. {
  725. int mpflag = 1;
  726. int type = lntmsg->msg_type;
  727. lnet_process_id_t target = lntmsg->msg_target;
  728. unsigned int payload_niov = lntmsg->msg_niov;
  729. struct kvec *payload_iov = lntmsg->msg_iov;
  730. lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
  731. unsigned int payload_offset = lntmsg->msg_offset;
  732. unsigned int payload_nob = lntmsg->msg_len;
  733. ksock_tx_t *tx;
  734. int desc_size;
  735. int rc;
  736. /* NB 'private' is different depending on what we're sending.
  737. * Just ignore it... */
  738. CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
  739. payload_nob, payload_niov, libcfs_id2str(target));
  740. LASSERT(payload_nob == 0 || payload_niov > 0);
  741. LASSERT(payload_niov <= LNET_MAX_IOV);
  742. /* payload is either all vaddrs or all pages */
  743. LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
  744. LASSERT (!in_interrupt ());
  745. if (payload_iov != NULL)
  746. desc_size = offsetof(ksock_tx_t,
  747. tx_frags.virt.iov[1 + payload_niov]);
  748. else
  749. desc_size = offsetof(ksock_tx_t,
  750. tx_frags.paged.kiov[payload_niov]);
  751. if (lntmsg->msg_vmflush)
  752. mpflag = cfs_memory_pressure_get_and_set();
  753. tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size);
  754. if (tx == NULL) {
  755. CERROR("Can't allocate tx desc type %d size %d\n",
  756. type, desc_size);
  757. if (lntmsg->msg_vmflush)
  758. cfs_memory_pressure_restore(mpflag);
  759. return -ENOMEM;
  760. }
  761. tx->tx_conn = NULL; /* set when assigned a conn */
  762. tx->tx_lnetmsg = lntmsg;
  763. if (payload_iov != NULL) {
  764. tx->tx_kiov = NULL;
  765. tx->tx_nkiov = 0;
  766. tx->tx_iov = tx->tx_frags.virt.iov;
  767. tx->tx_niov = 1 +
  768. lnet_extract_iov(payload_niov, &tx->tx_iov[1],
  769. payload_niov, payload_iov,
  770. payload_offset, payload_nob);
  771. } else {
  772. tx->tx_niov = 1;
  773. tx->tx_iov = &tx->tx_frags.paged.iov;
  774. tx->tx_kiov = tx->tx_frags.paged.kiov;
  775. tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
  776. payload_niov, payload_kiov,
  777. payload_offset, payload_nob);
  778. if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload)
  779. tx->tx_zc_capable = 1;
  780. }
  781. socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
  782. /* The first fragment will be set later in pro_pack */
  783. rc = ksocknal_launch_packet(ni, tx, target);
  784. if (!mpflag)
  785. cfs_memory_pressure_restore(mpflag);
  786. if (rc == 0)
  787. return 0;
  788. ksocknal_free_tx(tx);
  789. return -EIO;
  790. }
  791. int
  792. ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name)
  793. {
  794. struct task_struct *task = kthread_run(fn, arg, "%s", name);
  795. if (IS_ERR(task))
  796. return PTR_ERR(task);
  797. write_lock_bh(&ksocknal_data.ksnd_global_lock);
  798. ksocknal_data.ksnd_nthreads++;
  799. write_unlock_bh(&ksocknal_data.ksnd_global_lock);
  800. return 0;
  801. }
  802. void
  803. ksocknal_thread_fini (void)
  804. {
  805. write_lock_bh(&ksocknal_data.ksnd_global_lock);
  806. ksocknal_data.ksnd_nthreads--;
  807. write_unlock_bh(&ksocknal_data.ksnd_global_lock);
  808. }
  809. int
  810. ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
  811. {
  812. static char ksocknal_slop_buffer[4096];
  813. int nob;
  814. unsigned int niov;
  815. int skipped;
  816. LASSERT(conn->ksnc_proto != NULL);
  817. if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) {
  818. /* Remind the socket to ack eagerly... */
  819. ksocknal_lib_eager_ack(conn);
  820. }
  821. if (nob_to_skip == 0) { /* right at next packet boundary now */
  822. conn->ksnc_rx_started = 0;
  823. mb(); /* racing with timeout thread */
  824. switch (conn->ksnc_proto->pro_version) {
  825. case KSOCK_PROTO_V2:
  826. case KSOCK_PROTO_V3:
  827. conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
  828. conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
  829. conn->ksnc_rx_iov[0].iov_base = &conn->ksnc_msg;
  830. conn->ksnc_rx_nob_wanted = offsetof(ksock_msg_t, ksm_u);
  831. conn->ksnc_rx_nob_left = offsetof(ksock_msg_t, ksm_u);
  832. conn->ksnc_rx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u);
  833. break;
  834. case KSOCK_PROTO_V1:
  835. /* Receiving bare lnet_hdr_t */
  836. conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
  837. conn->ksnc_rx_nob_wanted = sizeof(lnet_hdr_t);
  838. conn->ksnc_rx_nob_left = sizeof(lnet_hdr_t);
  839. conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
  840. conn->ksnc_rx_iov[0].iov_base = &conn->ksnc_msg.ksm_u.lnetmsg;
  841. conn->ksnc_rx_iov[0].iov_len = sizeof (lnet_hdr_t);
  842. break;
  843. default:
  844. LBUG ();
  845. }
  846. conn->ksnc_rx_niov = 1;
  847. conn->ksnc_rx_kiov = NULL;
  848. conn->ksnc_rx_nkiov = 0;
  849. conn->ksnc_rx_csum = ~0;
  850. return 1;
  851. }
  852. /* Set up to skip as much as possible now. If there's more left
  853. * (ran out of iov entries) we'll get called again */
  854. conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
  855. conn->ksnc_rx_nob_left = nob_to_skip;
  856. conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
  857. skipped = 0;
  858. niov = 0;
  859. do {
  860. nob = min_t(int, nob_to_skip, sizeof(ksocknal_slop_buffer));
  861. conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
  862. conn->ksnc_rx_iov[niov].iov_len = nob;
  863. niov++;
  864. skipped += nob;
  865. nob_to_skip -= nob;
  866. } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
  867. niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
  868. conn->ksnc_rx_niov = niov;
  869. conn->ksnc_rx_kiov = NULL;
  870. conn->ksnc_rx_nkiov = 0;
  871. conn->ksnc_rx_nob_wanted = skipped;
  872. return 0;
  873. }
  874. static int
  875. ksocknal_process_receive (ksock_conn_t *conn)
  876. {
  877. lnet_hdr_t *lhdr;
  878. lnet_process_id_t *id;
  879. int rc;
  880. LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0);
  881. /* NB: sched lock NOT held */
  882. /* SOCKNAL_RX_LNET_HEADER is here for backward compatibility */
  883. LASSERT(conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER ||
  884. conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD ||
  885. conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER ||
  886. conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
  887. again:
  888. if (conn->ksnc_rx_nob_wanted != 0) {
  889. rc = ksocknal_receive(conn);
  890. if (rc <= 0) {
  891. LASSERT (rc != -EAGAIN);
  892. if (rc == 0)
  893. CDEBUG(D_NET, "[%p] EOF from %s ip %pI4h:%d\n",
  894. conn,
  895. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  896. &conn->ksnc_ipaddr,
  897. conn->ksnc_port);
  898. else if (!conn->ksnc_closing)
  899. CERROR("[%p] Error %d on read from %s ip %pI4h:%d\n",
  900. conn, rc,
  901. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  902. &conn->ksnc_ipaddr,
  903. conn->ksnc_port);
  904. /* it's not an error if conn is being closed */
  905. ksocknal_close_conn_and_siblings (conn,
  906. (conn->ksnc_closing) ? 0 : rc);
  907. return (rc == 0 ? -ESHUTDOWN : rc);
  908. }
  909. if (conn->ksnc_rx_nob_wanted != 0) {
  910. /* short read */
  911. return -EAGAIN;
  912. }
  913. }
  914. switch (conn->ksnc_rx_state) {
  915. case SOCKNAL_RX_KSM_HEADER:
  916. if (conn->ksnc_flip) {
  917. __swab32s(&conn->ksnc_msg.ksm_type);
  918. __swab32s(&conn->ksnc_msg.ksm_csum);
  919. __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]);
  920. __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]);
  921. }
  922. if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP &&
  923. conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) {
  924. CERROR("%s: Unknown message type: %x\n",
  925. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  926. conn->ksnc_msg.ksm_type);
  927. ksocknal_new_packet(conn, 0);
  928. ksocknal_close_conn_and_siblings(conn, -EPROTO);
  929. return -EPROTO;
  930. }
  931. if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP &&
  932. conn->ksnc_msg.ksm_csum != 0 && /* has checksum */
  933. conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
  934. /* NOOP Checksum error */
  935. CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
  936. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  937. conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
  938. ksocknal_new_packet(conn, 0);
  939. ksocknal_close_conn_and_siblings(conn, -EPROTO);
  940. return -EIO;
  941. }
  942. if (conn->ksnc_msg.ksm_zc_cookies[1] != 0) {
  943. __u64 cookie = 0;
  944. LASSERT (conn->ksnc_proto != &ksocknal_protocol_v1x);
  945. if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP)
  946. cookie = conn->ksnc_msg.ksm_zc_cookies[0];
  947. rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie,
  948. conn->ksnc_msg.ksm_zc_cookies[1]);
  949. if (rc != 0) {
  950. CERROR("%s: Unknown ZC-ACK cookie: %llu, %llu\n",
  951. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  952. cookie, conn->ksnc_msg.ksm_zc_cookies[1]);
  953. ksocknal_new_packet(conn, 0);
  954. ksocknal_close_conn_and_siblings(conn, -EPROTO);
  955. return rc;
  956. }
  957. }
  958. if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) {
  959. ksocknal_new_packet (conn, 0);
  960. return 0; /* NOOP is done and just return */
  961. }
  962. conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
  963. conn->ksnc_rx_nob_wanted = sizeof(ksock_lnet_msg_t);
  964. conn->ksnc_rx_nob_left = sizeof(ksock_lnet_msg_t);
  965. conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
  966. conn->ksnc_rx_iov[0].iov_base = &conn->ksnc_msg.ksm_u.lnetmsg;
  967. conn->ksnc_rx_iov[0].iov_len = sizeof(ksock_lnet_msg_t);
  968. conn->ksnc_rx_niov = 1;
  969. conn->ksnc_rx_kiov = NULL;
  970. conn->ksnc_rx_nkiov = 0;
  971. goto again; /* read lnet header now */
  972. case SOCKNAL_RX_LNET_HEADER:
  973. /* unpack message header */
  974. conn->ksnc_proto->pro_unpack(&conn->ksnc_msg);
  975. if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) {
  976. /* Userspace peer */
  977. lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
  978. id = &conn->ksnc_peer->ksnp_id;
  979. /* Substitute process ID assigned at connection time */
  980. lhdr->src_pid = cpu_to_le32(id->pid);
  981. lhdr->src_nid = cpu_to_le64(id->nid);
  982. }
  983. conn->ksnc_rx_state = SOCKNAL_RX_PARSE;
  984. ksocknal_conn_addref(conn); /* ++ref while parsing */
  985. rc = lnet_parse(conn->ksnc_peer->ksnp_ni,
  986. &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr,
  987. conn->ksnc_peer->ksnp_id.nid, conn, 0);
  988. if (rc < 0) {
  989. /* I just received garbage: give up on this conn */
  990. ksocknal_new_packet(conn, 0);
  991. ksocknal_close_conn_and_siblings (conn, rc);
  992. ksocknal_conn_decref(conn);
  993. return -EPROTO;
  994. }
  995. /* I'm racing with ksocknal_recv() */
  996. LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE ||
  997. conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD);
  998. if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD)
  999. return 0;
  1000. /* ksocknal_recv() got called */
  1001. goto again;
  1002. case SOCKNAL_RX_LNET_PAYLOAD:
  1003. /* payload all received */
  1004. rc = 0;
  1005. if (conn->ksnc_rx_nob_left == 0 && /* not truncating */
  1006. conn->ksnc_msg.ksm_csum != 0 && /* has checksum */
  1007. conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
  1008. CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
  1009. libcfs_id2str(conn->ksnc_peer->ksnp_id),
  1010. conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
  1011. rc = -EIO;
  1012. }
  1013. if (rc == 0 && conn->ksnc_msg.ksm_zc_cookies[0] != 0) {
  1014. LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x);
  1015. lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
  1016. id = &conn->ksnc_peer->ksnp_id;
  1017. rc = conn->ksnc_proto->pro_handle_zcreq(conn,
  1018. conn->ksnc_msg.ksm_zc_cookies[0],
  1019. *ksocknal_tunables.ksnd_nonblk_zcack ||
  1020. le64_to_cpu(lhdr->src_nid) != id->nid);
  1021. }
  1022. lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc);
  1023. if (rc != 0) {
  1024. ksocknal_new_packet(conn, 0);
  1025. ksocknal_close_conn_and_siblings (conn, rc);
  1026. return -EPROTO;
  1027. }
  1028. /* Fall through */
  1029. case SOCKNAL_RX_SLOP:
  1030. /* starting new packet? */
  1031. if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
  1032. return 0; /* come back later */
  1033. goto again; /* try to finish reading slop now */
  1034. default:
  1035. break;
  1036. }
  1037. /* Not Reached */
  1038. LBUG();
  1039. return -EINVAL; /* keep gcc happy */
  1040. }
  1041. int
  1042. ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
  1043. unsigned int niov, struct kvec *iov, lnet_kiov_t *kiov,
  1044. unsigned int offset, unsigned int mlen, unsigned int rlen)
  1045. {
  1046. ksock_conn_t *conn = private;
  1047. ksock_sched_t *sched = conn->ksnc_scheduler;
  1048. LASSERT(mlen <= rlen);
  1049. LASSERT(niov <= LNET_MAX_IOV);
  1050. conn->ksnc_cookie = msg;
  1051. conn->ksnc_rx_nob_wanted = mlen;
  1052. conn->ksnc_rx_nob_left = rlen;
  1053. if (mlen == 0 || iov != NULL) {
  1054. conn->ksnc_rx_nkiov = 0;
  1055. conn->ksnc_rx_kiov = NULL;
  1056. conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
  1057. conn->ksnc_rx_niov =
  1058. lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov,
  1059. niov, iov, offset, mlen);
  1060. } else {
  1061. conn->ksnc_rx_niov = 0;
  1062. conn->ksnc_rx_iov = NULL;
  1063. conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
  1064. conn->ksnc_rx_nkiov =
  1065. lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov,
  1066. niov, kiov, offset, mlen);
  1067. }
  1068. LASSERT(mlen ==
  1069. lnet_iov_nob(conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
  1070. lnet_kiov_nob(conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
  1071. LASSERT(conn->ksnc_rx_scheduled);
  1072. spin_lock_bh(&sched->kss_lock);
  1073. switch (conn->ksnc_rx_state) {
  1074. case SOCKNAL_RX_PARSE_WAIT:
  1075. list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);
  1076. wake_up (&sched->kss_waitq);
  1077. LASSERT (conn->ksnc_rx_ready);
  1078. break;
  1079. case SOCKNAL_RX_PARSE:
  1080. /* scheduler hasn't noticed I'm parsing yet */
  1081. break;
  1082. }
  1083. conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD;
  1084. spin_unlock_bh(&sched->kss_lock);
  1085. ksocknal_conn_decref(conn);
  1086. return 0;
  1087. }
  1088. static inline int
  1089. ksocknal_sched_cansleep(ksock_sched_t *sched)
  1090. {
  1091. int rc;
  1092. spin_lock_bh(&sched->kss_lock);
  1093. rc = !ksocknal_data.ksnd_shuttingdown &&
  1094. list_empty(&sched->kss_rx_conns) &&
  1095. list_empty(&sched->kss_tx_conns);
  1096. spin_unlock_bh(&sched->kss_lock);
  1097. return rc;
  1098. }
  1099. int ksocknal_scheduler(void *arg)
  1100. {
  1101. struct ksock_sched_info *info;
  1102. ksock_sched_t *sched;
  1103. ksock_conn_t *conn;
  1104. ksock_tx_t *tx;
  1105. int rc;
  1106. int nloops = 0;
  1107. long id = (long)arg;
  1108. info = ksocknal_data.ksnd_sched_info[KSOCK_THREAD_CPT(id)];
  1109. sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)];
  1110. cfs_block_allsigs();
  1111. rc = cfs_cpt_bind(lnet_cpt_table(), info->ksi_cpt);
  1112. if (rc != 0) {
  1113. CERROR("Can't set CPT affinity to %d: %d\n",
  1114. info->ksi_cpt, rc);
  1115. }
  1116. spin_lock_bh(&sched->kss_lock);
  1117. while (!ksocknal_data.ksnd_shuttingdown) {
  1118. int did_something = 0;
  1119. /* Ensure I progress everything semi-fairly */
  1120. if (!list_empty (&sched->kss_rx_conns)) {
  1121. conn = list_entry(sched->kss_rx_conns.next,
  1122. ksock_conn_t, ksnc_rx_list);
  1123. list_del(&conn->ksnc_rx_list);
  1124. LASSERT(conn->ksnc_rx_scheduled);
  1125. LASSERT(conn->ksnc_rx_ready);
  1126. /* clear rx_ready in case receive isn't complete.
  1127. * Do it BEFORE we call process_recv, since
  1128. * data_ready can set it any time after we release
  1129. * kss_lock. */
  1130. conn->ksnc_rx_ready = 0;
  1131. spin_unlock_bh(&sched->kss_lock);
  1132. rc = ksocknal_process_receive(conn);
  1133. spin_lock_bh(&sched->kss_lock);
  1134. /* I'm the only one that can clear this flag */
  1135. LASSERT(conn->ksnc_rx_scheduled);
  1136. /* Did process_receive get everything it wanted? */
  1137. if (rc == 0)
  1138. conn->ksnc_rx_ready = 1;
  1139. if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
  1140. /* Conn blocked waiting for ksocknal_recv()
  1141. * I change its state (under lock) to signal
  1142. * it can be rescheduled */
  1143. conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
  1144. } else if (conn->ksnc_rx_ready) {
  1145. /* reschedule for rx */
  1146. list_add_tail (&conn->ksnc_rx_list,
  1147. &sched->kss_rx_conns);
  1148. } else {
  1149. conn->ksnc_rx_scheduled = 0;
  1150. /* drop my ref */
  1151. ksocknal_conn_decref(conn);
  1152. }
  1153. did_something = 1;
  1154. }
  1155. if (!list_empty (&sched->kss_tx_conns)) {
  1156. LIST_HEAD(zlist);
  1157. if (!list_empty(&sched->kss_zombie_noop_txs)) {
  1158. list_add(&zlist,
  1159. &sched->kss_zombie_noop_txs);
  1160. list_del_init(&sched->kss_zombie_noop_txs);
  1161. }
  1162. conn = list_entry(sched->kss_tx_conns.next,
  1163. ksock_conn_t, ksnc_tx_list);
  1164. list_del (&conn->ksnc_tx_list);
  1165. LASSERT(conn->ksnc_tx_scheduled);
  1166. LASSERT(conn->ksnc_tx_ready);
  1167. LASSERT(!list_empty(&conn->ksnc_tx_queue));
  1168. tx = list_entry(conn->ksnc_tx_queue.next,
  1169. ksock_tx_t, tx_list);
  1170. if (conn->ksnc_tx_carrier == tx)
  1171. ksocknal_next_tx_carrier(conn);
  1172. /* dequeue now so empty list => more to send */
  1173. list_del(&tx->tx_list);
  1174. /* Clear tx_ready in case send isn't complete. Do
  1175. * it BEFORE we call process_transmit, since
  1176. * write_space can set it any time after we release
  1177. * kss_lock. */
  1178. conn->ksnc_tx_ready = 0;
  1179. spin_unlock_bh(&sched->kss_lock);
  1180. if (!list_empty(&zlist)) {
  1181. /* free zombie noop txs, it's fast because
  1182. * noop txs are just put in freelist */
  1183. ksocknal_txlist_done(NULL, &zlist, 0);
  1184. }
  1185. rc = ksocknal_process_transmit(conn, tx);
  1186. if (rc == -ENOMEM || rc == -EAGAIN) {
  1187. /* Incomplete send: replace tx on HEAD of tx_queue */
  1188. spin_lock_bh(&sched->kss_lock);
  1189. list_add(&tx->tx_list,
  1190. &conn->ksnc_tx_queue);
  1191. } else {
  1192. /* Complete send; tx -ref */
  1193. ksocknal_tx_decref(tx);
  1194. spin_lock_bh(&sched->kss_lock);
  1195. /* assume space for more */
  1196. conn->ksnc_tx_ready = 1;
  1197. }
  1198. if (rc == -ENOMEM) {
  1199. /* Do nothing; after a short timeout, this
  1200. * conn will be reposted on kss_tx_conns. */
  1201. } else if (conn->ksnc_tx_ready &&
  1202. !list_empty(&conn->ksnc_tx_queue)) {
  1203. /* reschedule for tx */
  1204. list_add_tail(&conn->ksnc_tx_list,
  1205. &sched->kss_tx_conns);
  1206. } else {
  1207. conn->ksnc_tx_scheduled = 0;
  1208. /* drop my ref */
  1209. ksocknal_conn_decref(conn);
  1210. }
  1211. did_something = 1;
  1212. }
  1213. if (!did_something || /* nothing to do */
  1214. ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
  1215. spin_unlock_bh(&sched->kss_lock);
  1216. nloops = 0;
  1217. if (!did_something) { /* wait for something to do */
  1218. rc = wait_event_interruptible_exclusive(
  1219. sched->kss_waitq,
  1220. !ksocknal_sched_cansleep(sched));
  1221. LASSERT (rc == 0);
  1222. } else {
  1223. cond_resched();
  1224. }
  1225. spin_lock_bh(&sched->kss_lock);
  1226. }
  1227. }
  1228. spin_unlock_bh(&sched->kss_lock);
  1229. ksocknal_thread_fini();
  1230. return 0;
  1231. }
  1232. /*
  1233. * Add connection to kss_rx_conns of scheduler
  1234. * and wakeup the scheduler.
  1235. */
  1236. void ksocknal_read_callback (ksock_conn_t *conn)
  1237. {
  1238. ksock_sched_t *sched;
  1239. sched = conn->ksnc_scheduler;
  1240. spin_lock_bh(&sched->kss_lock);
  1241. conn->ksnc_rx_ready = 1;
  1242. if (!conn->ksnc_rx_scheduled) { /* not being progressed */
  1243. list_add_tail(&conn->ksnc_rx_list,
  1244. &sched->kss_rx_conns);
  1245. conn->ksnc_rx_scheduled = 1;
  1246. /* extra ref for scheduler */
  1247. ksocknal_conn_addref(conn);
  1248. wake_up (&sched->kss_waitq);
  1249. }
  1250. spin_unlock_bh(&sched->kss_lock);
  1251. }
  1252. /*
  1253. * Add connection to kss_tx_conns of scheduler
  1254. * and wakeup the scheduler.
  1255. */
  1256. void ksocknal_write_callback (ksock_conn_t *conn)
  1257. {
  1258. ksock_sched_t *sched;
  1259. sched = conn->ksnc_scheduler;
  1260. spin_lock_bh(&sched->kss_lock);
  1261. conn->ksnc_tx_ready = 1;
  1262. if (!conn->ksnc_tx_scheduled && /* not being progressed */
  1263. !list_empty(&conn->ksnc_tx_queue)) { /* packets to send */
  1264. list_add_tail (&conn->ksnc_tx_list,
  1265. &sched->kss_tx_conns);
  1266. conn->ksnc_tx_scheduled = 1;
  1267. /* extra ref for scheduler */
  1268. ksocknal_conn_addref(conn);
  1269. wake_up (&sched->kss_waitq);
  1270. }
  1271. spin_unlock_bh(&sched->kss_lock);
  1272. }
  1273. static ksock_proto_t *
  1274. ksocknal_parse_proto_version (ksock_hello_msg_t *hello)
  1275. {
  1276. __u32 version = 0;
  1277. if (hello->kshm_magic == LNET_PROTO_MAGIC)
  1278. version = hello->kshm_version;
  1279. else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
  1280. version = __swab32(hello->kshm_version);
  1281. if (version != 0) {
  1282. #if SOCKNAL_VERSION_DEBUG
  1283. if (*ksocknal_tunables.ksnd_protocol == 1)
  1284. return NULL;
  1285. if (*ksocknal_tunables.ksnd_protocol == 2 &&
  1286. version == KSOCK_PROTO_V3)
  1287. return NULL;
  1288. #endif
  1289. if (version == KSOCK_PROTO_V2)
  1290. return &ksocknal_protocol_v2x;
  1291. if (version == KSOCK_PROTO_V3)
  1292. return &ksocknal_protocol_v3x;
  1293. return NULL;
  1294. }
  1295. if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
  1296. lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello;
  1297. CLASSERT(sizeof (lnet_magicversion_t) ==
  1298. offsetof (ksock_hello_msg_t, kshm_src_nid));
  1299. if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) &&
  1300. hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR))
  1301. return &ksocknal_protocol_v1x;
  1302. }
  1303. return NULL;
  1304. }
  1305. int
  1306. ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn,
  1307. lnet_nid_t peer_nid, ksock_hello_msg_t *hello)
  1308. {
  1309. /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
  1310. ksock_net_t *net = (ksock_net_t *)ni->ni_data;
  1311. LASSERT(hello->kshm_nips <= LNET_MAX_INTERFACES);
  1312. /* rely on caller to hold a ref on socket so it wouldn't disappear */
  1313. LASSERT(conn->ksnc_proto != NULL);
  1314. hello->kshm_src_nid = ni->ni_nid;
  1315. hello->kshm_dst_nid = peer_nid;
  1316. hello->kshm_src_pid = the_lnet.ln_pid;
  1317. hello->kshm_src_incarnation = net->ksnn_incarnation;
  1318. hello->kshm_ctype = conn->ksnc_type;
  1319. return conn->ksnc_proto->pro_send_hello(conn, hello);
  1320. }
  1321. static int
  1322. ksocknal_invert_type(int type)
  1323. {
  1324. switch (type) {
  1325. case SOCKLND_CONN_ANY:
  1326. case SOCKLND_CONN_CONTROL:
  1327. return type;
  1328. case SOCKLND_CONN_BULK_IN:
  1329. return SOCKLND_CONN_BULK_OUT;
  1330. case SOCKLND_CONN_BULK_OUT:
  1331. return SOCKLND_CONN_BULK_IN;
  1332. default:
  1333. return SOCKLND_CONN_NONE;
  1334. }
  1335. }
  1336. int
  1337. ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn,
  1338. ksock_hello_msg_t *hello, lnet_process_id_t *peerid,
  1339. __u64 *incarnation)
  1340. {
  1341. /* Return < 0 fatal error
  1342. * 0 success
  1343. * EALREADY lost connection race
  1344. * EPROTO protocol version mismatch
  1345. */
  1346. struct socket *sock = conn->ksnc_sock;
  1347. int active = (conn->ksnc_proto != NULL);
  1348. int timeout;
  1349. int proto_match;
  1350. int rc;
  1351. ksock_proto_t *proto;
  1352. lnet_process_id_t recv_id;
  1353. /* socket type set on active connections - not set on passive */
  1354. LASSERT(!active == !(conn->ksnc_type != SOCKLND_CONN_NONE));
  1355. timeout = active ? *ksocknal_tunables.ksnd_timeout :
  1356. lnet_acceptor_timeout();
  1357. rc = lnet_sock_read(sock, &hello->kshm_magic, sizeof (hello->kshm_magic), timeout);
  1358. if (rc != 0) {
  1359. CERROR("Error %d reading HELLO from %pI4h\n",
  1360. rc, &conn->ksnc_ipaddr);
  1361. LASSERT (rc < 0);
  1362. return rc;
  1363. }
  1364. if (hello->kshm_magic != LNET_PROTO_MAGIC &&
  1365. hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) &&
  1366. hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) {
  1367. /* Unexpected magic! */
  1368. CERROR("Bad magic(1) %#08x (%#08x expected) from %pI4h\n",
  1369. __cpu_to_le32 (hello->kshm_magic),
  1370. LNET_PROTO_TCP_MAGIC,
  1371. &conn->ksnc_ipaddr);
  1372. return -EPROTO;
  1373. }
  1374. rc = lnet_sock_read(sock, &hello->kshm_version,
  1375. sizeof(hello->kshm_version), timeout);
  1376. if (rc != 0) {
  1377. CERROR("Error %d reading HELLO from %pI4h\n",
  1378. rc, &conn->ksnc_ipaddr);
  1379. LASSERT(rc < 0);
  1380. return rc;
  1381. }
  1382. proto = ksocknal_parse_proto_version(hello);
  1383. if (proto == NULL) {
  1384. if (!active) {
  1385. /* unknown protocol from peer, tell peer my protocol */
  1386. conn->ksnc_proto = &ksocknal_protocol_v3x;
  1387. #if SOCKNAL_VERSION_DEBUG
  1388. if (*ksocknal_tunables.ksnd_protocol == 2)
  1389. conn->ksnc_proto = &ksocknal_protocol_v2x;
  1390. else if (*ksocknal_tunables.ksnd_protocol == 1)
  1391. conn->ksnc_proto = &ksocknal_protocol_v1x;
  1392. #endif
  1393. hello->kshm_nips = 0;
  1394. ksocknal_send_hello(ni, conn, ni->ni_nid, hello);
  1395. }
  1396. CERROR("Unknown protocol version (%d.x expected) from %pI4h\n",
  1397. conn->ksnc_proto->pro_version,
  1398. &conn->ksnc_ipaddr);
  1399. return -EPROTO;
  1400. }
  1401. proto_match = (conn->ksnc_proto == proto);
  1402. conn->ksnc_proto = proto;
  1403. /* receive the rest of hello message anyway */
  1404. rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout);
  1405. if (rc != 0) {
  1406. CERROR("Error %d reading or checking hello from from %pI4h\n",
  1407. rc, &conn->ksnc_ipaddr);
  1408. LASSERT(rc < 0);
  1409. return rc;
  1410. }
  1411. *incarnation = hello->kshm_src_incarnation;
  1412. if (hello->kshm_src_nid == LNET_NID_ANY) {
  1413. CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY from %pI4h\n",
  1414. &conn->ksnc_ipaddr);
  1415. return -EPROTO;
  1416. }
  1417. if (!active &&
  1418. conn->ksnc_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
  1419. /* Userspace NAL assigns peer process ID from socket */
  1420. recv_id.pid = conn->ksnc_port | LNET_PID_USERFLAG;
  1421. recv_id.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), conn->ksnc_ipaddr);
  1422. } else {
  1423. recv_id.nid = hello->kshm_src_nid;
  1424. recv_id.pid = hello->kshm_src_pid;
  1425. }
  1426. if (!active) {
  1427. *peerid = recv_id;
  1428. /* peer determines type */
  1429. conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype);
  1430. if (conn->ksnc_type == SOCKLND_CONN_NONE) {
  1431. CERROR("Unexpected type %d from %s ip %pI4h\n",
  1432. hello->kshm_ctype, libcfs_id2str(*peerid),
  1433. &conn->ksnc_ipaddr);
  1434. return -EPROTO;
  1435. }
  1436. return 0;
  1437. }
  1438. if (peerid->pid != recv_id.pid ||
  1439. peerid->nid != recv_id.nid) {
  1440. LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host %pI4h, but they claimed they were %s; please check your Lustre configuration.\n",
  1441. libcfs_id2str(*peerid),
  1442. &conn->ksnc_ipaddr,
  1443. libcfs_id2str(recv_id));
  1444. return -EPROTO;
  1445. }
  1446. if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
  1447. /* Possible protocol mismatch or I lost the connection race */
  1448. return proto_match ? EALREADY : EPROTO;
  1449. }
  1450. if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) {
  1451. CERROR("Mismatched types: me %d, %s ip %pI4h %d\n",
  1452. conn->ksnc_type, libcfs_id2str(*peerid),
  1453. &conn->ksnc_ipaddr,
  1454. hello->kshm_ctype);
  1455. return -EPROTO;
  1456. }
  1457. return 0;
  1458. }
  1459. static int
  1460. ksocknal_connect (ksock_route_t *route)
  1461. {
  1462. LIST_HEAD(zombies);
  1463. ksock_peer_t *peer = route->ksnr_peer;
  1464. int type;
  1465. int wanted;
  1466. struct socket *sock;
  1467. unsigned long deadline;
  1468. int retry_later = 0;
  1469. int rc = 0;
  1470. deadline = cfs_time_add(cfs_time_current(),
  1471. cfs_time_seconds(*ksocknal_tunables.ksnd_timeout));
  1472. write_lock_bh(&ksocknal_data.ksnd_global_lock);
  1473. LASSERT(route->ksnr_scheduled);
  1474. LASSERT(!route->ksnr_connecting);
  1475. route->ksnr_connecting = 1;
  1476. for (;;) {
  1477. wanted = ksocknal_route_mask() & ~route->ksnr_connected;
  1478. /* stop connecting if peer/route got closed under me, or
  1479. * route got connected while queued */
  1480. if (peer->ksnp_closing || route->ksnr_deleted ||
  1481. wanted == 0) {
  1482. retry_later = 0;
  1483. break;
  1484. }
  1485. /* reschedule if peer is connecting to me */
  1486. if (peer->ksnp_accepting > 0) {
  1487. CDEBUG(D_NET,
  1488. "peer %s(%d) already connecting to me, retry later.\n",
  1489. libcfs_nid2str(peer->ksnp_id.nid), peer->ksnp_accepting);
  1490. retry_later = 1;
  1491. }
  1492. if (retry_later) /* needs reschedule */
  1493. break;
  1494. if ((wanted & (1 << SOCKLND_CONN_ANY)) != 0) {
  1495. type = SOCKLND_CONN_ANY;
  1496. } else if ((wanted & (1 << SOCKLND_CONN_CONTROL)) != 0) {
  1497. type = SOCKLND_CONN_CONTROL;
  1498. } else if ((wanted & (1 << SOCKLND_CONN_BULK_IN)) != 0) {
  1499. type = SOCKLND_CONN_BULK_IN;
  1500. } else {
  1501. LASSERT ((wanted & (1 << SOCKLND_CONN_BULK_OUT)) != 0);
  1502. type = SOCKLND_CONN_BULK_OUT;
  1503. }
  1504. write_unlock_bh(&ksocknal_data.ksnd_global_lock);
  1505. if (cfs_time_aftereq(cfs_time_current(), deadline)) {
  1506. rc = -ETIMEDOUT;
  1507. lnet_connect_console_error(rc, peer->ksnp_id.nid,
  1508. route->ksnr_ipaddr,
  1509. route->ksnr_port);
  1510. goto failed;
  1511. }
  1512. rc = lnet_connect(&sock, peer->ksnp_id.nid,
  1513. route->ksnr_myipaddr,
  1514. route->ksnr_ipaddr, route->ksnr_port);
  1515. if (rc != 0)
  1516. goto failed;
  1517. rc = ksocknal_create_conn(peer->ksnp_ni, route, sock, type);
  1518. if (rc < 0) {
  1519. lnet_connect_console_error(rc, peer->ksnp_id.nid,
  1520. route->ksnr_ipaddr,
  1521. route->ksnr_port);
  1522. goto failed;
  1523. }
  1524. /* A +ve RC means I have to retry because I lost the connection
  1525. * race or I have to renegotiate protocol version */
  1526. retry_later = (rc != 0);
  1527. if (retry_later)
  1528. CDEBUG(D_NET, "peer %s: conn race, retry later.\n",
  1529. libcfs_nid2str(peer->ksnp_id.nid));
  1530. write_lock_bh(&ksocknal_data.ksnd_global_lock);
  1531. }
  1532. route->ksnr_scheduled = 0;
  1533. route->ksnr_connecting = 0;
  1534. if (retry_later) {
  1535. /* re-queue for attention; this frees me up to handle
  1536. * the peer's incoming connection request */
  1537. if (rc == EALREADY ||
  1538. (rc == 0 && peer->ksnp_accepting > 0)) {
  1539. /* We want to introduce a delay before next
  1540. * attempt to connect if we lost conn race,
  1541. * but the race is resolved quickly usually,
  1542. * so min_reconnectms should be good heuristic */
  1543. route->ksnr_retry_interval =
  1544. cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000;
  1545. route->ksnr_timeout = cfs_time_add(cfs_time_current(),
  1546. route->ksnr_retry_interval);
  1547. }
  1548. ksocknal_launch_connection_locked(route);
  1549. }
  1550. write_unlock_bh(&ksocknal_data.ksnd_global_lock);
  1551. return retry_later;
  1552. failed:
  1553. write_lock_bh(&ksocknal_data.ksnd_global_lock);
  1554. route->ksnr_scheduled = 0;
  1555. route->ksnr_connecting = 0;
  1556. /* This is a retry rather than a new connection */
  1557. route->ksnr_retry_interval *= 2;
  1558. route->ksnr_retry_interval =
  1559. max(route->ksnr_retry_interval,
  1560. cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000);
  1561. route->ksnr_retry_interval =
  1562. min(route->ksnr_retry_interval,
  1563. cfs_time_seconds(*ksocknal_tunables.ksnd_max_reconnectms)/1000);
  1564. LASSERT (route->ksnr_retry_interval != 0);
  1565. route->ksnr_timeout = cfs_time_add(cfs_time_current(),
  1566. route->ksnr_retry_interval);
  1567. if (!list_empty(&peer->ksnp_tx_queue) &&
  1568. peer->ksnp_accepting == 0 &&
  1569. ksocknal_find_connecting_route_locked(peer) == NULL) {
  1570. ksock_conn_t *conn;
  1571. /* ksnp_tx_queue is queued on a conn on successful
  1572. * connection for V1.x and V2.x */
  1573. if (!list_empty (&peer->ksnp_conns)) {
  1574. conn = list_entry(peer->ksnp_conns.next,
  1575. ksock_conn_t, ksnc_list);
  1576. LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x);
  1577. }
  1578. /* take all the blocked packets while I've got the lock and
  1579. * complete below... */
  1580. list_splice_init(&peer->ksnp_tx_queue, &zombies);
  1581. }
  1582. #if 0 /* irrelevant with only eager routes */
  1583. if (!route->ksnr_deleted) {
  1584. /* make this route least-favourite for re-selection */
  1585. list_del(&route->ksnr_list);
  1586. list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
  1587. }
  1588. #endif
  1589. write_unlock_bh(&ksocknal_data.ksnd_global_lock);
  1590. ksocknal_peer_failed(peer);
  1591. ksocknal_txlist_done(peer->ksnp_ni, &zombies, 1);
  1592. return 0;
  1593. }
  1594. /*
  1595. * check whether we need to create more connds.
  1596. * It will try to create new thread if it's necessary, @timeout can
  1597. * be updated if failed to create, so caller wouldn't keep try while
  1598. * running out of resource.
  1599. */
  1600. static int
  1601. ksocknal_connd_check_start(time64_t sec, long *timeout)
  1602. {
  1603. char name[16];
  1604. int rc;
  1605. int total = ksocknal_data.ksnd_connd_starting +
  1606. ksocknal_data.ksnd_connd_running;
  1607. if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
  1608. /* still in initializing */
  1609. return 0;
  1610. }
  1611. if (total >= *ksocknal_tunables.ksnd_nconnds_max ||
  1612. total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) {
  1613. /* can't create more connd, or still have enough
  1614. * threads to handle more connecting */
  1615. return 0;
  1616. }
  1617. if (list_empty(&ksocknal_data.ksnd_connd_routes)) {
  1618. /* no pending connecting request */
  1619. return 0;
  1620. }
  1621. if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) {
  1622. /* may run out of resource, retry later */
  1623. *timeout = cfs_time_seconds(1);
  1624. return 0;
  1625. }
  1626. if (ksocknal_data.ksnd_connd_starting > 0) {
  1627. /* serialize starting to avoid flood */
  1628. return 0;
  1629. }
  1630. ksocknal_data.ksnd_connd_starting_stamp = sec;
  1631. ksocknal_data.ksnd_connd_starting++;
  1632. spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
  1633. /* NB: total is the next id */
  1634. snprintf(name, sizeof(name), "socknal_cd%02d", total);
  1635. rc = ksocknal_thread_start(ksocknal_connd, NULL, name);
  1636. spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
  1637. if (rc == 0)
  1638. return 1;
  1639. /* we tried ... */
  1640. LASSERT(ksocknal_data.ksnd_connd_starting > 0);
  1641. ksocknal_data.ksnd_connd_starting--;
  1642. ksocknal_data.ksnd_connd_failed_stamp = ktime_get_real_seconds();
  1643. return 1;
  1644. }
  1645. /*
  1646. * check whether current thread can exit, it will return 1 if there are too
  1647. * many threads and no creating in past 120 seconds.
  1648. * Also, this function may update @timeout to make caller come back
  1649. * again to recheck these conditions.
  1650. */
  1651. static int
  1652. ksocknal_connd_check_stop(time64_t sec, long *timeout)
  1653. {
  1654. int val;
  1655. if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
  1656. /* still in initializing */
  1657. return 0;
  1658. }
  1659. if (ksocknal_data.ksnd_connd_starting > 0) {
  1660. /* in progress of starting new thread */
  1661. return 0;
  1662. }
  1663. if (ksocknal_data.ksnd_connd_running <=
  1664. *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */
  1665. return 0;
  1666. }
  1667. /* created thread in past 120 seconds? */
  1668. val = (int)(ksocknal_data.ksnd_connd_starting_stamp +
  1669. SOCKNAL_CONND_TIMEOUT - sec);
  1670. *timeout = (val > 0) ? cfs_time_seconds(val) :
  1671. cfs_time_seconds(SOCKNAL_CONND_TIMEOUT);
  1672. if (val > 0)
  1673. return 0;
  1674. /* no creating in past 120 seconds */
  1675. return ksocknal_data.ksnd_connd_running >
  1676. ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV;
  1677. }
  1678. /* Go through connd_routes queue looking for a route that we can process
  1679. * right now, @timeout_p can be updated if we need to come back later */
  1680. static ksock_route_t *
  1681. ksocknal_connd_get_route_locked(signed long *timeout_p)
  1682. {
  1683. ksock_route_t *route;
  1684. unsigned long now;
  1685. now = cfs_time_current();
  1686. /* connd_routes can contain both pending and ordinary routes */
  1687. list_for_each_entry (route, &ksocknal_data.ksnd_connd_routes,
  1688. ksnr_connd_list) {
  1689. if (route->ksnr_retry_interval == 0 ||
  1690. cfs_time_aftereq(now, route->ksnr_timeout))
  1691. return route;
  1692. if (*timeout_p == MAX_SCHEDULE_TIMEOUT ||
  1693. (int)*timeout_p > (int)(route->ksnr_timeout - now))
  1694. *timeout_p = (int)(route->ksnr_timeout - now);
  1695. }
  1696. return NULL;
  1697. }
  1698. int
  1699. ksocknal_connd (void *arg)
  1700. {
  1701. spinlock_t *connd_lock = &ksocknal_data.ksnd_connd_lock;
  1702. ksock_connreq_t *cr;
  1703. wait_queue_t wait;
  1704. int nloops = 0;
  1705. int cons_retry = 0;
  1706. cfs_block_allsigs();
  1707. init_waitqueue_entry(&wait, current);
  1708. spin_lock_bh(connd_lock);
  1709. LASSERT(ksocknal_data.ksnd_connd_starting > 0);
  1710. ksocknal_data.ksnd_connd_starting--;
  1711. ksocknal_data.ksnd_connd_running++;
  1712. while (!ksocknal_data.ksnd_shuttingdown) {
  1713. ksock_route_t *route = NULL;
  1714. time64_t sec = ktime_get_real_seconds();
  1715. long timeout = MAX_SCHEDULE_TIMEOUT;
  1716. int dropped_lock = 0;
  1717. if (ksocknal_connd_check_stop(sec, &timeout)) {
  1718. /* wakeup another one to check stop */
  1719. wake_up(&ksocknal_data.ksnd_connd_waitq);
  1720. break;
  1721. }
  1722. if (ksocknal_connd_check_start(sec, &timeout)) {
  1723. /* created new thread */
  1724. dropped_lock = 1;
  1725. }
  1726. if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
  1727. /* Connection accepted by the listener */
  1728. cr = list_entry(ksocknal_data.ksnd_connd_connreqs. \
  1729. next, ksock_connreq_t, ksncr_list);
  1730. list_del(&cr->ksncr_list);
  1731. spin_unlock_bh(connd_lock);
  1732. dropped_lock = 1;
  1733. ksocknal_create_conn(cr->ksncr_ni, NULL,
  1734. cr->ksncr_sock, SOCKLND_CONN_NONE);
  1735. lnet_ni_decref(cr->ksncr_ni);
  1736. LIBCFS_FREE(cr, sizeof(*cr));
  1737. spin_lock_bh(connd_lock);
  1738. }
  1739. /* Only handle an outgoing connection request if there
  1740. * is a thread left to handle incoming connections and
  1741. * create new connd */
  1742. if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV <
  1743. ksocknal_data.ksnd_connd_running) {
  1744. route = ksocknal_connd_get_route_locked(&timeout);
  1745. }
  1746. if (route != NULL) {
  1747. list_del (&route->ksnr_connd_list);
  1748. ksocknal_data.ksnd_connd_connecting++;
  1749. spin_unlock_bh(connd_lock);
  1750. dropped_lock = 1;
  1751. if (ksocknal_connect(route)) {
  1752. /* consecutive retry */
  1753. if (cons_retry++ > SOCKNAL_INSANITY_RECONN) {
  1754. CWARN("massive consecutive re-connecting to %pI4h\n",
  1755. &route->ksnr_ipaddr);
  1756. cons_retry = 0;
  1757. }
  1758. } else {
  1759. cons_retry = 0;
  1760. }
  1761. ksocknal_route_decref(route);
  1762. spin_lock_bh(connd_lock);
  1763. ksocknal_data.ksnd_connd_connecting--;
  1764. }
  1765. if (dropped_lock) {
  1766. if (++nloops < SOCKNAL_RESCHED)
  1767. continue;
  1768. spin_unlock_bh(connd_lock);
  1769. nloops = 0;
  1770. cond_resched();
  1771. spin_lock_bh(connd_lock);
  1772. continue;
  1773. }
  1774. /* Nothing to do for 'timeout' */
  1775. set_current_state(TASK_INTERRUPTIBLE);
  1776. add_wait_queue_exclusive(&ksocknal_data.ksnd_connd_waitq, &wait);
  1777. spin_unlock_bh(connd_lock);
  1778. nloops = 0;
  1779. schedule_timeout(timeout);
  1780. remove_wait_queue(&ksocknal_data.ksnd_connd_waitq, &wait);
  1781. spin_lock_bh(connd_lock);
  1782. }
  1783. ksocknal_data.ksnd_connd_running--;
  1784. spin_unlock_bh(connd_lock);
  1785. ksocknal_thread_fini();
  1786. return 0;
  1787. }
  1788. static ksock_conn_t *
  1789. ksocknal_find_timed_out_conn (ksock_peer_t *peer)
  1790. {
  1791. /* We're called with a shared lock on ksnd_global_lock */
  1792. ksock_conn_t *conn;
  1793. struct list_head *ctmp;
  1794. list_for_each (ctmp, &peer->ksnp_conns) {
  1795. int error;
  1796. conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
  1797. /* Don't need the {get,put}connsock dance to deref ksnc_sock */
  1798. LASSERT(!conn->ksnc_closing);
  1799. /* SOCK_ERROR will reset error code of socket in
  1800. * some platform (like Darwin8.x) */
  1801. error = conn->ksnc_sock->sk->sk_err;
  1802. if (error != 0) {
  1803. ksocknal_conn_addref(conn);
  1804. switch (error) {
  1805. case ECONNRESET:
  1806. CNETERR("A connection with %s (%pI4h:%d) was reset; it may have rebooted.\n",
  1807. libcfs_id2str(peer->ksnp_id),
  1808. &conn->ksnc_ipaddr,
  1809. conn->ksnc_port);
  1810. break;
  1811. case ETIMEDOUT:
  1812. CNETERR("A connection with %s (%pI4h:%d) timed out; the network or node may be down.\n",
  1813. libcfs_id2str(peer->ksnp_id),
  1814. &conn->ksnc_ipaddr,
  1815. conn->ksnc_port);
  1816. break;
  1817. default:
  1818. CNETERR("An unexpected network error %d occurred with %s (%pI4h:%d\n",
  1819. error,
  1820. libcfs_id2str(peer->ksnp_id),
  1821. &conn->ksnc_ipaddr,
  1822. conn->ksnc_port);
  1823. break;
  1824. }
  1825. return conn;
  1826. }
  1827. if (conn->ksnc_rx_started &&
  1828. cfs_time_aftereq(cfs_time_current(),
  1829. conn->ksnc_rx_deadline)) {
  1830. /* Timed out incomplete incoming message */
  1831. ksocknal_conn_addref(conn);
  1832. CNETERR("Timeout receiving from %s (%pI4h:%d), state %d wanted %d left %d\n",
  1833. libcfs_id2str(peer->ksnp_id),
  1834. &conn->ksnc_ipaddr,
  1835. conn->ksnc_port,
  1836. conn->ksnc_rx_state,
  1837. conn->ksnc_rx_nob_wanted,
  1838. conn->ksnc_rx_nob_left);
  1839. return conn;
  1840. }
  1841. if ((!list_empty(&conn->ksnc_tx_queue) ||
  1842. conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
  1843. cfs_time_aftereq(cfs_time_current(),
  1844. conn->ksnc_tx_deadline)) {
  1845. /* Timed out messages queued for sending or
  1846. * buffered in the socket's send buffer */
  1847. ksocknal_conn_addref(conn);
  1848. CNETERR("Timeout sending data to %s (%pI4h:%d) the network or that node may be down.\n",
  1849. libcfs_id2str(peer->ksnp_id),
  1850. &conn->ksnc_ipaddr,
  1851. conn->ksnc_port);
  1852. return conn;
  1853. }
  1854. }
  1855. return NULL;
  1856. }
  1857. static inline void
  1858. ksocknal_flush_stale_txs(ksock_peer_t *peer)
  1859. {
  1860. ksock_tx_t *tx;
  1861. LIST_HEAD(stale_txs);
  1862. write_lock_bh(&ksocknal_data.ksnd_global_lock);
  1863. while (!list_empty (&peer->ksnp_tx_queue)) {
  1864. tx = list_entry (peer->ksnp_tx_queue.next,
  1865. ksock_tx_t, tx_list);
  1866. if (!cfs_time_aftereq(cfs_time_current(),
  1867. tx->tx_deadline))
  1868. break;
  1869. list_del (&tx->tx_list);
  1870. list_add_tail (&tx->tx_list, &stale_txs);
  1871. }
  1872. write_unlock_bh(&ksocknal_data.ksnd_global_lock);
  1873. ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1);
  1874. }
  1875. static int
  1876. ksocknal_send_keepalive_locked(ksock_peer_t *peer)
  1877. {
  1878. ksock_sched_t *sched;
  1879. ksock_conn_t *conn;
  1880. ksock_tx_t *tx;
  1881. if (list_empty(&peer->ksnp_conns)) /* last_alive will be updated by create_conn */
  1882. return 0;
  1883. if (peer->ksnp_proto != &ksocknal_protocol_v3x)
  1884. return 0;
  1885. if (*ksocknal_tunables.ksnd_keepalive <= 0 ||
  1886. time_before(cfs_time_current(),
  1887. cfs_time_add(peer->ksnp_last_alive,
  1888. cfs_time_seconds(*ksocknal_tunables.ksnd_keepalive))))
  1889. return 0;
  1890. if (time_before(cfs_time_current(), peer->ksnp_send_keepalive))
  1891. return 0;
  1892. /* retry 10 secs later, so we wouldn't put pressure
  1893. * on this peer if we failed to send keepalive this time */
  1894. peer->ksnp_send_keepalive = cfs_time_shift(10);
  1895. conn = ksocknal_find_conn_locked(peer, NULL, 1);
  1896. if (conn != NULL) {
  1897. sched = conn->ksnc_scheduler;
  1898. spin_lock_bh(&sched->kss_lock);
  1899. if (!list_empty(&conn->ksnc_tx_queue)) {
  1900. spin_unlock_bh(&sched->kss_lock);
  1901. /* there is an queued ACK, don't need keepalive */
  1902. return 0;
  1903. }
  1904. spin_unlock_bh(&sched->kss_lock);
  1905. }
  1906. read_unlock(&ksocknal_data.ksnd_global_lock);
  1907. /* cookie = 1 is reserved for keepalive PING */
  1908. tx = ksocknal_alloc_tx_noop(1, 1);
  1909. if (tx == NULL) {
  1910. read_lock(&ksocknal_data.ksnd_global_lock);
  1911. return -ENOMEM;
  1912. }
  1913. if (ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id) == 0) {
  1914. read_lock(&ksocknal_data.ksnd_global_lock);
  1915. return 1;
  1916. }
  1917. ksocknal_free_tx(tx);
  1918. read_lock(&ksocknal_data.ksnd_global_lock);
  1919. return -EIO;
  1920. }
  1921. static void
  1922. ksocknal_check_peer_timeouts (int idx)
  1923. {
  1924. struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
  1925. ksock_peer_t *peer;
  1926. ksock_conn_t *conn;
  1927. ksock_tx_t *tx;
  1928. again:
  1929. /* NB. We expect to have a look at all the peers and not find any
  1930. * connections to time out, so we just use a shared lock while we
  1931. * take a look... */
  1932. read_lock(&ksocknal_data.ksnd_global_lock);
  1933. list_for_each_entry(peer, peers, ksnp_list) {
  1934. unsigned long deadline = 0;
  1935. int resid = 0;
  1936. int n = 0;
  1937. if (ksocknal_send_keepalive_locked(peer) != 0) {
  1938. read_unlock(&ksocknal_data.ksnd_global_lock);
  1939. goto again;
  1940. }
  1941. conn = ksocknal_find_timed_out_conn (peer);
  1942. if (conn != NULL) {
  1943. read_unlock(&ksocknal_data.ksnd_global_lock);
  1944. ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
  1945. /* NB we won't find this one again, but we can't
  1946. * just proceed with the next peer, since we dropped
  1947. * ksnd_global_lock and it might be dead already! */
  1948. ksocknal_conn_decref(conn);
  1949. goto again;
  1950. }
  1951. /* we can't process stale txs right here because we're
  1952. * holding only shared lock */
  1953. if (!list_empty (&peer->ksnp_tx_queue)) {
  1954. ksock_tx_t *tx =
  1955. list_entry (peer->ksnp_tx_queue.next,
  1956. ksock_tx_t, tx_list);
  1957. if (cfs_time_aftereq(cfs_time_current(),
  1958. tx->tx_deadline)) {
  1959. ksocknal_peer_addref(peer);
  1960. read_unlock(&ksocknal_data.ksnd_global_lock);
  1961. ksocknal_flush_stale_txs(peer);
  1962. ksocknal_peer_decref(peer);
  1963. goto again;
  1964. }
  1965. }
  1966. if (list_empty(&peer->ksnp_zc_req_list))
  1967. continue;
  1968. spin_lock(&peer->ksnp_lock);
  1969. list_for_each_entry(tx, &peer->ksnp_zc_req_list, tx_zc_list) {
  1970. if (!cfs_time_aftereq(cfs_time_current(),
  1971. tx->tx_deadline))
  1972. break;
  1973. /* ignore the TX if connection is being closed */
  1974. if (tx->tx_conn->ksnc_closing)
  1975. continue;
  1976. n++;
  1977. }
  1978. if (n == 0) {
  1979. spin_unlock(&peer->ksnp_lock);
  1980. continue;
  1981. }
  1982. tx = list_entry(peer->ksnp_zc_req_list.next,
  1983. ksock_tx_t, tx_zc_list);
  1984. deadline = tx->tx_deadline;
  1985. resid = tx->tx_resid;
  1986. conn = tx->tx_conn;
  1987. ksocknal_conn_addref(conn);
  1988. spin_unlock(&peer->ksnp_lock);
  1989. read_unlock(&ksocknal_data.ksnd_global_lock);
  1990. CERROR("Total %d stale ZC_REQs for peer %s detected; the oldest(%p) timed out %ld secs ago, resid: %d, wmem: %d\n",
  1991. n, libcfs_nid2str(peer->ksnp_id.nid), tx,
  1992. cfs_duration_sec(cfs_time_current() - deadline),
  1993. resid, conn->ksnc_sock->sk->sk_wmem_queued);
  1994. ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
  1995. ksocknal_conn_decref(conn);
  1996. goto again;
  1997. }
  1998. read_unlock(&ksocknal_data.ksnd_global_lock);
  1999. }
  2000. int
  2001. ksocknal_reaper (void *arg)
  2002. {
  2003. wait_queue_t wait;
  2004. ksock_conn_t *conn;
  2005. ksock_sched_t *sched;
  2006. struct list_head enomem_conns;
  2007. int nenomem_conns;
  2008. long timeout;
  2009. int i;
  2010. int peer_index = 0;
  2011. unsigned long deadline = cfs_time_current();
  2012. cfs_block_allsigs();
  2013. INIT_LIST_HEAD(&enomem_conns);
  2014. init_waitqueue_entry(&wait, current);
  2015. spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
  2016. while (!ksocknal_data.ksnd_shuttingdown) {
  2017. if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
  2018. conn = list_entry (ksocknal_data. \
  2019. ksnd_deathrow_conns.next,
  2020. ksock_conn_t, ksnc_list);
  2021. list_del (&conn->ksnc_list);
  2022. spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
  2023. ksocknal_terminate_conn(conn);
  2024. ksocknal_conn_decref(conn);
  2025. spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
  2026. continue;
  2027. }
  2028. if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
  2029. conn = list_entry (ksocknal_data.ksnd_zombie_conns.\
  2030. next, ksock_conn_t, ksnc_list);
  2031. list_del (&conn->ksnc_list);
  2032. spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
  2033. ksocknal_destroy_conn(conn);
  2034. spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
  2035. continue;
  2036. }
  2037. if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
  2038. list_add(&enomem_conns,
  2039. &ksocknal_data.ksnd_enomem_conns);
  2040. list_del_init(&ksocknal_data.ksnd_enomem_conns);
  2041. }
  2042. spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
  2043. /* reschedule all the connections that stalled with ENOMEM... */
  2044. nenomem_conns = 0;
  2045. while (!list_empty (&enomem_conns)) {
  2046. conn = list_entry (enomem_conns.next,
  2047. ksock_conn_t, ksnc_tx_list);
  2048. list_del (&conn->ksnc_tx_list);
  2049. sched = conn->ksnc_scheduler;
  2050. spin_lock_bh(&sched->kss_lock);
  2051. LASSERT(conn->ksnc_tx_scheduled);
  2052. conn->ksnc_tx_ready = 1;
  2053. list_add_tail(&conn->ksnc_tx_list,
  2054. &sched->kss_tx_conns);
  2055. wake_up(&sched->kss_waitq);
  2056. spin_unlock_bh(&sched->kss_lock);
  2057. nenomem_conns++;
  2058. }
  2059. /* careful with the jiffy wrap... */
  2060. while ((timeout = cfs_time_sub(deadline,
  2061. cfs_time_current())) <= 0) {
  2062. const int n = 4;
  2063. const int p = 1;
  2064. int chunk = ksocknal_data.ksnd_peer_hash_size;
  2065. /* Time to check for timeouts on a few more peers: I do
  2066. * checks every 'p' seconds on a proportion of the peer
  2067. * table and I need to check every connection 'n' times
  2068. * within a timeout interval, to ensure I detect a
  2069. * timeout on any connection within (n+1)/n times the
  2070. * timeout interval. */
  2071. if (*ksocknal_tunables.ksnd_timeout > n * p)
  2072. chunk = (chunk * n * p) /
  2073. *ksocknal_tunables.ksnd_timeout;
  2074. if (chunk == 0)
  2075. chunk = 1;
  2076. for (i = 0; i < chunk; i++) {
  2077. ksocknal_check_peer_timeouts (peer_index);
  2078. peer_index = (peer_index + 1) %
  2079. ksocknal_data.ksnd_peer_hash_size;
  2080. }
  2081. deadline = cfs_time_add(deadline, cfs_time_seconds(p));
  2082. }
  2083. if (nenomem_conns != 0) {
  2084. /* Reduce my timeout if I rescheduled ENOMEM conns.
  2085. * This also prevents me getting woken immediately
  2086. * if any go back on my enomem list. */
  2087. timeout = SOCKNAL_ENOMEM_RETRY;
  2088. }
  2089. ksocknal_data.ksnd_reaper_waketime =
  2090. cfs_time_add(cfs_time_current(), timeout);
  2091. set_current_state (TASK_INTERRUPTIBLE);
  2092. add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
  2093. if (!ksocknal_data.ksnd_shuttingdown &&
  2094. list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
  2095. list_empty (&ksocknal_data.ksnd_zombie_conns))
  2096. schedule_timeout(timeout);
  2097. set_current_state (TASK_RUNNING);
  2098. remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
  2099. spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
  2100. }
  2101. spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
  2102. ksocknal_thread_fini();
  2103. return 0;
  2104. }