Fix docker startup so that it works properly with stdio mode. Probably worthwhile to toss majority of this readme, less confusing

This commit is contained in:
Ra
2025-09-08 19:34:32 -07:00
parent 5d3e04c5f8
commit 74a8574778
13 changed files with 386 additions and 564 deletions

View File

@@ -26,7 +26,7 @@ defmodule AgentCoordinator.HttpInterface do
def start_link(opts \\ []) do
port = Keyword.get(opts, :port, 8080)
Logger.info("Starting Agent Coordinator HTTP interface on port #{port}")
IO.puts(:stderr, "Starting Agent Coordinator HTTP interface on port #{port}")
Plug.Cowboy.http(__MODULE__, [],
port: port,
@@ -158,7 +158,7 @@ defmodule AgentCoordinator.HttpInterface do
send_json_response(conn, 400, %{error: error})
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
IO.puts(:stderr, "Unexpected MCP response: #{inspect(unexpected)}")
send_json_response(conn, 500, %{
error: %{
code: -32603,
@@ -317,7 +317,7 @@ defmodule AgentCoordinator.HttpInterface do
rescue
# Client disconnected
_ ->
Logger.info("SSE client disconnected")
IO.puts(:stderr, "SSE client disconnected")
conn
end
end
@@ -411,7 +411,7 @@ defmodule AgentCoordinator.HttpInterface do
origin
else
# For production, be more restrictive
Logger.warning("Potentially unsafe origin: #{origin}")
IO.puts(:stderr, "Potentially unsafe origin: #{origin}")
"*" # Fallback for now, could be more restrictive
end
_ -> "*"
@@ -487,7 +487,7 @@ defmodule AgentCoordinator.HttpInterface do
validated: true
}}
{:error, reason} ->
Logger.warning("Invalid MCP session token: #{reason}")
IO.puts(:stderr, "Invalid MCP session token: #{reason}")
# Fall back to generating anonymous session
anonymous_id = "http_anonymous_" <> (:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower))
{anonymous_id, %{validated: false, reason: reason}}
@@ -559,7 +559,7 @@ defmodule AgentCoordinator.HttpInterface do
send_json_response(conn, 400, response)
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
IO.puts(:stderr, "Unexpected MCP response: #{inspect(unexpected)}")
send_json_response(conn, 500, %{
jsonrpc: "2.0",
id: Map.get(mcp_request, "id"),

View File

@@ -102,7 +102,7 @@ defmodule AgentCoordinator.InterfaceManager do
metrics: initialize_metrics()
}
Logger.info("Interface Manager starting with config: #{inspect(config.enabled_interfaces)}")
IO.puts(:stderr, "Interface Manager starting with config: #{inspect(config.enabled_interfaces)}")
# Start enabled interfaces
{:ok, state, {:continue, :start_interfaces}}
@@ -114,11 +114,11 @@ defmodule AgentCoordinator.InterfaceManager do
updated_state = Enum.reduce(state.config.enabled_interfaces, state, fn interface_type, acc ->
case start_interface_server(interface_type, state.config, acc) do
{:ok, interface_info} ->
Logger.info("Started #{interface_type} interface")
IO.puts(:stderr, "Started #{interface_type} interface")
%{acc | interfaces: Map.put(acc.interfaces, interface_type, interface_info)}
{:error, reason} ->
Logger.error("Failed to start #{interface_type} interface: #{reason}")
IO.puts(:stderr, "Failed to start #{interface_type} interface: #{reason}")
acc
end
end)
@@ -152,11 +152,11 @@ defmodule AgentCoordinator.InterfaceManager do
updated_interfaces = Map.put(state.interfaces, interface_type, interface_info)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Started #{interface_type} interface on demand")
IO.puts(:stderr, "Started #{interface_type} interface on demand")
{:reply, {:ok, interface_info}, updated_state}
{:error, reason} ->
Logger.error("Failed to start #{interface_type} interface: #{reason}")
IO.puts(:stderr, "Failed to start #{interface_type} interface: #{reason}")
{:reply, {:error, reason}, state}
end
else
@@ -176,11 +176,11 @@ defmodule AgentCoordinator.InterfaceManager do
updated_interfaces = Map.delete(state.interfaces, interface_type)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Stopped #{interface_type} interface")
IO.puts(:stderr, "Stopped #{interface_type} interface")
{:reply, :ok, updated_state}
{:error, reason} ->
Logger.error("Failed to stop #{interface_type} interface: #{reason}")
IO.puts(:stderr, "Failed to stop #{interface_type} interface: #{reason}")
{:reply, {:error, reason}, state}
end
end
@@ -202,7 +202,7 @@ defmodule AgentCoordinator.InterfaceManager do
updated_interfaces = Map.put(state.interfaces, interface_type, new_interface_info)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Restarted #{interface_type} interface")
IO.puts(:stderr, "Restarted #{interface_type} interface")
{:reply, {:ok, new_interface_info}, updated_state}
{:error, reason} ->
@@ -210,12 +210,12 @@ defmodule AgentCoordinator.InterfaceManager do
updated_interfaces = Map.delete(state.interfaces, interface_type)
updated_state = %{state | interfaces: updated_interfaces}
Logger.error("Failed to restart #{interface_type} interface: #{reason}")
IO.puts(:stderr, "Failed to restart #{interface_type} interface: #{reason}")
{:reply, {:error, reason}, updated_state}
end
{:error, reason} ->
Logger.error("Failed to stop #{interface_type} interface for restart: #{reason}")
IO.puts(:stderr, "Failed to stop #{interface_type} interface for restart: #{reason}")
{:reply, {:error, reason}, state}
end
end
@@ -253,7 +253,7 @@ defmodule AgentCoordinator.InterfaceManager do
updated_registry = Map.put(state.session_registry, session_id, session_data)
updated_state = %{state | session_registry: updated_registry}
Logger.debug("Registered session #{session_id} for #{interface_type}")
IO.puts(:stderr, "Registered session #{session_id} for #{interface_type}")
{:noreply, updated_state}
end
@@ -261,14 +261,14 @@ defmodule AgentCoordinator.InterfaceManager do
def handle_cast({:unregister_session, session_id}, state) do
case Map.get(state.session_registry, session_id) do
nil ->
Logger.debug("Attempted to unregister unknown session: #{session_id}")
IO.puts(:stderr, "Attempted to unregister unknown session: #{session_id}")
{:noreply, state}
_session_data ->
updated_registry = Map.delete(state.session_registry, session_id)
updated_state = %{state | session_registry: updated_registry}
Logger.debug("Unregistered session #{session_id}")
IO.puts(:stderr, "Unregistered session #{session_id}")
{:noreply, updated_state}
end
end
@@ -278,7 +278,7 @@ defmodule AgentCoordinator.InterfaceManager do
# Handle interface process crashes
case find_interface_by_pid(pid, state.interfaces) do
{interface_type, _interface_info} ->
Logger.error("#{interface_type} interface crashed: #{inspect(reason)}")
IO.puts(:stderr, "#{interface_type} interface crashed: #{inspect(reason)}")
# Remove from running interfaces
updated_interfaces = Map.delete(state.interfaces, interface_type)
@@ -286,14 +286,14 @@ defmodule AgentCoordinator.InterfaceManager do
# Optionally restart if configured
if should_auto_restart?(interface_type, state.config) do
Logger.info("Auto-restarting #{interface_type} interface")
IO.puts(:stderr, "Auto-restarting #{interface_type} interface")
Process.send_after(self(), {:restart_interface, interface_type}, 5000)
end
{:noreply, updated_state}
nil ->
Logger.debug("Unknown process died: #{inspect(pid)}")
IO.puts(:stderr, "Unknown process died: #{inspect(pid)}")
{:noreply, state}
end
end
@@ -305,18 +305,18 @@ defmodule AgentCoordinator.InterfaceManager do
updated_interfaces = Map.put(state.interfaces, interface_type, interface_info)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Auto-restarted #{interface_type} interface")
IO.puts(:stderr, "Auto-restarted #{interface_type} interface")
{:noreply, updated_state}
{:error, reason} ->
Logger.error("Failed to auto-restart #{interface_type} interface: #{reason}")
IO.puts(:stderr, "Failed to auto-restart #{interface_type} interface: #{reason}")
{:noreply, state}
end
end
@impl GenServer
def handle_info(message, state) do
Logger.debug("Interface Manager received unexpected message: #{inspect(message)}")
IO.puts(:stderr, "Interface Manager received unexpected message: #{inspect(message)}")
{:noreply, state}
end
@@ -516,18 +516,46 @@ defmodule AgentCoordinator.InterfaceManager do
defp handle_stdio_loop(state) do
# Handle MCP JSON-RPC messages from STDIO
# Use different approaches for Docker vs regular environments
if docker_environment?() do
handle_stdio_docker_loop(state)
else
handle_stdio_regular_loop(state)
end
end
defp handle_stdio_regular_loop(state) do
case IO.read(:stdio, :line) do
:eof ->
Logger.info("STDIO interface shutting down (EOF)")
IO.puts(:stderr, "STDIO interface shutting down (EOF)")
exit(:normal)
{:error, reason} ->
Logger.error("STDIO error: #{inspect(reason)}")
IO.puts(:stderr, "STDIO error: #{inspect(reason)}")
exit({:error, reason})
line ->
handle_stdio_message(String.trim(line), state)
handle_stdio_loop(state)
handle_stdio_regular_loop(state)
end
end
defp handle_stdio_docker_loop(state) do
# In Docker, use regular IO.read instead of Port.open({:fd, 0, 1})
# to avoid "driver_select stealing control of fd=0" conflicts with external MCP servers
# This allows external servers to use pipes while Agent Coordinator reads from stdin
case IO.read(:stdio, :line) do
:eof ->
IO.puts(:stderr, "STDIO interface shutting down (EOF)")
exit(:normal)
{:error, reason} ->
IO.puts(:stderr, "STDIO error: #{inspect(reason)}")
exit({:error, reason})
line ->
handle_stdio_message(String.trim(line), state)
handle_stdio_docker_loop(state)
end
end
@@ -646,4 +674,21 @@ defmodule AgentCoordinator.InterfaceManager do
end
defp deep_merge(_left, right), do: right
# Check if running in Docker environment
defp docker_environment? do
# Check common Docker environment indicators
System.get_env("DOCKER_CONTAINER") != nil or
System.get_env("container") != nil or
System.get_env("DOCKERIZED") != nil or
File.exists?("/.dockerenv") or
File.exists?("/proc/1/cgroup") and
(File.read!("/proc/1/cgroup") |> String.contains?("docker")) or
String.contains?(to_string(System.get_env("PATH", "")), "/app/") or
# Check if we're running under a container init system
case File.read("/proc/1/comm") do
{:ok, comm} -> String.trim(comm) in ["bash", "sh", "docker-init", "tini"]
_ -> false
end
end
end

View File

@@ -654,7 +654,7 @@ defmodule AgentCoordinator.MCPServer do
}}
{:error, reason} ->
Logger.error("Failed to create session for agent #{agent.id}: #{inspect(reason)}")
IO.puts(:stderr, "Failed to create session for agent #{agent.id}: #{inspect(reason)}")
# Still return success but without session token for backward compatibility
{:ok, %{agent_id: agent.id, codebase_id: agent.codebase_id, status: "registered"}}
end
@@ -1226,17 +1226,15 @@ defmodule AgentCoordinator.MCPServer do
tools: []
}
# Initialize the server and get tools
# Initialize the server and get tools with shorter timeout
case initialize_external_server(server_info) do
{:ok, tools} ->
{:ok, %{server_info | tools: tools}}
{:error, reason} ->
# Cleanup on initialization failure
cleanup_external_pid_file(pid_file_path)
kill_external_process(os_pid)
if Port.info(port), do: Port.close(port)
{:error, reason}
# Log error but don't fail - continue with empty tools
IO.puts(:stderr, "Failed to initialize #{name}: #{reason}")
{:ok, %{server_info | tools: []}}
end
{:error, reason} ->
@@ -1276,6 +1274,8 @@ defmodule AgentCoordinator.MCPServer do
env_list =
Enum.map(env, fn {key, value} -> {String.to_charlist(key), String.to_charlist(value)} end)
# Use pipe communication but allow stdin/stdout for MCP protocol
# Remove :nouse_stdio since MCP servers need stdin/stdout to communicate
port_options = [
:binary,
:stream,
@@ -1357,7 +1357,7 @@ defmodule AgentCoordinator.MCPServer do
Port.command(server_info.port, request_json)
# Collect full response by reading multiple lines if needed
response_data = collect_external_response(server_info.port, "", 30_000)
response_data = collect_external_response(server_info.port, "", 5_000)
cond do
response_data == "" ->
@@ -1503,35 +1503,6 @@ defmodule AgentCoordinator.MCPServer do
pid_file_path
end
defp cleanup_external_pid_file(pid_file_path) do
if File.exists?(pid_file_path) do
File.rm(pid_file_path)
end
end
defp kill_external_process(os_pid) when is_integer(os_pid) do
try do
case System.cmd("kill", ["-TERM", to_string(os_pid)]) do
{_, 0} ->
IO.puts(:stderr, "Successfully terminated process #{os_pid}")
:ok
{_, _} ->
case System.cmd("kill", ["-KILL", to_string(os_pid)]) do
{_, 0} ->
IO.puts(:stderr, "Force killed process #{os_pid}")
:ok
{_, _} ->
IO.puts(:stderr, "Failed to kill process #{os_pid}")
:error
end
end
rescue
_ -> :error
end
end
defp get_all_unified_tools_from_state(state) do
# Combine coordinator tools with external server tools from state
coordinator_tools = @mcp_tools
@@ -1560,12 +1531,12 @@ defmodule AgentCoordinator.MCPServer do
defp route_tool_call(tool_name, args, state) do
# Extract agent_id for activity tracking
agent_id = Map.get(args, "agent_id")
# Update agent activity before processing the tool call
if agent_id do
ActivityTracker.update_agent_activity(agent_id, tool_name, args)
end
# Check if it's a coordinator tool first
coordinator_tool_names = Enum.map(@mcp_tools, & &1["name"])
@@ -1583,12 +1554,12 @@ defmodule AgentCoordinator.MCPServer do
# Try to route to external server
route_to_external_server(tool_name, args, state)
end
# Clear agent activity after tool call completes (optional - could keep until next call)
# if agent_id do
# ActivityTracker.clear_agent_activity(agent_id)
# end
result
end

View File

@@ -78,7 +78,7 @@ defmodule AgentCoordinator.SessionManager do
}
}
Logger.info("SessionManager started with #{state.config.expiry_minutes}min expiry")
IO.puts(:stderr, "SessionManager started with #{state.config.expiry_minutes}min expiry")
{:ok, state}
end
@@ -99,7 +99,7 @@ defmodule AgentCoordinator.SessionManager do
new_sessions = Map.put(state.sessions, session_token, session_data)
new_state = %{state | sessions: new_sessions}
Logger.debug("Created session #{session_token} for agent #{agent_id}")
IO.puts(:stderr, "Created session #{session_token} for agent #{agent_id}")
{:reply, {:ok, session_token}, new_state}
end
@@ -136,7 +136,7 @@ defmodule AgentCoordinator.SessionManager do
session_data ->
new_sessions = Map.delete(state.sessions, session_token)
new_state = %{state | sessions: new_sessions}
Logger.debug("Invalidated session #{session_token} for agent #{session_data.agent_id}")
IO.puts(:stderr, "Invalidated session #{session_token} for agent #{session_data.agent_id}")
{:reply, :ok, new_state}
end
end
@@ -161,7 +161,7 @@ defmodule AgentCoordinator.SessionManager do
end)
if length(expired_sessions) > 0 do
Logger.debug("Cleaned up #{length(expired_sessions)} expired sessions")
IO.puts(:stderr, "Cleaned up #{length(expired_sessions)} expired sessions")
end
new_state = %{state | sessions: Map.new(active_sessions)}

