From 7cb42e413160494ae0ca52b103236f1df0f36942 Mon Sep 17 00:00:00 2001 From: Ra Date: Wed, 3 Sep 2025 00:24:06 -0700 Subject: [PATCH] Remove duplicate MCP server files - consolidation complete - Deleted mcp_server_manager.ex, enhanced_mcp_server.ex, unified_mcp_server.ex - All functionality successfully consolidated into single mcp_server.ex - Server starts correctly with all external servers (context7, filesystem, memory, sequentialthinking) - HTTP server support working for mcp_figma - All 15+ agent coordination tools properly registered - Codebase is now clean with no duplicate files --- lib/agent_coordinator/enhanced_mcp_server.ex | 266 ----- lib/agent_coordinator/mcp_server_manager.ex | 1107 ------------------ lib/agent_coordinator/unified_mcp_server.ex | 251 ---- 3 files changed, 1624 deletions(-) delete mode 100644 lib/agent_coordinator/enhanced_mcp_server.ex delete mode 100644 lib/agent_coordinator/mcp_server_manager.ex delete mode 100644 lib/agent_coordinator/unified_mcp_server.ex diff --git a/lib/agent_coordinator/enhanced_mcp_server.ex b/lib/agent_coordinator/enhanced_mcp_server.ex deleted file mode 100644 index 3f57fa7..0000000 --- a/lib/agent_coordinator/enhanced_mcp_server.ex +++ /dev/null @@ -1,266 +0,0 @@ -defmodule AgentCoordinator.EnhancedMCPServer do - @moduledoc """ - Enhanced MCP server with automatic heartbeat management and collision detection. - - This module extends the base MCP server with: - 1. Automatic heartbeats on every operation - 2. Agent session tracking - 3. Enhanced collision detection - 4. Automatic agent cleanup on disconnect - """ - - use GenServer - alias AgentCoordinator.{MCPServer, AutoHeartbeat, TaskRegistry} - - # Track active agent sessions - defstruct [ - :agent_sessions, - :session_monitors - ] - - # Client API - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @doc """ - Enhanced MCP request handler with automatic heartbeat management - """ - def handle_enhanced_mcp_request(request, session_info \\ %{}) do - GenServer.call(__MODULE__, {:enhanced_mcp_request, request, session_info}) - end - - @doc """ - Register an agent with enhanced session tracking - """ - def register_agent_with_session(name, capabilities, session_pid \\ self()) do - GenServer.call(__MODULE__, {:register_agent_with_session, name, capabilities, session_pid}) - end - - # Server callbacks - - def init(_opts) do - state = %__MODULE__{ - agent_sessions: %{}, - session_monitors: %{} - } - - {:ok, state} - end - - def handle_call({:enhanced_mcp_request, request, session_info}, {from_pid, _}, state) do - # Extract agent_id from session or request - agent_id = extract_agent_id(request, session_info, state) - - # If we have an agent_id, send heartbeat before and after operation - enhanced_result = - case agent_id do - nil -> - # No agent context, use normal MCP processing - MCPServer.handle_mcp_request(request) - - id -> - # Send pre-operation heartbeat - pre_heartbeat = TaskRegistry.heartbeat_agent(id) - - # Process the request - result = MCPServer.handle_mcp_request(request) - - # Send post-operation heartbeat and update session activity - post_heartbeat = TaskRegistry.heartbeat_agent(id) - update_session_activity(state, id, from_pid) - - # Add heartbeat metadata to successful responses - case result do - %{"result" => _} = success -> - Map.put(success, "_heartbeat_metadata", %{ - agent_id: id, - pre_heartbeat: pre_heartbeat, - post_heartbeat: post_heartbeat, - timestamp: DateTime.utc_now() - }) - - error_result -> - error_result - end - end - - {:reply, enhanced_result, state} - end - - def handle_call({:register_agent_with_session, name, capabilities, session_pid}, _from, state) do - # Convert capabilities to strings if they're atoms - string_capabilities = - Enum.map(capabilities, fn - cap when is_atom(cap) -> Atom.to_string(cap) - cap when is_binary(cap) -> cap - end) - - # Register the agent normally first - case MCPServer.handle_mcp_request(%{ - "method" => "tools/call", - "params" => %{ - "name" => "register_agent", - "arguments" => %{"name" => name, "capabilities" => string_capabilities} - } - }) do - %{"result" => %{"content" => [%{"text" => response_json}]}} -> - case Jason.decode(response_json) do - {:ok, %{"agent_id" => agent_id}} -> - # Track the session - monitor_ref = Process.monitor(session_pid) - - new_state = %{ - state - | agent_sessions: - Map.put(state.agent_sessions, agent_id, %{ - pid: session_pid, - name: name, - capabilities: capabilities, - registered_at: DateTime.utc_now(), - last_activity: DateTime.utc_now() - }), - session_monitors: Map.put(state.session_monitors, monitor_ref, agent_id) - } - - # Start automatic heartbeat management - AutoHeartbeat.start_link([]) - - AutoHeartbeat.register_agent_with_heartbeat(name, capabilities, %{ - session_pid: session_pid, - enhanced_server: true - }) - - {:reply, {:ok, agent_id}, new_state} - - {:error, reason} -> - {:reply, {:error, reason}, state} - end - - %{"error" => %{"message" => message}} -> - {:reply, {:error, message}, state} - - _ -> - {:reply, {:error, "Unexpected response format"}, state} - end - end - - def handle_call(:get_enhanced_task_board, _from, state) do - # Get the regular task board - case MCPServer.handle_mcp_request(%{ - "method" => "tools/call", - "params" => %{"name" => "get_task_board", "arguments" => %{}} - }) do - %{"result" => %{"content" => [%{"text" => response_json}]}} -> - case Jason.decode(response_json) do - {:ok, %{"agents" => agents}} -> - # Enhance with session information - enhanced_agents = - Enum.map(agents, fn agent -> - agent_id = agent["agent_id"] - session_info = Map.get(state.agent_sessions, agent_id, %{}) - - Map.merge(agent, %{ - "session_active" => Map.has_key?(state.agent_sessions, agent_id), - "last_activity" => Map.get(session_info, :last_activity), - "session_duration" => calculate_session_duration(session_info) - }) - end) - - result = %{ - "agents" => enhanced_agents, - "active_sessions" => map_size(state.agent_sessions) - } - - {:reply, {:ok, result}, state} - - {:error, reason} -> - {:reply, {:error, reason}, state} - end - - %{"error" => %{"message" => message}} -> - {:reply, {:error, message}, state} - end - end - - # Handle process monitoring - cleanup when agent session dies - def handle_info({:DOWN, monitor_ref, :process, _pid, _reason}, state) do - case Map.get(state.session_monitors, monitor_ref) do - nil -> - {:noreply, state} - - agent_id -> - # Clean up the agent session - new_state = %{ - state - | agent_sessions: Map.delete(state.agent_sessions, agent_id), - session_monitors: Map.delete(state.session_monitors, monitor_ref) - } - - # Stop heartbeat management - AutoHeartbeat.stop_heartbeat(agent_id) - - # Mark agent as offline in registry - # (This could be enhanced to gracefully handle ongoing tasks) - - {:noreply, new_state} - end - end - - # Private helpers - - defp extract_agent_id(request, session_info, state) do - # Try to get agent_id from various sources - cond do - # From request arguments - Map.get(request, "params", %{}) - |> Map.get("arguments", %{}) - |> Map.get("agent_id") -> - request["params"]["arguments"]["agent_id"] - - # From session info - Map.get(session_info, :agent_id) -> - session_info.agent_id - - # From session lookup by PID - session_pid = Map.get(session_info, :session_pid, self()) -> - find_agent_by_session_pid(state, session_pid) - - true -> - nil - end - end - - defp find_agent_by_session_pid(state, session_pid) do - Enum.find_value(state.agent_sessions, fn {agent_id, session_data} -> - if session_data.pid == session_pid, do: agent_id, else: nil - end) - end - - defp update_session_activity(state, agent_id, _session_pid) do - case Map.get(state.agent_sessions, agent_id) do - nil -> - :ok - - session_data -> - _updated_session = %{session_data | last_activity: DateTime.utc_now()} - # Note: This doesn't update the state since we're in a call handler - # In a real implementation, you might want to use cast for this - :ok - end - end - - @doc """ - Get enhanced task board with session information - """ - def get_enhanced_task_board do - GenServer.call(__MODULE__, :get_enhanced_task_board) - end - - defp calculate_session_duration(%{registered_at: start_time}) do - DateTime.diff(DateTime.utc_now(), start_time, :second) - end - - defp calculate_session_duration(_), do: nil -end diff --git a/lib/agent_coordinator/mcp_server_manager.ex b/lib/agent_coordinator/mcp_server_manager.ex deleted file mode 100644 index 90b35a9..0000000 --- a/lib/agent_coordinator/mcp_server_manager.ex +++ /dev/null @@ -1,1107 +0,0 @@ -defmodule AgentCoordinator.MCPServerManager do - @moduledoc """ - Manages external MCP servers as internal clients. - - This module starts, monitors, and communicates with external MCP servers, - acting as a client to each while presenting their tools through the - unified Agent Coordinator interface. - """ - - use GenServer - require Logger - - defstruct [ - :servers, - :server_processes, - :tool_registry, - :config - ] - - # Client API - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @doc """ - Get all tools from all managed servers plus Agent Coordinator tools - """ - def get_unified_tools do - GenServer.call(__MODULE__, :get_unified_tools, 60_000) - end - - @doc """ - Route a tool call to the appropriate server - """ - def route_tool_call(tool_name, arguments, agent_context) do - GenServer.call(__MODULE__, {:route_tool_call, tool_name, arguments, agent_context}, 60_000) - end - - @doc """ - Get status of all managed servers - """ - def get_server_status do - GenServer.call(__MODULE__, :get_server_status, 15_000) - end - - @doc """ - Restart a specific server - """ - def restart_server(server_name) do - GenServer.call(__MODULE__, {:restart_server, server_name}, 30_000) - end - - @doc """ - Refresh tool registry by re-discovering tools from all servers - """ - def refresh_tools do - GenServer.call(__MODULE__, :refresh_tools, 60_000) - end - - # Server callbacks - - def init(opts) do - config = load_server_config(opts) - - state = %__MODULE__{ - servers: %{}, - server_processes: %{}, - tool_registry: %{}, - config: config - } - - # Start all configured servers - {:ok, state, {:continue, :start_servers}} - end - - def handle_continue(:start_servers, state) do - IO.puts(:stderr, "Starting external MCP servers...") - - new_state = - Enum.reduce(state.config.servers, state, fn {name, config}, acc -> - case start_server(name, config) do - {:ok, server_info} -> - IO.puts(:stderr, "Started MCP server: #{name}") - - %{ - acc - | servers: Map.put(acc.servers, name, server_info), - server_processes: Map.put(acc.server_processes, name, server_info.pid) - } - - {:error, reason} -> - IO.puts(:stderr, "Failed to start MCP server #{name}: #{reason}") - acc - end - end) - - # Build initial tool registry - updated_state = refresh_tool_registry(new_state) - - {:noreply, updated_state} - end - - def handle_call(:get_unified_tools, _from, state) do - # Combine Agent Coordinator tools with external server tools - coordinator_tools = get_coordinator_tools() - external_tools = Map.values(state.tool_registry) |> List.flatten() - - all_tools = coordinator_tools ++ external_tools - - {:reply, all_tools, state} - end - - def handle_call({:route_tool_call, tool_name, arguments, agent_context}, _from, state) do - case find_tool_server(tool_name, state) do - {:coordinator, _} -> - # Route to Agent Coordinator's own tools - result = handle_coordinator_tool(tool_name, arguments, agent_context) - {:reply, result, state} - - {:external, server_name} -> - # Route to external server - result = call_external_tool(server_name, tool_name, arguments, agent_context, state) - {:reply, result, state} - - :not_found -> - error_result = %{ - "error" => %{ - "code" => -32601, - "message" => "Tool not found: #{tool_name}" - } - } - - {:reply, error_result, state} - end - end - - def handle_call(:get_server_status, _from, state) do - status = - Enum.map(state.servers, fn {name, server_info} -> - {name, - %{ - status: if(Process.alive?(server_info.pid), do: :running, else: :stopped), - pid: server_info.pid, - tools_count: length(Map.get(state.tool_registry, name, [])), - started_at: server_info.started_at - }} - end) - |> Map.new() - - {:reply, status, state} - end - - def handle_call({:restart_server, server_name}, _from, state) do - case Map.get(state.servers, server_name) do - nil -> - {:reply, {:error, "Server not found"}, state} - - server_info -> - # Stop existing server - if Process.alive?(server_info.pid) do - Process.exit(server_info.pid, :kill) - end - - # Start new server - server_config = Map.get(state.config.servers, server_name) - - case start_server(server_name, server_config) do - {:ok, new_server_info} -> - new_state = %{ - state - | servers: Map.put(state.servers, server_name, new_server_info), - server_processes: - Map.put(state.server_processes, server_name, new_server_info.pid) - } - - updated_state = refresh_tool_registry(new_state) - {:reply, {:ok, new_server_info}, updated_state} - - {:error, reason} -> - {:reply, {:error, reason}, state} - end - end - end - - def handle_call(:refresh_tools, _from, state) do - # Re-discover tools from all running servers - updated_state = rediscover_all_tools(state) - - all_tools = - get_coordinator_tools() ++ (Map.values(updated_state.tool_registry) |> List.flatten()) - - IO.puts(:stderr, "Refreshed tool registry: found #{length(all_tools)} total tools") - - {:reply, {:ok, length(all_tools)}, updated_state} - end - - def handle_info({:DOWN, _ref, :port, port, reason}, state) do - # Handle server port death - case find_server_by_port(port, state.servers) do - {server_name, server_info} -> - IO.puts(:stderr, "MCP server #{server_name} port died: #{reason}") - - # Cleanup PID file and kill external process - if server_info.pid_file_path do - cleanup_pid_file(server_info.pid_file_path) - end - - if server_info.os_pid do - kill_external_process(server_info.os_pid) - end - - # Remove from state - new_state = %{ - state - | servers: Map.delete(state.servers, server_name), - server_processes: Map.delete(state.server_processes, server_name), - tool_registry: Map.delete(state.tool_registry, server_name) - } - - # Attempt restart if configured - if should_auto_restart?(server_name, state.config) do - IO.puts(:stderr, "Auto-restarting MCP server: #{server_name}") - Process.send_after(self(), {:restart_server, server_name}, 1000) - end - - {:noreply, new_state} - - nil -> - {:noreply, state} - end - end - - def handle_info({:restart_server, server_name}, state) do - server_config = Map.get(state.config.servers, server_name) - - case start_server(server_name, server_config) do - {:ok, server_info} -> - IO.puts(:stderr, "Auto-restarted MCP server: #{server_name}") - - new_state = %{ - state - | servers: Map.put(state.servers, server_name, server_info), - server_processes: Map.put(state.server_processes, server_name, server_info.pid) - } - - updated_state = refresh_tool_registry(new_state) - {:noreply, updated_state} - - {:error, reason} -> - IO.puts(:stderr, - "Failed to auto-restart MCP server #{server_name}: #{reason}" - ) - - {:noreply, state} - end - end - - def handle_info(_msg, state) do - {:noreply, state} - end - - # Private functions - - defp load_server_config(_opts) do # We should probably use opts, but idk how to fix it, so we're using env var for a single var - # Allow override from opts or config file - config_file = System.get_env("MCP_CONFIG_FILE", "mcp_servers.json") - - if File.exists?(config_file) do - IO.puts(:stderr, "Loading MCP server config from #{config_file}") - try do - case Jason.decode!(File.read!(config_file)) do - %{"servers" => servers} = full_config -> - # Convert string types to atoms and normalize server configs - normalized_servers = - Enum.into(servers, %{}, fn {name, config} -> - normalized_config = - config - |> Map.update("type", :stdio, fn - "stdio" -> :stdio - "http" -> :http - type when is_atom(type) -> type - type -> String.to_existing_atom(type) - end) - |> Enum.into(%{}, fn - {"type", type} -> {:type, type} - {key, value} -> {String.to_atom(key), value} - end) - - {name, normalized_config} - end) - - base_config = %{servers: normalized_servers} - - # Add any additional config from the JSON file - case Map.get(full_config, "config") do - nil -> - base_config - - additional_config -> - Map.merge(base_config, %{config: additional_config}) - end - - _ -> - IO.puts(:stderr, - "Invalid config file format in #{config_file}, using defaults" - ) - - get_default_config() - end - rescue - e -> - IO.puts(:stderr, - "Failed to load config file #{config_file}: #{Exception.message(e)}, using defaults" - ) - - get_default_config() - end - else - IO.puts(:stderr, "Config file #{config_file} not found, using defaults") - get_default_config() - end - end - - defp get_default_config do - %{ - servers: %{ - "mcp_context7" => %{ - type: :stdio, - command: "uvx", - args: ["mcp-server-context7"], - auto_restart: true, - description: "Context7 library documentation server" - }, - "mcp_figma" => %{ - type: :stdio, - command: "bunx", - args: ["-y", "@figma/mcp-server-figma"], - auto_restart: true, - description: "Figma design integration server" - }, - "mcp_filesystem" => %{ - type: :stdio, - command: "bunx", - args: ["-y", "@modelcontextprotocol/server-filesystem", "/home/ra"], - auto_restart: true, - description: "Filesystem operations server with heartbeat coverage" - }, - "mcp_firebase" => %{ - type: :stdio, - command: "bunx", - args: ["-y", "@firebase/mcp-server"], - auto_restart: true, - description: "Firebase integration server" - }, - "mcp_memory" => %{ - type: :stdio, - command: "bunx", - args: ["-y", "@modelcontextprotocol/server-memory"], - auto_restart: true, - description: "Memory and knowledge graph server" - }, - "mcp_sequentialthi" => %{ - type: :stdio, - command: "bunx", - args: ["-y", "@modelcontextprotocol/server-sequential-thinking"], - auto_restart: true, - description: "Sequential thinking and reasoning server" - } - } - } - end - - defp start_server(name, %{type: :stdio} = config) do - case start_stdio_server(name, config) do - {:ok, os_pid, port, pid_file_path} -> - # Monitor the port (not the OS PID) - port_ref = Port.monitor(port) - - server_info = %{ - name: name, - type: :stdio, - # Use port as the "pid" for process tracking - pid: port, - os_pid: os_pid, - port: port, - pid_file_path: pid_file_path, - port_ref: port_ref, - started_at: DateTime.utc_now(), - tools: [] - } - - # Initialize the server and get tools - case initialize_server(server_info) do - {:ok, tools} -> - {:ok, %{server_info | tools: tools}} - - {:error, reason} -> - # Cleanup on initialization failure - cleanup_pid_file(pid_file_path) - kill_external_process(os_pid) - # Only close port if it's still open - if Port.info(port) do - Port.close(port) - end - - {:error, reason} - end - - {:error, reason} -> - {:error, reason} - end - end - - defp start_server(name, %{type: :http} = config) do - # For HTTP servers, we don't spawn processes - just store connection info - server_info = %{ - name: name, - type: :http, - url: Map.get(config, :url), - # No process to track for HTTP - pid: nil, - os_pid: nil, - port: nil, - pid_file_path: nil, - port_ref: nil, - started_at: DateTime.utc_now(), - tools: [] - } - - # For HTTP servers, we can try to get tools but don't need process management - case initialize_http_server(server_info) do - {:ok, tools} -> - {:ok, %{server_info | tools: tools}} - - {:error, reason} -> - {:error, reason} - end - end - - defp start_stdio_server(name, config) do - command = Map.get(config, :command, "npx") - args = Map.get(config, :args, []) - env = Map.get(config, :env, %{}) - - # Convert env map to list format expected by Port.open - env_list = - Enum.map(env, fn {key, value} -> {String.to_charlist(key), String.to_charlist(value)} end) - - port_options = [ - :binary, - :stream, - {:env, env_list}, - :exit_status, - :hide - ] - - try do - port = - Port.open( - {:spawn_executable, System.find_executable(command)}, - [{:args, args} | port_options] - ) - - # Get the OS PID of the spawned process - {:os_pid, os_pid} = Port.info(port, :os_pid) - - # Create PID file for cleanup - pid_file_path = create_pid_file(name, os_pid) - - IO.puts(:stderr, "Started MCP server #{name} with OS PID #{os_pid}") - - {:ok, os_pid, port, pid_file_path} - rescue - e -> - IO.puts(:stderr, - "Failed to start stdio server #{name}: #{Exception.message(e)}" - ) - - {:error, Exception.message(e)} - end - end - - defp create_pid_file(server_name, os_pid) do - pid_dir = Path.join(System.tmp_dir(), "mcp_servers") - File.mkdir_p!(pid_dir) - - pid_file_path = Path.join(pid_dir, "#{server_name}.pid") - File.write!(pid_file_path, to_string(os_pid)) - - pid_file_path - end - - defp cleanup_pid_file(pid_file_path) do - if File.exists?(pid_file_path) do - File.rm(pid_file_path) - end - end - - defp kill_external_process(os_pid) when is_integer(os_pid) do - try do - case System.cmd("kill", ["-TERM", to_string(os_pid)]) do - {_, 0} -> - IO.puts(:stderr, "Successfully terminated process #{os_pid}") - :ok - - {_, _} -> - # Try force kill - case System.cmd("kill", ["-KILL", to_string(os_pid)]) do - {_, 0} -> - IO.puts(:stderr, "Force killed process #{os_pid}") - :ok - - {_, _} -> - IO.puts(:stderr, "Failed to kill process #{os_pid}") - :error - end - end - rescue - _ -> :error - end - end - - defp find_server_by_port(port, servers) do - Enum.find(servers, fn {_name, server_info} -> - server_info.port == port - end) - end - - defp initialize_server(server_info) do - # Send initialize request - init_request = %{ - "jsonrpc" => "2.0", - "id" => 1, - "method" => "initialize", - "params" => %{ - "protocolVersion" => "2024-11-05", - "capabilities" => %{}, - "clientInfo" => %{ - "name" => "agent-coordinator", - "version" => "0.1.0" - } - } - } - - with {:ok, _init_response} <- send_server_request(server_info, init_request), - {:ok, tools_response} <- get_server_tools(server_info) do - {:ok, tools_response} - else - {:error, reason} -> {:error, reason} - end - end - - defp initialize_http_server(server_info) do - # For HTTP servers, we would make HTTP requests instead of using ports - # For now, return empty tools list as we need to implement HTTP client logic - IO.puts(:stderr, - "HTTP server support not fully implemented yet for #{server_info.name}" - ) - - {:ok, []} - rescue - e -> - {:error, "HTTP server initialization failed: #{Exception.message(e)}"} - end - - defp get_server_tools(server_info) do - tools_request = %{ - "jsonrpc" => "2.0", - "id" => 2, - "method" => "tools/list" - } - - case send_server_request(server_info, tools_request) do - {:ok, %{"result" => %{"tools" => tools}}} -> - {:ok, tools} - - {:ok, unexpected} -> - IO.puts(:stderr, - "Unexpected tools response from #{server_info.name}: #{inspect(unexpected)}" - ) - - {:ok, []} - - {:error, reason} -> - {:error, reason} - end - end - - defp send_server_request(server_info, request) do - request_json = Jason.encode!(request) <> "\n" - - Port.command(server_info.port, request_json) - - # Collect full response by reading multiple lines if needed - response_data = collect_response(server_info.port, "", 30_000) - - cond do - # Check if we got any response data - response_data == "" -> - {:error, "No response received from server #{server_info.name}"} - - # Try to decode JSON response - true -> - case Jason.decode(response_data) do - {:ok, response} -> - {:ok, response} - - {:error, %Jason.DecodeError{} = error} -> - IO.puts(:stderr, - "JSON decode error for server #{server_info.name}: #{Exception.message(error)}" - ) - - IO.puts(:stderr, "Raw response data: #{inspect(response_data)}") - {:error, "JSON decode error: #{Exception.message(error)}"} - end - end - end - - defp collect_response(port, acc, timeout) do - receive do - {^port, {:data, data}} -> - # Accumulate binary data - new_acc = acc <> data - - # Try to extract complete JSON messages from the accumulated data - case extract_json_messages(new_acc) do - {json_message, _remaining} when json_message != nil -> - # We found a complete JSON message, return it - json_message - - {nil, remaining} -> - # No complete JSON message yet, continue collecting - collect_response(port, remaining, timeout) - end - - {^port, {:exit_status, status}} -> - IO.puts(:stderr, "Server exited with status: #{status}") - acc - after - timeout -> - IO.puts(:stderr, "Server request timeout after #{timeout}ms") - acc - end - end - - # Extract complete JSON messages from accumulated binary data - defp extract_json_messages(data) do - lines = String.split(data, "\n", trim: false) - - # Process each line to find JSON messages and skip log messages - {json_lines, _remaining_data} = extract_json_from_lines(lines, []) - - case json_lines do - [] -> - # No complete JSON found, return the last partial line if any - last_line = List.last(lines) || "" - - if String.trim(last_line) != "" and not String.ends_with?(data, "\n") do - {nil, last_line} - else - {nil, ""} - end - - _ -> - # Join all JSON lines and try to parse - json_data = Enum.join(json_lines, "\n") - - case Jason.decode(json_data) do - {:ok, _} -> - # Valid JSON found - {json_data, ""} - - {:error, _} -> - # Invalid JSON, might be incomplete - {nil, data} - end - end - end - - defp extract_json_from_lines([], acc), do: {Enum.reverse(acc), ""} - - defp extract_json_from_lines([line], acc) do - # This is the last line, it might be incomplete - trimmed = String.trim(line) - - cond do - trimmed == "" -> - {Enum.reverse(acc), ""} - - # Skip log messages - Regex.match?(~r/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/, trimmed) -> - {Enum.reverse(acc), ""} - - Regex.match?(~r/^\d{2}:\d{2}:\d{2}\.\d+\s+\[(info|warning|error|debug)\]/, trimmed) -> - {Enum.reverse(acc), ""} - - # Check if this looks like JSON - String.starts_with?(trimmed, ["{"]) -> - {Enum.reverse([line | acc]), ""} - - true -> - # Non-JSON line, might be incomplete - {Enum.reverse(acc), line} - end - end - - defp extract_json_from_lines([line | rest], acc) do - trimmed = String.trim(line) - - cond do - trimmed == "" -> - extract_json_from_lines(rest, acc) - - # Skip log messages - Regex.match?(~r/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/, trimmed) -> - IO.puts(:stderr, "Skipping log message from MCP server: #{trimmed}") - extract_json_from_lines(rest, acc) - - Regex.match?(~r/^\d{2}:\d{2}:\d{2}\.\d+\s+\[(info|warning|error|debug)\]/, trimmed) -> - IO.puts(:stderr, "Skipping log message from MCP server: #{trimmed}") - extract_json_from_lines(rest, acc) - - # Check if this looks like JSON - String.starts_with?(trimmed, ["{"]) -> - extract_json_from_lines(rest, [line | acc]) - - true -> - # Skip non-JSON lines - IO.puts(:stderr, "Skipping non-JSON line from MCP server: #{trimmed}") - extract_json_from_lines(rest, acc) - end - end - - defp refresh_tool_registry(state) do - new_registry = - Enum.reduce(state.servers, %{}, fn {name, server_info}, acc -> - tools = Map.get(server_info, :tools, []) - Map.put(acc, name, tools) - end) - - %{state | tool_registry: new_registry} - end - - defp rediscover_all_tools(state) do - # Re-query all running servers for their current tools - updated_servers = - Enum.reduce(state.servers, state.servers, fn {name, server_info}, acc -> - # Check if server is alive (handle both PID and Port) - server_alive = - case server_info.pid do - nil -> false - pid when is_pid(pid) -> Process.alive?(pid) - port when is_port(port) -> Port.info(port) != nil - _ -> false - end - - if server_alive do - case get_server_tools(server_info) do - {:ok, new_tools} -> - IO.puts(:stderr, "Rediscovered #{length(new_tools)} tools from #{name}") - Map.put(acc, name, %{server_info | tools: new_tools}) - - {:error, reason} -> - IO.puts(:stderr, - "Failed to rediscover tools from #{name}: #{inspect(reason)}" - ) - - acc - end - else - IO.puts(:stderr, "Server #{name} is not alive, skipping tool rediscovery") - acc - end - end) - - # Update state with new server info and refresh tool registry - new_state = %{state | servers: updated_servers} - refresh_tool_registry(new_state) - end - - defp find_tool_server(tool_name, state) do - # Check all tool registries (both coordinator and external servers) - # Start with coordinator tools - coordinator_tools = get_coordinator_tools() - - if Enum.any?(coordinator_tools, fn tool -> tool["name"] == tool_name end) do - {:coordinator, tool_name} - else - # Check external servers - case find_external_tool_server(tool_name, state.tool_registry) do - nil -> :not_found - server_name -> {:external, server_name} - end - end - end - - defp find_external_tool_server(tool_name, tool_registry) do - Enum.find_value(tool_registry, fn {server_name, tools} -> - if Enum.any?(tools, fn tool -> tool["name"] == tool_name end) do - server_name - else - nil - end - end) - end - - defp get_coordinator_tools do - # Get Agent Coordinator native tools - coordinator_native_tools = [ - %{ - "name" => "register_agent", - "description" => "Register a new agent with the coordination system", - "inputSchema" => %{ - "type" => "object", - "properties" => %{ - "name" => %{"type" => "string"}, - "capabilities" => %{ - "type" => "array", - "items" => %{"type" => "string"} - }, - "metadata" => %{ - "type" => "object", - "description" => "Optional metadata about the agent (e.g., client_type, session_id)" - } - }, - "required" => ["name", "capabilities"] - } - }, - %{ - "name" => "create_task", - "description" => "Create a new task in the coordination system", - "inputSchema" => %{ - "type" => "object", - "properties" => %{ - "title" => %{"type" => "string"}, - "description" => %{"type" => "string"}, - "priority" => %{"type" => "string", "enum" => ["low", "normal", "high", "urgent"]}, - "required_capabilities" => %{ - "type" => "array", - "items" => %{"type" => "string"} - }, - "file_paths" => %{ - "type" => "array", - "items" => %{"type" => "string"} - } - }, - "required" => ["title", "description"] - } - }, - %{ - "name" => "get_next_task", - "description" => "Get the next task for an agent", - "inputSchema" => %{ - "type" => "object", - "properties" => %{ - "agent_id" => %{"type" => "string"} - }, - "required" => ["agent_id"] - } - }, - %{ - "name" => "complete_task", - "description" => "Mark current task as completed", - "inputSchema" => %{ - "type" => "object", - "properties" => %{ - "agent_id" => %{"type" => "string"} - }, - "required" => ["agent_id"] - } - }, - %{ - "name" => "get_task_board", - "description" => "Get overview of all agents and their current tasks", - "inputSchema" => %{ - "type" => "object", - "properties" => %{} - } - }, - %{ - "name" => "heartbeat", - "description" => "Send heartbeat to maintain agent status", - "inputSchema" => %{ - "type" => "object", - "properties" => %{ - "agent_id" => %{"type" => "string"} - }, - "required" => ["agent_id"] - } - } - ] - - # Get VS Code tools only if VS Code functionality is available - vscode_tools = - try do - if Code.ensure_loaded?(AgentCoordinator.VSCodeToolProvider) do - AgentCoordinator.VSCodeToolProvider.get_tools() - else - IO.puts(:stderr, "VS Code tools not available - module not loaded") - [] - end - rescue - _ -> - IO.puts(:stderr, "VS Code tools not available - error loading") - [] - end - - # Combine all coordinator tools - coordinator_native_tools ++ vscode_tools - end - - # Removed get_coordinator_tool_names - now using dynamic tool discovery - - defp handle_coordinator_tool(tool_name, arguments, agent_context) do - # Route to existing Agent Coordinator functionality or VS Code tools - case tool_name do - "register_agent" -> - opts = - case arguments["metadata"] do - nil -> [] - metadata -> [metadata: metadata] - end - - AgentCoordinator.TaskRegistry.register_agent( - arguments["name"], - arguments["capabilities"], - opts - ) - - "create_task" -> - AgentCoordinator.TaskRegistry.create_task( - arguments["title"], - arguments["description"], - Map.take(arguments, ["priority", "required_capabilities", "file_paths"]) - ) - - "get_next_task" -> - AgentCoordinator.TaskRegistry.get_next_task(arguments["agent_id"]) - - "complete_task" -> - AgentCoordinator.TaskRegistry.complete_task(arguments["agent_id"]) - - "get_task_board" -> - AgentCoordinator.TaskRegistry.get_task_board() - - "heartbeat" -> - AgentCoordinator.TaskRegistry.heartbeat_agent(arguments["agent_id"]) - - # VS Code tools - route to VS Code Tool Provider - "vscode_" <> _rest -> - AgentCoordinator.VSCodeToolProvider.handle_tool_call(tool_name, arguments, agent_context) - - _ -> - %{"error" => %{"code" => -32601, "message" => "Unknown coordinator tool: #{tool_name}"}} - end - end - - defp call_external_tool(server_name, tool_name, arguments, agent_context, state) do - case Map.get(state.servers, server_name) do - nil -> - %{"error" => %{"code" => -32603, "message" => "Server not available: #{server_name}"}} - - server_info -> - # Send heartbeat before tool call if agent context available - if agent_context && agent_context.agent_id do - AgentCoordinator.TaskRegistry.heartbeat_agent(agent_context.agent_id) - - # Auto-create/update current task for this tool usage - update_current_task(agent_context.agent_id, tool_name, arguments) - end - - # Make the actual tool call - tool_request = %{ - "jsonrpc" => "2.0", - "id" => System.unique_integer([:positive]), - "method" => "tools/call", - "params" => %{ - "name" => tool_name, - "arguments" => arguments - } - } - - result = - case send_server_request(server_info, tool_request) do - {:ok, response} -> - # Send heartbeat after successful tool call - if agent_context && agent_context.agent_id do - AgentCoordinator.TaskRegistry.heartbeat_agent(agent_context.agent_id) - end - - response - - {:error, reason} -> - %{"error" => %{"code" => -32603, "message" => reason}} - end - - result - end - end - - defp update_current_task(agent_id, tool_name, arguments) do - # Create a descriptive task title based on the tool being used - task_title = generate_task_title(tool_name, arguments) - task_description = generate_task_description(tool_name, arguments) - - # Check if agent has current task, if not create one - case AgentCoordinator.TaskRegistry.get_agent_current_task(agent_id) do - nil -> - # Create new auto-task - AgentCoordinator.TaskRegistry.create_task( - task_title, - task_description, - %{ - priority: "normal", - auto_generated: true, - tool_name: tool_name, - assigned_agent: agent_id - } - ) - - # Auto-assign to this agent - case AgentCoordinator.TaskRegistry.get_next_task(agent_id) do - {:ok, _task} -> :ok - _ -> :ok - end - - existing_task -> - # Update existing task with latest activity - AgentCoordinator.TaskRegistry.update_task_activity( - existing_task.id, - tool_name, - arguments - ) - end - end - - # TODO: Perhaps... copilot should supply what it thinks it is doing? - # Or, we need to fill these out with every possible tool - defp generate_task_title(tool_name, arguments) do - case tool_name do - "read_file" -> - "Reading file: #{Path.basename(arguments["path"] || "unknown")}" - - "write_file" -> - "Writing file: #{Path.basename(arguments["path"] || "unknown")}" - - "list_directory" -> - "Exploring directory: #{Path.basename(arguments["path"] || "unknown")}" - - "mcp_context7_get-library-docs" -> - "Researching: #{arguments["context7CompatibleLibraryID"] || "library"}" - - "mcp_figma_get_code" -> - "Generating Figma code: #{arguments["nodeId"] || "component"}" - - "mcp_firebase_firestore_get_documents" -> - "Fetching Firestore documents" - - "mcp_memory_search_nodes" -> - "Searching memory: #{arguments["query"] || "query"}" - - "mcp_sequentialthi_sequentialthinking" -> - "Thinking through problem" - - _ -> - "Using tool: #{tool_name}" - end - end - - # TODO: See Line [1042](#L1042) - defp generate_task_description(tool_name, arguments) do - case tool_name do - "read_file" -> - "Reading and analyzing file content from #{arguments["path"]}" - - "write_file" -> - "Creating or updating file at #{arguments["path"]}" - - "list_directory" -> - "Exploring directory structure at #{arguments["path"]}" - - "mcp_context7_get-library-docs" -> - "Researching documentation for #{arguments["context7CompatibleLibraryID"]} library" - - "mcp_figma_get_code" -> - "Generating code for Figma component #{arguments["nodeId"]}" - - "mcp_firebase_firestore_get_documents" -> - "Retrieving documents from Firestore: #{inspect(arguments["paths"])}" - - "mcp_memory_search_nodes" -> - "Searching knowledge graph for: #{arguments["query"]}" - - "mcp_sequentialthi_sequentialthinking" -> - "Using sequential thinking to solve complex problem" - - _ -> - "Executing #{tool_name} with arguments: #{inspect(arguments)}" - end - end - - defp should_auto_restart?(server_name, config) do - server_config = Map.get(config.servers, server_name, %{}) - Map.get(server_config, :auto_restart, false) - end -end diff --git a/lib/agent_coordinator/unified_mcp_server.ex b/lib/agent_coordinator/unified_mcp_server.ex deleted file mode 100644 index f588ebf..0000000 --- a/lib/agent_coordinator/unified_mcp_server.ex +++ /dev/null @@ -1,251 +0,0 @@ -defmodule AgentCoordinator.UnifiedMCPServer do - @moduledoc """ - Unified MCP Server that aggregates all external MCP servers and Agent Coordinator tools. - - This is the single MCP server that GitHub Copilot sees, which internally manages - all other MCP servers and provides automatic task tracking for any tool usage. - """ - - use GenServer - require Logger - - alias AgentCoordinator.{MCPServerManager, TaskRegistry} - - defstruct [ - :agent_sessions, - :request_id_counter - ] - - # Client API - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @doc """ - Handle MCP request from GitHub Copilot - """ - def handle_mcp_request(request) do - GenServer.call(__MODULE__, {:handle_request, request}, 60_000) - end - - # Server callbacks - - def init(_opts) do - state = %__MODULE__{ - agent_sessions: %{}, - request_id_counter: 0 - } - - IO.puts(:stderr, "Unified MCP Server starting...") - - {:ok, state} - end - - def handle_call({:handle_request, request}, _from, state) do - response = process_mcp_request(request, state) - {:reply, response, state} - end - - def handle_call({:register_agent_session, agent_id, session_info}, _from, state) do - new_state = %{state | agent_sessions: Map.put(state.agent_sessions, agent_id, session_info)} - {:reply, :ok, new_state} - end - - def handle_info(_msg, state) do - {:noreply, state} - end - - # Private functions - - defp process_mcp_request(request, state) do - method = Map.get(request, "method") - id = Map.get(request, "id") - - case method do - "initialize" -> - handle_initialize(request, id) - - "tools/list" -> - handle_tools_list(request, id) - - "tools/call" -> - handle_tools_call(request, id, state) - - _ -> - error_response(id, -32601, "Method not found: #{method}") - end - end - - defp handle_initialize(_request, id) do - %{ - "jsonrpc" => "2.0", - "id" => id, - "result" => %{ - "protocolVersion" => "2024-11-05", - "capabilities" => %{ - "tools" => %{}, - "coordination" => %{ - "automatic_task_tracking" => true, - "agent_management" => true, - "multi_server_proxy" => true, - "heartbeat_coverage" => true - } - }, - "serverInfo" => %{ - "name" => "agent-coordinator-unified", - "version" => "0.1.0", - "description" => - "Unified MCP server with automatic task tracking and agent coordination" - } - } - } - end - - defp handle_tools_list(_request, id) do - case MCPServerManager.get_unified_tools() do - tools when is_list(tools) -> - %{ - "jsonrpc" => "2.0", - "id" => id, - "result" => %{ - "tools" => tools - } - } - - {:error, reason} -> - error_response(id, -32603, "Failed to get tools: #{reason}") - end - end - - defp handle_tools_call(request, id, state) do - params = Map.get(request, "params", %{}) - tool_name = Map.get(params, "name") - arguments = Map.get(params, "arguments", %{}) - - # Determine agent context from the request or session - agent_context = determine_agent_context(request, arguments, state) - - case MCPServerManager.route_tool_call(tool_name, arguments, agent_context) do - %{"error" => _} = error_result -> - Map.put(error_result, "id", id) - - result -> - # Wrap successful results in MCP format - success_response = %{ - "jsonrpc" => "2.0", - "id" => id, - "result" => format_tool_result(result, tool_name, agent_context) - } - - success_response - end - end - - defp determine_agent_context(request, arguments, state) do - # Try to determine agent from various sources: - - # 1. Explicit agent_id in arguments - case Map.get(arguments, "agent_id") do - agent_id when is_binary(agent_id) -> - %{agent_id: agent_id} - - _ -> - # 2. Try to extract from request metadata - case extract_agent_from_request(request) do - agent_id when is_binary(agent_id) -> - %{agent_id: agent_id} - - _ -> - # 3. Use a default session for GitHub Copilot - default_agent_context(state) - end - end - end - - defp extract_agent_from_request(_request) do - # Look for agent info in request headers, params, etc. - # This could be extended to support various ways of identifying the agent - nil - end - - defp default_agent_context(state) do - # Create or use a default agent session for GitHub Copilot - default_agent_id = "github_copilot_session" - - case Map.get(state.agent_sessions, default_agent_id) do - nil -> - # Auto-register GitHub Copilot as an agent - case TaskRegistry.register_agent("GitHub Copilot", [ - "coding", - "analysis", - "review", - "documentation" - ]) do - {:ok, %{agent_id: agent_id}} -> - session_info = %{ - agent_id: agent_id, - name: "GitHub Copilot", - auto_registered: true, - created_at: DateTime.utc_now() - } - - GenServer.call(self(), {:register_agent_session, agent_id, session_info}) - %{agent_id: agent_id} - - _ -> - %{agent_id: default_agent_id} - end - - session_info -> - %{agent_id: session_info.agent_id} - end - end - - defp format_tool_result(result, tool_name, agent_context) do - # Format the result according to MCP tool call response format - base_result = - case result do - %{"result" => content} when is_map(content) -> - # Already properly formatted - content - - {:ok, content} -> - # Convert tuple response to content - %{"content" => [%{"type" => "text", "text" => inspect(content)}]} - - %{} = map_result -> - # Convert map to text content - %{"content" => [%{"type" => "text", "text" => Jason.encode!(map_result)}]} - - binary when is_binary(binary) -> - # Simple text result - %{"content" => [%{"type" => "text", "text" => binary}]} - - other -> - # Fallback for any other type - %{"content" => [%{"type" => "text", "text" => inspect(other)}]} - end - - # Add metadata about the operation - metadata = %{ - "tool_name" => tool_name, - "agent_id" => agent_context.agent_id, - "timestamp" => DateTime.utc_now() |> DateTime.to_iso8601(), - "auto_tracked" => true - } - - Map.put(base_result, "_metadata", metadata) - end - - defp error_response(id, code, message) do - %{ - "jsonrpc" => "2.0", - "id" => id, - "error" => %{ - "code" => code, - "message" => message - } - } - end -end