1
0

udp.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package net
  15. import (
  16. "fmt"
  17. "io"
  18. "net"
  19. "sync"
  20. "time"
  21. "github.com/fatedier/frp/utils/log"
  22. "github.com/fatedier/frp/utils/pool"
  23. )
  24. type UdpPacket struct {
  25. Buf []byte
  26. LocalAddr net.Addr
  27. RemoteAddr net.Addr
  28. }
  29. type FakeUdpConn struct {
  30. log.Logger
  31. l *UdpListener
  32. localAddr net.Addr
  33. remoteAddr net.Addr
  34. packets chan []byte
  35. closeFlag bool
  36. lastActive time.Time
  37. mu sync.RWMutex
  38. }
  39. func NewFakeUdpConn(l *UdpListener, laddr, raddr net.Addr) *FakeUdpConn {
  40. fc := &FakeUdpConn{
  41. Logger: log.NewPrefixLogger(""),
  42. l: l,
  43. localAddr: laddr,
  44. remoteAddr: raddr,
  45. packets: make(chan []byte, 20),
  46. }
  47. go func() {
  48. for {
  49. time.Sleep(5 * time.Second)
  50. fc.mu.RLock()
  51. if time.Now().Sub(fc.lastActive) > 10*time.Second {
  52. fc.mu.RUnlock()
  53. fc.Close()
  54. break
  55. }
  56. fc.mu.RUnlock()
  57. }
  58. }()
  59. return fc
  60. }
  61. func (c *FakeUdpConn) putPacket(content []byte) {
  62. defer func() {
  63. if err := recover(); err != nil {
  64. }
  65. }()
  66. select {
  67. case c.packets <- content:
  68. default:
  69. }
  70. }
  71. func (c *FakeUdpConn) Read(b []byte) (n int, err error) {
  72. content, ok := <-c.packets
  73. if !ok {
  74. return 0, io.EOF
  75. }
  76. c.mu.Lock()
  77. c.lastActive = time.Now()
  78. c.mu.Unlock()
  79. if len(b) < len(content) {
  80. n = len(b)
  81. } else {
  82. n = len(content)
  83. }
  84. copy(b, content)
  85. return n, nil
  86. }
  87. func (c *FakeUdpConn) Write(b []byte) (n int, err error) {
  88. c.mu.RLock()
  89. if c.closeFlag {
  90. c.mu.RUnlock()
  91. return 0, io.ErrClosedPipe
  92. }
  93. c.mu.RUnlock()
  94. packet := &UdpPacket{
  95. Buf: b,
  96. LocalAddr: c.localAddr,
  97. RemoteAddr: c.remoteAddr,
  98. }
  99. c.l.writeUdpPacket(packet)
  100. c.mu.Lock()
  101. c.lastActive = time.Now()
  102. c.mu.Unlock()
  103. return len(b), nil
  104. }
  105. func (c *FakeUdpConn) Close() error {
  106. c.mu.Lock()
  107. defer c.mu.Unlock()
  108. if !c.closeFlag {
  109. c.closeFlag = true
  110. close(c.packets)
  111. }
  112. return nil
  113. }
  114. func (c *FakeUdpConn) IsClosed() bool {
  115. c.mu.RLock()
  116. defer c.mu.RUnlock()
  117. return c.closeFlag
  118. }
  119. func (c *FakeUdpConn) LocalAddr() net.Addr {
  120. return c.localAddr
  121. }
  122. func (c *FakeUdpConn) RemoteAddr() net.Addr {
  123. return c.remoteAddr
  124. }
  125. func (c *FakeUdpConn) SetDeadline(t time.Time) error {
  126. return nil
  127. }
  128. func (c *FakeUdpConn) SetReadDeadline(t time.Time) error {
  129. return nil
  130. }
  131. func (c *FakeUdpConn) SetWriteDeadline(t time.Time) error {
  132. return nil
  133. }
  134. type UdpListener struct {
  135. net.Addr
  136. accept chan Conn
  137. writeCh chan *UdpPacket
  138. readConn net.Conn
  139. closeFlag bool
  140. fakeConns map[string]*FakeUdpConn
  141. log.Logger
  142. }
  143. func ListenUDP(bindAddr string, bindPort int) (l *UdpListener, err error) {
  144. udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", bindAddr, bindPort))
  145. if err != nil {
  146. return l, err
  147. }
  148. readConn, err := net.ListenUDP("udp", udpAddr)
  149. l = &UdpListener{
  150. Addr: udpAddr,
  151. accept: make(chan Conn),
  152. writeCh: make(chan *UdpPacket, 1000),
  153. fakeConns: make(map[string]*FakeUdpConn),
  154. Logger: log.NewPrefixLogger(""),
  155. }
  156. // for reading
  157. go func() {
  158. for {
  159. buf := pool.GetBuf(1450)
  160. n, remoteAddr, err := readConn.ReadFromUDP(buf)
  161. if err != nil {
  162. close(l.accept)
  163. close(l.writeCh)
  164. return
  165. }
  166. fakeConn, exist := l.fakeConns[remoteAddr.String()]
  167. if !exist || fakeConn.IsClosed() {
  168. fakeConn = NewFakeUdpConn(l, l.Addr, remoteAddr)
  169. l.fakeConns[remoteAddr.String()] = fakeConn
  170. }
  171. fakeConn.putPacket(buf[:n])
  172. l.accept <- fakeConn
  173. }
  174. }()
  175. // for writing
  176. go func() {
  177. for {
  178. packet, ok := <-l.writeCh
  179. if !ok {
  180. return
  181. }
  182. if addr, ok := packet.RemoteAddr.(*net.UDPAddr); ok {
  183. readConn.WriteToUDP(packet.Buf, addr)
  184. }
  185. }
  186. }()
  187. return
  188. }
  189. func (l *UdpListener) writeUdpPacket(packet *UdpPacket) (err error) {
  190. defer func() {
  191. if errRet := recover(); errRet != nil {
  192. err = fmt.Errorf("udp write closed listener")
  193. l.Info("udp write closed listener")
  194. }
  195. }()
  196. l.writeCh <- packet
  197. return
  198. }
  199. func (l *UdpListener) WriteMsg(buf []byte, remoteAddr *net.UDPAddr) (err error) {
  200. // only set remote addr here
  201. packet := &UdpPacket{
  202. Buf: buf,
  203. RemoteAddr: remoteAddr,
  204. }
  205. err = l.writeUdpPacket(packet)
  206. return
  207. }
  208. func (l *UdpListener) Accept() (Conn, error) {
  209. conn, ok := <-l.accept
  210. if !ok {
  211. return conn, fmt.Errorf("channel for udp listener closed")
  212. }
  213. return conn, nil
  214. }
  215. func (l *UdpListener) Close() error {
  216. if !l.closeFlag {
  217. l.closeFlag = true
  218. l.readConn.Close()
  219. }
  220. return nil
  221. }