1
0

proxy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/fatedier/frp/models/config"
  24. "github.com/fatedier/frp/models/msg"
  25. "github.com/fatedier/frp/models/proto/udp"
  26. "github.com/fatedier/frp/utils/errors"
  27. frpIo "github.com/fatedier/frp/utils/io"
  28. "github.com/fatedier/frp/utils/log"
  29. frpNet "github.com/fatedier/frp/utils/net"
  30. "github.com/fatedier/frp/utils/util"
  31. "github.com/fatedier/frp/utils/vhost"
  32. )
  33. type Proxy interface {
  34. Run() (remoteAddr string, err error)
  35. GetControl() *Control
  36. GetName() string
  37. GetConf() config.ProxyConf
  38. GetWorkConnFromPool() (workConn frpNet.Conn, err error)
  39. GetUsedPortsNum() int
  40. Close()
  41. log.Logger
  42. }
  43. type BaseProxy struct {
  44. name string
  45. ctl *Control
  46. listeners []frpNet.Listener
  47. usedPortsNum int
  48. mu sync.RWMutex
  49. log.Logger
  50. }
  51. func (pxy *BaseProxy) GetName() string {
  52. return pxy.name
  53. }
  54. func (pxy *BaseProxy) GetControl() *Control {
  55. return pxy.ctl
  56. }
  57. func (pxy *BaseProxy) GetUsedPortsNum() int {
  58. return pxy.usedPortsNum
  59. }
  60. func (pxy *BaseProxy) Close() {
  61. pxy.Info("proxy closing")
  62. for _, l := range pxy.listeners {
  63. l.Close()
  64. }
  65. }
  66. func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
  67. ctl := pxy.GetControl()
  68. // try all connections from the pool
  69. for i := 0; i < ctl.poolCount+1; i++ {
  70. if workConn, err = ctl.GetWorkConn(); err != nil {
  71. pxy.Warn("failed to get work connection: %v", err)
  72. return
  73. }
  74. pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  75. workConn.AddLogPrefix(pxy.GetName())
  76. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  77. ProxyName: pxy.GetName(),
  78. })
  79. if err != nil {
  80. workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  81. workConn.Close()
  82. } else {
  83. break
  84. }
  85. }
  86. if err != nil {
  87. pxy.Error("try to get work connection failed in the end")
  88. return
  89. }
  90. return
  91. }
  92. // startListenHandler start a goroutine handler for each listener.
  93. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  94. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  95. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) {
  96. for _, listener := range pxy.listeners {
  97. go func(l frpNet.Listener) {
  98. for {
  99. // block
  100. // if listener is closed, err returned
  101. c, err := l.Accept()
  102. if err != nil {
  103. pxy.Info("listener is closed")
  104. return
  105. }
  106. pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
  107. go handler(p, c)
  108. }
  109. }(listener)
  110. }
  111. }
  112. func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
  113. basePxy := BaseProxy{
  114. name: pxyConf.GetName(),
  115. ctl: ctl,
  116. listeners: make([]frpNet.Listener, 0),
  117. Logger: log.NewPrefixLogger(ctl.runId),
  118. }
  119. switch cfg := pxyConf.(type) {
  120. case *config.TcpProxyConf:
  121. basePxy.usedPortsNum = 1
  122. pxy = &TcpProxy{
  123. BaseProxy: basePxy,
  124. cfg: cfg,
  125. }
  126. case *config.HttpProxyConf:
  127. pxy = &HttpProxy{
  128. BaseProxy: basePxy,
  129. cfg: cfg,
  130. }
  131. case *config.HttpsProxyConf:
  132. pxy = &HttpsProxy{
  133. BaseProxy: basePxy,
  134. cfg: cfg,
  135. }
  136. case *config.UdpProxyConf:
  137. basePxy.usedPortsNum = 1
  138. pxy = &UdpProxy{
  139. BaseProxy: basePxy,
  140. cfg: cfg,
  141. }
  142. case *config.StcpProxyConf:
  143. pxy = &StcpProxy{
  144. BaseProxy: basePxy,
  145. cfg: cfg,
  146. }
  147. case *config.XtcpProxyConf:
  148. pxy = &XtcpProxy{
  149. BaseProxy: basePxy,
  150. cfg: cfg,
  151. }
  152. default:
  153. return pxy, fmt.Errorf("proxy type not support")
  154. }
  155. pxy.AddLogPrefix(pxy.GetName())
  156. return
  157. }
  158. type TcpProxy struct {
  159. BaseProxy
  160. cfg *config.TcpProxyConf
  161. realPort int
  162. }
  163. func (pxy *TcpProxy) Run() (remoteAddr string, err error) {
  164. pxy.realPort, err = pxy.ctl.svr.tcpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
  165. if err != nil {
  166. return
  167. }
  168. defer func() {
  169. if err != nil {
  170. pxy.ctl.svr.tcpPortManager.Release(pxy.realPort)
  171. }
  172. }()
  173. remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
  174. pxy.cfg.RemotePort = pxy.realPort
  175. listener, errRet := frpNet.ListenTcp(config.ServerCommonCfg.ProxyBindAddr, pxy.realPort)
  176. if errRet != nil {
  177. err = errRet
  178. return
  179. }
  180. listener.AddLogPrefix(pxy.name)
  181. pxy.listeners = append(pxy.listeners, listener)
  182. pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
  183. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  184. return
  185. }
  186. func (pxy *TcpProxy) GetConf() config.ProxyConf {
  187. return pxy.cfg
  188. }
  189. func (pxy *TcpProxy) Close() {
  190. pxy.BaseProxy.Close()
  191. pxy.ctl.svr.tcpPortManager.Release(pxy.realPort)
  192. }
  193. type HttpProxy struct {
  194. BaseProxy
  195. cfg *config.HttpProxyConf
  196. closeFuncs []func()
  197. }
  198. func (pxy *HttpProxy) Run() (remoteAddr string, err error) {
  199. routeConfig := vhost.VhostRouteConfig{
  200. RewriteHost: pxy.cfg.HostHeaderRewrite,
  201. Username: pxy.cfg.HttpUser,
  202. Password: pxy.cfg.HttpPwd,
  203. CreateConnFn: pxy.GetRealConn,
  204. }
  205. locations := pxy.cfg.Locations
  206. if len(locations) == 0 {
  207. locations = []string{""}
  208. }
  209. addrs := make([]string, 0)
  210. for _, domain := range pxy.cfg.CustomDomains {
  211. routeConfig.Domain = domain
  212. for _, location := range locations {
  213. routeConfig.Location = location
  214. err = pxy.ctl.svr.httpReverseProxy.Register(routeConfig)
  215. if err != nil {
  216. return
  217. }
  218. tmpDomain := routeConfig.Domain
  219. tmpLocation := routeConfig.Location
  220. addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(config.ServerCommonCfg.VhostHttpPort)))
  221. pxy.closeFuncs = append(pxy.closeFuncs, func() {
  222. pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation)
  223. })
  224. pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
  225. }
  226. }
  227. if pxy.cfg.SubDomain != "" {
  228. routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
  229. for _, location := range locations {
  230. routeConfig.Location = location
  231. err = pxy.ctl.svr.httpReverseProxy.Register(routeConfig)
  232. if err != nil {
  233. return
  234. }
  235. tmpDomain := routeConfig.Domain
  236. tmpLocation := routeConfig.Location
  237. addrs = append(addrs, util.CanonicalAddr(tmpDomain, int(config.ServerCommonCfg.VhostHttpPort)))
  238. pxy.closeFuncs = append(pxy.closeFuncs, func() {
  239. pxy.ctl.svr.httpReverseProxy.UnRegister(tmpDomain, tmpLocation)
  240. })
  241. pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
  242. }
  243. }
  244. remoteAddr = strings.Join(addrs, ",")
  245. return
  246. }
  247. func (pxy *HttpProxy) GetConf() config.ProxyConf {
  248. return pxy.cfg
  249. }
  250. func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) {
  251. tmpConn, errRet := pxy.GetWorkConnFromPool()
  252. if errRet != nil {
  253. err = errRet
  254. return
  255. }
  256. var rwc io.ReadWriteCloser = tmpConn
  257. if pxy.cfg.UseEncryption {
  258. rwc, err = frpIo.WithEncryption(rwc, []byte(config.ServerCommonCfg.PrivilegeToken))
  259. if err != nil {
  260. pxy.Error("create encryption stream error: %v", err)
  261. return
  262. }
  263. }
  264. if pxy.cfg.UseCompression {
  265. rwc = frpIo.WithCompression(rwc)
  266. }
  267. workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
  268. workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
  269. StatsOpenConnection(pxy.GetName())
  270. return
  271. }
  272. func (pxy *HttpProxy) updateStatsAfterClosedConn(totalRead, totalWrite int64) {
  273. name := pxy.GetName()
  274. StatsCloseConnection(name)
  275. StatsAddTrafficIn(name, totalWrite)
  276. StatsAddTrafficOut(name, totalRead)
  277. }
  278. func (pxy *HttpProxy) Close() {
  279. pxy.BaseProxy.Close()
  280. for _, closeFn := range pxy.closeFuncs {
  281. closeFn()
  282. }
  283. }
  284. type HttpsProxy struct {
  285. BaseProxy
  286. cfg *config.HttpsProxyConf
  287. }
  288. func (pxy *HttpsProxy) Run() (remoteAddr string, err error) {
  289. routeConfig := &vhost.VhostRouteConfig{}
  290. addrs := make([]string, 0)
  291. for _, domain := range pxy.cfg.CustomDomains {
  292. routeConfig.Domain = domain
  293. l, errRet := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
  294. if errRet != nil {
  295. err = errRet
  296. return
  297. }
  298. l.AddLogPrefix(pxy.name)
  299. pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
  300. pxy.listeners = append(pxy.listeners, l)
  301. addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(config.ServerCommonCfg.VhostHttpsPort)))
  302. }
  303. if pxy.cfg.SubDomain != "" {
  304. routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
  305. l, errRet := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
  306. if errRet != nil {
  307. err = errRet
  308. return
  309. }
  310. l.AddLogPrefix(pxy.name)
  311. pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
  312. pxy.listeners = append(pxy.listeners, l)
  313. addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(config.ServerCommonCfg.VhostHttpsPort)))
  314. }
  315. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  316. remoteAddr = strings.Join(addrs, ",")
  317. return
  318. }
  319. func (pxy *HttpsProxy) GetConf() config.ProxyConf {
  320. return pxy.cfg
  321. }
  322. func (pxy *HttpsProxy) Close() {
  323. pxy.BaseProxy.Close()
  324. }
  325. type StcpProxy struct {
  326. BaseProxy
  327. cfg *config.StcpProxyConf
  328. }
  329. func (pxy *StcpProxy) Run() (remoteAddr string, err error) {
  330. listener, errRet := pxy.ctl.svr.visitorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
  331. if errRet != nil {
  332. err = errRet
  333. return
  334. }
  335. listener.AddLogPrefix(pxy.name)
  336. pxy.listeners = append(pxy.listeners, listener)
  337. pxy.Info("stcp proxy custom listen success")
  338. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  339. return
  340. }
  341. func (pxy *StcpProxy) GetConf() config.ProxyConf {
  342. return pxy.cfg
  343. }
  344. func (pxy *StcpProxy) Close() {
  345. pxy.BaseProxy.Close()
  346. pxy.ctl.svr.visitorManager.CloseListener(pxy.GetName())
  347. }
  348. type XtcpProxy struct {
  349. BaseProxy
  350. cfg *config.XtcpProxyConf
  351. closeCh chan struct{}
  352. }
  353. func (pxy *XtcpProxy) Run() (remoteAddr string, err error) {
  354. if pxy.ctl.svr.natHoleController == nil {
  355. pxy.Error("udp port for xtcp is not specified.")
  356. err = fmt.Errorf("xtcp is not supported in frps")
  357. return
  358. }
  359. sidCh := pxy.ctl.svr.natHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk)
  360. go func() {
  361. for {
  362. select {
  363. case <-pxy.closeCh:
  364. break
  365. case sid := <-sidCh:
  366. workConn, errRet := pxy.GetWorkConnFromPool()
  367. if errRet != nil {
  368. continue
  369. }
  370. m := &msg.NatHoleSid{
  371. Sid: sid,
  372. }
  373. errRet = msg.WriteMsg(workConn, m)
  374. if errRet != nil {
  375. pxy.Warn("write nat hole sid package error, %v", errRet)
  376. }
  377. }
  378. }
  379. }()
  380. return
  381. }
  382. func (pxy *XtcpProxy) GetConf() config.ProxyConf {
  383. return pxy.cfg
  384. }
  385. func (pxy *XtcpProxy) Close() {
  386. pxy.BaseProxy.Close()
  387. pxy.ctl.svr.natHoleController.CloseClient(pxy.GetName())
  388. errors.PanicToError(func() {
  389. close(pxy.closeCh)
  390. })
  391. }
  392. type UdpProxy struct {
  393. BaseProxy
  394. cfg *config.UdpProxyConf
  395. realPort int
  396. // udpConn is the listener of udp packages
  397. udpConn *net.UDPConn
  398. // there are always only one workConn at the same time
  399. // get another one if it closed
  400. workConn net.Conn
  401. // sendCh is used for sending packages to workConn
  402. sendCh chan *msg.UdpPacket
  403. // readCh is used for reading packages from workConn
  404. readCh chan *msg.UdpPacket
  405. // checkCloseCh is used for watching if workConn is closed
  406. checkCloseCh chan int
  407. isClosed bool
  408. }
  409. func (pxy *UdpProxy) Run() (remoteAddr string, err error) {
  410. pxy.realPort, err = pxy.ctl.svr.udpPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
  411. if err != nil {
  412. return
  413. }
  414. defer func() {
  415. if err != nil {
  416. pxy.ctl.svr.udpPortManager.Release(pxy.realPort)
  417. }
  418. }()
  419. remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
  420. pxy.cfg.RemotePort = pxy.realPort
  421. addr, errRet := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", config.ServerCommonCfg.ProxyBindAddr, pxy.realPort))
  422. if errRet != nil {
  423. err = errRet
  424. return
  425. }
  426. udpConn, errRet := net.ListenUDP("udp", addr)
  427. if errRet != nil {
  428. err = errRet
  429. pxy.Warn("listen udp port error: %v", err)
  430. return
  431. }
  432. pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
  433. pxy.udpConn = udpConn
  434. pxy.sendCh = make(chan *msg.UdpPacket, 1024)
  435. pxy.readCh = make(chan *msg.UdpPacket, 1024)
  436. pxy.checkCloseCh = make(chan int)
  437. // read message from workConn, if it returns any error, notify proxy to start a new workConn
  438. workConnReaderFn := func(conn net.Conn) {
  439. for {
  440. var (
  441. rawMsg msg.Message
  442. errRet error
  443. )
  444. pxy.Trace("loop waiting message from udp workConn")
  445. // client will send heartbeat in workConn for keeping alive
  446. conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
  447. if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
  448. pxy.Warn("read from workConn for udp error: %v", errRet)
  449. conn.Close()
  450. // notify proxy to start a new work connection
  451. // ignore error here, it means the proxy is closed
  452. errors.PanicToError(func() {
  453. pxy.checkCloseCh <- 1
  454. })
  455. return
  456. }
  457. conn.SetReadDeadline(time.Time{})
  458. switch m := rawMsg.(type) {
  459. case *msg.Ping:
  460. pxy.Trace("udp work conn get ping message")
  461. continue
  462. case *msg.UdpPacket:
  463. if errRet := errors.PanicToError(func() {
  464. pxy.Trace("get udp message from workConn: %s", m.Content)
  465. pxy.readCh <- m
  466. StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content)))
  467. }); errRet != nil {
  468. conn.Close()
  469. pxy.Info("reader goroutine for udp work connection closed")
  470. return
  471. }
  472. }
  473. }
  474. }
  475. // send message to workConn
  476. workConnSenderFn := func(conn net.Conn, ctx context.Context) {
  477. var errRet error
  478. for {
  479. select {
  480. case udpMsg, ok := <-pxy.sendCh:
  481. if !ok {
  482. pxy.Info("sender goroutine for udp work connection closed")
  483. return
  484. }
  485. if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
  486. pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
  487. conn.Close()
  488. return
  489. } else {
  490. pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
  491. StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content)))
  492. continue
  493. }
  494. case <-ctx.Done():
  495. pxy.Info("sender goroutine for udp work connection closed")
  496. return
  497. }
  498. }
  499. }
  500. go func() {
  501. // Sleep a while for waiting control send the NewProxyResp to client.
  502. time.Sleep(500 * time.Millisecond)
  503. for {
  504. workConn, err := pxy.GetWorkConnFromPool()
  505. if err != nil {
  506. time.Sleep(1 * time.Second)
  507. // check if proxy is closed
  508. select {
  509. case _, ok := <-pxy.checkCloseCh:
  510. if !ok {
  511. return
  512. }
  513. default:
  514. }
  515. continue
  516. }
  517. // close the old workConn and replac it with a new one
  518. if pxy.workConn != nil {
  519. pxy.workConn.Close()
  520. }
  521. pxy.workConn = workConn
  522. ctx, cancel := context.WithCancel(context.Background())
  523. go workConnReaderFn(workConn)
  524. go workConnSenderFn(workConn, ctx)
  525. _, ok := <-pxy.checkCloseCh
  526. cancel()
  527. if !ok {
  528. return
  529. }
  530. }
  531. }()
  532. // Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
  533. // Client will transfor udp message to local udp service and waiting for response for a while.
  534. // Response will be wrapped to be forwarded by work connection to server.
  535. // Close readCh and sendCh at the end.
  536. go func() {
  537. udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
  538. pxy.Close()
  539. }()
  540. return remoteAddr, nil
  541. }
  542. func (pxy *UdpProxy) GetConf() config.ProxyConf {
  543. return pxy.cfg
  544. }
  545. func (pxy *UdpProxy) Close() {
  546. pxy.mu.Lock()
  547. defer pxy.mu.Unlock()
  548. if !pxy.isClosed {
  549. pxy.isClosed = true
  550. pxy.BaseProxy.Close()
  551. if pxy.workConn != nil {
  552. pxy.workConn.Close()
  553. }
  554. pxy.udpConn.Close()
  555. // all channels only closed here
  556. close(pxy.checkCloseCh)
  557. close(pxy.readCh)
  558. close(pxy.sendCh)
  559. }
  560. pxy.ctl.svr.udpPortManager.Release(pxy.realPort)
  561. }
  562. // HandleUserTcpConnection is used for incoming tcp user connections.
  563. // It can be used for tcp, http, https type.
  564. func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
  565. defer userConn.Close()
  566. // try all connections from the pool
  567. workConn, err := pxy.GetWorkConnFromPool()
  568. if err != nil {
  569. return
  570. }
  571. defer workConn.Close()
  572. var local io.ReadWriteCloser = workConn
  573. cfg := pxy.GetConf().GetBaseInfo()
  574. if cfg.UseEncryption {
  575. local, err = frpIo.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
  576. if err != nil {
  577. pxy.Error("create encryption stream error: %v", err)
  578. return
  579. }
  580. }
  581. if cfg.UseCompression {
  582. local = frpIo.WithCompression(local)
  583. }
  584. pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  585. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  586. StatsOpenConnection(pxy.GetName())
  587. inCount, outCount := frpIo.Join(local, userConn)
  588. StatsCloseConnection(pxy.GetName())
  589. StatsAddTrafficIn(pxy.GetName(), inCount)
  590. StatsAddTrafficOut(pxy.GetName(), outCount)
  591. pxy.Debug("join connections closed")
  592. }