1
0

sess.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975
  1. package kcp
  2. import (
  3. "crypto/rand"
  4. "encoding/binary"
  5. "hash/crc32"
  6. "io"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/pkg/errors"
  12. "golang.org/x/net/ipv4"
  13. )
  14. type errTimeout struct {
  15. error
  16. }
  17. func (errTimeout) Timeout() bool { return true }
  18. func (errTimeout) Temporary() bool { return true }
  19. func (errTimeout) Error() string { return "i/o timeout" }
  20. const (
  21. // 16-bytes magic number for each packet
  22. nonceSize = 16
  23. // 4-bytes packet checksum
  24. crcSize = 4
  25. // overall crypto header size
  26. cryptHeaderSize = nonceSize + crcSize
  27. // maximum packet size
  28. mtuLimit = 1500
  29. // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
  30. rxFECMulti = 3
  31. // accept backlog
  32. acceptBacklog = 128
  33. // prerouting(to session) queue
  34. qlen = 128
  35. )
  36. const (
  37. errBrokenPipe = "broken pipe"
  38. errInvalidOperation = "invalid operation"
  39. )
  40. var (
  41. // global packet buffer
  42. // shared among sending/receiving/FEC
  43. xmitBuf sync.Pool
  44. )
  45. func init() {
  46. xmitBuf.New = func() interface{} {
  47. return make([]byte, mtuLimit)
  48. }
  49. }
  50. type (
  51. // UDPSession defines a KCP session implemented by UDP
  52. UDPSession struct {
  53. updaterIdx int // record slice index in updater
  54. conn net.PacketConn // the underlying packet connection
  55. kcp *KCP // KCP ARQ protocol
  56. l *Listener // point to the Listener if it's accepted by Listener
  57. block BlockCrypt // block encryption
  58. // kcp receiving is based on packets
  59. // recvbuf turns packets into stream
  60. recvbuf []byte
  61. bufptr []byte
  62. // extended output buffer(with header)
  63. ext []byte
  64. // FEC
  65. fecDecoder *fecDecoder
  66. fecEncoder *fecEncoder
  67. // settings
  68. remote net.Addr // remote peer address
  69. rd time.Time // read deadline
  70. wd time.Time // write deadline
  71. headerSize int // the overall header size added before KCP frame
  72. ackNoDelay bool // send ack immediately for each incoming packet
  73. writeDelay bool // delay kcp.flush() for Write() for bulk transfer
  74. dup int // duplicate udp packets
  75. // notifications
  76. die chan struct{} // notify session has Closed
  77. chReadEvent chan struct{} // notify Read() can be called without blocking
  78. chWriteEvent chan struct{} // notify Write() can be called without blocking
  79. chErrorEvent chan error // notify Read() have an error
  80. isClosed bool // flag the session has Closed
  81. mu sync.Mutex
  82. }
  83. setReadBuffer interface {
  84. SetReadBuffer(bytes int) error
  85. }
  86. setWriteBuffer interface {
  87. SetWriteBuffer(bytes int) error
  88. }
  89. )
  90. // newUDPSession create a new udp session for client or server
  91. func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
  92. sess := new(UDPSession)
  93. sess.die = make(chan struct{})
  94. sess.chReadEvent = make(chan struct{}, 1)
  95. sess.chWriteEvent = make(chan struct{}, 1)
  96. sess.chErrorEvent = make(chan error, 1)
  97. sess.remote = remote
  98. sess.conn = conn
  99. sess.l = l
  100. sess.block = block
  101. sess.recvbuf = make([]byte, mtuLimit)
  102. // FEC initialization
  103. sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  104. if sess.block != nil {
  105. sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
  106. } else {
  107. sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
  108. }
  109. // calculate header size
  110. if sess.block != nil {
  111. sess.headerSize += cryptHeaderSize
  112. }
  113. if sess.fecEncoder != nil {
  114. sess.headerSize += fecHeaderSizePlus2
  115. }
  116. // only allocate extended packet buffer
  117. // when the extra header is required
  118. if sess.headerSize > 0 {
  119. sess.ext = make([]byte, mtuLimit)
  120. }
  121. sess.kcp = NewKCP(conv, func(buf []byte, size int) {
  122. if size >= IKCP_OVERHEAD {
  123. sess.output(buf[:size])
  124. }
  125. })
  126. sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
  127. // add current session to the global updater,
  128. // which periodically calls sess.update()
  129. updater.addSession(sess)
  130. if sess.l == nil { // it's a client connection
  131. go sess.readLoop()
  132. atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
  133. } else {
  134. atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
  135. }
  136. currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
  137. maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
  138. if currestab > maxconn {
  139. atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
  140. }
  141. return sess
  142. }
  143. // Read implements net.Conn
  144. func (s *UDPSession) Read(b []byte) (n int, err error) {
  145. for {
  146. s.mu.Lock()
  147. if len(s.bufptr) > 0 { // copy from buffer into b
  148. n = copy(b, s.bufptr)
  149. s.bufptr = s.bufptr[n:]
  150. s.mu.Unlock()
  151. return n, nil
  152. }
  153. if s.isClosed {
  154. s.mu.Unlock()
  155. return 0, errors.New(errBrokenPipe)
  156. }
  157. if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
  158. atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
  159. if len(b) >= size { // direct write to b
  160. s.kcp.Recv(b)
  161. s.mu.Unlock()
  162. return size, nil
  163. }
  164. // resize kcp receive buffer
  165. // to make sure recvbuf has enough capacity
  166. if cap(s.recvbuf) < size {
  167. s.recvbuf = make([]byte, size)
  168. }
  169. // resize recvbuf slice length
  170. s.recvbuf = s.recvbuf[:size]
  171. s.kcp.Recv(s.recvbuf)
  172. n = copy(b, s.recvbuf) // copy to b
  173. s.bufptr = s.recvbuf[n:] // update pointer
  174. s.mu.Unlock()
  175. return n, nil
  176. }
  177. // read deadline
  178. var timeout *time.Timer
  179. var c <-chan time.Time
  180. if !s.rd.IsZero() {
  181. if time.Now().After(s.rd) {
  182. s.mu.Unlock()
  183. return 0, errTimeout{}
  184. }
  185. delay := s.rd.Sub(time.Now())
  186. timeout = time.NewTimer(delay)
  187. c = timeout.C
  188. }
  189. s.mu.Unlock()
  190. // wait for read event or timeout
  191. select {
  192. case <-s.chReadEvent:
  193. case <-c:
  194. case <-s.die:
  195. case err = <-s.chErrorEvent:
  196. if timeout != nil {
  197. timeout.Stop()
  198. }
  199. return n, err
  200. }
  201. if timeout != nil {
  202. timeout.Stop()
  203. }
  204. }
  205. }
  206. // Write implements net.Conn
  207. func (s *UDPSession) Write(b []byte) (n int, err error) {
  208. for {
  209. s.mu.Lock()
  210. if s.isClosed {
  211. s.mu.Unlock()
  212. return 0, errors.New(errBrokenPipe)
  213. }
  214. // api flow control
  215. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  216. n = len(b)
  217. for {
  218. if len(b) <= int(s.kcp.mss) {
  219. s.kcp.Send(b)
  220. break
  221. } else {
  222. s.kcp.Send(b[:s.kcp.mss])
  223. b = b[s.kcp.mss:]
  224. }
  225. }
  226. if !s.writeDelay {
  227. s.kcp.flush(false)
  228. }
  229. s.mu.Unlock()
  230. atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
  231. return n, nil
  232. }
  233. // write deadline
  234. var timeout *time.Timer
  235. var c <-chan time.Time
  236. if !s.wd.IsZero() {
  237. if time.Now().After(s.wd) {
  238. s.mu.Unlock()
  239. return 0, errTimeout{}
  240. }
  241. delay := s.wd.Sub(time.Now())
  242. timeout = time.NewTimer(delay)
  243. c = timeout.C
  244. }
  245. s.mu.Unlock()
  246. // wait for write event or timeout
  247. select {
  248. case <-s.chWriteEvent:
  249. case <-c:
  250. case <-s.die:
  251. }
  252. if timeout != nil {
  253. timeout.Stop()
  254. }
  255. }
  256. }
  257. // Close closes the connection.
  258. func (s *UDPSession) Close() error {
  259. // remove this session from updater & listener(if necessary)
  260. updater.removeSession(s)
  261. if s.l != nil { // notify listener
  262. s.l.closeSession(sessionKey{
  263. addr: s.remote.String(),
  264. convID: s.kcp.conv,
  265. })
  266. }
  267. s.mu.Lock()
  268. defer s.mu.Unlock()
  269. if s.isClosed {
  270. return errors.New(errBrokenPipe)
  271. }
  272. close(s.die)
  273. s.isClosed = true
  274. atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
  275. if s.l == nil { // client socket close
  276. return s.conn.Close()
  277. }
  278. return nil
  279. }
  280. // LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
  281. func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
  282. // RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
  283. func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
  284. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  285. func (s *UDPSession) SetDeadline(t time.Time) error {
  286. s.mu.Lock()
  287. defer s.mu.Unlock()
  288. s.rd = t
  289. s.wd = t
  290. return nil
  291. }
  292. // SetReadDeadline implements the Conn SetReadDeadline method.
  293. func (s *UDPSession) SetReadDeadline(t time.Time) error {
  294. s.mu.Lock()
  295. defer s.mu.Unlock()
  296. s.rd = t
  297. return nil
  298. }
  299. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  300. func (s *UDPSession) SetWriteDeadline(t time.Time) error {
  301. s.mu.Lock()
  302. defer s.mu.Unlock()
  303. s.wd = t
  304. return nil
  305. }
  306. // SetWriteDelay delays write for bulk transfer until the next update interval
  307. func (s *UDPSession) SetWriteDelay(delay bool) {
  308. s.mu.Lock()
  309. defer s.mu.Unlock()
  310. s.writeDelay = delay
  311. }
  312. // SetWindowSize set maximum window size
  313. func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
  314. s.mu.Lock()
  315. defer s.mu.Unlock()
  316. s.kcp.WndSize(sndwnd, rcvwnd)
  317. }
  318. // SetMtu sets the maximum transmission unit(not including UDP header)
  319. func (s *UDPSession) SetMtu(mtu int) bool {
  320. if mtu > mtuLimit {
  321. return false
  322. }
  323. s.mu.Lock()
  324. defer s.mu.Unlock()
  325. s.kcp.SetMtu(mtu - s.headerSize)
  326. return true
  327. }
  328. // SetStreamMode toggles the stream mode on/off
  329. func (s *UDPSession) SetStreamMode(enable bool) {
  330. s.mu.Lock()
  331. defer s.mu.Unlock()
  332. if enable {
  333. s.kcp.stream = 1
  334. } else {
  335. s.kcp.stream = 0
  336. }
  337. }
  338. // SetACKNoDelay changes ack flush option, set true to flush ack immediately,
  339. func (s *UDPSession) SetACKNoDelay(nodelay bool) {
  340. s.mu.Lock()
  341. defer s.mu.Unlock()
  342. s.ackNoDelay = nodelay
  343. }
  344. // SetDUP duplicates udp packets for kcp output, for testing purpose only
  345. func (s *UDPSession) SetDUP(dup int) {
  346. s.mu.Lock()
  347. defer s.mu.Unlock()
  348. s.dup = dup
  349. }
  350. // SetNoDelay calls nodelay() of kcp
  351. // https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
  352. func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
  353. s.mu.Lock()
  354. defer s.mu.Unlock()
  355. s.kcp.NoDelay(nodelay, interval, resend, nc)
  356. }
  357. // SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
  358. func (s *UDPSession) SetDSCP(dscp int) error {
  359. s.mu.Lock()
  360. defer s.mu.Unlock()
  361. if s.l == nil {
  362. if nc, ok := s.conn.(*connectedUDPConn); ok {
  363. return ipv4.NewConn(nc.UDPConn).SetTOS(dscp << 2)
  364. } else if nc, ok := s.conn.(net.Conn); ok {
  365. return ipv4.NewConn(nc).SetTOS(dscp << 2)
  366. }
  367. }
  368. return errors.New(errInvalidOperation)
  369. }
  370. // SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
  371. func (s *UDPSession) SetReadBuffer(bytes int) error {
  372. s.mu.Lock()
  373. defer s.mu.Unlock()
  374. if s.l == nil {
  375. if nc, ok := s.conn.(setReadBuffer); ok {
  376. return nc.SetReadBuffer(bytes)
  377. }
  378. }
  379. return errors.New(errInvalidOperation)
  380. }
  381. // SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
  382. func (s *UDPSession) SetWriteBuffer(bytes int) error {
  383. s.mu.Lock()
  384. defer s.mu.Unlock()
  385. if s.l == nil {
  386. if nc, ok := s.conn.(setWriteBuffer); ok {
  387. return nc.SetWriteBuffer(bytes)
  388. }
  389. }
  390. return errors.New(errInvalidOperation)
  391. }
  392. // output pipeline entry
  393. // steps for output data processing:
  394. // 0. Header extends
  395. // 1. FEC
  396. // 2. CRC32
  397. // 3. Encryption
  398. // 4. WriteTo kernel
  399. func (s *UDPSession) output(buf []byte) {
  400. var ecc [][]byte
  401. // 0. extend buf's header space(if necessary)
  402. ext := buf
  403. if s.headerSize > 0 {
  404. ext = s.ext[:s.headerSize+len(buf)]
  405. copy(ext[s.headerSize:], buf)
  406. }
  407. // 1. FEC encoding
  408. if s.fecEncoder != nil {
  409. ecc = s.fecEncoder.encode(ext)
  410. }
  411. // 2&3. crc32 & encryption
  412. if s.block != nil {
  413. io.ReadFull(rand.Reader, ext[:nonceSize])
  414. checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
  415. binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
  416. s.block.Encrypt(ext, ext)
  417. for k := range ecc {
  418. io.ReadFull(rand.Reader, ecc[k][:nonceSize])
  419. checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
  420. binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
  421. s.block.Encrypt(ecc[k], ecc[k])
  422. }
  423. }
  424. // 4. WriteTo kernel
  425. nbytes := 0
  426. npkts := 0
  427. for i := 0; i < s.dup+1; i++ {
  428. if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
  429. nbytes += n
  430. npkts++
  431. }
  432. }
  433. for k := range ecc {
  434. if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
  435. nbytes += n
  436. npkts++
  437. }
  438. }
  439. atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
  440. atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
  441. }
  442. // kcp update, returns interval for next calling
  443. func (s *UDPSession) update() (interval time.Duration) {
  444. s.mu.Lock()
  445. s.kcp.flush(false)
  446. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  447. s.notifyWriteEvent()
  448. }
  449. interval = time.Duration(s.kcp.interval) * time.Millisecond
  450. s.mu.Unlock()
  451. return
  452. }
  453. // GetConv gets conversation id of a session
  454. func (s *UDPSession) GetConv() uint32 { return s.kcp.conv }
  455. func (s *UDPSession) notifyReadEvent() {
  456. select {
  457. case s.chReadEvent <- struct{}{}:
  458. default:
  459. }
  460. }
  461. func (s *UDPSession) notifyWriteEvent() {
  462. select {
  463. case s.chWriteEvent <- struct{}{}:
  464. default:
  465. }
  466. }
  467. func (s *UDPSession) kcpInput(data []byte) {
  468. var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
  469. if s.fecDecoder != nil {
  470. f := s.fecDecoder.decodeBytes(data)
  471. s.mu.Lock()
  472. if f.flag == typeData {
  473. if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
  474. kcpInErrors++
  475. }
  476. }
  477. if f.flag == typeData || f.flag == typeFEC {
  478. if f.flag == typeFEC {
  479. fecParityShards++
  480. }
  481. recovers := s.fecDecoder.decode(f)
  482. for _, r := range recovers {
  483. if len(r) >= 2 { // must be larger than 2bytes
  484. sz := binary.LittleEndian.Uint16(r)
  485. if int(sz) <= len(r) && sz >= 2 {
  486. if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
  487. fecRecovered++
  488. } else {
  489. kcpInErrors++
  490. }
  491. } else {
  492. fecErrs++
  493. }
  494. } else {
  495. fecErrs++
  496. }
  497. }
  498. }
  499. // notify reader
  500. if n := s.kcp.PeekSize(); n > 0 {
  501. s.notifyReadEvent()
  502. }
  503. s.mu.Unlock()
  504. } else {
  505. s.mu.Lock()
  506. if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
  507. kcpInErrors++
  508. }
  509. // notify reader
  510. if n := s.kcp.PeekSize(); n > 0 {
  511. s.notifyReadEvent()
  512. }
  513. s.mu.Unlock()
  514. }
  515. atomic.AddUint64(&DefaultSnmp.InPkts, 1)
  516. atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
  517. if fecParityShards > 0 {
  518. atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
  519. }
  520. if kcpInErrors > 0 {
  521. atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
  522. }
  523. if fecErrs > 0 {
  524. atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
  525. }
  526. if fecRecovered > 0 {
  527. atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
  528. }
  529. }
  530. func (s *UDPSession) receiver(ch chan<- []byte) {
  531. for {
  532. data := xmitBuf.Get().([]byte)[:mtuLimit]
  533. if n, _, err := s.conn.ReadFrom(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD {
  534. select {
  535. case ch <- data[:n]:
  536. case <-s.die:
  537. return
  538. }
  539. } else if err != nil {
  540. s.chErrorEvent <- err
  541. return
  542. } else {
  543. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  544. }
  545. }
  546. }
  547. // read loop for client session
  548. func (s *UDPSession) readLoop() {
  549. chPacket := make(chan []byte, qlen)
  550. go s.receiver(chPacket)
  551. for {
  552. select {
  553. case data := <-chPacket:
  554. raw := data
  555. dataValid := false
  556. if s.block != nil {
  557. s.block.Decrypt(data, data)
  558. data = data[nonceSize:]
  559. checksum := crc32.ChecksumIEEE(data[crcSize:])
  560. if checksum == binary.LittleEndian.Uint32(data) {
  561. data = data[crcSize:]
  562. dataValid = true
  563. } else {
  564. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  565. }
  566. } else if s.block == nil {
  567. dataValid = true
  568. }
  569. if dataValid {
  570. s.kcpInput(data)
  571. }
  572. xmitBuf.Put(raw)
  573. case <-s.die:
  574. return
  575. }
  576. }
  577. }
  578. type (
  579. sessionKey struct {
  580. addr string
  581. convID uint32
  582. }
  583. // Listener defines a server listening for connections
  584. Listener struct {
  585. block BlockCrypt // block encryption
  586. dataShards int // FEC data shard
  587. parityShards int // FEC parity shard
  588. fecDecoder *fecDecoder // FEC mock initialization
  589. conn net.PacketConn // the underlying packet connection
  590. sessions map[sessionKey]*UDPSession // all sessions accepted by this Listener
  591. chAccepts chan *UDPSession // Listen() backlog
  592. chSessionClosed chan sessionKey // session close queue
  593. headerSize int // the overall header size added before KCP frame
  594. die chan struct{} // notify the listener has closed
  595. rd atomic.Value // read deadline for Accept()
  596. wd atomic.Value
  597. }
  598. // incoming packet
  599. inPacket struct {
  600. from net.Addr
  601. data []byte
  602. }
  603. )
  604. // monitor incoming data for all connections of server
  605. func (l *Listener) monitor() {
  606. // cache last session
  607. var lastKey sessionKey
  608. var lastSession *UDPSession
  609. chPacket := make(chan inPacket, qlen)
  610. go l.receiver(chPacket)
  611. for {
  612. select {
  613. case p := <-chPacket:
  614. raw := p.data
  615. data := p.data
  616. from := p.from
  617. dataValid := false
  618. if l.block != nil {
  619. l.block.Decrypt(data, data)
  620. data = data[nonceSize:]
  621. checksum := crc32.ChecksumIEEE(data[crcSize:])
  622. if checksum == binary.LittleEndian.Uint32(data) {
  623. data = data[crcSize:]
  624. dataValid = true
  625. } else {
  626. atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
  627. }
  628. } else if l.block == nil {
  629. dataValid = true
  630. }
  631. if dataValid {
  632. var conv uint32
  633. convValid := false
  634. if l.fecDecoder != nil {
  635. isfec := binary.LittleEndian.Uint16(data[4:])
  636. if isfec == typeData {
  637. conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
  638. convValid = true
  639. }
  640. } else {
  641. conv = binary.LittleEndian.Uint32(data)
  642. convValid = true
  643. }
  644. if convValid {
  645. key := sessionKey{
  646. addr: from.String(),
  647. convID: conv,
  648. }
  649. var s *UDPSession
  650. var ok bool
  651. // packets received from an address always come in batch.
  652. // cache the session for next packet, without querying map.
  653. if key == lastKey {
  654. s, ok = lastSession, true
  655. } else if s, ok = l.sessions[key]; ok {
  656. lastSession = s
  657. lastKey = key
  658. }
  659. if !ok { // new session
  660. if len(l.chAccepts) < cap(l.chAccepts) && len(l.sessions) < 4096 { // do not let new session overwhelm accept queue and connection count
  661. s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
  662. s.kcpInput(data)
  663. l.sessions[key] = s
  664. l.chAccepts <- s
  665. }
  666. } else {
  667. s.kcpInput(data)
  668. }
  669. }
  670. }
  671. xmitBuf.Put(raw)
  672. case key := <-l.chSessionClosed:
  673. if key == lastKey {
  674. lastKey = sessionKey{}
  675. }
  676. delete(l.sessions, key)
  677. case <-l.die:
  678. return
  679. }
  680. }
  681. }
  682. func (l *Listener) receiver(ch chan<- inPacket) {
  683. for {
  684. data := xmitBuf.Get().([]byte)[:mtuLimit]
  685. if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
  686. select {
  687. case ch <- inPacket{from, data[:n]}:
  688. case <-l.die:
  689. return
  690. }
  691. } else if err != nil {
  692. return
  693. } else {
  694. atomic.AddUint64(&DefaultSnmp.InErrs, 1)
  695. }
  696. }
  697. }
  698. // SetReadBuffer sets the socket read buffer for the Listener
  699. func (l *Listener) SetReadBuffer(bytes int) error {
  700. if nc, ok := l.conn.(setReadBuffer); ok {
  701. return nc.SetReadBuffer(bytes)
  702. }
  703. return errors.New(errInvalidOperation)
  704. }
  705. // SetWriteBuffer sets the socket write buffer for the Listener
  706. func (l *Listener) SetWriteBuffer(bytes int) error {
  707. if nc, ok := l.conn.(setWriteBuffer); ok {
  708. return nc.SetWriteBuffer(bytes)
  709. }
  710. return errors.New(errInvalidOperation)
  711. }
  712. // SetDSCP sets the 6bit DSCP field of IP header
  713. func (l *Listener) SetDSCP(dscp int) error {
  714. if nc, ok := l.conn.(net.Conn); ok {
  715. return ipv4.NewConn(nc).SetTOS(dscp << 2)
  716. }
  717. return errors.New(errInvalidOperation)
  718. }
  719. // Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
  720. func (l *Listener) Accept() (net.Conn, error) {
  721. return l.AcceptKCP()
  722. }
  723. // AcceptKCP accepts a KCP connection
  724. func (l *Listener) AcceptKCP() (*UDPSession, error) {
  725. var timeout <-chan time.Time
  726. if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
  727. timeout = time.After(tdeadline.Sub(time.Now()))
  728. }
  729. select {
  730. case <-timeout:
  731. return nil, &errTimeout{}
  732. case c := <-l.chAccepts:
  733. return c, nil
  734. case <-l.die:
  735. return nil, errors.New(errBrokenPipe)
  736. }
  737. }
  738. // SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
  739. func (l *Listener) SetDeadline(t time.Time) error {
  740. l.SetReadDeadline(t)
  741. l.SetWriteDeadline(t)
  742. return nil
  743. }
  744. // SetReadDeadline implements the Conn SetReadDeadline method.
  745. func (l *Listener) SetReadDeadline(t time.Time) error {
  746. l.rd.Store(t)
  747. return nil
  748. }
  749. // SetWriteDeadline implements the Conn SetWriteDeadline method.
  750. func (l *Listener) SetWriteDeadline(t time.Time) error {
  751. l.wd.Store(t)
  752. return nil
  753. }
  754. // Close stops listening on the UDP address. Already Accepted connections are not closed.
  755. func (l *Listener) Close() error {
  756. close(l.die)
  757. return l.conn.Close()
  758. }
  759. // closeSession notify the listener that a session has closed
  760. func (l *Listener) closeSession(key sessionKey) bool {
  761. select {
  762. case l.chSessionClosed <- key:
  763. return true
  764. case <-l.die:
  765. return false
  766. }
  767. }
  768. // Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
  769. func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
  770. // Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
  771. func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
  772. // ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
  773. // dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
  774. func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
  775. udpaddr, err := net.ResolveUDPAddr("udp", laddr)
  776. if err != nil {
  777. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  778. }
  779. conn, err := net.ListenUDP("udp", udpaddr)
  780. if err != nil {
  781. return nil, errors.Wrap(err, "net.ListenUDP")
  782. }
  783. return ServeConn(block, dataShards, parityShards, conn)
  784. }
  785. // ServeConn serves KCP protocol for a single packet connection.
  786. func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
  787. l := new(Listener)
  788. l.conn = conn
  789. l.sessions = make(map[sessionKey]*UDPSession)
  790. l.chAccepts = make(chan *UDPSession, acceptBacklog)
  791. l.chSessionClosed = make(chan sessionKey)
  792. l.die = make(chan struct{})
  793. l.dataShards = dataShards
  794. l.parityShards = parityShards
  795. l.block = block
  796. l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
  797. // calculate header size
  798. if l.block != nil {
  799. l.headerSize += cryptHeaderSize
  800. }
  801. if l.fecDecoder != nil {
  802. l.headerSize += fecHeaderSizePlus2
  803. }
  804. go l.monitor()
  805. return l, nil
  806. }
  807. // Dial connects to the remote address "raddr" on the network "udp"
  808. func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
  809. // DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
  810. func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
  811. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  812. if err != nil {
  813. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  814. }
  815. udpconn, err := net.DialUDP("udp", nil, udpaddr)
  816. if err != nil {
  817. return nil, errors.Wrap(err, "net.DialUDP")
  818. }
  819. return NewConn(raddr, block, dataShards, parityShards, &connectedUDPConn{udpconn})
  820. }
  821. // NewConn establishes a session and talks KCP protocol over a packet connection.
  822. func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
  823. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  824. if err != nil {
  825. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  826. }
  827. var convid uint32
  828. binary.Read(rand.Reader, binary.LittleEndian, &convid)
  829. return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
  830. }
  831. func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, dataShards, parityShards int, conn *net.UDPConn) (*UDPSession, error) {
  832. udpaddr, err := net.ResolveUDPAddr("udp", raddr)
  833. if err != nil {
  834. return nil, errors.Wrap(err, "net.ResolveUDPAddr")
  835. }
  836. var pConn net.PacketConn = conn
  837. if connected {
  838. pConn = &connectedUDPConn{conn}
  839. }
  840. return newUDPSession(convid, dataShards, parityShards, nil, pConn, udpaddr, block), nil
  841. }
  842. // returns current time in milliseconds
  843. func currentMs() uint32 { return uint32(time.Now().UnixNano() / int64(time.Millisecond)) }
  844. // connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
  845. // to Write syscalls that are 4 times faster on some OS'es. This should only be
  846. // used for connections that were produced by a net.Dial* call.
  847. type connectedUDPConn struct{ *net.UDPConn }
  848. // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
  849. func (c *connectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Write(b) }