1
0

kcp.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. // Package kcp - A Fast and Reliable ARQ Protocol
  2. package kcp
  3. import (
  4. "encoding/binary"
  5. "sync/atomic"
  6. )
  7. const (
  8. IKCP_RTO_NDL = 30 // no delay min rto
  9. IKCP_RTO_MIN = 100 // normal min rto
  10. IKCP_RTO_DEF = 200
  11. IKCP_RTO_MAX = 60000
  12. IKCP_CMD_PUSH = 81 // cmd: push data
  13. IKCP_CMD_ACK = 82 // cmd: ack
  14. IKCP_CMD_WASK = 83 // cmd: window probe (ask)
  15. IKCP_CMD_WINS = 84 // cmd: window size (tell)
  16. IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
  17. IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
  18. IKCP_WND_SND = 32
  19. IKCP_WND_RCV = 32
  20. IKCP_MTU_DEF = 1400
  21. IKCP_ACK_FAST = 3
  22. IKCP_INTERVAL = 100
  23. IKCP_OVERHEAD = 24
  24. IKCP_DEADLINK = 20
  25. IKCP_THRESH_INIT = 2
  26. IKCP_THRESH_MIN = 2
  27. IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
  28. IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
  29. )
  30. // output_callback is a prototype which ought capture conn and call conn.Write
  31. type output_callback func(buf []byte, size int)
  32. /* encode 8 bits unsigned int */
  33. func ikcp_encode8u(p []byte, c byte) []byte {
  34. p[0] = c
  35. return p[1:]
  36. }
  37. /* decode 8 bits unsigned int */
  38. func ikcp_decode8u(p []byte, c *byte) []byte {
  39. *c = p[0]
  40. return p[1:]
  41. }
  42. /* encode 16 bits unsigned int (lsb) */
  43. func ikcp_encode16u(p []byte, w uint16) []byte {
  44. binary.LittleEndian.PutUint16(p, w)
  45. return p[2:]
  46. }
  47. /* decode 16 bits unsigned int (lsb) */
  48. func ikcp_decode16u(p []byte, w *uint16) []byte {
  49. *w = binary.LittleEndian.Uint16(p)
  50. return p[2:]
  51. }
  52. /* encode 32 bits unsigned int (lsb) */
  53. func ikcp_encode32u(p []byte, l uint32) []byte {
  54. binary.LittleEndian.PutUint32(p, l)
  55. return p[4:]
  56. }
  57. /* decode 32 bits unsigned int (lsb) */
  58. func ikcp_decode32u(p []byte, l *uint32) []byte {
  59. *l = binary.LittleEndian.Uint32(p)
  60. return p[4:]
  61. }
  62. func _imin_(a, b uint32) uint32 {
  63. if a <= b {
  64. return a
  65. }
  66. return b
  67. }
  68. func _imax_(a, b uint32) uint32 {
  69. if a >= b {
  70. return a
  71. }
  72. return b
  73. }
  74. func _ibound_(lower, middle, upper uint32) uint32 {
  75. return _imin_(_imax_(lower, middle), upper)
  76. }
  77. func _itimediff(later, earlier uint32) int32 {
  78. return (int32)(later - earlier)
  79. }
  80. // segment defines a KCP segment
  81. type segment struct {
  82. conv uint32
  83. cmd uint8
  84. frg uint8
  85. wnd uint16
  86. ts uint32
  87. sn uint32
  88. una uint32
  89. rto uint32
  90. xmit uint32
  91. resendts uint32
  92. fastack uint32
  93. data []byte
  94. }
  95. // encode a segment into buffer
  96. func (seg *segment) encode(ptr []byte) []byte {
  97. ptr = ikcp_encode32u(ptr, seg.conv)
  98. ptr = ikcp_encode8u(ptr, seg.cmd)
  99. ptr = ikcp_encode8u(ptr, seg.frg)
  100. ptr = ikcp_encode16u(ptr, seg.wnd)
  101. ptr = ikcp_encode32u(ptr, seg.ts)
  102. ptr = ikcp_encode32u(ptr, seg.sn)
  103. ptr = ikcp_encode32u(ptr, seg.una)
  104. ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
  105. atomic.AddUint64(&DefaultSnmp.OutSegs, 1)
  106. return ptr
  107. }
  108. // KCP defines a single KCP connection
  109. type KCP struct {
  110. conv, mtu, mss, state uint32
  111. snd_una, snd_nxt, rcv_nxt uint32
  112. ssthresh uint32
  113. rx_rttvar, rx_srtt int32
  114. rx_rto, rx_minrto uint32
  115. snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
  116. interval, ts_flush uint32
  117. nodelay, updated uint32
  118. ts_probe, probe_wait uint32
  119. dead_link, incr uint32
  120. fastresend int32
  121. nocwnd, stream int32
  122. snd_queue []segment
  123. rcv_queue []segment
  124. snd_buf []segment
  125. rcv_buf []segment
  126. acklist []ackItem
  127. buffer []byte
  128. output output_callback
  129. }
  130. type ackItem struct {
  131. sn uint32
  132. ts uint32
  133. }
  134. // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
  135. // from the same connection.
  136. func NewKCP(conv uint32, output output_callback) *KCP {
  137. kcp := new(KCP)
  138. kcp.conv = conv
  139. kcp.snd_wnd = IKCP_WND_SND
  140. kcp.rcv_wnd = IKCP_WND_RCV
  141. kcp.rmt_wnd = IKCP_WND_RCV
  142. kcp.mtu = IKCP_MTU_DEF
  143. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  144. kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
  145. kcp.rx_rto = IKCP_RTO_DEF
  146. kcp.rx_minrto = IKCP_RTO_MIN
  147. kcp.interval = IKCP_INTERVAL
  148. kcp.ts_flush = IKCP_INTERVAL
  149. kcp.ssthresh = IKCP_THRESH_INIT
  150. kcp.dead_link = IKCP_DEADLINK
  151. kcp.output = output
  152. return kcp
  153. }
  154. // newSegment creates a KCP segment
  155. func (kcp *KCP) newSegment(size int) (seg segment) {
  156. seg.data = xmitBuf.Get().([]byte)[:size]
  157. return
  158. }
  159. // delSegment recycles a KCP segment
  160. func (kcp *KCP) delSegment(seg segment) {
  161. xmitBuf.Put(seg.data)
  162. }
  163. // PeekSize checks the size of next message in the recv queue
  164. func (kcp *KCP) PeekSize() (length int) {
  165. if len(kcp.rcv_queue) == 0 {
  166. return -1
  167. }
  168. seg := &kcp.rcv_queue[0]
  169. if seg.frg == 0 {
  170. return len(seg.data)
  171. }
  172. if len(kcp.rcv_queue) < int(seg.frg+1) {
  173. return -1
  174. }
  175. for k := range kcp.rcv_queue {
  176. seg := &kcp.rcv_queue[k]
  177. length += len(seg.data)
  178. if seg.frg == 0 {
  179. break
  180. }
  181. }
  182. return
  183. }
  184. // Recv is user/upper level recv: returns size, returns below zero for EAGAIN
  185. func (kcp *KCP) Recv(buffer []byte) (n int) {
  186. if len(kcp.rcv_queue) == 0 {
  187. return -1
  188. }
  189. peeksize := kcp.PeekSize()
  190. if peeksize < 0 {
  191. return -2
  192. }
  193. if peeksize > len(buffer) {
  194. return -3
  195. }
  196. var fast_recover bool
  197. if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
  198. fast_recover = true
  199. }
  200. // merge fragment
  201. count := 0
  202. for k := range kcp.rcv_queue {
  203. seg := &kcp.rcv_queue[k]
  204. copy(buffer, seg.data)
  205. buffer = buffer[len(seg.data):]
  206. n += len(seg.data)
  207. count++
  208. kcp.delSegment(*seg)
  209. if seg.frg == 0 {
  210. break
  211. }
  212. }
  213. if count > 0 {
  214. kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
  215. }
  216. // move available data from rcv_buf -> rcv_queue
  217. count = 0
  218. for k := range kcp.rcv_buf {
  219. seg := &kcp.rcv_buf[k]
  220. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  221. kcp.rcv_nxt++
  222. count++
  223. } else {
  224. break
  225. }
  226. }
  227. if count > 0 {
  228. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  229. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  230. }
  231. // fast recover
  232. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
  233. // ready to send back IKCP_CMD_WINS in ikcp_flush
  234. // tell remote my window size
  235. kcp.probe |= IKCP_ASK_TELL
  236. }
  237. return
  238. }
  239. // Send is user/upper level send, returns below zero for error
  240. func (kcp *KCP) Send(buffer []byte) int {
  241. var count int
  242. if len(buffer) == 0 {
  243. return -1
  244. }
  245. // append to previous segment in streaming mode (if possible)
  246. if kcp.stream != 0 {
  247. n := len(kcp.snd_queue)
  248. if n > 0 {
  249. seg := &kcp.snd_queue[n-1]
  250. if len(seg.data) < int(kcp.mss) {
  251. capacity := int(kcp.mss) - len(seg.data)
  252. extend := capacity
  253. if len(buffer) < capacity {
  254. extend = len(buffer)
  255. }
  256. // grow slice, the underlying cap is guaranteed to
  257. // be larger than kcp.mss
  258. oldlen := len(seg.data)
  259. seg.data = seg.data[:oldlen+extend]
  260. copy(seg.data[oldlen:], buffer)
  261. buffer = buffer[extend:]
  262. }
  263. }
  264. if len(buffer) == 0 {
  265. return 0
  266. }
  267. }
  268. if len(buffer) <= int(kcp.mss) {
  269. count = 1
  270. } else {
  271. count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
  272. }
  273. if count > 255 {
  274. return -2
  275. }
  276. if count == 0 {
  277. count = 1
  278. }
  279. for i := 0; i < count; i++ {
  280. var size int
  281. if len(buffer) > int(kcp.mss) {
  282. size = int(kcp.mss)
  283. } else {
  284. size = len(buffer)
  285. }
  286. seg := kcp.newSegment(size)
  287. copy(seg.data, buffer[:size])
  288. if kcp.stream == 0 { // message mode
  289. seg.frg = uint8(count - i - 1)
  290. } else { // stream mode
  291. seg.frg = 0
  292. }
  293. kcp.snd_queue = append(kcp.snd_queue, seg)
  294. buffer = buffer[size:]
  295. }
  296. return 0
  297. }
  298. func (kcp *KCP) update_ack(rtt int32) {
  299. // https://tools.ietf.org/html/rfc6298
  300. var rto uint32
  301. if kcp.rx_srtt == 0 {
  302. kcp.rx_srtt = rtt
  303. kcp.rx_rttvar = rtt >> 1
  304. } else {
  305. delta := rtt - kcp.rx_srtt
  306. kcp.rx_srtt += delta >> 3
  307. if delta < 0 {
  308. delta = -delta
  309. }
  310. if rtt < kcp.rx_srtt-kcp.rx_rttvar {
  311. // if the new RTT sample is below the bottom of the range of
  312. // what an RTT measurement is expected to be.
  313. // give an 8x reduced weight versus its normal weighting
  314. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5
  315. } else {
  316. kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2
  317. }
  318. }
  319. rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2)
  320. kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
  321. }
  322. func (kcp *KCP) shrink_buf() {
  323. if len(kcp.snd_buf) > 0 {
  324. seg := &kcp.snd_buf[0]
  325. kcp.snd_una = seg.sn
  326. } else {
  327. kcp.snd_una = kcp.snd_nxt
  328. }
  329. }
  330. func (kcp *KCP) parse_ack(sn uint32) {
  331. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  332. return
  333. }
  334. for k := range kcp.snd_buf {
  335. seg := &kcp.snd_buf[k]
  336. if sn == seg.sn {
  337. kcp.delSegment(*seg)
  338. copy(kcp.snd_buf[k:], kcp.snd_buf[k+1:])
  339. kcp.snd_buf[len(kcp.snd_buf)-1] = segment{}
  340. kcp.snd_buf = kcp.snd_buf[:len(kcp.snd_buf)-1]
  341. break
  342. }
  343. if _itimediff(sn, seg.sn) < 0 {
  344. break
  345. }
  346. }
  347. }
  348. func (kcp *KCP) parse_fastack(sn uint32) {
  349. if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
  350. return
  351. }
  352. for k := range kcp.snd_buf {
  353. seg := &kcp.snd_buf[k]
  354. if _itimediff(sn, seg.sn) < 0 {
  355. break
  356. } else if sn != seg.sn {
  357. seg.fastack++
  358. }
  359. }
  360. }
  361. func (kcp *KCP) parse_una(una uint32) {
  362. count := 0
  363. for k := range kcp.snd_buf {
  364. seg := &kcp.snd_buf[k]
  365. if _itimediff(una, seg.sn) > 0 {
  366. kcp.delSegment(*seg)
  367. count++
  368. } else {
  369. break
  370. }
  371. }
  372. if count > 0 {
  373. kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count)
  374. }
  375. }
  376. // ack append
  377. func (kcp *KCP) ack_push(sn, ts uint32) {
  378. kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
  379. }
  380. func (kcp *KCP) parse_data(newseg segment) {
  381. sn := newseg.sn
  382. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
  383. _itimediff(sn, kcp.rcv_nxt) < 0 {
  384. kcp.delSegment(newseg)
  385. return
  386. }
  387. n := len(kcp.rcv_buf) - 1
  388. insert_idx := 0
  389. repeat := false
  390. for i := n; i >= 0; i-- {
  391. seg := &kcp.rcv_buf[i]
  392. if seg.sn == sn {
  393. repeat = true
  394. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  395. break
  396. }
  397. if _itimediff(sn, seg.sn) > 0 {
  398. insert_idx = i + 1
  399. break
  400. }
  401. }
  402. if !repeat {
  403. if insert_idx == n+1 {
  404. kcp.rcv_buf = append(kcp.rcv_buf, newseg)
  405. } else {
  406. kcp.rcv_buf = append(kcp.rcv_buf, segment{})
  407. copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
  408. kcp.rcv_buf[insert_idx] = newseg
  409. }
  410. } else {
  411. kcp.delSegment(newseg)
  412. }
  413. // move available data from rcv_buf -> rcv_queue
  414. count := 0
  415. for k := range kcp.rcv_buf {
  416. seg := &kcp.rcv_buf[k]
  417. if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  418. kcp.rcv_nxt++
  419. count++
  420. } else {
  421. break
  422. }
  423. }
  424. if count > 0 {
  425. kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
  426. kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
  427. }
  428. }
  429. // Input when you received a low level packet (eg. UDP packet), call it
  430. // regular indicates a regular packet has received(not from FEC)
  431. func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
  432. una := kcp.snd_una
  433. if len(data) < IKCP_OVERHEAD {
  434. return -1
  435. }
  436. var maxack uint32
  437. var lastackts uint32
  438. var flag int
  439. var inSegs uint64
  440. for {
  441. var ts, sn, length, una, conv uint32
  442. var wnd uint16
  443. var cmd, frg uint8
  444. if len(data) < int(IKCP_OVERHEAD) {
  445. break
  446. }
  447. data = ikcp_decode32u(data, &conv)
  448. if conv != kcp.conv {
  449. return -1
  450. }
  451. data = ikcp_decode8u(data, &cmd)
  452. data = ikcp_decode8u(data, &frg)
  453. data = ikcp_decode16u(data, &wnd)
  454. data = ikcp_decode32u(data, &ts)
  455. data = ikcp_decode32u(data, &sn)
  456. data = ikcp_decode32u(data, &una)
  457. data = ikcp_decode32u(data, &length)
  458. if len(data) < int(length) {
  459. return -2
  460. }
  461. if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
  462. cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
  463. return -3
  464. }
  465. // only trust window updates from regular packets. i.e: latest update
  466. if regular {
  467. kcp.rmt_wnd = uint32(wnd)
  468. }
  469. kcp.parse_una(una)
  470. kcp.shrink_buf()
  471. if cmd == IKCP_CMD_ACK {
  472. kcp.parse_ack(sn)
  473. kcp.shrink_buf()
  474. if flag == 0 {
  475. flag = 1
  476. maxack = sn
  477. } else if _itimediff(sn, maxack) > 0 {
  478. maxack = sn
  479. }
  480. lastackts = ts
  481. } else if cmd == IKCP_CMD_PUSH {
  482. if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
  483. kcp.ack_push(sn, ts)
  484. if _itimediff(sn, kcp.rcv_nxt) >= 0 {
  485. seg := kcp.newSegment(int(length))
  486. seg.conv = conv
  487. seg.cmd = cmd
  488. seg.frg = frg
  489. seg.wnd = wnd
  490. seg.ts = ts
  491. seg.sn = sn
  492. seg.una = una
  493. copy(seg.data, data[:length])
  494. kcp.parse_data(seg)
  495. } else {
  496. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  497. }
  498. } else {
  499. atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
  500. }
  501. } else if cmd == IKCP_CMD_WASK {
  502. // ready to send back IKCP_CMD_WINS in Ikcp_flush
  503. // tell remote my window size
  504. kcp.probe |= IKCP_ASK_TELL
  505. } else if cmd == IKCP_CMD_WINS {
  506. // do nothing
  507. } else {
  508. return -3
  509. }
  510. inSegs++
  511. data = data[length:]
  512. }
  513. atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)
  514. if flag != 0 && regular {
  515. kcp.parse_fastack(maxack)
  516. current := currentMs()
  517. if _itimediff(current, lastackts) >= 0 {
  518. kcp.update_ack(_itimediff(current, lastackts))
  519. }
  520. }
  521. if _itimediff(kcp.snd_una, una) > 0 {
  522. if kcp.cwnd < kcp.rmt_wnd {
  523. mss := kcp.mss
  524. if kcp.cwnd < kcp.ssthresh {
  525. kcp.cwnd++
  526. kcp.incr += mss
  527. } else {
  528. if kcp.incr < mss {
  529. kcp.incr = mss
  530. }
  531. kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
  532. if (kcp.cwnd+1)*mss <= kcp.incr {
  533. kcp.cwnd++
  534. }
  535. }
  536. if kcp.cwnd > kcp.rmt_wnd {
  537. kcp.cwnd = kcp.rmt_wnd
  538. kcp.incr = kcp.rmt_wnd * mss
  539. }
  540. }
  541. }
  542. if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
  543. kcp.flush(true)
  544. } else if kcp.rmt_wnd == 0 && len(kcp.acklist) > 0 { // window zero
  545. kcp.flush(true)
  546. }
  547. return 0
  548. }
  549. func (kcp *KCP) wnd_unused() uint16 {
  550. if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
  551. return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
  552. }
  553. return 0
  554. }
  555. // flush pending data
  556. func (kcp *KCP) flush(ackOnly bool) {
  557. var seg segment
  558. seg.conv = kcp.conv
  559. seg.cmd = IKCP_CMD_ACK
  560. seg.wnd = kcp.wnd_unused()
  561. seg.una = kcp.rcv_nxt
  562. buffer := kcp.buffer
  563. // flush acknowledges
  564. ptr := buffer
  565. for i, ack := range kcp.acklist {
  566. size := len(buffer) - len(ptr)
  567. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  568. kcp.output(buffer, size)
  569. ptr = buffer
  570. }
  571. // filter jitters caused by bufferbloat
  572. if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
  573. seg.sn, seg.ts = ack.sn, ack.ts
  574. ptr = seg.encode(ptr)
  575. }
  576. }
  577. kcp.acklist = kcp.acklist[0:0]
  578. if ackOnly { // flash remain ack segments
  579. size := len(buffer) - len(ptr)
  580. if size > 0 {
  581. kcp.output(buffer, size)
  582. }
  583. return
  584. }
  585. // probe window size (if remote window size equals zero)
  586. if kcp.rmt_wnd == 0 {
  587. current := currentMs()
  588. if kcp.probe_wait == 0 {
  589. kcp.probe_wait = IKCP_PROBE_INIT
  590. kcp.ts_probe = current + kcp.probe_wait
  591. } else {
  592. if _itimediff(current, kcp.ts_probe) >= 0 {
  593. if kcp.probe_wait < IKCP_PROBE_INIT {
  594. kcp.probe_wait = IKCP_PROBE_INIT
  595. }
  596. kcp.probe_wait += kcp.probe_wait / 2
  597. if kcp.probe_wait > IKCP_PROBE_LIMIT {
  598. kcp.probe_wait = IKCP_PROBE_LIMIT
  599. }
  600. kcp.ts_probe = current + kcp.probe_wait
  601. kcp.probe |= IKCP_ASK_SEND
  602. }
  603. }
  604. } else {
  605. kcp.ts_probe = 0
  606. kcp.probe_wait = 0
  607. }
  608. // flush window probing commands
  609. if (kcp.probe & IKCP_ASK_SEND) != 0 {
  610. seg.cmd = IKCP_CMD_WASK
  611. size := len(buffer) - len(ptr)
  612. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  613. kcp.output(buffer, size)
  614. ptr = buffer
  615. }
  616. ptr = seg.encode(ptr)
  617. }
  618. // flush window probing commands
  619. if (kcp.probe & IKCP_ASK_TELL) != 0 {
  620. seg.cmd = IKCP_CMD_WINS
  621. size := len(buffer) - len(ptr)
  622. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  623. kcp.output(buffer, size)
  624. ptr = buffer
  625. }
  626. ptr = seg.encode(ptr)
  627. }
  628. kcp.probe = 0
  629. // calculate window size
  630. cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
  631. if kcp.nocwnd == 0 {
  632. cwnd = _imin_(kcp.cwnd, cwnd)
  633. }
  634. // sliding window, controlled by snd_nxt && sna_una+cwnd
  635. newSegsCount := 0
  636. for k := range kcp.snd_queue {
  637. if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
  638. break
  639. }
  640. newseg := kcp.snd_queue[k]
  641. newseg.conv = kcp.conv
  642. newseg.cmd = IKCP_CMD_PUSH
  643. newseg.sn = kcp.snd_nxt
  644. kcp.snd_buf = append(kcp.snd_buf, newseg)
  645. kcp.snd_nxt++
  646. newSegsCount++
  647. kcp.snd_queue[k].data = nil
  648. }
  649. if newSegsCount > 0 {
  650. kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
  651. }
  652. // calculate resent
  653. resent := uint32(kcp.fastresend)
  654. if kcp.fastresend <= 0 {
  655. resent = 0xffffffff
  656. }
  657. // check for retransmissions
  658. current := currentMs()
  659. var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
  660. for k := range kcp.snd_buf {
  661. segment := &kcp.snd_buf[k]
  662. needsend := false
  663. if segment.xmit == 0 { // initial transmit
  664. needsend = true
  665. segment.rto = kcp.rx_rto
  666. segment.resendts = current + segment.rto
  667. } else if _itimediff(current, segment.resendts) >= 0 { // RTO
  668. needsend = true
  669. if kcp.nodelay == 0 {
  670. segment.rto += kcp.rx_rto
  671. } else {
  672. segment.rto += kcp.rx_rto / 2
  673. }
  674. segment.resendts = current + segment.rto
  675. lost++
  676. lostSegs++
  677. } else if segment.fastack >= resent { // fast retransmit
  678. needsend = true
  679. segment.fastack = 0
  680. segment.rto = kcp.rx_rto
  681. segment.resendts = current + segment.rto
  682. change++
  683. fastRetransSegs++
  684. } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
  685. needsend = true
  686. segment.fastack = 0
  687. segment.rto = kcp.rx_rto
  688. segment.resendts = current + segment.rto
  689. change++
  690. earlyRetransSegs++
  691. }
  692. if needsend {
  693. segment.xmit++
  694. segment.ts = current
  695. segment.wnd = seg.wnd
  696. segment.una = seg.una
  697. size := len(buffer) - len(ptr)
  698. need := IKCP_OVERHEAD + len(segment.data)
  699. if size+need > int(kcp.mtu) {
  700. kcp.output(buffer, size)
  701. current = currentMs() // time update for a blocking call
  702. ptr = buffer
  703. }
  704. ptr = segment.encode(ptr)
  705. copy(ptr, segment.data)
  706. ptr = ptr[len(segment.data):]
  707. if segment.xmit >= kcp.dead_link {
  708. kcp.state = 0xFFFFFFFF
  709. }
  710. }
  711. }
  712. // flash remain segments
  713. size := len(buffer) - len(ptr)
  714. if size > 0 {
  715. kcp.output(buffer, size)
  716. }
  717. // counter updates
  718. sum := lostSegs
  719. if lostSegs > 0 {
  720. atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
  721. }
  722. if fastRetransSegs > 0 {
  723. atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
  724. sum += fastRetransSegs
  725. }
  726. if earlyRetransSegs > 0 {
  727. atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
  728. sum += earlyRetransSegs
  729. }
  730. if sum > 0 {
  731. atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
  732. }
  733. // update ssthresh
  734. // rate halving, https://tools.ietf.org/html/rfc6937
  735. if change > 0 {
  736. inflight := kcp.snd_nxt - kcp.snd_una
  737. kcp.ssthresh = inflight / 2
  738. if kcp.ssthresh < IKCP_THRESH_MIN {
  739. kcp.ssthresh = IKCP_THRESH_MIN
  740. }
  741. kcp.cwnd = kcp.ssthresh + resent
  742. kcp.incr = kcp.cwnd * kcp.mss
  743. }
  744. // congestion control, https://tools.ietf.org/html/rfc5681
  745. if lost > 0 {
  746. kcp.ssthresh = cwnd / 2
  747. if kcp.ssthresh < IKCP_THRESH_MIN {
  748. kcp.ssthresh = IKCP_THRESH_MIN
  749. }
  750. kcp.cwnd = 1
  751. kcp.incr = kcp.mss
  752. }
  753. if kcp.cwnd < 1 {
  754. kcp.cwnd = 1
  755. kcp.incr = kcp.mss
  756. }
  757. }
  758. // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
  759. // ikcp_check when to call it again (without ikcp_input/_send calling).
  760. // 'current' - current timestamp in millisec.
  761. func (kcp *KCP) Update() {
  762. var slap int32
  763. current := currentMs()
  764. if kcp.updated == 0 {
  765. kcp.updated = 1
  766. kcp.ts_flush = current
  767. }
  768. slap = _itimediff(current, kcp.ts_flush)
  769. if slap >= 10000 || slap < -10000 {
  770. kcp.ts_flush = current
  771. slap = 0
  772. }
  773. if slap >= 0 {
  774. kcp.ts_flush += kcp.interval
  775. if _itimediff(current, kcp.ts_flush) >= 0 {
  776. kcp.ts_flush = current + kcp.interval
  777. }
  778. kcp.flush(false)
  779. }
  780. }
  781. // Check determines when should you invoke ikcp_update:
  782. // returns when you should invoke ikcp_update in millisec, if there
  783. // is no ikcp_input/_send calling. you can call ikcp_update in that
  784. // time, instead of call update repeatly.
  785. // Important to reduce unnacessary ikcp_update invoking. use it to
  786. // schedule ikcp_update (eg. implementing an epoll-like mechanism,
  787. // or optimize ikcp_update when handling massive kcp connections)
  788. func (kcp *KCP) Check() uint32 {
  789. current := currentMs()
  790. ts_flush := kcp.ts_flush
  791. tm_flush := int32(0x7fffffff)
  792. tm_packet := int32(0x7fffffff)
  793. minimal := uint32(0)
  794. if kcp.updated == 0 {
  795. return current
  796. }
  797. if _itimediff(current, ts_flush) >= 10000 ||
  798. _itimediff(current, ts_flush) < -10000 {
  799. ts_flush = current
  800. }
  801. if _itimediff(current, ts_flush) >= 0 {
  802. return current
  803. }
  804. tm_flush = _itimediff(ts_flush, current)
  805. for k := range kcp.snd_buf {
  806. seg := &kcp.snd_buf[k]
  807. diff := _itimediff(seg.resendts, current)
  808. if diff <= 0 {
  809. return current
  810. }
  811. if diff < tm_packet {
  812. tm_packet = diff
  813. }
  814. }
  815. minimal = uint32(tm_packet)
  816. if tm_packet >= tm_flush {
  817. minimal = uint32(tm_flush)
  818. }
  819. if minimal >= kcp.interval {
  820. minimal = kcp.interval
  821. }
  822. return current + minimal
  823. }
  824. // SetMtu changes MTU size, default is 1400
  825. func (kcp *KCP) SetMtu(mtu int) int {
  826. if mtu < 50 || mtu < IKCP_OVERHEAD {
  827. return -1
  828. }
  829. buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
  830. if buffer == nil {
  831. return -2
  832. }
  833. kcp.mtu = uint32(mtu)
  834. kcp.mss = kcp.mtu - IKCP_OVERHEAD
  835. kcp.buffer = buffer
  836. return 0
  837. }
  838. // NoDelay options
  839. // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
  840. // nodelay: 0:disable(default), 1:enable
  841. // interval: internal update timer interval in millisec, default is 100ms
  842. // resend: 0:disable fast resend(default), 1:enable fast resend
  843. // nc: 0:normal congestion control(default), 1:disable congestion control
  844. func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
  845. if nodelay >= 0 {
  846. kcp.nodelay = uint32(nodelay)
  847. if nodelay != 0 {
  848. kcp.rx_minrto = IKCP_RTO_NDL
  849. } else {
  850. kcp.rx_minrto = IKCP_RTO_MIN
  851. }
  852. }
  853. if interval >= 0 {
  854. if interval > 5000 {
  855. interval = 5000
  856. } else if interval < 10 {
  857. interval = 10
  858. }
  859. kcp.interval = uint32(interval)
  860. }
  861. if resend >= 0 {
  862. kcp.fastresend = int32(resend)
  863. }
  864. if nc >= 0 {
  865. kcp.nocwnd = int32(nc)
  866. }
  867. return 0
  868. }
  869. // WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
  870. func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
  871. if sndwnd > 0 {
  872. kcp.snd_wnd = uint32(sndwnd)
  873. }
  874. if rcvwnd > 0 {
  875. kcp.rcv_wnd = uint32(rcvwnd)
  876. }
  877. return 0
  878. }
  879. // WaitSnd gets how many packet is waiting to be sent
  880. func (kcp *KCP) WaitSnd() int {
  881. return len(kcp.snd_buf) + len(kcp.snd_queue)
  882. }
  883. // remove front n elements from queue
  884. func (kcp *KCP) remove_front(q []segment, n int) []segment {
  885. newn := copy(q, q[n:])
  886. for i := newn; i < len(q); i++ {
  887. q[i] = segment{} // manual set nil for GC
  888. }
  889. return q[:newn]
  890. }