nathole.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/utils/errors"
  10. "github.com/fatedier/frp/utils/log"
  11. "github.com/fatedier/frp/utils/pool"
  12. "github.com/fatedier/frp/utils/util"
  13. )
  14. // Timeout seconds.
  15. var NatHoleTimeout int64 = 10
  16. type NatHoleController struct {
  17. listener *net.UDPConn
  18. clientCfgs map[string]*NatHoleClientCfg
  19. sessions map[string]*NatHoleSession
  20. mu sync.RWMutex
  21. }
  22. func NewNatHoleController(udpBindAddr string) (nc *NatHoleController, err error) {
  23. addr, err := net.ResolveUDPAddr("udp", udpBindAddr)
  24. if err != nil {
  25. return nil, err
  26. }
  27. lconn, err := net.ListenUDP("udp", addr)
  28. if err != nil {
  29. return nil, err
  30. }
  31. nc = &NatHoleController{
  32. listener: lconn,
  33. clientCfgs: make(map[string]*NatHoleClientCfg),
  34. sessions: make(map[string]*NatHoleSession),
  35. }
  36. return nc, nil
  37. }
  38. func (nc *NatHoleController) ListenClient(name string, sk string) (sidCh chan string) {
  39. clientCfg := &NatHoleClientCfg{
  40. Name: name,
  41. Sk: sk,
  42. SidCh: make(chan string),
  43. }
  44. nc.mu.Lock()
  45. nc.clientCfgs[name] = clientCfg
  46. nc.mu.Unlock()
  47. return clientCfg.SidCh
  48. }
  49. func (nc *NatHoleController) CloseClient(name string) {
  50. nc.mu.Lock()
  51. defer nc.mu.Unlock()
  52. delete(nc.clientCfgs, name)
  53. }
  54. func (nc *NatHoleController) Run() {
  55. for {
  56. buf := pool.GetBuf(1024)
  57. n, raddr, err := nc.listener.ReadFromUDP(buf)
  58. if err != nil {
  59. log.Trace("nat hole listener read from udp error: %v", err)
  60. return
  61. }
  62. rd := bytes.NewReader(buf[:n])
  63. rawMsg, err := msg.ReadMsg(rd)
  64. if err != nil {
  65. log.Trace("read nat hole message error: %v", err)
  66. continue
  67. }
  68. switch m := rawMsg.(type) {
  69. case *msg.NatHoleVisitor:
  70. go nc.HandleVisitor(m, raddr)
  71. case *msg.NatHoleClient:
  72. go nc.HandleClient(m, raddr)
  73. default:
  74. log.Trace("error nat hole message type")
  75. continue
  76. }
  77. pool.PutBuf(buf)
  78. }
  79. }
  80. func (nc *NatHoleController) GenSid() string {
  81. t := time.Now().Unix()
  82. id, _ := util.RandId()
  83. return fmt.Sprintf("%d%s", t, id)
  84. }
  85. func (nc *NatHoleController) HandleVisitor(m *msg.NatHoleVisitor, raddr *net.UDPAddr) {
  86. sid := nc.GenSid()
  87. session := &NatHoleSession{
  88. Sid: sid,
  89. VisitorAddr: raddr,
  90. NotifyCh: make(chan struct{}, 0),
  91. }
  92. nc.mu.Lock()
  93. clientCfg, ok := nc.clientCfgs[m.ProxyName]
  94. if !ok || m.SignKey != util.GetAuthKey(clientCfg.Sk, m.Timestamp) {
  95. nc.mu.Unlock()
  96. return
  97. }
  98. nc.sessions[sid] = session
  99. nc.mu.Unlock()
  100. log.Trace("handle visitor message, sid [%s]", sid)
  101. defer func() {
  102. nc.mu.Lock()
  103. delete(nc.sessions, sid)
  104. nc.mu.Unlock()
  105. }()
  106. err := errors.PanicToError(func() {
  107. clientCfg.SidCh <- sid
  108. })
  109. if err != nil {
  110. return
  111. }
  112. // Wait client connections.
  113. select {
  114. case <-session.NotifyCh:
  115. resp := nc.GenNatHoleResponse(raddr, session)
  116. log.Trace("send nat hole response to visitor")
  117. nc.listener.WriteToUDP(resp, raddr)
  118. case <-time.After(time.Duration(NatHoleTimeout) * time.Second):
  119. return
  120. }
  121. }
  122. func (nc *NatHoleController) HandleClient(m *msg.NatHoleClient, raddr *net.UDPAddr) {
  123. nc.mu.RLock()
  124. session, ok := nc.sessions[m.Sid]
  125. nc.mu.RUnlock()
  126. if !ok {
  127. return
  128. }
  129. log.Trace("handle client message, sid [%s]", session.Sid)
  130. session.ClientAddr = raddr
  131. session.NotifyCh <- struct{}{}
  132. resp := nc.GenNatHoleResponse(raddr, session)
  133. log.Trace("send nat hole response to client")
  134. nc.listener.WriteToUDP(resp, raddr)
  135. }
  136. func (nc *NatHoleController) GenNatHoleResponse(raddr *net.UDPAddr, session *NatHoleSession) []byte {
  137. m := &msg.NatHoleResp{
  138. Sid: session.Sid,
  139. VisitorAddr: session.VisitorAddr.String(),
  140. ClientAddr: session.ClientAddr.String(),
  141. }
  142. b := bytes.NewBuffer(nil)
  143. err := msg.WriteMsg(b, m)
  144. if err != nil {
  145. return []byte("")
  146. }
  147. return b.Bytes()
  148. }
  149. type NatHoleSession struct {
  150. Sid string
  151. VisitorAddr *net.UDPAddr
  152. ClientAddr *net.UDPAddr
  153. NotifyCh chan struct{}
  154. }
  155. type NatHoleClientCfg struct {
  156. Name string
  157. Sk string
  158. SidCh chan string
  159. }