Rename state to app and furhter cleanups
[pstop.git] / i_s / processlist / public.go
1 // i_s - library routines for pstop.
2 //
3 // This file contains the library routines for managing the
4 // table_io_waits_by_table table.
5 package processlist
6
7 import (
8         "database/sql"
9         "fmt"
10         "github.com/sjmudd/pstop/lib"
11         "github.com/sjmudd/pstop/p_s"
12         "regexp"
13         "strings"
14         "time"
15 )
16
17 type map_string_int map[string]int
18
19 // a table of rows
20 type Object struct {
21         p_s.RelativeStats
22         p_s.InitialTime
23         current table_rows      // processlist
24         results pl_by_user_rows // results by user
25         totals  pl_by_user_row  // totals of results
26 }
27
28 // Collect() collects data from the db, updating initial
29 // values if needed, and then subtracting initial values if we want
30 // relative values, after which it stores totals.
31 func (t *Object) Collect(dbh *sql.DB) {
32         lib.Logger.Println("Object.Collect() - starting collection of data")
33         start := time.Now()
34
35         t.current = select_processlist(dbh)
36         lib.Logger.Println("t.current collected", len(t.current), "row(s) from SELECT")
37
38         t.processlist2by_user()
39
40         t.results.Sort()
41         // lib.Logger.Println( "- collecting t.totals from t.results" )
42         t.totals = t.results.totals()
43
44         lib.Logger.Println("Object.Collect() END, took:", time.Duration(time.Since(start)).String())
45 }
46
47 func (t *Object) Headings() string {
48         return t.results.Headings()
49 }
50
51 func (t Object) EmptyRowContent() string {
52         return t.results.emptyRowContent()
53 }
54
55 func (t Object) TotalRowContent() string {
56         return t.totals.row_content(t.totals)
57 }
58
59 func (t Object) RowContent(max_rows int) []string {
60         rows := make([]string, 0, max_rows)
61
62         for i := range t.results {
63                 if i < max_rows {
64                         rows = append(rows, t.results[i].row_content(t.totals))
65                 }
66         }
67
68         return rows
69 }
70
71 func (t Object) Description() string {
72         count := t.count_rows()
73         return fmt.Sprintf("Activity by Username (processlist) %d rows", count)
74 }
75
76 func (t Object) count_rows() int {
77         var count int
78         for row := range t.results {
79                 if t.results[row].username != "" {
80                         count++
81                 }
82         }
83         return count
84 }
85
86 // return the hostname without the port part
87 func get_hostname(h_p string) string {
88         i := strings.Index(h_p, ":")
89         if i >= 0 {
90                 return h_p[0:i]
91         } else {
92                 return h_p // shouldn't happen !!!
93         }
94 }
95
96 // read in processlist and add the appropriate values into a new pl_by_user table
97 func (t *Object) processlist2by_user() {
98         lib.Logger.Println("Object.processlist2by_user() START")
99
100         var re_active_repl_master_thread *regexp.Regexp = regexp.MustCompile("Sending binlog event to slave")
101         var re_select *regexp.Regexp = regexp.MustCompile(`(?i)SELECT`) // make case insensitive
102         var re_insert *regexp.Regexp = regexp.MustCompile(`(?i)INSERT`) // make case insensitive
103         var re_update *regexp.Regexp = regexp.MustCompile(`(?i)UPDATE`) // make case insensitive
104         var re_delete *regexp.Regexp = regexp.MustCompile(`(?i)DELETE`) // make case insensitive
105
106         var row pl_by_user_row
107         var results pl_by_user_rows
108         var my_hosts map_string_int
109         var my_db map_string_int
110         var ok bool
111
112         row_by_user := make(map[string]pl_by_user_row)
113         hosts_by_user := make(map[string]map_string_int)
114         dbs_by_user := make(map[string]map_string_int)
115
116         for i := range t.current {
117                 // munge the username for special purposes (event scheduler, replication threads etc)
118                 id := t.current[i].ID
119                 username := t.current[i].USER // limit size for display
120                 host := get_hostname(t.current[i].HOST)
121                 command := t.current[i].COMMAND
122                 db := t.current[i].DB
123                 info := t.current[i].INFO
124                 state := t.current[i].STATE
125
126                 lib.Logger.Println("- id/user/host:", id, username, host)
127
128                 if old_row, ok := row_by_user[username]; ok {
129                         lib.Logger.Println("- found old row in row_by_user")
130                         row = old_row // get old row
131                 } else {
132                         lib.Logger.Println("- NOT found old row in row_by_user")
133                         // create new row - RESET THE VALUES !!!!
134                         rowp := new(pl_by_user_row)
135                         row = *rowp
136                         row.username = t.current[i].USER
137                         row_by_user[username] = row
138                 }
139                 row.connections++
140                 // ignore system SQL threads (may be more to filter out)
141                 if username != "system user" && host != "" && command != "Binlog Dump" {
142                         if command == "Sleep" {
143                                 row.sleeptime += t.current[i].TIME
144                         } else {
145                                 row.runtime += t.current[i].TIME
146                                 row.active++
147                         }
148                 }
149                 if command == "Binlog Dump" && re_active_repl_master_thread.MatchString(state) {
150                         row.active++
151                 }
152
153                 // add the host if not known already
154                 if host != "" {
155                         if my_hosts, ok = hosts_by_user[username]; !ok {
156                                 my_hosts = make(map_string_int)
157                         }
158                         my_hosts[host] = 1 // whatever - value doesn't matter
159                         hosts_by_user[username] = my_hosts
160                 }
161                 row.hosts = uint64(len(hosts_by_user[username]))
162
163                 // add the db count if not known already
164                 if db != "" {
165                         if my_db, ok = dbs_by_user[username]; !ok {
166                                 my_db = make(map_string_int)
167                         }
168                         my_db[db] = 1 // whatever - value doesn't matter
169                         dbs_by_user[username] = my_db
170                 }
171                 row.dbs = uint64(len(dbs_by_user[username]))
172
173                 if re_select.MatchString(info) == true {
174                         row.selects++
175                 }
176                 if re_insert.MatchString(info) == true {
177                         row.inserts++
178                 }
179                 if re_update.MatchString(info) == true {
180                         row.updates++
181                 }
182                 if re_delete.MatchString(info) == true {
183                         row.deletes++
184                 }
185
186                 row_by_user[username] = row
187         }
188
189         results = make(pl_by_user_rows, 0, len(row_by_user))
190         for _, v := range row_by_user {
191                 results = append(results, v)
192         }
193         t.results = results
194         t.results.Sort() // sort output
195
196         t.totals = t.results.totals()
197
198         lib.Logger.Println("Object.processlist2by_user() END")
199 }