Move to i_s / p_s and add user info (not quite complete)
[pstop.git] / i_s / processlist.go
1 // i_s - library routines for pstop.
2 //
3 // This file contains the library routines for managing the
4 // table_io_waits_by_table table.
5 package i_s
6
7 import (
8         "database/sql"
9         "fmt"
10         "github.com/sjmudd/pstop/lib"
11         "github.com/sjmudd/pstop/p_s"
12         "regexp"
13         "strings"
14         "time"
15 )
16
17 type map_string_string map[string]string
18
19 // a table of rows
20 type Processlist struct {
21         p_s.RelativeStats
22         p_s.InitialTime
23         current processlist_rows // processlist
24         results pl_by_user_rows  // results by user
25         totals  pl_by_user_row   // totals of results
26 }
27
28 // Collect() collects data from the db, updating initial
29 // values if needed, and then subtracting initial values if we want
30 // relative values, after which it stores totals.
31 func (t *Processlist) Collect(dbh *sql.DB) {
32         lib.Logger.Println("Processlist.Collect() - starting collection of data")
33         start := time.Now()
34
35         t.current = select_processlist(dbh)
36         lib.Logger.Println("t.current collected", len(t.current), "row(s) from SELECT")
37
38         t.processlist2by_user()
39
40         t.results.Sort()
41         // lib.Logger.Println( "- collecting t.totals from t.results" )
42         t.totals = t.results.totals()
43
44         lib.Logger.Println("Processlist.Collect() END, took:", time.Duration(time.Since(start)).String())
45 }
46
47 func (t *Processlist) Headings() string {
48         return t.results.Headings()
49 }
50
51 func (t Processlist) EmptyRowContent() string {
52         return t.results.emptyRowContent()
53 }
54
55 func (t Processlist) TotalRowContent() string {
56         return t.totals.row_content(t.totals)
57 }
58
59 func (t Processlist) RowContent(max_rows int) []string {
60         rows := make([]string, 0, max_rows)
61
62         for i := range t.results {
63                 if i < max_rows {
64                         rows = append(rows, t.results[i].row_content(t.totals))
65                 }
66         }
67
68         return rows
69 }
70
71 func (t Processlist) Description() string {
72         count := t.count_rows()
73         return fmt.Sprintf("User Information (processlist) %d rows", count)
74 }
75
76 func (t Processlist) count_rows() int {
77         var count int
78         for row := range t.results {
79                 if t.results[row].username != "" {
80                         count++
81                 }
82         }
83         return count
84 }
85
86 // return the hostname without the port part
87 func get_hostname(h_p string) string {
88         i := strings.Index(h_p, ":")
89         if i >= 0 {
90                 return h_p[0:i]
91         } else {
92                 return h_p // shouldn't happen !!!
93         }
94 }
95
96 // read in processlist and add the appropriate values into a new pl_by_user table
97 func (t *Processlist) processlist2by_user() {
98         lib.Logger.Println("Processlist.processlist2by_user() START")
99
100         var re_active_repl_master_thread *regexp.Regexp = regexp.MustCompile("Sending binlog event to slave")
101         var re_select *regexp.Regexp = regexp.MustCompile(`SELECT(?i)`) // make case insensitive
102         var re_insert *regexp.Regexp = regexp.MustCompile(`INSERT(?i)`) // make case insensitive
103         var re_update *regexp.Regexp = regexp.MustCompile(`UPDATE(?i)`) // make case insensitive
104         var re_delete *regexp.Regexp = regexp.MustCompile(`DELETE(?i)`) // make case insensitive
105
106         var row pl_by_user_row
107         var results pl_by_user_rows
108         var my_hosts map_string_string
109         var my_db map_string_string
110         var ok bool
111         // map username to row details
112         users := make(map[string]pl_by_user_row)
113         hosts_by_user := make(map[string]map_string_string)
114         dbs_by_user := make(map[string]map_string_string)
115         // var dbs map[string]map_string_string
116         //                                                                                                Command                  state
117         // | 3859522915 | m1m1repl        | bc210bprdb-01.lhr4.prod.booking.com:58703            | NULL | Binlog Dump |  4165475 | Master has sent all binlog to slave; waiting for binlog to be updated | NULL                                                                                                 |
118         // | 4179949288 | m1m1repl        | xc238bprdb-01.lhr4.prod.booking.com:34391            | NULL | Binlog Dump |  3053011 | Master has sent all binlog to slave; waiting for binlog to be updated | NULL                                                                                                 |
119         // | 4336765784 | m1m1repl        | bc279bprdb-01.lhr4.prod.booking.com:50991            | NULL | Binlog Dump |  2523403 | Sending binlog event to slave                                         | NULL                                                                                                 |
120
121         for i := range t.current {
122                 // munge the username for special purposes (event scheduler, replication threads etc)
123                 id := t.current[i].ID
124                 username := t.current[i].USER // limit size for display
125                 host := get_hostname(t.current[i].HOST)
126                 command := t.current[i].COMMAND
127                 db := t.current[i].DB
128                 info := t.current[i].INFO
129                 state := t.current[i].STATE
130
131                 lib.Logger.Println("- id/user/host:", id, username, host)
132
133                 if old_row, ok := users[username]; ok {
134                         lib.Logger.Println("- found old row in users")
135                         row = old_row // get old row
136                 } else {
137                         lib.Logger.Println("- NOT found old row in users")
138                         // create new row - RESET THE VALUES !!!!
139                         rowp := new(pl_by_user_row)
140                         row = *rowp
141                         row.username = t.current[i].USER
142                         users[username] = row
143                 }
144                 row.connections++
145                 // ignore system SQL threads (may be more to filter out)
146                 if username != "system user" && host != "" && command != "Sleep" && command != "Binlog Dump" {
147                         row.runtime += t.current[i].TIME
148                         row.active++
149                 }
150                 if command == "Binlog Dump" && re_active_repl_master_thread.MatchString(state) {
151                         row.active++
152                 }
153
154                 // add the host if not known already
155                 if host != "" {
156                         if my_hosts, ok = hosts_by_user[username]; !ok {
157                                 my_hosts = make(map_string_string)
158                         }
159                         my_hosts[host] = host // whatever - value doesn't matter
160                         hosts_by_user[username] = my_hosts
161                 }
162                 row.hosts = uint64(len(hosts_by_user[username]))
163
164                 // add the db count if not known already
165                 if db != "" {
166                         if my_db, ok = dbs_by_user[username]; !ok {
167                                 my_db = make(map_string_string)
168                         }
169                         my_db[db] = db // whatever - value doesn't matter
170                         dbs_by_user[username] = my_db
171                 }
172                 row.dbs = uint64(len(dbs_by_user[username]))
173
174                 // selects
175                 if re_select.MatchString(info) == true {
176                         row.selects++
177                 }
178                 if re_insert.MatchString(info) == true {
179                         row.inserts++
180                 }
181                 if re_update.MatchString(info) == true {
182                         row.updates++
183                 }
184                 if re_delete.MatchString(info) == true {
185                         row.deletes++
186                 }
187
188                 users[username] = row
189         }
190
191         results = make(pl_by_user_rows, 0, len(users))
192         for _, v := range users {
193                 results = append(results, v)
194         }
195         t.results = results
196         t.results.Sort() // sort output
197
198         t.totals = t.results.totals()
199
200         lib.Logger.Println("Processlist.processlist2by_user() END")
201 }