alils.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package alils
  2. import (
  3. "encoding/json"
  4. "github.com/astaxie/beego/logs"
  5. "github.com/gogo/protobuf/proto"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. CacheSize int = 64
  12. Delimiter string = "##"
  13. )
  14. type AliLSConfig struct {
  15. Project string `json:"project"`
  16. Endpoint string `json:"endpoint"`
  17. KeyID string `json:"key_id"`
  18. KeySecret string `json:"key_secret"`
  19. LogStore string `json:"log_store"`
  20. Topics []string `json:"topics"`
  21. Source string `json:"source"`
  22. Level int `json:"level"`
  23. FlushWhen int `json:"flush_when"`
  24. }
  25. // aliLSWriter implements LoggerInterface.
  26. // it writes messages in keep-live tcp connection.
  27. type aliLSWriter struct {
  28. store *LogStore
  29. group []*LogGroup
  30. withMap bool
  31. groupMap map[string]*LogGroup
  32. lock *sync.Mutex
  33. AliLSConfig
  34. }
  35. // 创建提供Logger接口的日志服务
  36. func NewAliLS() logs.Logger {
  37. alils := new(aliLSWriter)
  38. alils.Level = logs.LevelTrace
  39. return alils
  40. }
  41. // 读取配置
  42. // 初始化必要的数据结构
  43. func (c *aliLSWriter) Init(jsonConfig string) (err error) {
  44. json.Unmarshal([]byte(jsonConfig), c)
  45. if c.FlushWhen > CacheSize {
  46. c.FlushWhen = CacheSize
  47. }
  48. // 初始化Project
  49. prj := &LogProject{
  50. Name: c.Project,
  51. Endpoint: c.Endpoint,
  52. AccessKeyId: c.KeyID,
  53. AccessKeySecret: c.KeySecret,
  54. }
  55. // 获取logstore
  56. c.store, err = prj.GetLogStore(c.LogStore)
  57. if err != nil {
  58. return err
  59. }
  60. // 创建默认Log Group
  61. c.group = append(c.group, &LogGroup{
  62. Topic: proto.String(""),
  63. Source: proto.String(c.Source),
  64. Logs: make([]*Log, 0, c.FlushWhen),
  65. })
  66. // 创建其它Log Group
  67. c.groupMap = make(map[string]*LogGroup)
  68. for _, topic := range c.Topics {
  69. lg := &LogGroup{
  70. Topic: proto.String(topic),
  71. Source: proto.String(c.Source),
  72. Logs: make([]*Log, 0, c.FlushWhen),
  73. }
  74. c.group = append(c.group, lg)
  75. c.groupMap[topic] = lg
  76. }
  77. if len(c.group) == 1 {
  78. c.withMap = false
  79. } else {
  80. c.withMap = true
  81. }
  82. c.lock = &sync.Mutex{}
  83. return nil
  84. }
  85. // WriteMsg write message in connection.
  86. // if connection is down, try to re-connect.
  87. func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int) (err error) {
  88. if level > c.Level {
  89. return nil
  90. }
  91. var topic string
  92. var content string
  93. var lg *LogGroup
  94. if c.withMap {
  95. // 解析出Topic,并匹配LogGroup
  96. strs := strings.SplitN(msg, Delimiter, 2)
  97. if len(strs) == 2 {
  98. pos := strings.LastIndex(strs[0], " ")
  99. topic = strs[0][pos+1 : len(strs[0])]
  100. content = strs[0][0:pos] + strs[1]
  101. lg = c.groupMap[topic]
  102. }
  103. // 默认发到空Topic
  104. if lg == nil {
  105. topic = ""
  106. content = msg
  107. lg = c.group[0]
  108. }
  109. } else {
  110. topic = ""
  111. content = msg
  112. lg = c.group[0]
  113. }
  114. // 生成日志
  115. c1 := &Log_Content{
  116. Key: proto.String("msg"),
  117. Value: proto.String(content),
  118. }
  119. l := &Log{
  120. Time: proto.Uint32(uint32(when.Unix())), // 填写日志时间
  121. Contents: []*Log_Content{
  122. c1,
  123. },
  124. }
  125. c.lock.Lock()
  126. lg.Logs = append(lg.Logs, l)
  127. c.lock.Unlock()
  128. // 满足条件则Flush
  129. if len(lg.Logs) >= c.FlushWhen {
  130. c.flush(lg)
  131. }
  132. return nil
  133. }
  134. // Flush implementing method. empty.
  135. func (c *aliLSWriter) Flush() {
  136. // flush所有group
  137. for _, lg := range c.group {
  138. c.flush(lg)
  139. }
  140. }
  141. // Destroy destroy connection writer and close tcp listener.
  142. func (c *aliLSWriter) Destroy() {
  143. }
  144. func (c *aliLSWriter) flush(lg *LogGroup) {
  145. c.lock.Lock()
  146. defer c.lock.Unlock()
  147. // 把以上的LogGroup推送到SLS服务器,
  148. // SLS服务器会根据该logstore的shard个数自动进行负载均衡。
  149. err := c.store.PutLogs(lg)
  150. if err != nil {
  151. return
  152. }
  153. lg.Logs = make([]*Log, 0, c.FlushWhen)
  154. }
  155. func init() {
  156. logs.Register(logs.AdapterAliLS, NewAliLS)
  157. }