1
0

task.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. // Copyright 2014 beego Author. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package toolbox
  15. import (
  16. "log"
  17. "math"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. // bounds provides a range of acceptable values (plus a map of name to value).
  24. type bounds struct {
  25. min, max uint
  26. names map[string]uint
  27. }
  28. // The bounds for each field.
  29. var (
  30. AdminTaskList map[string]Tasker
  31. stop chan bool
  32. changed chan bool
  33. isstart bool
  34. seconds = bounds{0, 59, nil}
  35. minutes = bounds{0, 59, nil}
  36. hours = bounds{0, 23, nil}
  37. days = bounds{1, 31, nil}
  38. months = bounds{1, 12, map[string]uint{
  39. "jan": 1,
  40. "feb": 2,
  41. "mar": 3,
  42. "apr": 4,
  43. "may": 5,
  44. "jun": 6,
  45. "jul": 7,
  46. "aug": 8,
  47. "sep": 9,
  48. "oct": 10,
  49. "nov": 11,
  50. "dec": 12,
  51. }}
  52. weeks = bounds{0, 6, map[string]uint{
  53. "sun": 0,
  54. "mon": 1,
  55. "tue": 2,
  56. "wed": 3,
  57. "thu": 4,
  58. "fri": 5,
  59. "sat": 6,
  60. }}
  61. )
  62. const (
  63. // Set the top bit if a star was included in the expression.
  64. starBit = 1 << 63
  65. )
  66. // Schedule time taks schedule
  67. type Schedule struct {
  68. Second uint64
  69. Minute uint64
  70. Hour uint64
  71. Day uint64
  72. Month uint64
  73. Week uint64
  74. }
  75. // TaskFunc task func type
  76. type TaskFunc func() error
  77. // Tasker task interface
  78. type Tasker interface {
  79. GetSpec() string
  80. GetStatus() string
  81. Run() error
  82. SetNext(time.Time)
  83. GetNext() time.Time
  84. SetPrev(time.Time)
  85. GetPrev() time.Time
  86. }
  87. // task error
  88. type taskerr struct {
  89. t time.Time
  90. errinfo string
  91. }
  92. // Task task struct
  93. type Task struct {
  94. Taskname string
  95. Spec *Schedule
  96. SpecStr string
  97. DoFunc TaskFunc
  98. Prev time.Time
  99. Next time.Time
  100. Errlist []*taskerr // like errtime:errinfo
  101. ErrLimit int // max length for the errlist, 0 stand for no limit
  102. }
  103. // NewTask add new task with name, time and func
  104. func NewTask(tname string, spec string, f TaskFunc) *Task {
  105. task := &Task{
  106. Taskname: tname,
  107. DoFunc: f,
  108. ErrLimit: 100,
  109. SpecStr: spec,
  110. }
  111. task.SetCron(spec)
  112. return task
  113. }
  114. // GetSpec get spec string
  115. func (t *Task) GetSpec() string {
  116. return t.SpecStr
  117. }
  118. // GetStatus get current task status
  119. func (t *Task) GetStatus() string {
  120. var str string
  121. for _, v := range t.Errlist {
  122. str += v.t.String() + ":" + v.errinfo + "<br>"
  123. }
  124. return str
  125. }
  126. // Run run all tasks
  127. func (t *Task) Run() error {
  128. err := t.DoFunc()
  129. if err != nil {
  130. if t.ErrLimit > 0 && t.ErrLimit > len(t.Errlist) {
  131. t.Errlist = append(t.Errlist, &taskerr{t: t.Next, errinfo: err.Error()})
  132. }
  133. }
  134. return err
  135. }
  136. // SetNext set next time for this task
  137. func (t *Task) SetNext(now time.Time) {
  138. t.Next = t.Spec.Next(now)
  139. }
  140. // GetNext get the next call time of this task
  141. func (t *Task) GetNext() time.Time {
  142. return t.Next
  143. }
  144. // SetPrev set prev time of this task
  145. func (t *Task) SetPrev(now time.Time) {
  146. t.Prev = now
  147. }
  148. // GetPrev get prev time of this task
  149. func (t *Task) GetPrev() time.Time {
  150. return t.Prev
  151. }
  152. // six columns mean:
  153. // second:0-59
  154. // minute:0-59
  155. // hour:1-23
  156. // day:1-31
  157. // month:1-12
  158. // week:0-6(0 means Sunday)
  159. // SetCron some signals:
  160. // *: any time
  161. // ,:  separate signal
  162. //   -:duration
  163. // /n : do as n times of time duration
  164. /////////////////////////////////////////////////////////
  165. // 0/30 * * * * * every 30s
  166. // 0 43 21 * * * 21:43
  167. // 0 15 05 * * *    05:15
  168. // 0 0 17 * * * 17:00
  169. // 0 0 17 * * 1 17:00 in every Monday
  170. // 0 0,10 17 * * 0,2,3 17:00 and 17:10 in every Sunday, Tuesday and Wednesday
  171. // 0 0-10 17 1 * * 17:00 to 17:10 in 1 min duration each time on the first day of month
  172. // 0 0 0 1,15 * 1 0:00 on the 1st day and 15th day of month
  173. // 0 42 4 1 * *     4:42 on the 1st day of month
  174. // 0 0 21 * * 1-6   21:00 from Monday to Saturday
  175. // 0 0,10,20,30,40,50 * * * *  every 10 min duration
  176. // 0 */10 * * * *        every 10 min duration
  177. // 0 * 1 * * *         1:00 to 1:59 in 1 min duration each time
  178. // 0 0 1 * * *         1:00
  179. // 0 0 */1 * * *        0 min of hour in 1 hour duration
  180. // 0 0 * * * *         0 min of hour in 1 hour duration
  181. // 0 2 8-20/3 * * *       8:02, 11:02, 14:02, 17:02, 20:02
  182. // 0 30 5 1,15 * *       5:30 on the 1st day and 15th day of month
  183. func (t *Task) SetCron(spec string) {
  184. t.Spec = t.parse(spec)
  185. }
  186. func (t *Task) parse(spec string) *Schedule {
  187. if len(spec) > 0 && spec[0] == '@' {
  188. return t.parseSpec(spec)
  189. }
  190. // Split on whitespace. We require 5 or 6 fields.
  191. // (second) (minute) (hour) (day of month) (month) (day of week, optional)
  192. fields := strings.Fields(spec)
  193. if len(fields) != 5 && len(fields) != 6 {
  194. log.Panicf("Expected 5 or 6 fields, found %d: %s", len(fields), spec)
  195. }
  196. // If a sixth field is not provided (DayOfWeek), then it is equivalent to star.
  197. if len(fields) == 5 {
  198. fields = append(fields, "*")
  199. }
  200. schedule := &Schedule{
  201. Second: getField(fields[0], seconds),
  202. Minute: getField(fields[1], minutes),
  203. Hour: getField(fields[2], hours),
  204. Day: getField(fields[3], days),
  205. Month: getField(fields[4], months),
  206. Week: getField(fields[5], weeks),
  207. }
  208. return schedule
  209. }
  210. func (t *Task) parseSpec(spec string) *Schedule {
  211. switch spec {
  212. case "@yearly", "@annually":
  213. return &Schedule{
  214. Second: 1 << seconds.min,
  215. Minute: 1 << minutes.min,
  216. Hour: 1 << hours.min,
  217. Day: 1 << days.min,
  218. Month: 1 << months.min,
  219. Week: all(weeks),
  220. }
  221. case "@monthly":
  222. return &Schedule{
  223. Second: 1 << seconds.min,
  224. Minute: 1 << minutes.min,
  225. Hour: 1 << hours.min,
  226. Day: 1 << days.min,
  227. Month: all(months),
  228. Week: all(weeks),
  229. }
  230. case "@weekly":
  231. return &Schedule{
  232. Second: 1 << seconds.min,
  233. Minute: 1 << minutes.min,
  234. Hour: 1 << hours.min,
  235. Day: all(days),
  236. Month: all(months),
  237. Week: 1 << weeks.min,
  238. }
  239. case "@daily", "@midnight":
  240. return &Schedule{
  241. Second: 1 << seconds.min,
  242. Minute: 1 << minutes.min,
  243. Hour: 1 << hours.min,
  244. Day: all(days),
  245. Month: all(months),
  246. Week: all(weeks),
  247. }
  248. case "@hourly":
  249. return &Schedule{
  250. Second: 1 << seconds.min,
  251. Minute: 1 << minutes.min,
  252. Hour: all(hours),
  253. Day: all(days),
  254. Month: all(months),
  255. Week: all(weeks),
  256. }
  257. }
  258. log.Panicf("Unrecognized descriptor: %s", spec)
  259. return nil
  260. }
  261. // Next set schedule to next time
  262. func (s *Schedule) Next(t time.Time) time.Time {
  263. // Start at the earliest possible time (the upcoming second).
  264. t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)
  265. // This flag indicates whether a field has been incremented.
  266. added := false
  267. // If no time is found within five years, return zero.
  268. yearLimit := t.Year() + 5
  269. WRAP:
  270. if t.Year() > yearLimit {
  271. return time.Time{}
  272. }
  273. // Find the first applicable month.
  274. // If it's this month, then do nothing.
  275. for 1<<uint(t.Month())&s.Month == 0 {
  276. // If we have to add a month, reset the other parts to 0.
  277. if !added {
  278. added = true
  279. // Otherwise, set the date at the beginning (since the current time is irrelevant).
  280. t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location())
  281. }
  282. t = t.AddDate(0, 1, 0)
  283. // Wrapped around.
  284. if t.Month() == time.January {
  285. goto WRAP
  286. }
  287. }
  288. // Now get a day in that month.
  289. for !dayMatches(s, t) {
  290. if !added {
  291. added = true
  292. t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
  293. }
  294. t = t.AddDate(0, 0, 1)
  295. if t.Day() == 1 {
  296. goto WRAP
  297. }
  298. }
  299. for 1<<uint(t.Hour())&s.Hour == 0 {
  300. if !added {
  301. added = true
  302. t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
  303. }
  304. t = t.Add(1 * time.Hour)
  305. if t.Hour() == 0 {
  306. goto WRAP
  307. }
  308. }
  309. for 1<<uint(t.Minute())&s.Minute == 0 {
  310. if !added {
  311. added = true
  312. t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
  313. }
  314. t = t.Add(1 * time.Minute)
  315. if t.Minute() == 0 {
  316. goto WRAP
  317. }
  318. }
  319. for 1<<uint(t.Second())&s.Second == 0 {
  320. if !added {
  321. added = true
  322. t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), 0, t.Location())
  323. }
  324. t = t.Add(1 * time.Second)
  325. if t.Second() == 0 {
  326. goto WRAP
  327. }
  328. }
  329. return t
  330. }
  331. func dayMatches(s *Schedule, t time.Time) bool {
  332. var (
  333. domMatch = 1<<uint(t.Day())&s.Day > 0
  334. dowMatch = 1<<uint(t.Weekday())&s.Week > 0
  335. )
  336. if s.Day&starBit > 0 || s.Week&starBit > 0 {
  337. return domMatch && dowMatch
  338. }
  339. return domMatch || dowMatch
  340. }
  341. // StartTask start all tasks
  342. func StartTask() {
  343. if isstart {
  344. //If already started, no need to start another goroutine.
  345. return
  346. }
  347. isstart = true
  348. go run()
  349. }
  350. func run() {
  351. now := time.Now().Local()
  352. for _, t := range AdminTaskList {
  353. t.SetNext(now)
  354. }
  355. for {
  356. sortList := NewMapSorter(AdminTaskList)
  357. sortList.Sort()
  358. var effective time.Time
  359. if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext().IsZero() {
  360. // If there are no entries yet, just sleep - it still handles new entries
  361. // and stop requests.
  362. effective = now.AddDate(10, 0, 0)
  363. } else {
  364. effective = sortList.Vals[0].GetNext()
  365. }
  366. select {
  367. case now = <-time.After(effective.Sub(now)):
  368. // Run every entry whose next time was this effective time.
  369. for _, e := range sortList.Vals {
  370. if e.GetNext() != effective {
  371. break
  372. }
  373. go e.Run()
  374. e.SetPrev(e.GetNext())
  375. e.SetNext(effective)
  376. }
  377. continue
  378. case <-changed:
  379. continue
  380. case <-stop:
  381. return
  382. }
  383. }
  384. }
  385. // StopTask stop all tasks
  386. func StopTask() {
  387. if isstart {
  388. isstart = false
  389. stop <- true
  390. }
  391. }
  392. // AddTask add task with name
  393. func AddTask(taskname string, t Tasker) {
  394. AdminTaskList[taskname] = t
  395. if isstart {
  396. changed <- true
  397. }
  398. }
  399. // DeleteTask delete task with name
  400. func DeleteTask(taskname string) {
  401. delete(AdminTaskList, taskname)
  402. if isstart {
  403. changed <- true
  404. }
  405. }
  406. // MapSorter sort map for tasker
  407. type MapSorter struct {
  408. Keys []string
  409. Vals []Tasker
  410. }
  411. // NewMapSorter create new tasker map
  412. func NewMapSorter(m map[string]Tasker) *MapSorter {
  413. ms := &MapSorter{
  414. Keys: make([]string, 0, len(m)),
  415. Vals: make([]Tasker, 0, len(m)),
  416. }
  417. for k, v := range m {
  418. ms.Keys = append(ms.Keys, k)
  419. ms.Vals = append(ms.Vals, v)
  420. }
  421. return ms
  422. }
  423. // Sort sort tasker map
  424. func (ms *MapSorter) Sort() {
  425. sort.Sort(ms)
  426. }
  427. func (ms *MapSorter) Len() int { return len(ms.Keys) }
  428. func (ms *MapSorter) Less(i, j int) bool {
  429. if ms.Vals[i].GetNext().IsZero() {
  430. return false
  431. }
  432. if ms.Vals[j].GetNext().IsZero() {
  433. return true
  434. }
  435. return ms.Vals[i].GetNext().Before(ms.Vals[j].GetNext())
  436. }
  437. func (ms *MapSorter) Swap(i, j int) {
  438. ms.Vals[i], ms.Vals[j] = ms.Vals[j], ms.Vals[i]
  439. ms.Keys[i], ms.Keys[j] = ms.Keys[j], ms.Keys[i]
  440. }
  441. func getField(field string, r bounds) uint64 {
  442. // list = range {"," range}
  443. var bits uint64
  444. ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })
  445. for _, expr := range ranges {
  446. bits |= getRange(expr, r)
  447. }
  448. return bits
  449. }
  450. // getRange returns the bits indicated by the given expression:
  451. // number | number "-" number [ "/" number ]
  452. func getRange(expr string, r bounds) uint64 {
  453. var (
  454. start, end, step uint
  455. rangeAndStep = strings.Split(expr, "/")
  456. lowAndHigh = strings.Split(rangeAndStep[0], "-")
  457. singleDigit = len(lowAndHigh) == 1
  458. )
  459. var extrastar uint64
  460. if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" {
  461. start = r.min
  462. end = r.max
  463. extrastar = starBit
  464. } else {
  465. start = parseIntOrName(lowAndHigh[0], r.names)
  466. switch len(lowAndHigh) {
  467. case 1:
  468. end = start
  469. case 2:
  470. end = parseIntOrName(lowAndHigh[1], r.names)
  471. default:
  472. log.Panicf("Too many hyphens: %s", expr)
  473. }
  474. }
  475. switch len(rangeAndStep) {
  476. case 1:
  477. step = 1
  478. case 2:
  479. step = mustParseInt(rangeAndStep[1])
  480. // Special handling: "N/step" means "N-max/step".
  481. if singleDigit {
  482. end = r.max
  483. }
  484. default:
  485. log.Panicf("Too many slashes: %s", expr)
  486. }
  487. if start < r.min {
  488. log.Panicf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr)
  489. }
  490. if end > r.max {
  491. log.Panicf("End of range (%d) above maximum (%d): %s", end, r.max, expr)
  492. }
  493. if start > end {
  494. log.Panicf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr)
  495. }
  496. return getBits(start, end, step) | extrastar
  497. }
  498. // parseIntOrName returns the (possibly-named) integer contained in expr.
  499. func parseIntOrName(expr string, names map[string]uint) uint {
  500. if names != nil {
  501. if namedInt, ok := names[strings.ToLower(expr)]; ok {
  502. return namedInt
  503. }
  504. }
  505. return mustParseInt(expr)
  506. }
  507. // mustParseInt parses the given expression as an int or panics.
  508. func mustParseInt(expr string) uint {
  509. num, err := strconv.Atoi(expr)
  510. if err != nil {
  511. log.Panicf("Failed to parse int from %s: %s", expr, err)
  512. }
  513. if num < 0 {
  514. log.Panicf("Negative number (%d) not allowed: %s", num, expr)
  515. }
  516. return uint(num)
  517. }
  518. // getBits sets all bits in the range [min, max], modulo the given step size.
  519. func getBits(min, max, step uint) uint64 {
  520. var bits uint64
  521. // If step is 1, use shifts.
  522. if step == 1 {
  523. return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min)
  524. }
  525. // Else, use a simple loop.
  526. for i := min; i <= max; i += step {
  527. bits |= 1 << i
  528. }
  529. return bits
  530. }
  531. // all returns all bits within the given bounds. (plus the star bit)
  532. func all(r bounds) uint64 {
  533. return getBits(r.min, r.max, 1) | starBit
  534. }
  535. func init() {
  536. AdminTaskList = make(map[string]Tasker)
  537. stop = make(chan bool)
  538. changed = make(chan bool)
  539. }