123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package alils
- import (
- "encoding/json"
- "github.com/astaxie/beego/logs"
- "github.com/gogo/protobuf/proto"
- "strings"
- "sync"
- "time"
- )
- const (
- CacheSize int = 64
- Delimiter string = "##"
- )
- type AliLSConfig struct {
- Project string `json:"project"`
- Endpoint string `json:"endpoint"`
- KeyID string `json:"key_id"`
- KeySecret string `json:"key_secret"`
- LogStore string `json:"log_store"`
- Topics []string `json:"topics"`
- Source string `json:"source"`
- Level int `json:"level"`
- FlushWhen int `json:"flush_when"`
- }
- // aliLSWriter implements LoggerInterface.
- // it writes messages in keep-live tcp connection.
- type aliLSWriter struct {
- store *LogStore
- group []*LogGroup
- withMap bool
- groupMap map[string]*LogGroup
- lock *sync.Mutex
- AliLSConfig
- }
- // 创建提供Logger接口的日志服务
- func NewAliLS() logs.Logger {
- alils := new(aliLSWriter)
- alils.Level = logs.LevelTrace
- return alils
- }
- // 读取配置
- // 初始化必要的数据结构
- func (c *aliLSWriter) Init(jsonConfig string) (err error) {
- json.Unmarshal([]byte(jsonConfig), c)
- if c.FlushWhen > CacheSize {
- c.FlushWhen = CacheSize
- }
- // 初始化Project
- prj := &LogProject{
- Name: c.Project,
- Endpoint: c.Endpoint,
- AccessKeyId: c.KeyID,
- AccessKeySecret: c.KeySecret,
- }
- // 获取logstore
- c.store, err = prj.GetLogStore(c.LogStore)
- if err != nil {
- return err
- }
- // 创建默认Log Group
- c.group = append(c.group, &LogGroup{
- Topic: proto.String(""),
- Source: proto.String(c.Source),
- Logs: make([]*Log, 0, c.FlushWhen),
- })
- // 创建其它Log Group
- c.groupMap = make(map[string]*LogGroup)
- for _, topic := range c.Topics {
- lg := &LogGroup{
- Topic: proto.String(topic),
- Source: proto.String(c.Source),
- Logs: make([]*Log, 0, c.FlushWhen),
- }
- c.group = append(c.group, lg)
- c.groupMap[topic] = lg
- }
- if len(c.group) == 1 {
- c.withMap = false
- } else {
- c.withMap = true
- }
- c.lock = &sync.Mutex{}
- return nil
- }
- // WriteMsg write message in connection.
- // if connection is down, try to re-connect.
- func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int) (err error) {
- if level > c.Level {
- return nil
- }
- var topic string
- var content string
- var lg *LogGroup
- if c.withMap {
- // 解析出Topic,并匹配LogGroup
- strs := strings.SplitN(msg, Delimiter, 2)
- if len(strs) == 2 {
- pos := strings.LastIndex(strs[0], " ")
- topic = strs[0][pos+1 : len(strs[0])]
- content = strs[0][0:pos] + strs[1]
- lg = c.groupMap[topic]
- }
- // 默认发到空Topic
- if lg == nil {
- topic = ""
- content = msg
- lg = c.group[0]
- }
- } else {
- topic = ""
- content = msg
- lg = c.group[0]
- }
- // 生成日志
- c1 := &Log_Content{
- Key: proto.String("msg"),
- Value: proto.String(content),
- }
- l := &Log{
- Time: proto.Uint32(uint32(when.Unix())), // 填写日志时间
- Contents: []*Log_Content{
- c1,
- },
- }
- c.lock.Lock()
- lg.Logs = append(lg.Logs, l)
- c.lock.Unlock()
- // 满足条件则Flush
- if len(lg.Logs) >= c.FlushWhen {
- c.flush(lg)
- }
- return nil
- }
- // Flush implementing method. empty.
- func (c *aliLSWriter) Flush() {
- // flush所有group
- for _, lg := range c.group {
- c.flush(lg)
- }
- }
- // Destroy destroy connection writer and close tcp listener.
- func (c *aliLSWriter) Destroy() {
- }
- func (c *aliLSWriter) flush(lg *LogGroup) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // 把以上的LogGroup推送到SLS服务器,
- // SLS服务器会根据该logstore的shard个数自动进行负载均衡。
- err := c.store.PutLogs(lg)
- if err != nil {
- return
- }
- lg.Logs = make([]*Log, 0, c.FlushWhen)
- }
- func init() {
- logs.Register(logs.AdapterAliLS, NewAliLS)
- }
|