Fix MCP server communication: remove line limit and improve JSON parsing
- Remove {:line, 1024} port option that was truncating long JSON responses
- Rewrite collect_response to handle binary data instead of line-based data
- Add extract_json_messages function to properly parse complete JSON from mixed stdout
- Filter out log messages while preserving complete JSON responses
- Add timeouts to all GenServer calls to prevent blocking
- This fixes the 'unexpected end of input at position 1024' JSON decode errors
This commit is contained in:
@@ -27,35 +27,35 @@ defmodule AgentCoordinator.MCPServerManager do
|
|||||||
Get all tools from all managed servers plus Agent Coordinator tools
|
Get all tools from all managed servers plus Agent Coordinator tools
|
||||||
"""
|
"""
|
||||||
def get_unified_tools do
|
def get_unified_tools do
|
||||||
GenServer.call(__MODULE__, :get_unified_tools)
|
GenServer.call(__MODULE__, :get_unified_tools, 60_000)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Route a tool call to the appropriate server
|
Route a tool call to the appropriate server
|
||||||
"""
|
"""
|
||||||
def route_tool_call(tool_name, arguments, agent_context) do
|
def route_tool_call(tool_name, arguments, agent_context) do
|
||||||
GenServer.call(__MODULE__, {:route_tool_call, tool_name, arguments, agent_context})
|
GenServer.call(__MODULE__, {:route_tool_call, tool_name, arguments, agent_context}, 60_000)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Get status of all managed servers
|
Get status of all managed servers
|
||||||
"""
|
"""
|
||||||
def get_server_status do
|
def get_server_status do
|
||||||
GenServer.call(__MODULE__, :get_server_status)
|
GenServer.call(__MODULE__, :get_server_status, 15_000)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Restart a specific server
|
Restart a specific server
|
||||||
"""
|
"""
|
||||||
def restart_server(server_name) do
|
def restart_server(server_name) do
|
||||||
GenServer.call(__MODULE__, {:restart_server, server_name})
|
GenServer.call(__MODULE__, {:restart_server, server_name}, 30_000)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Refresh tool registry by re-discovering tools from all servers
|
Refresh tool registry by re-discovering tools from all servers
|
||||||
"""
|
"""
|
||||||
def refresh_tools do
|
def refresh_tools do
|
||||||
GenServer.call(__MODULE__, :refresh_tools)
|
GenServer.call(__MODULE__, :refresh_tools, 60_000)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Server callbacks
|
# Server callbacks
|
||||||
@@ -432,7 +432,6 @@ defmodule AgentCoordinator.MCPServerManager do
|
|||||||
port_options = [
|
port_options = [
|
||||||
:binary,
|
:binary,
|
||||||
:stream,
|
:stream,
|
||||||
{:line, 1024},
|
|
||||||
{:env, env_list},
|
{:env, env_list},
|
||||||
:exit_status,
|
:exit_status,
|
||||||
:hide
|
:hide
|
||||||
@@ -567,6 +566,13 @@ defmodule AgentCoordinator.MCPServerManager do
|
|||||||
# Collect full response by reading multiple lines if needed
|
# Collect full response by reading multiple lines if needed
|
||||||
response_data = collect_response(server_info.port, "", 30_000)
|
response_data = collect_response(server_info.port, "", 30_000)
|
||||||
|
|
||||||
|
cond do
|
||||||
|
# Check if we got any response data
|
||||||
|
response_data == "" ->
|
||||||
|
{:error, "No response received from server #{server_info.name}"}
|
||||||
|
|
||||||
|
# Try to decode JSON response
|
||||||
|
true ->
|
||||||
case Jason.decode(response_data) do
|
case Jason.decode(response_data) do
|
||||||
{:ok, response} -> {:ok, response}
|
{:ok, response} -> {:ok, response}
|
||||||
{:error, %Jason.DecodeError{} = error} ->
|
{:error, %Jason.DecodeError{} = error} ->
|
||||||
@@ -577,22 +583,23 @@ defmodule AgentCoordinator.MCPServerManager do
|
|||||||
{:error, "JSON decode error: #{inspect(reason)}"}
|
{:error, "JSON decode error: #{inspect(reason)}"}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp collect_response(port, acc, timeout) do
|
defp collect_response(port, acc, timeout) do
|
||||||
receive do
|
receive do
|
||||||
{^port, {:data, {_eol, response_line}}} ->
|
{^port, {:data, data}} ->
|
||||||
# Accumulate the response line
|
# Accumulate binary data
|
||||||
new_acc = acc <> response_line
|
new_acc = acc <> data
|
||||||
|
|
||||||
# Check if we have a complete JSON object
|
# Try to extract complete JSON messages from the accumulated data
|
||||||
case Jason.decode(new_acc) do
|
case extract_json_messages(new_acc) do
|
||||||
{:ok, _} ->
|
{json_message, _remaining} when json_message != nil ->
|
||||||
# Successfully decoded, return the complete response
|
# We found a complete JSON message, return it
|
||||||
new_acc
|
json_message
|
||||||
|
|
||||||
{:error, _} ->
|
{nil, remaining} ->
|
||||||
# Not complete yet, continue collecting
|
# No complete JSON message yet, continue collecting
|
||||||
collect_response(port, new_acc, timeout)
|
collect_response(port, remaining, timeout)
|
||||||
end
|
end
|
||||||
|
|
||||||
{^port, {:exit_status, status}} ->
|
{^port, {:exit_status, status}} ->
|
||||||
@@ -606,6 +613,93 @@ defmodule AgentCoordinator.MCPServerManager do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Extract complete JSON messages from accumulated binary data
|
||||||
|
defp extract_json_messages(data) do
|
||||||
|
lines = String.split(data, "\n", trim: false)
|
||||||
|
|
||||||
|
# Process each line to find JSON messages and skip log messages
|
||||||
|
{json_lines, _remaining_data} = extract_json_from_lines(lines, [])
|
||||||
|
|
||||||
|
case json_lines do
|
||||||
|
[] ->
|
||||||
|
# No complete JSON found, return the last partial line if any
|
||||||
|
last_line = List.last(lines) || ""
|
||||||
|
if String.trim(last_line) != "" and not String.ends_with?(data, "\n") do
|
||||||
|
{nil, last_line}
|
||||||
|
else
|
||||||
|
{nil, ""}
|
||||||
|
end
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
# Join all JSON lines and try to parse
|
||||||
|
json_data = Enum.join(json_lines, "\n")
|
||||||
|
|
||||||
|
case Jason.decode(json_data) do
|
||||||
|
{:ok, _} ->
|
||||||
|
# Valid JSON found
|
||||||
|
{json_data, ""}
|
||||||
|
|
||||||
|
{:error, _} ->
|
||||||
|
# Invalid JSON, might be incomplete
|
||||||
|
{nil, data}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp extract_json_from_lines([], acc), do: {Enum.reverse(acc), ""}
|
||||||
|
|
||||||
|
defp extract_json_from_lines([line], acc) do
|
||||||
|
# This is the last line, it might be incomplete
|
||||||
|
trimmed = String.trim(line)
|
||||||
|
|
||||||
|
cond do
|
||||||
|
trimmed == "" ->
|
||||||
|
{Enum.reverse(acc), ""}
|
||||||
|
|
||||||
|
# Skip log messages
|
||||||
|
Regex.match?(~r/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/, trimmed) ->
|
||||||
|
{Enum.reverse(acc), ""}
|
||||||
|
|
||||||
|
Regex.match?(~r/^\d{2}:\d{2}:\d{2}\.\d+\s+\[(info|warning|error|debug)\]/, trimmed) ->
|
||||||
|
{Enum.reverse(acc), ""}
|
||||||
|
|
||||||
|
# Check if this looks like JSON
|
||||||
|
String.starts_with?(trimmed, ["{"]) ->
|
||||||
|
{Enum.reverse([line | acc]), ""}
|
||||||
|
|
||||||
|
true ->
|
||||||
|
# Non-JSON line, might be incomplete
|
||||||
|
{Enum.reverse(acc), line}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp extract_json_from_lines([line | rest], acc) do
|
||||||
|
trimmed = String.trim(line)
|
||||||
|
|
||||||
|
cond do
|
||||||
|
trimmed == "" ->
|
||||||
|
extract_json_from_lines(rest, acc)
|
||||||
|
|
||||||
|
# Skip log messages
|
||||||
|
Regex.match?(~r/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/, trimmed) ->
|
||||||
|
Logger.debug("Skipping log message from MCP server: #{trimmed}")
|
||||||
|
extract_json_from_lines(rest, acc)
|
||||||
|
|
||||||
|
Regex.match?(~r/^\d{2}:\d{2}:\d{2}\.\d+\s+\[(info|warning|error|debug)\]/, trimmed) ->
|
||||||
|
Logger.debug("Skipping log message from MCP server: #{trimmed}")
|
||||||
|
extract_json_from_lines(rest, acc)
|
||||||
|
|
||||||
|
# Check if this looks like JSON
|
||||||
|
String.starts_with?(trimmed, ["{"]) ->
|
||||||
|
extract_json_from_lines(rest, [line | acc])
|
||||||
|
|
||||||
|
true ->
|
||||||
|
# Skip non-JSON lines
|
||||||
|
Logger.debug("Skipping non-JSON line from MCP server: #{trimmed}")
|
||||||
|
extract_json_from_lines(rest, acc)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp refresh_tool_registry(state) do
|
defp refresh_tool_registry(state) do
|
||||||
new_registry =
|
new_registry =
|
||||||
Enum.reduce(state.servers, %{}, fn {name, server_info}, acc ->
|
Enum.reduce(state.servers, %{}, fn {name, server_info}, acc ->
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ defmodule AgentCoordinator.UnifiedMCPServer do
|
|||||||
Handle MCP request from GitHub Copilot
|
Handle MCP request from GitHub Copilot
|
||||||
"""
|
"""
|
||||||
def handle_mcp_request(request) do
|
def handle_mcp_request(request) do
|
||||||
GenServer.call(__MODULE__, {:handle_request, request})
|
GenServer.call(__MODULE__, {:handle_request, request}, 60_000)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Server callbacks
|
# Server callbacks
|
||||||
|
|||||||
Reference in New Issue
Block a user