package client import ( "bufio" "fmt" "net" "strings" "sync" "time" "github.com/rs/zerolog" ) // AsteriskServer defines an Asterisk AMI endpoint type AsteriskServer struct { Host string `yaml:"host"` AMIPort int `yaml:"amiPort"` } // AsteriskClient manages AMI connections to Asterisk servers type AsteriskClient struct { servers []AsteriskServer user string secret string logger zerolog.Logger } // NewAsteriskClient creates a new Asterisk AMI client func NewAsteriskClient(servers []AsteriskServer, user, secret string, logger zerolog.Logger) *AsteriskClient { return &AsteriskClient{ servers: servers, user: user, secret: secret, logger: logger.With().Str("client", "asterisk").Logger(), } } // ReloadPJSIP sends a PJSIP reload command to all Asterisk servers func (c *AsteriskClient) ReloadPJSIP() []ServerResult { return c.runOnAll("pjsip reload") } // ReloadDialplan sends a dialplan reload to all Asterisk servers func (c *AsteriskClient) ReloadDialplan() []ServerResult { return c.runOnAll("dialplan reload") } // ReloadAll reloads both PJSIP and dialplan on all servers func (c *AsteriskClient) ReloadAll() []ServerResult { results := c.runOnAll("core reload") return results } // GetChannelCount returns active channel count from each server func (c *AsteriskClient) GetChannelCount() []ServerResult { return c.runOnAll("core show channels count") } // GetUptime returns uptime from each server func (c *AsteriskClient) GetUptime() []ServerResult { return c.runOnAll("core show uptime") } // ServerResult holds the result from an AMI command on a single server type ServerResult struct { Host string `json:"host"` Success bool `json:"success"` Output string `json:"output"` Error string `json:"error,omitempty"` } // runOnAll executes an AMI command on all Asterisk servers in parallel func (c *AsteriskClient) runOnAll(command string) []ServerResult { var wg sync.WaitGroup results := make([]ServerResult, len(c.servers)) for i, srv := range c.servers { wg.Add(1) go func(idx int, server AsteriskServer) { defer wg.Done() results[idx] = c.execAMI(server, command) }(i, srv) } wg.Wait() return results } // execAMI connects to a single Asterisk AMI server and executes a command func (c *AsteriskClient) execAMI(server AsteriskServer, command string) ServerResult { addr := fmt.Sprintf("%s:%d", server.Host, server.AMIPort) result := ServerResult{Host: server.Host} conn, err := net.DialTimeout("tcp", addr, 5*time.Second) if err != nil { result.Error = fmt.Sprintf("connection failed: %v", err) c.logger.Warn().Str("host", server.Host).Err(err).Msg("AMI connection failed") return result } defer conn.Close() conn.SetDeadline(time.Now().Add(10 * time.Second)) reader := bufio.NewReader(conn) // Read AMI banner banner, err := reader.ReadString('\n') if err != nil { result.Error = fmt.Sprintf("failed to read banner: %v", err) return result } if !strings.HasPrefix(banner, "Asterisk Call Manager") { result.Error = fmt.Sprintf("unexpected banner: %s", strings.TrimSpace(banner)) return result } // Login loginMsg := fmt.Sprintf("Action: Login\r\nUsername: %s\r\nSecret: %s\r\n\r\n", c.user, c.secret) if _, err := conn.Write([]byte(loginMsg)); err != nil { result.Error = fmt.Sprintf("login write failed: %v", err) return result } // Read login response loginResp, err := readAMIResponse(reader) if err != nil { result.Error = fmt.Sprintf("login response failed: %v", err) return result } if !strings.Contains(loginResp, "Success") { result.Error = fmt.Sprintf("login failed: %s", loginResp) return result } // Execute command cmdMsg := fmt.Sprintf("Action: Command\r\nCommand: %s\r\n\r\n", command) if _, err := conn.Write([]byte(cmdMsg)); err != nil { result.Error = fmt.Sprintf("command write failed: %v", err) return result } // Read command response cmdResp, err := readAMIResponse(reader) if err != nil { result.Error = fmt.Sprintf("command response failed: %v", err) return result } // Logoff conn.Write([]byte("Action: Logoff\r\n\r\n")) result.Success = true result.Output = cmdResp c.logger.Debug().Str("host", server.Host).Str("command", command).Msg("AMI command executed") return result } // readAMIResponse reads a complete AMI response (terminated by blank line) func readAMIResponse(reader *bufio.Reader) (string, error) { var lines []string for { line, err := reader.ReadString('\n') if err != nil { return strings.Join(lines, "\n"), err } trimmed := strings.TrimSpace(line) if trimmed == "" { break } lines = append(lines, trimmed) } return strings.Join(lines, "\n"), nil } // Health checks connectivity to all Asterisk servers func (c *AsteriskClient) Health() error { for _, srv := range c.servers { addr := fmt.Sprintf("%s:%d", srv.Host, srv.AMIPort) conn, err := net.DialTimeout("tcp", addr, 3*time.Second) if err != nil { return fmt.Errorf("asterisk %s unreachable: %w", srv.Host, err) } conn.Close() } return nil } // Servers returns the configured server list func (c *AsteriskClient) Servers() []AsteriskServer { return c.servers }