123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998 |
- // Package kcp - A Fast and Reliable ARQ Protocol
- package kcp
- import (
- "encoding/binary"
- "sync/atomic"
- )
- const (
- IKCP_RTO_NDL = 30 // no delay min rto
- IKCP_RTO_MIN = 100 // normal min rto
- IKCP_RTO_DEF = 200
- IKCP_RTO_MAX = 60000
- IKCP_CMD_PUSH = 81 // cmd: push data
- IKCP_CMD_ACK = 82 // cmd: ack
- IKCP_CMD_WASK = 83 // cmd: window probe (ask)
- IKCP_CMD_WINS = 84 // cmd: window size (tell)
- IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
- IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
- IKCP_WND_SND = 32
- IKCP_WND_RCV = 32
- IKCP_MTU_DEF = 1400
- IKCP_ACK_FAST = 3
- IKCP_INTERVAL = 100
- IKCP_OVERHEAD = 24
- IKCP_DEADLINK = 20
- IKCP_THRESH_INIT = 2
- IKCP_THRESH_MIN = 2
- IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
- IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
- )
- // output_callback is a prototype which ought capture conn and call conn.Write
- type output_callback func(buf []byte, size int)
- /* encode 8 bits unsigned int */
- func ikcp_encode8u(p []byte, c byte) []byte {
- p[0] = c
- return p[1:]
- }
- /* decode 8 bits unsigned int */
- func ikcp_decode8u(p []byte, c *byte) []byte {
- *c = p[0]
- return p[1:]
- }
- /* encode 16 bits unsigned int (lsb) */
- func ikcp_encode16u(p []byte, w uint16) []byte {
- binary.LittleEndian.PutUint16(p, w)
- return p[2:]
- }
- /* decode 16 bits unsigned int (lsb) */
- func ikcp_decode16u(p []byte, w *uint16) []byte {
- *w = binary.LittleEndian.Uint16(p)
- return p[2:]
- }
- /* encode 32 bits unsigned int (lsb) */
- func ikcp_encode32u(p []byte, l uint32) []byte {
- binary.LittleEndian.PutUint32(p, l)
- return p[4:]
- }
- /* decode 32 bits unsigned int (lsb) */
- func ikcp_decode32u(p []byte, l *uint32) []byte {
- *l = binary.LittleEndian.Uint32(p)
- return p[4:]
- }
- func _imin_(a, b uint32) uint32 {
- if a <= b {
- return a
- }
- return b
- }
- func _imax_(a, b uint32) uint32 {
- if a >= b {
- return a
- }
- return b
- }
- func _ibound_(lower, middle, upper uint32) uint32 {
- return _imin_(_imax_(lower, middle), upper)
- }
- func _itimediff(later, earlier uint32) int32 {
- return (int32)(later - earlier)
- }
- // segment defines a KCP segment
- type segment struct {
- conv uint32
- cmd uint8
- frg uint8
- wnd uint16
- ts uint32
- sn uint32
- una uint32
- rto uint32
- xmit uint32
- resendts uint32
- fastack uint32
- data []byte
- }
- // encode a segment into buffer
- func (seg *segment) encode(ptr []byte) []byte {
- ptr = ikcp_encode32u(ptr, seg.conv)
- ptr = ikcp_encode8u(ptr, seg.cmd)
- ptr = ikcp_encode8u(ptr, seg.frg)
- ptr = ikcp_encode16u(ptr, seg.wnd)
- ptr = ikcp_encode32u(ptr, seg.ts)
- ptr = ikcp_encode32u(ptr, seg.sn)
- ptr = ikcp_encode32u(ptr, seg.una)
- ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
- atomic.AddUint64(&DefaultSnmp.OutSegs, 1)
- return ptr
- }
- // KCP defines a single KCP connection
- type KCP struct {
- conv, mtu, mss, state uint32
- snd_una, snd_nxt, rcv_nxt uint32
- ssthresh uint32
- rx_rttvar, rx_srtt int32
- rx_rto, rx_minrto uint32
- snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
- interval, ts_flush uint32
- nodelay, updated uint32
- ts_probe, probe_wait uint32
- dead_link, incr uint32
- fastresend int32
- nocwnd, stream int32
- snd_queue []segment
- rcv_queue []segment
- snd_buf []segment
- rcv_buf []segment
- acklist []ackItem
- buffer []byte
- output output_callback
- }
- type ackItem struct {
- sn uint32
- ts uint32
- }
- // NewKCP create a new kcp control object, 'conv' must equal in two endpoint
- // from the same connection.
- func NewKCP(conv uint32, output output_callback) *KCP {
- kcp := new(KCP)
- kcp.conv = conv
- kcp.snd_wnd = IKCP_WND_SND
- kcp.rcv_wnd = IKCP_WND_RCV
- kcp.rmt_wnd = IKCP_WND_RCV
- kcp.mtu = IKCP_MTU_DEF
- kcp.mss = kcp.mtu - IKCP_OVERHEAD
- kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
- kcp.rx_rto = IKCP_RTO_DEF
- kcp.rx_minrto = IKCP_RTO_MIN
- kcp.interval = IKCP_INTERVAL
- kcp.ts_flush = IKCP_INTERVAL
- kcp.ssthresh = IKCP_THRESH_INIT
- kcp.dead_link = IKCP_DEADLINK
- kcp.output = output
- return kcp
- }
- // newSegment creates a KCP segment
- func (kcp *KCP) newSegment(size int) (seg segment) {
- seg.data = xmitBuf.Get().([]byte)[:size]
- return
- }
- // delSegment recycles a KCP segment
- func (kcp *KCP) delSegment(seg segment) {
- xmitBuf.Put(seg.data)
- }
- // PeekSize checks the size of next message in the recv queue
- func (kcp *KCP) PeekSize() (length int) {
- if len(kcp.rcv_queue) == 0 {
- return -1
- }
- seg := &kcp.rcv_queue[0]
- if seg.frg == 0 {
- return len(seg.data)
- }
- if len(kcp.rcv_queue) < int(seg.frg+1) {
- return -1
- }
- for k := range kcp.rcv_queue {
- seg := &kcp.rcv_queue[k]
- length += len(seg.data)
- if seg.frg == 0 {
- break
- }
- }
- return
- }
- // Recv is user/upper level recv: returns size, returns below zero for EAGAIN
- func (kcp *KCP) Recv(buffer []byte) (n int) {
- if len(kcp.rcv_queue) == 0 {
- return -1
- }
- peeksize := kcp.PeekSize()
- if peeksize < 0 {
- return -2
- }
- if peeksize > len(buffer) {
- return -3
- }
- var fast_recover bool
- if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
- fast_recover = true
- }
- // merge fragment
- count := 0
- for k := range kcp.rcv_queue {
- seg := &kcp.rcv_queue[k]
- copy(buffer, seg.data)
- buffer = buffer[len(seg.data):]
- n += len(seg.data)
- count++
- kcp.delSegment(*seg)
- if seg.frg == 0 {
- break
- }
- }
- if count > 0 {
- kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
- }
- // move available data from rcv_buf -> rcv_queue
- count = 0
- for k := range kcp.rcv_buf {
- seg := &kcp.rcv_buf[k]
- if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
- kcp.rcv_nxt++
- count++
- } else {
- break
- }
- }
- if count > 0 {
- kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
- kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
- }
- // fast recover
- if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
- // ready to send back IKCP_CMD_WINS in ikcp_flush
- // tell remote my window size
- kcp.probe |= IKCP_ASK_TELL
- }
- return
- }
- // Send is user/upper level send, returns below zero for error
- func (kcp *KCP) Send(buffer []byte) int {
- var count int
- if len(buffer) == 0 {
- return -1
- }
- // append to previous segment in streaming mode (if possible)
- if kcp.stream != 0 {
- n := len(kcp.snd_queue)
- if n > 0 {
- seg := &kcp.snd_queue[n-1]
- if len(seg.data) < int(kcp.mss) {
- capacity := int(kcp.mss) - len(seg.data)
- extend := capacity
- if len(buffer) < capacity {
- extend = len(buffer)
- }
- // grow slice, the underlying cap is guaranteed to
- // be larger than kcp.mss
- oldlen := len(seg.data)
- seg.data = seg.data[:oldlen+extend]
- copy(seg.data[oldlen:], buffer)
- buffer = buffer[extend:]
- }
- }
- if len(buffer) == 0 {
- return 0
- }
- }
- if len(buffer) <= int(kcp.mss) {
- count = 1
- } else {
- count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
- }
- if count > 255 {
- return -2
- }
- if count == 0 {
- count = 1
- }
- for i := 0; i < count; i++ {
- var size int
- if len(buffer) > int(kcp.mss) {
- size = int(kcp.mss)
- } else {
- size = len(buffer)
- }
- seg := kcp.newSegment(size)
- copy(seg.data, buffer[:size])
- if kcp.stream == 0 { // message mode
- seg.frg = uint8(count - i - 1)
- } else { // stream mode
- seg.frg = 0
- }
- kcp.snd_queue = append(kcp.snd_queue, seg)
- buffer = buffer[size:]
- }
- return 0
- }
- func (kcp *KCP) update_ack(rtt int32) {
- // https://tools.ietf.org/html/rfc6298
- var rto uint32
- if kcp.rx_srtt == 0 {
- kcp.rx_srtt = rtt
- kcp.rx_rttvar = rtt >> 1
- } else {
- delta := rtt - kcp.rx_srtt
- kcp.rx_srtt += delta >> 3
- if delta < 0 {
- delta = -delta
- }
- if rtt < kcp.rx_srtt-kcp.rx_rttvar {
- // if the new RTT sample is below the bottom of the range of
- // what an RTT measurement is expected to be.
- // give an 8x reduced weight versus its normal weighting
- kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5
- } else {
- kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2
- }
- }
- rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2)
- kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
- }
- func (kcp *KCP) shrink_buf() {
- if len(kcp.snd_buf) > 0 {
- seg := &kcp.snd_buf[0]
- kcp.snd_una = seg.sn
- } else {
- kcp.snd_una = kcp.snd_nxt
- }
- }
- func (kcp *KCP) parse_ack(sn uint32) {
- if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
- return
- }
- for k := range kcp.snd_buf {
- seg := &kcp.snd_buf[k]
- if sn == seg.sn {
- kcp.delSegment(*seg)
- copy(kcp.snd_buf[k:], kcp.snd_buf[k+1:])
- kcp.snd_buf[len(kcp.snd_buf)-1] = segment{}
- kcp.snd_buf = kcp.snd_buf[:len(kcp.snd_buf)-1]
- break
- }
- if _itimediff(sn, seg.sn) < 0 {
- break
- }
- }
- }
- func (kcp *KCP) parse_fastack(sn uint32) {
- if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
- return
- }
- for k := range kcp.snd_buf {
- seg := &kcp.snd_buf[k]
- if _itimediff(sn, seg.sn) < 0 {
- break
- } else if sn != seg.sn {
- seg.fastack++
- }
- }
- }
- func (kcp *KCP) parse_una(una uint32) {
- count := 0
- for k := range kcp.snd_buf {
- seg := &kcp.snd_buf[k]
- if _itimediff(una, seg.sn) > 0 {
- kcp.delSegment(*seg)
- count++
- } else {
- break
- }
- }
- if count > 0 {
- kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count)
- }
- }
- // ack append
- func (kcp *KCP) ack_push(sn, ts uint32) {
- kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
- }
- func (kcp *KCP) parse_data(newseg segment) {
- sn := newseg.sn
- if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
- _itimediff(sn, kcp.rcv_nxt) < 0 {
- kcp.delSegment(newseg)
- return
- }
- n := len(kcp.rcv_buf) - 1
- insert_idx := 0
- repeat := false
- for i := n; i >= 0; i-- {
- seg := &kcp.rcv_buf[i]
- if seg.sn == sn {
- repeat = true
- atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
- break
- }
- if _itimediff(sn, seg.sn) > 0 {
- insert_idx = i + 1
- break
- }
- }
- if !repeat {
- if insert_idx == n+1 {
- kcp.rcv_buf = append(kcp.rcv_buf, newseg)
- } else {
- kcp.rcv_buf = append(kcp.rcv_buf, segment{})
- copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
- kcp.rcv_buf[insert_idx] = newseg
- }
- } else {
- kcp.delSegment(newseg)
- }
- // move available data from rcv_buf -> rcv_queue
- count := 0
- for k := range kcp.rcv_buf {
- seg := &kcp.rcv_buf[k]
- if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
- kcp.rcv_nxt++
- count++
- } else {
- break
- }
- }
- if count > 0 {
- kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
- kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
- }
- }
- // Input when you received a low level packet (eg. UDP packet), call it
- // regular indicates a regular packet has received(not from FEC)
- func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
- una := kcp.snd_una
- if len(data) < IKCP_OVERHEAD {
- return -1
- }
- var maxack uint32
- var lastackts uint32
- var flag int
- var inSegs uint64
- for {
- var ts, sn, length, una, conv uint32
- var wnd uint16
- var cmd, frg uint8
- if len(data) < int(IKCP_OVERHEAD) {
- break
- }
- data = ikcp_decode32u(data, &conv)
- if conv != kcp.conv {
- return -1
- }
- data = ikcp_decode8u(data, &cmd)
- data = ikcp_decode8u(data, &frg)
- data = ikcp_decode16u(data, &wnd)
- data = ikcp_decode32u(data, &ts)
- data = ikcp_decode32u(data, &sn)
- data = ikcp_decode32u(data, &una)
- data = ikcp_decode32u(data, &length)
- if len(data) < int(length) {
- return -2
- }
- if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
- cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
- return -3
- }
- // only trust window updates from regular packets. i.e: latest update
- if regular {
- kcp.rmt_wnd = uint32(wnd)
- }
- kcp.parse_una(una)
- kcp.shrink_buf()
- if cmd == IKCP_CMD_ACK {
- kcp.parse_ack(sn)
- kcp.shrink_buf()
- if flag == 0 {
- flag = 1
- maxack = sn
- } else if _itimediff(sn, maxack) > 0 {
- maxack = sn
- }
- lastackts = ts
- } else if cmd == IKCP_CMD_PUSH {
- if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
- kcp.ack_push(sn, ts)
- if _itimediff(sn, kcp.rcv_nxt) >= 0 {
- seg := kcp.newSegment(int(length))
- seg.conv = conv
- seg.cmd = cmd
- seg.frg = frg
- seg.wnd = wnd
- seg.ts = ts
- seg.sn = sn
- seg.una = una
- copy(seg.data, data[:length])
- kcp.parse_data(seg)
- } else {
- atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
- }
- } else {
- atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
- }
- } else if cmd == IKCP_CMD_WASK {
- // ready to send back IKCP_CMD_WINS in Ikcp_flush
- // tell remote my window size
- kcp.probe |= IKCP_ASK_TELL
- } else if cmd == IKCP_CMD_WINS {
- // do nothing
- } else {
- return -3
- }
- inSegs++
- data = data[length:]
- }
- atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)
- if flag != 0 && regular {
- kcp.parse_fastack(maxack)
- current := currentMs()
- if _itimediff(current, lastackts) >= 0 {
- kcp.update_ack(_itimediff(current, lastackts))
- }
- }
- if _itimediff(kcp.snd_una, una) > 0 {
- if kcp.cwnd < kcp.rmt_wnd {
- mss := kcp.mss
- if kcp.cwnd < kcp.ssthresh {
- kcp.cwnd++
- kcp.incr += mss
- } else {
- if kcp.incr < mss {
- kcp.incr = mss
- }
- kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
- if (kcp.cwnd+1)*mss <= kcp.incr {
- kcp.cwnd++
- }
- }
- if kcp.cwnd > kcp.rmt_wnd {
- kcp.cwnd = kcp.rmt_wnd
- kcp.incr = kcp.rmt_wnd * mss
- }
- }
- }
- if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
- kcp.flush(true)
- } else if kcp.rmt_wnd == 0 && len(kcp.acklist) > 0 { // window zero
- kcp.flush(true)
- }
- return 0
- }
- func (kcp *KCP) wnd_unused() uint16 {
- if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
- return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
- }
- return 0
- }
- // flush pending data
- func (kcp *KCP) flush(ackOnly bool) {
- var seg segment
- seg.conv = kcp.conv
- seg.cmd = IKCP_CMD_ACK
- seg.wnd = kcp.wnd_unused()
- seg.una = kcp.rcv_nxt
- buffer := kcp.buffer
- // flush acknowledges
- ptr := buffer
- for i, ack := range kcp.acklist {
- size := len(buffer) - len(ptr)
- if size+IKCP_OVERHEAD > int(kcp.mtu) {
- kcp.output(buffer, size)
- ptr = buffer
- }
- // filter jitters caused by bufferbloat
- if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
- seg.sn, seg.ts = ack.sn, ack.ts
- ptr = seg.encode(ptr)
- }
- }
- kcp.acklist = kcp.acklist[0:0]
- if ackOnly { // flash remain ack segments
- size := len(buffer) - len(ptr)
- if size > 0 {
- kcp.output(buffer, size)
- }
- return
- }
- // probe window size (if remote window size equals zero)
- if kcp.rmt_wnd == 0 {
- current := currentMs()
- if kcp.probe_wait == 0 {
- kcp.probe_wait = IKCP_PROBE_INIT
- kcp.ts_probe = current + kcp.probe_wait
- } else {
- if _itimediff(current, kcp.ts_probe) >= 0 {
- if kcp.probe_wait < IKCP_PROBE_INIT {
- kcp.probe_wait = IKCP_PROBE_INIT
- }
- kcp.probe_wait += kcp.probe_wait / 2
- if kcp.probe_wait > IKCP_PROBE_LIMIT {
- kcp.probe_wait = IKCP_PROBE_LIMIT
- }
- kcp.ts_probe = current + kcp.probe_wait
- kcp.probe |= IKCP_ASK_SEND
- }
- }
- } else {
- kcp.ts_probe = 0
- kcp.probe_wait = 0
- }
- // flush window probing commands
- if (kcp.probe & IKCP_ASK_SEND) != 0 {
- seg.cmd = IKCP_CMD_WASK
- size := len(buffer) - len(ptr)
- if size+IKCP_OVERHEAD > int(kcp.mtu) {
- kcp.output(buffer, size)
- ptr = buffer
- }
- ptr = seg.encode(ptr)
- }
- // flush window probing commands
- if (kcp.probe & IKCP_ASK_TELL) != 0 {
- seg.cmd = IKCP_CMD_WINS
- size := len(buffer) - len(ptr)
- if size+IKCP_OVERHEAD > int(kcp.mtu) {
- kcp.output(buffer, size)
- ptr = buffer
- }
- ptr = seg.encode(ptr)
- }
- kcp.probe = 0
- // calculate window size
- cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
- if kcp.nocwnd == 0 {
- cwnd = _imin_(kcp.cwnd, cwnd)
- }
- // sliding window, controlled by snd_nxt && sna_una+cwnd
- newSegsCount := 0
- for k := range kcp.snd_queue {
- if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
- break
- }
- newseg := kcp.snd_queue[k]
- newseg.conv = kcp.conv
- newseg.cmd = IKCP_CMD_PUSH
- newseg.sn = kcp.snd_nxt
- kcp.snd_buf = append(kcp.snd_buf, newseg)
- kcp.snd_nxt++
- newSegsCount++
- kcp.snd_queue[k].data = nil
- }
- if newSegsCount > 0 {
- kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
- }
- // calculate resent
- resent := uint32(kcp.fastresend)
- if kcp.fastresend <= 0 {
- resent = 0xffffffff
- }
- // check for retransmissions
- current := currentMs()
- var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
- for k := range kcp.snd_buf {
- segment := &kcp.snd_buf[k]
- needsend := false
- if segment.xmit == 0 { // initial transmit
- needsend = true
- segment.rto = kcp.rx_rto
- segment.resendts = current + segment.rto
- } else if _itimediff(current, segment.resendts) >= 0 { // RTO
- needsend = true
- if kcp.nodelay == 0 {
- segment.rto += kcp.rx_rto
- } else {
- segment.rto += kcp.rx_rto / 2
- }
- segment.resendts = current + segment.rto
- lost++
- lostSegs++
- } else if segment.fastack >= resent { // fast retransmit
- needsend = true
- segment.fastack = 0
- segment.rto = kcp.rx_rto
- segment.resendts = current + segment.rto
- change++
- fastRetransSegs++
- } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
- needsend = true
- segment.fastack = 0
- segment.rto = kcp.rx_rto
- segment.resendts = current + segment.rto
- change++
- earlyRetransSegs++
- }
- if needsend {
- segment.xmit++
- segment.ts = current
- segment.wnd = seg.wnd
- segment.una = seg.una
- size := len(buffer) - len(ptr)
- need := IKCP_OVERHEAD + len(segment.data)
- if size+need > int(kcp.mtu) {
- kcp.output(buffer, size)
- current = currentMs() // time update for a blocking call
- ptr = buffer
- }
- ptr = segment.encode(ptr)
- copy(ptr, segment.data)
- ptr = ptr[len(segment.data):]
- if segment.xmit >= kcp.dead_link {
- kcp.state = 0xFFFFFFFF
- }
- }
- }
- // flash remain segments
- size := len(buffer) - len(ptr)
- if size > 0 {
- kcp.output(buffer, size)
- }
- // counter updates
- sum := lostSegs
- if lostSegs > 0 {
- atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
- }
- if fastRetransSegs > 0 {
- atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
- sum += fastRetransSegs
- }
- if earlyRetransSegs > 0 {
- atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
- sum += earlyRetransSegs
- }
- if sum > 0 {
- atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
- }
- // update ssthresh
- // rate halving, https://tools.ietf.org/html/rfc6937
- if change > 0 {
- inflight := kcp.snd_nxt - kcp.snd_una
- kcp.ssthresh = inflight / 2
- if kcp.ssthresh < IKCP_THRESH_MIN {
- kcp.ssthresh = IKCP_THRESH_MIN
- }
- kcp.cwnd = kcp.ssthresh + resent
- kcp.incr = kcp.cwnd * kcp.mss
- }
- // congestion control, https://tools.ietf.org/html/rfc5681
- if lost > 0 {
- kcp.ssthresh = cwnd / 2
- if kcp.ssthresh < IKCP_THRESH_MIN {
- kcp.ssthresh = IKCP_THRESH_MIN
- }
- kcp.cwnd = 1
- kcp.incr = kcp.mss
- }
- if kcp.cwnd < 1 {
- kcp.cwnd = 1
- kcp.incr = kcp.mss
- }
- }
- // Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
- // ikcp_check when to call it again (without ikcp_input/_send calling).
- // 'current' - current timestamp in millisec.
- func (kcp *KCP) Update() {
- var slap int32
- current := currentMs()
- if kcp.updated == 0 {
- kcp.updated = 1
- kcp.ts_flush = current
- }
- slap = _itimediff(current, kcp.ts_flush)
- if slap >= 10000 || slap < -10000 {
- kcp.ts_flush = current
- slap = 0
- }
- if slap >= 0 {
- kcp.ts_flush += kcp.interval
- if _itimediff(current, kcp.ts_flush) >= 0 {
- kcp.ts_flush = current + kcp.interval
- }
- kcp.flush(false)
- }
- }
- // Check determines when should you invoke ikcp_update:
- // returns when you should invoke ikcp_update in millisec, if there
- // is no ikcp_input/_send calling. you can call ikcp_update in that
- // time, instead of call update repeatly.
- // Important to reduce unnacessary ikcp_update invoking. use it to
- // schedule ikcp_update (eg. implementing an epoll-like mechanism,
- // or optimize ikcp_update when handling massive kcp connections)
- func (kcp *KCP) Check() uint32 {
- current := currentMs()
- ts_flush := kcp.ts_flush
- tm_flush := int32(0x7fffffff)
- tm_packet := int32(0x7fffffff)
- minimal := uint32(0)
- if kcp.updated == 0 {
- return current
- }
- if _itimediff(current, ts_flush) >= 10000 ||
- _itimediff(current, ts_flush) < -10000 {
- ts_flush = current
- }
- if _itimediff(current, ts_flush) >= 0 {
- return current
- }
- tm_flush = _itimediff(ts_flush, current)
- for k := range kcp.snd_buf {
- seg := &kcp.snd_buf[k]
- diff := _itimediff(seg.resendts, current)
- if diff <= 0 {
- return current
- }
- if diff < tm_packet {
- tm_packet = diff
- }
- }
- minimal = uint32(tm_packet)
- if tm_packet >= tm_flush {
- minimal = uint32(tm_flush)
- }
- if minimal >= kcp.interval {
- minimal = kcp.interval
- }
- return current + minimal
- }
- // SetMtu changes MTU size, default is 1400
- func (kcp *KCP) SetMtu(mtu int) int {
- if mtu < 50 || mtu < IKCP_OVERHEAD {
- return -1
- }
- buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
- if buffer == nil {
- return -2
- }
- kcp.mtu = uint32(mtu)
- kcp.mss = kcp.mtu - IKCP_OVERHEAD
- kcp.buffer = buffer
- return 0
- }
- // NoDelay options
- // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
- // nodelay: 0:disable(default), 1:enable
- // interval: internal update timer interval in millisec, default is 100ms
- // resend: 0:disable fast resend(default), 1:enable fast resend
- // nc: 0:normal congestion control(default), 1:disable congestion control
- func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
- if nodelay >= 0 {
- kcp.nodelay = uint32(nodelay)
- if nodelay != 0 {
- kcp.rx_minrto = IKCP_RTO_NDL
- } else {
- kcp.rx_minrto = IKCP_RTO_MIN
- }
- }
- if interval >= 0 {
- if interval > 5000 {
- interval = 5000
- } else if interval < 10 {
- interval = 10
- }
- kcp.interval = uint32(interval)
- }
- if resend >= 0 {
- kcp.fastresend = int32(resend)
- }
- if nc >= 0 {
- kcp.nocwnd = int32(nc)
- }
- return 0
- }
- // WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
- func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
- if sndwnd > 0 {
- kcp.snd_wnd = uint32(sndwnd)
- }
- if rcvwnd > 0 {
- kcp.rcv_wnd = uint32(rcvwnd)
- }
- return 0
- }
- // WaitSnd gets how many packet is waiting to be sent
- func (kcp *KCP) WaitSnd() int {
- return len(kcp.snd_buf) + len(kcp.snd_queue)
- }
- // remove front n elements from queue
- func (kcp *KCP) remove_front(q []segment, n int) []segment {
- newn := copy(q, q[n:])
- for i := newn; i < len(q); i++ {
- q[i] = segment{} // manual set nil for GC
- }
- return q[:newn]
- }
|