1
0

proxy_manager.go 7.5 KB


  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/models/config"
  6. "github.com/fatedier/frp/models/msg"
  7. "github.com/fatedier/frp/utils/errors"
  8. "github.com/fatedier/frp/utils/log"
  9. frpNet "github.com/fatedier/frp/utils/net"
  10. )
  11. const (
  12. ProxyStatusNew = "new"
  13. ProxyStatusStartErr = "start error"
  14. ProxyStatusWaitStart = "wait start"
  15. ProxyStatusRunning = "running"
  16. ProxyStatusClosed = "closed"
  17. )
  18. type ProxyManager struct {
  19. ctl *Control
  20. proxies map[string]*ProxyWrapper
  21. visitorCfgs map[string]config.ProxyConf
  22. visitors map[string]Visitor
  23. sendCh chan (msg.Message)
  24. closed bool
  25. mu sync.RWMutex
  26. log.Logger
  27. }
  28. type ProxyWrapper struct {
  29. Name string
  30. Type string
  31. Status string
  32. Err string
  33. Cfg config.ProxyConf
  34. RemoteAddr string
  35. pxy Proxy
  36. mu sync.RWMutex
  37. }
  38. type ProxyStatus struct {
  39. Name string `json:"name"`
  40. Type string `json:"type"`
  41. Status string `json:"status"`
  42. Err string `json:"err"`
  43. Cfg config.ProxyConf `json:"cfg"`
  44. // Got from server.
  45. RemoteAddr string `json:"remote_addr"`
  46. }
  47. func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
  48. return &ProxyWrapper{
  49. Name: cfg.GetName(),
  50. Type: cfg.GetType(),
  51. Status: ProxyStatusNew,
  52. Cfg: cfg,
  53. pxy: nil,
  54. }
  55. }
  56. func (pw *ProxyWrapper) GetStatusStr() string {
  57. pw.mu.RLock()
  58. defer pw.mu.RUnlock()
  59. return pw.Status
  60. }
  61. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  62. pw.mu.RLock()
  63. defer pw.mu.RUnlock()
  64. ps := &ProxyStatus{
  65. Name: pw.Name,
  66. Type: pw.Type,
  67. Status: pw.Status,
  68. Err: pw.Err,
  69. Cfg: pw.Cfg,
  70. RemoteAddr: pw.RemoteAddr,
  71. }
  72. return ps
  73. }
  74. func (pw *ProxyWrapper) WaitStart() {
  75. pw.mu.Lock()
  76. defer pw.mu.Unlock()
  77. pw.Status = ProxyStatusWaitStart
  78. }
  79. func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error {
  80. if pw.pxy != nil {
  81. pw.pxy.Close()
  82. pw.pxy = nil
  83. }
  84. if serverRespErr != "" {
  85. pw.mu.Lock()
  86. pw.Status = ProxyStatusStartErr
  87. pw.RemoteAddr = remoteAddr
  88. pw.Err = serverRespErr
  89. pw.mu.Unlock()
  90. return fmt.Errorf(serverRespErr)
  91. }
  92. pxy := NewProxy(pw.Cfg)
  93. pw.mu.Lock()
  94. defer pw.mu.Unlock()
  95. pw.RemoteAddr = remoteAddr
  96. if err := pxy.Run(); err != nil {
  97. pw.Status = ProxyStatusStartErr
  98. pw.Err = err.Error()
  99. return err
  100. }
  101. pw.Status = ProxyStatusRunning
  102. pw.Err = ""
  103. pw.pxy = pxy
  104. return nil
  105. }
  106. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  107. pw.mu.RLock()
  108. pxy := pw.pxy
  109. pw.mu.RUnlock()
  110. if pxy != nil {
  111. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  112. go pxy.InWorkConn(workConn)
  113. } else {
  114. workConn.Close()
  115. }
  116. }
  117. func (pw *ProxyWrapper) Close() {
  118. pw.mu.Lock()
  119. defer pw.mu.Unlock()
  120. if pw.pxy != nil {
  121. pw.pxy.Close()
  122. pw.pxy = nil
  123. }
  124. pw.Status = ProxyStatusClosed
  125. }
  126. func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  127. return &ProxyManager{
  128. ctl: ctl,
  129. proxies: make(map[string]*ProxyWrapper),
  130. visitorCfgs: make(map[string]config.ProxyConf),
  131. visitors: make(map[string]Visitor),
  132. sendCh: msgSendCh,
  133. closed: false,
  134. Logger: log.NewPrefixLogger(logPrefix),
  135. }
  136. }
  137. func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
  138. pm.mu.Lock()
  139. defer pm.mu.Unlock()
  140. pm.closed = false
  141. pm.sendCh = msgSendCh
  142. pm.ClearLogPrefix()
  143. pm.AddLogPrefix(logPrefix)
  144. }
  145. // Must hold the lock before calling this function.
  146. func (pm *ProxyManager) sendMsg(m msg.Message) error {
  147. err := errors.PanicToError(func() {
  148. pm.sendCh <- m
  149. })
  150. if err != nil {
  151. pm.closed = true
  152. }
  153. return err
  154. }
  155. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  156. pm.mu.Lock()
  157. defer pm.mu.Unlock()
  158. if pm.closed {
  159. return fmt.Errorf("ProxyManager is closed now")
  160. }
  161. pxy, ok := pm.proxies[name]
  162. if !ok {
  163. return fmt.Errorf("no proxy found")
  164. }
  165. if err := pxy.Start(remoteAddr, serverRespErr); err != nil {
  166. errRet := err
  167. err = pm.sendMsg(&msg.CloseProxy{
  168. ProxyName: name,
  169. })
  170. if err != nil {
  171. errRet = fmt.Errorf("send CloseProxy message error")
  172. }
  173. return errRet
  174. }
  175. return nil
  176. }
  177. func (pm *ProxyManager) CloseProxies() {
  178. pm.mu.RLock()
  179. defer pm.mu.RUnlock()
  180. for _, pxy := range pm.proxies {
  181. pxy.Close()
  182. }
  183. }
  184. // pxyStatus: check and start proxies in which status
  185. func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
  186. pm.mu.RLock()
  187. defer pm.mu.RUnlock()
  188. if pm.closed {
  189. pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
  190. return
  191. }
  192. for _, pxy := range pm.proxies {
  193. status := pxy.GetStatusStr()
  194. for _, s := range pxyStatus {
  195. if status == s {
  196. var newProxyMsg msg.NewProxy
  197. pxy.Cfg.UnMarshalToMsg(&newProxyMsg)
  198. err := pm.sendMsg(&newProxyMsg)
  199. if err != nil {
  200. pm.Warn("[%s] proxy send NewProxy message error")
  201. return
  202. }
  203. pxy.WaitStart()
  204. break
  205. }
  206. }
  207. }
  208. for _, cfg := range pm.visitorCfgs {
  209. if _, exist := pm.visitors[cfg.GetName()]; !exist {
  210. pm.Info("try to start visitor [%s]", cfg.GetName())
  211. visitor := NewVisitor(pm.ctl, cfg)
  212. err := visitor.Run()
  213. if err != nil {
  214. visitor.Warn("start error: %v", err)
  215. continue
  216. }
  217. pm.visitors[cfg.GetName()] = visitor
  218. visitor.Info("start visitor success")
  219. }
  220. }
  221. }
  222. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf, startNow bool) error {
  223. pm.mu.Lock()
  224. defer func() {
  225. pm.mu.Unlock()
  226. if startNow {
  227. go pm.CheckAndStartProxy([]string{ProxyStatusNew})
  228. }
  229. }()
  230. if pm.closed {
  231. err := fmt.Errorf("Reload error: ProxyManager is closed now")
  232. pm.Warn(err.Error())
  233. return err
  234. }
  235. delPxyNames := make([]string, 0)
  236. for name, pxy := range pm.proxies {
  237. del := false
  238. cfg, ok := pxyCfgs[name]
  239. if !ok {
  240. del = true
  241. } else {
  242. if !pxy.Cfg.Compare(cfg) {
  243. del = true
  244. }
  245. }
  246. if del {
  247. delPxyNames = append(delPxyNames, name)
  248. delete(pm.proxies, name)
  249. pxy.Close()
  250. err := pm.sendMsg(&msg.CloseProxy{
  251. ProxyName: name,
  252. })
  253. if err != nil {
  254. err = fmt.Errorf("Reload error: ProxyManager is closed now")
  255. pm.Warn(err.Error())
  256. return err
  257. }
  258. }
  259. }
  260. pm.Info("proxy removed: %v", delPxyNames)
  261. addPxyNames := make([]string, 0)
  262. for name, cfg := range pxyCfgs {
  263. if _, ok := pm.proxies[name]; !ok {
  264. pxy := NewProxyWrapper(cfg)
  265. pm.proxies[name] = pxy
  266. addPxyNames = append(addPxyNames, name)
  267. }
  268. }
  269. pm.Info("proxy added: %v", addPxyNames)
  270. delVisitorName := make([]string, 0)
  271. for name, oldVisitorCfg := range pm.visitorCfgs {
  272. del := false
  273. cfg, ok := visitorCfgs[name]
  274. if !ok {
  275. del = true
  276. } else {
  277. if !oldVisitorCfg.Compare(cfg) {
  278. del = true
  279. }
  280. }
  281. if del {
  282. delVisitorName = append(delVisitorName, name)
  283. delete(pm.visitorCfgs, name)
  284. if visitor, ok := pm.visitors[name]; ok {
  285. visitor.Close()
  286. }
  287. delete(pm.visitors, name)
  288. }
  289. }
  290. pm.Info("visitor removed: %v", delVisitorName)
  291. addVisitorName := make([]string, 0)
  292. for name, visitorCfg := range visitorCfgs {
  293. if _, ok := pm.visitorCfgs[name]; !ok {
  294. pm.visitorCfgs[name] = visitorCfg
  295. addVisitorName = append(addVisitorName, name)
  296. }
  297. }
  298. pm.Info("visitor added: %v", addVisitorName)
  299. return nil
  300. }
  301. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  302. pm.mu.RLock()
  303. pw, ok := pm.proxies[name]
  304. pm.mu.RUnlock()
  305. if ok {
  306. pw.InWorkConn(workConn)
  307. } else {
  308. workConn.Close()
  309. }
  310. }
  311. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  312. ps := make([]*ProxyStatus, 0)
  313. pm.mu.RLock()
  314. defer pm.mu.RUnlock()
  315. for _, pxy := range pm.proxies {
  316. ps = append(ps, pxy.GetStatus())
  317. }
  318. return ps
  319. }