diff --git a/lib/agent_coordinator/mcp_server_manager.ex b/lib/agent_coordinator/mcp_server_manager.ex index 8d0ce87..3e66e66 100644 --- a/lib/agent_coordinator/mcp_server_manager.ex +++ b/lib/agent_coordinator/mcp_server_manager.ex @@ -27,35 +27,35 @@ defmodule AgentCoordinator.MCPServerManager do Get all tools from all managed servers plus Agent Coordinator tools """ def get_unified_tools do - GenServer.call(__MODULE__, :get_unified_tools) + GenServer.call(__MODULE__, :get_unified_tools, 60_000) end @doc """ Route a tool call to the appropriate server """ def route_tool_call(tool_name, arguments, agent_context) do - GenServer.call(__MODULE__, {:route_tool_call, tool_name, arguments, agent_context}) + GenServer.call(__MODULE__, {:route_tool_call, tool_name, arguments, agent_context}, 60_000) end @doc """ Get status of all managed servers """ def get_server_status do - GenServer.call(__MODULE__, :get_server_status) + GenServer.call(__MODULE__, :get_server_status, 15_000) end @doc """ Restart a specific server """ def restart_server(server_name) do - GenServer.call(__MODULE__, {:restart_server, server_name}) + GenServer.call(__MODULE__, {:restart_server, server_name}, 30_000) end @doc """ Refresh tool registry by re-discovering tools from all servers """ def refresh_tools do - GenServer.call(__MODULE__, :refresh_tools) + GenServer.call(__MODULE__, :refresh_tools, 60_000) end # Server callbacks @@ -432,7 +432,6 @@ defmodule AgentCoordinator.MCPServerManager do port_options = [ :binary, :stream, - {:line, 1024}, {:env, env_list}, :exit_status, :hide @@ -567,32 +566,40 @@ defmodule AgentCoordinator.MCPServerManager do # Collect full response by reading multiple lines if needed response_data = collect_response(server_info.port, "", 30_000) - case Jason.decode(response_data) do - {:ok, response} -> {:ok, response} - {:error, %Jason.DecodeError{} = error} -> - Logger.error("JSON decode error for server #{server_info.name}: #{Exception.message(error)}") - Logger.debug("Raw response data: #{inspect(response_data)}") - {:error, "JSON decode error: #{Exception.message(error)}"} - {:error, reason} -> - {:error, "JSON decode error: #{inspect(reason)}"} + cond do + # Check if we got any response data + response_data == "" -> + {:error, "No response received from server #{server_info.name}"} + + # Try to decode JSON response + true -> + case Jason.decode(response_data) do + {:ok, response} -> {:ok, response} + {:error, %Jason.DecodeError{} = error} -> + Logger.error("JSON decode error for server #{server_info.name}: #{Exception.message(error)}") + Logger.debug("Raw response data: #{inspect(response_data)}") + {:error, "JSON decode error: #{Exception.message(error)}"} + {:error, reason} -> + {:error, "JSON decode error: #{inspect(reason)}"} + end end end defp collect_response(port, acc, timeout) do receive do - {^port, {:data, {_eol, response_line}}} -> - # Accumulate the response line - new_acc = acc <> response_line - - # Check if we have a complete JSON object - case Jason.decode(new_acc) do - {:ok, _} -> - # Successfully decoded, return the complete response - new_acc - - {:error, _} -> - # Not complete yet, continue collecting - collect_response(port, new_acc, timeout) + {^port, {:data, data}} -> + # Accumulate binary data + new_acc = acc <> data + + # Try to extract complete JSON messages from the accumulated data + case extract_json_messages(new_acc) do + {json_message, _remaining} when json_message != nil -> + # We found a complete JSON message, return it + json_message + + {nil, remaining} -> + # No complete JSON message yet, continue collecting + collect_response(port, remaining, timeout) end {^port, {:exit_status, status}} -> @@ -606,6 +613,93 @@ defmodule AgentCoordinator.MCPServerManager do end end + # Extract complete JSON messages from accumulated binary data + defp extract_json_messages(data) do + lines = String.split(data, "\n", trim: false) + + # Process each line to find JSON messages and skip log messages + {json_lines, _remaining_data} = extract_json_from_lines(lines, []) + + case json_lines do + [] -> + # No complete JSON found, return the last partial line if any + last_line = List.last(lines) || "" + if String.trim(last_line) != "" and not String.ends_with?(data, "\n") do + {nil, last_line} + else + {nil, ""} + end + + _ -> + # Join all JSON lines and try to parse + json_data = Enum.join(json_lines, "\n") + + case Jason.decode(json_data) do + {:ok, _} -> + # Valid JSON found + {json_data, ""} + + {:error, _} -> + # Invalid JSON, might be incomplete + {nil, data} + end + end + end + + defp extract_json_from_lines([], acc), do: {Enum.reverse(acc), ""} + + defp extract_json_from_lines([line], acc) do + # This is the last line, it might be incomplete + trimmed = String.trim(line) + + cond do + trimmed == "" -> + {Enum.reverse(acc), ""} + + # Skip log messages + Regex.match?(~r/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/, trimmed) -> + {Enum.reverse(acc), ""} + + Regex.match?(~r/^\d{2}:\d{2}:\d{2}\.\d+\s+\[(info|warning|error|debug)\]/, trimmed) -> + {Enum.reverse(acc), ""} + + # Check if this looks like JSON + String.starts_with?(trimmed, ["{"]) -> + {Enum.reverse([line | acc]), ""} + + true -> + # Non-JSON line, might be incomplete + {Enum.reverse(acc), line} + end + end + + defp extract_json_from_lines([line | rest], acc) do + trimmed = String.trim(line) + + cond do + trimmed == "" -> + extract_json_from_lines(rest, acc) + + # Skip log messages + Regex.match?(~r/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/, trimmed) -> + Logger.debug("Skipping log message from MCP server: #{trimmed}") + extract_json_from_lines(rest, acc) + + Regex.match?(~r/^\d{2}:\d{2}:\d{2}\.\d+\s+\[(info|warning|error|debug)\]/, trimmed) -> + Logger.debug("Skipping log message from MCP server: #{trimmed}") + extract_json_from_lines(rest, acc) + + # Check if this looks like JSON + String.starts_with?(trimmed, ["{"]) -> + extract_json_from_lines(rest, [line | acc]) + + true -> + # Skip non-JSON lines + Logger.debug("Skipping non-JSON line from MCP server: #{trimmed}") + extract_json_from_lines(rest, acc) + end + end + defp refresh_tool_registry(state) do new_registry = Enum.reduce(state.servers, %{}, fn {name, server_info}, acc -> diff --git a/lib/agent_coordinator/unified_mcp_server.ex b/lib/agent_coordinator/unified_mcp_server.ex index 0098555..c4dc5f2 100644 --- a/lib/agent_coordinator/unified_mcp_server.ex +++ b/lib/agent_coordinator/unified_mcp_server.ex @@ -26,7 +26,7 @@ defmodule AgentCoordinator.UnifiedMCPServer do Handle MCP request from GitHub Copilot """ def handle_mcp_request(request) do - GenServer.call(__MODULE__, {:handle_request, request}) + GenServer.call(__MODULE__, {:handle_request, request}, 60_000) end # Server callbacks