X-Git-Url: http://git.iain.cx/?a=blobdiff_plain;f=i_s%2Fprocesslist.go;fp=i_s%2Fprocesslist.go;h=2be6d3372dd9cc60d0b187df2875048a8dde80a2;hb=52d3c00ce729f908f342e10f48818b112dec0130;hp=0000000000000000000000000000000000000000;hpb=b958730449ac1d927c267373a68b9ae54b72c454;p=pstop.git diff --git a/i_s/processlist.go b/i_s/processlist.go new file mode 100644 index 0000000..2be6d33 --- /dev/null +++ b/i_s/processlist.go @@ -0,0 +1,201 @@ +// i_s - library routines for pstop. +// +// This file contains the library routines for managing the +// table_io_waits_by_table table. +package i_s + +import ( + "database/sql" + "fmt" + "github.com/sjmudd/pstop/lib" + "github.com/sjmudd/pstop/p_s" + "regexp" + "strings" + "time" +) + +type map_string_string map[string]string + +// a table of rows +type Processlist struct { + p_s.RelativeStats + p_s.InitialTime + current processlist_rows // processlist + results pl_by_user_rows // results by user + totals pl_by_user_row // totals of results +} + +// Collect() collects data from the db, updating initial +// values if needed, and then subtracting initial values if we want +// relative values, after which it stores totals. +func (t *Processlist) Collect(dbh *sql.DB) { + lib.Logger.Println("Processlist.Collect() - starting collection of data") + start := time.Now() + + t.current = select_processlist(dbh) + lib.Logger.Println("t.current collected", len(t.current), "row(s) from SELECT") + + t.processlist2by_user() + + t.results.Sort() + // lib.Logger.Println( "- collecting t.totals from t.results" ) + t.totals = t.results.totals() + + lib.Logger.Println("Processlist.Collect() END, took:", time.Duration(time.Since(start)).String()) +} + +func (t *Processlist) Headings() string { + return t.results.Headings() +} + +func (t Processlist) EmptyRowContent() string { + return t.results.emptyRowContent() +} + +func (t Processlist) TotalRowContent() string { + return t.totals.row_content(t.totals) +} + +func (t Processlist) RowContent(max_rows int) []string { + rows := make([]string, 0, max_rows) + + for i := range t.results { + if i < max_rows { + rows = append(rows, t.results[i].row_content(t.totals)) + } + } + + return rows +} + +func (t Processlist) Description() string { + count := t.count_rows() + return fmt.Sprintf("User Information (processlist) %d rows", count) +} + +func (t Processlist) count_rows() int { + var count int + for row := range t.results { + if t.results[row].username != "" { + count++ + } + } + return count +} + +// return the hostname without the port part +func get_hostname(h_p string) string { + i := strings.Index(h_p, ":") + if i >= 0 { + return h_p[0:i] + } else { + return h_p // shouldn't happen !!! + } +} + +// read in processlist and add the appropriate values into a new pl_by_user table +func (t *Processlist) processlist2by_user() { + lib.Logger.Println("Processlist.processlist2by_user() START") + + var re_active_repl_master_thread *regexp.Regexp = regexp.MustCompile("Sending binlog event to slave") + var re_select *regexp.Regexp = regexp.MustCompile(`SELECT(?i)`) // make case insensitive + var re_insert *regexp.Regexp = regexp.MustCompile(`INSERT(?i)`) // make case insensitive + var re_update *regexp.Regexp = regexp.MustCompile(`UPDATE(?i)`) // make case insensitive + var re_delete *regexp.Regexp = regexp.MustCompile(`DELETE(?i)`) // make case insensitive + + var row pl_by_user_row + var results pl_by_user_rows + var my_hosts map_string_string + var my_db map_string_string + var ok bool + // map username to row details + users := make(map[string]pl_by_user_row) + hosts_by_user := make(map[string]map_string_string) + dbs_by_user := make(map[string]map_string_string) + // var dbs map[string]map_string_string + // Command state + // | 3859522915 | m1m1repl | bc210bprdb-01.lhr4.prod.booking.com:58703 | NULL | Binlog Dump | 4165475 | Master has sent all binlog to slave; waiting for binlog to be updated | NULL | + // | 4179949288 | m1m1repl | xc238bprdb-01.lhr4.prod.booking.com:34391 | NULL | Binlog Dump | 3053011 | Master has sent all binlog to slave; waiting for binlog to be updated | NULL | + // | 4336765784 | m1m1repl | bc279bprdb-01.lhr4.prod.booking.com:50991 | NULL | Binlog Dump | 2523403 | Sending binlog event to slave | NULL | + + for i := range t.current { + // munge the username for special purposes (event scheduler, replication threads etc) + id := t.current[i].ID + username := t.current[i].USER // limit size for display + host := get_hostname(t.current[i].HOST) + command := t.current[i].COMMAND + db := t.current[i].DB + info := t.current[i].INFO + state := t.current[i].STATE + + lib.Logger.Println("- id/user/host:", id, username, host) + + if old_row, ok := users[username]; ok { + lib.Logger.Println("- found old row in users") + row = old_row // get old row + } else { + lib.Logger.Println("- NOT found old row in users") + // create new row - RESET THE VALUES !!!! + rowp := new(pl_by_user_row) + row = *rowp + row.username = t.current[i].USER + users[username] = row + } + row.connections++ + // ignore system SQL threads (may be more to filter out) + if username != "system user" && host != "" && command != "Sleep" && command != "Binlog Dump" { + row.runtime += t.current[i].TIME + row.active++ + } + if command == "Binlog Dump" && re_active_repl_master_thread.MatchString(state) { + row.active++ + } + + // add the host if not known already + if host != "" { + if my_hosts, ok = hosts_by_user[username]; !ok { + my_hosts = make(map_string_string) + } + my_hosts[host] = host // whatever - value doesn't matter + hosts_by_user[username] = my_hosts + } + row.hosts = uint64(len(hosts_by_user[username])) + + // add the db count if not known already + if db != "" { + if my_db, ok = dbs_by_user[username]; !ok { + my_db = make(map_string_string) + } + my_db[db] = db // whatever - value doesn't matter + dbs_by_user[username] = my_db + } + row.dbs = uint64(len(dbs_by_user[username])) + + // selects + if re_select.MatchString(info) == true { + row.selects++ + } + if re_insert.MatchString(info) == true { + row.inserts++ + } + if re_update.MatchString(info) == true { + row.updates++ + } + if re_delete.MatchString(info) == true { + row.deletes++ + } + + users[username] = row + } + + results = make(pl_by_user_rows, 0, len(users)) + for _, v := range users { + results = append(results, v) + } + t.results = results + t.results.Sort() // sort output + + t.totals = t.results.totals() + + lib.Logger.Println("Processlist.processlist2by_user() END") +}