View File

@@ -37,7 +37,7 @@ defmodule AgentCoordinator.WebSocketHandler do
# Start heartbeat timer
Process.send_after(self(), :heartbeat, @heartbeat_interval)
Logger.info("WebSocket connection established: #{session_id}")
IO.puts(:stderr, "WebSocket connection established: #{session_id}")
{:ok, state}
end
@@ -64,7 +64,7 @@ defmodule AgentCoordinator.WebSocketHandler do
@impl WebSock
def handle_in({_binary, [opcode: :binary]}, state) do
Logger.warning("Received unexpected binary data on WebSocket")
IO.puts(:stderr, "Received unexpected binary data on WebSocket")
{:ok, state}
end
@@ -95,20 +95,20 @@ defmodule AgentCoordinator.WebSocketHandler do
@impl WebSock
def handle_info(message, state) do
Logger.debug("Received unexpected message: #{inspect(message)}")
IO.puts(:stderr, "Received unexpected message: #{inspect(message)}")
{:ok, state}
end
@impl WebSock
def terminate(:remote, state) do
Logger.info("WebSocket connection closed by client: #{state.session_id}")
IO.puts(:stderr, "WebSocket connection closed by client: #{state.session_id}")
cleanup_session(state)
:ok
end
@impl WebSock
def terminate(reason, state) do
Logger.info("WebSocket connection terminated: #{state.session_id}, reason: #{inspect(reason)}")
IO.puts(:stderr, "WebSocket connection terminated: #{state.session_id}, reason: #{inspect(reason)}")
cleanup_session(state)
:ok
end
@@ -245,7 +245,7 @@ defmodule AgentCoordinator.WebSocketHandler do
{:reply, {:text, Jason.encode!(response)}, updated_state}
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
IO.puts(:stderr, "Unexpected MCP response: #{inspect(unexpected)}")
error_response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
@@ -287,7 +287,7 @@ defmodule AgentCoordinator.WebSocketHandler do
defp handle_initialized_notification(_message, state) do
# Client is ready to receive notifications
Logger.info("WebSocket client initialized: #{state.session_id}")
IO.puts(:stderr, "WebSocket client initialized: #{state.session_id}")
{:ok, state}
end
@@ -304,7 +304,7 @@ defmodule AgentCoordinator.WebSocketHandler do
{:ok, state}
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
IO.puts(:stderr, "Unexpected MCP response: #{inspect(unexpected)}")
{:ok, state}
end
else