Fix inbox creation issues in agent coordinator

- Fixed Task.new/3 to handle both maps and keyword lists
- Added robust inbox existence checking in find_available_agent
- Ensure inbox creation during agent registration and task assignment
- Add helper function ensure_inbox_exists to avoid crashes
This commit is contained in:
Ra
2025-08-23 14:46:28 -07:00
parent 5048db99c7
commit 943d8ad4d7
40 changed files with 7798 additions and 404 deletions

View File

@@ -2,13 +2,27 @@ defmodule AgentCoordinator.Agent do
@moduledoc """
Agent data structure for the coordination system.
"""
@derive {Jason.Encoder,
only: [
:id,
:name,
:capabilities,
:status,
:current_task_id,
:codebase_id,
:workspace_path,
:last_heartbeat,
:metadata
]}
defstruct [
:id,
:name,
:capabilities,
:status,
:current_task_id,
:codebase_id,
:workspace_path,
:last_heartbeat,
:metadata
]
@@ -17,14 +31,16 @@ defmodule AgentCoordinator.Agent do
@type capability :: :coding | :testing | :documentation | :analysis | :review
@type t :: %__MODULE__{
id: String.t(),
name: String.t(),
capabilities: [capability()],
status: status(),
current_task_id: String.t() | nil,
last_heartbeat: DateTime.t(),
metadata: map()
}
id: String.t(),
name: String.t(),
capabilities: [capability()],
status: status(),
current_task_id: String.t() | nil,
codebase_id: String.t(),
workspace_path: String.t() | nil,
last_heartbeat: DateTime.t(),
metadata: map()
}
def new(name, capabilities, opts \\ []) do
%__MODULE__{
@@ -33,6 +49,8 @@ defmodule AgentCoordinator.Agent do
capabilities: capabilities,
status: :idle,
current_task_id: nil,
codebase_id: Keyword.get(opts, :codebase_id, "default"),
workspace_path: Keyword.get(opts, :workspace_path),
last_heartbeat: DateTime.utc_now(),
metadata: Keyword.get(opts, :metadata, %{})
}
@@ -55,12 +73,22 @@ defmodule AgentCoordinator.Agent do
end
def can_handle?(agent, task) do
# Check if agent is in the same codebase or can handle cross-codebase tasks
codebase_compatible = agent.codebase_id == task.codebase_id or
Map.get(agent.metadata, :cross_codebase_capable, false)
# Simple capability matching - can be enhanced
required_capabilities = Map.get(task.metadata, :required_capabilities, [])
case required_capabilities do
capability_match = case required_capabilities do
[] -> true
caps -> Enum.any?(caps, fn cap -> cap in agent.capabilities end)
end
codebase_compatible and capability_match
end
end
def can_work_cross_codebase?(agent) do
Map.get(agent.metadata, :cross_codebase_capable, false)
end
end

View File

@@ -7,37 +7,54 @@ defmodule AgentCoordinator.Application do
@impl true
def start(_type, _args) do
# Check if persistence should be enabled (useful for testing)
enable_persistence = Application.get_env(:agent_coordinator, :enable_persistence, true)
children = [
# Registry for agent inboxes
{Registry, keys: :unique, name: AgentCoordinator.InboxRegistry},
# PubSub for real-time updates
{Phoenix.PubSub, name: AgentCoordinator.PubSub},
# Persistence layer
{AgentCoordinator.Persistence, nats: nats_config()},
# Task registry with NATS integration
{AgentCoordinator.TaskRegistry, nats: nats_config()},
# Codebase registry for multi-codebase coordination
{AgentCoordinator.CodebaseRegistry, nats: if(enable_persistence, do: nats_config(), else: nil)},
# Task registry with NATS integration (conditionally add persistence)
{AgentCoordinator.TaskRegistry, nats: if(enable_persistence, do: nats_config(), else: nil)},
# MCP server
AgentCoordinator.MCPServer,
# Auto-heartbeat manager
AgentCoordinator.AutoHeartbeat,
# Enhanced MCP server with automatic heartbeats
AgentCoordinator.EnhancedMCPServer,
# Dynamic supervisor for agent inboxes
{DynamicSupervisor, name: AgentCoordinator.InboxSupervisor, strategy: :one_for_one}
]
# Add persistence layer if enabled
children =
if enable_persistence do
[{AgentCoordinator.Persistence, nats: nats_config()} | children]
else
children
end
opts = [strategy: :one_for_one, name: AgentCoordinator.Supervisor]
Supervisor.start_link(children, opts)
end
defp nats_config do
[
%{
host: System.get_env("NATS_HOST", "localhost"),
port: String.to_integer(System.get_env("NATS_PORT", "4222")),
connection_settings: [
connection_settings: %{
name: :agent_coordinator
]
]
}
}
end
end

View File

@@ -0,0 +1,231 @@
defmodule AgentCoordinator.AutoHeartbeat do
@moduledoc """
Automatic heartbeat management for agents.
This module provides:
1. Automatic heartbeat sending with every MCP action
2. Background heartbeat timer for idle periods
3. Heartbeat wrapper functions for all operations
"""
use GenServer
alias AgentCoordinator.{MCPServer, TaskRegistry}
# Heartbeat every 10 seconds when idle
@heartbeat_interval 10_000
# Store active agent contexts
defstruct [
:timers,
:agent_contexts
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Register an agent with automatic heartbeat management
"""
def register_agent_with_heartbeat(name, capabilities, agent_context \\ %{}) do
# Convert capabilities to strings if they're atoms
string_capabilities = Enum.map(capabilities, fn
cap when is_atom(cap) -> Atom.to_string(cap)
cap when is_binary(cap) -> cap
end)
# First register the agent normally
case MCPServer.handle_mcp_request(%{
"method" => "tools/call",
"params" => %{
"name" => "register_agent",
"arguments" => %{"name" => name, "capabilities" => string_capabilities}
}
}) do
%{"result" => %{"content" => [%{"text" => response_json}]}} ->
case Jason.decode(response_json) do
{:ok, %{"agent_id" => agent_id}} ->
# Start automatic heartbeat for this agent
GenServer.call(__MODULE__, {:start_heartbeat, agent_id, agent_context})
{:ok, agent_id}
{:error, reason} ->
{:error, reason}
end
%{"error" => %{"message" => message}} ->
{:error, message}
_ ->
{:error, "Unexpected response format"}
end
end
@doc """
Wrapper for any MCP action that automatically sends heartbeat
"""
def mcp_action_with_heartbeat(agent_id, action_request) do
# Send heartbeat before action
heartbeat_result = send_heartbeat(agent_id)
# Perform the actual action
action_result = MCPServer.handle_mcp_request(action_request)
# Send heartbeat after action (to update last activity)
post_heartbeat_result = send_heartbeat(agent_id)
# Reset the timer for this agent
GenServer.cast(__MODULE__, {:reset_timer, agent_id})
# Return the action result along with heartbeat status
case action_result do
%{"result" => _} = success ->
Map.put(success, "_heartbeat_status", %{
pre: heartbeat_result,
post: post_heartbeat_result
})
error_result ->
error_result
end
end
@doc """
Convenience functions for common operations with automatic heartbeats
"""
def create_task_with_heartbeat(agent_id, title, description, opts \\ %{}) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "create_task",
"arguments" => Map.merge(%{
"title" => title,
"description" => description
}, opts)
}
}
mcp_action_with_heartbeat(agent_id, request)
end
def get_next_task_with_heartbeat(agent_id) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "get_next_task",
"arguments" => %{"agent_id" => agent_id}
}
}
mcp_action_with_heartbeat(agent_id, request)
end
def complete_task_with_heartbeat(agent_id) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "complete_task",
"arguments" => %{"agent_id" => agent_id}
}
}
mcp_action_with_heartbeat(agent_id, request)
end
def get_task_board_with_heartbeat(agent_id) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "get_task_board",
"arguments" => %{}
}
}
mcp_action_with_heartbeat(agent_id, request)
end
@doc """
Stop heartbeat management for an agent (when they disconnect)
"""
def stop_heartbeat(agent_id) do
GenServer.call(__MODULE__, {:stop_heartbeat, agent_id})
end
# Server callbacks
def init(_opts) do
state = %__MODULE__{
timers: %{},
agent_contexts: %{}
}
{:ok, state}
end
def handle_call({:start_heartbeat, agent_id, context}, _from, state) do
# Cancel existing timer if any
if Map.has_key?(state.timers, agent_id) do
Process.cancel_timer(state.timers[agent_id])
end
# Start new timer
timer_ref = Process.send_after(self(), {:heartbeat_timer, agent_id}, @heartbeat_interval)
new_state = %{state |
timers: Map.put(state.timers, agent_id, timer_ref),
agent_contexts: Map.put(state.agent_contexts, agent_id, context)
}
{:reply, :ok, new_state}
end
def handle_call({:stop_heartbeat, agent_id}, _from, state) do
# Cancel timer
if Map.has_key?(state.timers, agent_id) do
Process.cancel_timer(state.timers[agent_id])
end
new_state = %{state |
timers: Map.delete(state.timers, agent_id),
agent_contexts: Map.delete(state.agent_contexts, agent_id)
}
{:reply, :ok, new_state}
end
def handle_cast({:reset_timer, agent_id}, state) do
# Cancel existing timer
if Map.has_key?(state.timers, agent_id) do
Process.cancel_timer(state.timers[agent_id])
end
# Start new timer
timer_ref = Process.send_after(self(), {:heartbeat_timer, agent_id}, @heartbeat_interval)
new_state = %{state | timers: Map.put(state.timers, agent_id, timer_ref)}
{:noreply, new_state}
end
def handle_info({:heartbeat_timer, agent_id}, state) do
# Send heartbeat
send_heartbeat(agent_id)
# Schedule next heartbeat
timer_ref = Process.send_after(self(), {:heartbeat_timer, agent_id}, @heartbeat_interval)
new_state = %{state | timers: Map.put(state.timers, agent_id, timer_ref)}
{:noreply, new_state}
end
# Private helpers
defp send_heartbeat(agent_id) do
case TaskRegistry.heartbeat_agent(agent_id) do
:ok -> :ok
{:error, reason} -> {:error, reason}
end
end
end

View File

@@ -2,34 +2,35 @@ defmodule AgentCoordinator.CLI do
@moduledoc """
Command line interface for testing the agent coordination system.
"""
alias AgentCoordinator.{MCPServer, TaskRegistry, Inbox, Agent, Task}
alias AgentCoordinator.{MCPServer, Inbox}
def main(args \\ []) do
case args do
["register", name | capabilities] ->
register_agent(name, capabilities)
["create-task", title, description | opts] ->
create_task(title, description, parse_task_opts(opts))
["board"] ->
show_task_board()
["agent-status", agent_id] ->
show_agent_status(agent_id)
["help"] ->
show_help()
_ ->
IO.puts("Invalid command. Use 'help' for usage information.")
end
end
defp register_agent(name, capabilities) do
caps = Enum.map(capabilities, &String.to_existing_atom/1)
# Note: capabilities should be passed as strings to the MCP server
# The server will handle the validation
request = %{
"method" => "tools/call",
"params" => %{
@@ -40,14 +41,14 @@ defmodule AgentCoordinator.CLI do
}
}
}
case MCPServer.handle_mcp_request(request) do
%{"result" => %{"content" => [%{"text" => result}]}} ->
data = Jason.decode!(result)
IO.puts("✓ Agent registered successfully!")
IO.puts(" Agent ID: #{data["agent_id"]}")
IO.puts(" Status: #{data["status"]}")
%{"error" => %{"message" => message}} ->
IO.puts("✗ Registration failed: #{message}")
end
@@ -58,24 +59,28 @@ defmodule AgentCoordinator.CLI do
"method" => "tools/call",
"params" => %{
"name" => "create_task",
"arguments" => Map.merge(%{
"title" => title,
"description" => description
}, opts)
"arguments" =>
Map.merge(
%{
"title" => title,
"description" => description
},
opts
)
}
}
case MCPServer.handle_mcp_request(request) do
%{"result" => %{"content" => [%{"text" => result}]}} ->
data = Jason.decode!(result)
IO.puts("✓ Task created successfully!")
IO.puts(" Task ID: #{data["task_id"]}")
IO.puts(" Status: #{data["status"]}")
if Map.has_key?(data, "assigned_to") do
IO.puts(" Assigned to: #{data["assigned_to"]}")
end
%{"error" => %{"message" => message}} ->
IO.puts("✗ Task creation failed: #{message}")
end
@@ -89,20 +94,20 @@ defmodule AgentCoordinator.CLI do
"arguments" => %{}
}
}
case MCPServer.handle_mcp_request(request) do
%{"result" => %{"content" => [%{"text" => result}]}} ->
%{"agents" => agents} = Jason.decode!(result)
IO.puts("\n📋 Task Board")
IO.puts(String.duplicate("=", 50))
if Enum.empty?(agents) do
IO.puts("No agents registered.")
else
Enum.each(agents, &print_agent_summary/1)
end
error ->
IO.puts("✗ Failed to fetch task board: #{inspect(error)}")
end
@@ -115,11 +120,11 @@ defmodule AgentCoordinator.CLI do
IO.puts(String.duplicate("-", 30))
IO.puts("Pending tasks: #{status.pending_count}")
IO.puts("Completed tasks: #{status.completed_count}")
case status.current_task do
nil ->
IO.puts("Current task: None")
task ->
IO.puts("Current task: #{task.title}")
IO.puts(" Description: #{task.description}")
@@ -129,23 +134,24 @@ defmodule AgentCoordinator.CLI do
end
defp print_agent_summary(agent) do
status_icon = case agent["status"] do
"idle" -> "💤"
"busy" -> "🔧"
"offline" -> ""
_ -> ""
end
status_icon =
case agent["status"] do
"idle" -> "💤"
"busy" -> "🔧"
"offline" -> ""
_ -> ""
end
online_status = if agent["online"], do: "🟢", else: "🔴"
IO.puts("\n#{status_icon} #{agent["name"]} (#{agent["agent_id"]}) #{online_status}")
IO.puts(" Capabilities: #{Enum.join(agent["capabilities"], ", ")}")
IO.puts(" Pending: #{agent["pending_tasks"]} | Completed: #{agent["completed_tasks"]}")
case agent["current_task"] do
nil ->
IO.puts(" Current: No active task")
task ->
IO.puts(" Current: #{task["title"]}")
end
@@ -156,13 +162,13 @@ defmodule AgentCoordinator.CLI do
case String.split(opt, "=", parts: 2) do
["priority", value] ->
Map.put(acc, "priority", value)
["files", files] ->
Map.put(acc, "file_paths", String.split(files, ","))
["caps", capabilities] ->
Map.put(acc, "required_capabilities", String.split(capabilities, ","))
_ ->
acc
end
@@ -197,4 +203,4 @@ defmodule AgentCoordinator.CLI do
agent-status abc-123-def
""")
end
end
end

View File

@@ -0,0 +1,317 @@
defmodule AgentCoordinator.Client do
@moduledoc """
Client wrapper for agents to interact with the coordination system.
This module provides a high-level API that automatically handles:
- Heartbeat management
- Session tracking
- Error handling and retries
- Collision detection
Usage:
```elixir
# Start a client session
{:ok, client} = AgentCoordinator.Client.start_session("MyAgent", [:coding, :analysis])
# All operations automatically include heartbeats
{:ok, task} = AgentCoordinator.Client.get_next_task(client)
{:ok, result} = AgentCoordinator.Client.complete_task(client)
```
"""
use GenServer
alias AgentCoordinator.{EnhancedMCPServer, AutoHeartbeat}
defstruct [
:agent_id,
:agent_name,
:capabilities,
:session_pid,
:heartbeat_interval,
:last_heartbeat,
:auto_heartbeat_enabled
]
# Client API
@doc """
Start a new agent session with automatic heartbeat management
"""
def start_session(agent_name, capabilities, opts \\ []) do
heartbeat_interval = Keyword.get(opts, :heartbeat_interval, 10_000)
auto_heartbeat = Keyword.get(opts, :auto_heartbeat, true)
GenServer.start_link(__MODULE__, %{
agent_name: agent_name,
capabilities: capabilities,
heartbeat_interval: heartbeat_interval,
auto_heartbeat_enabled: auto_heartbeat
})
end
@doc """
Get the next task for this agent (with automatic heartbeat)
"""
def get_next_task(client_pid) do
GenServer.call(client_pid, :get_next_task)
end
@doc """
Create a task (with automatic heartbeat)
"""
def create_task(client_pid, title, description, opts \\ %{}) do
GenServer.call(client_pid, {:create_task, title, description, opts})
end
@doc """
Complete the current task (with automatic heartbeat)
"""
def complete_task(client_pid) do
GenServer.call(client_pid, :complete_task)
end
@doc """
Get task board with enhanced information (with automatic heartbeat)
"""
def get_task_board(client_pid) do
GenServer.call(client_pid, :get_task_board)
end
@doc """
Send manual heartbeat
"""
def heartbeat(client_pid) do
GenServer.call(client_pid, :manual_heartbeat)
end
@doc """
Get client session information
"""
def get_session_info(client_pid) do
GenServer.call(client_pid, :get_session_info)
end
@doc """
Stop the client session (cleanly disconnects the agent)
"""
def stop_session(client_pid) do
GenServer.call(client_pid, :stop_session)
end
@doc """
Unregister the agent (e.g., when waiting for user input)
"""
def unregister_agent(client_pid, reason \\ "Waiting for user input") do
GenServer.call(client_pid, {:unregister_agent, reason})
end
# Server callbacks
def init(config) do
# Register with enhanced MCP server
case EnhancedMCPServer.register_agent_with_session(
config.agent_name,
config.capabilities,
self()
) do
{:ok, agent_id} ->
state = %__MODULE__{
agent_id: agent_id,
agent_name: config.agent_name,
capabilities: config.capabilities,
session_pid: self(),
heartbeat_interval: config.heartbeat_interval,
last_heartbeat: DateTime.utc_now(),
auto_heartbeat_enabled: config.auto_heartbeat_enabled
}
# Start automatic heartbeat timer if enabled
if config.auto_heartbeat_enabled do
schedule_heartbeat(state.heartbeat_interval)
end
{:ok, state}
{:error, reason} ->
{:stop, {:registration_failed, reason}}
end
end
def handle_call(:get_next_task, _from, state) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "get_next_task",
"arguments" => %{"agent_id" => state.agent_id}
}
}
result = enhanced_mcp_call(request, state)
{:reply, result, update_last_heartbeat(state)}
end
def handle_call({:create_task, title, description, opts}, _from, state) do
arguments = Map.merge(%{
"title" => title,
"description" => description
}, opts)
request = %{
"method" => "tools/call",
"params" => %{
"name" => "create_task",
"arguments" => arguments
}
}
result = enhanced_mcp_call(request, state)
{:reply, result, update_last_heartbeat(state)}
end
def handle_call(:complete_task, _from, state) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "complete_task",
"arguments" => %{"agent_id" => state.agent_id}
}
}
result = enhanced_mcp_call(request, state)
{:reply, result, update_last_heartbeat(state)}
end
def handle_call(:get_task_board, _from, state) do
case EnhancedMCPServer.get_enhanced_task_board() do
{:ok, board} ->
{:reply, {:ok, board}, update_last_heartbeat(state)}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
def handle_call(:manual_heartbeat, _from, state) do
result = send_heartbeat(state.agent_id)
{:reply, result, update_last_heartbeat(state)}
end
def handle_call(:get_session_info, _from, state) do
info = %{
agent_id: state.agent_id,
agent_name: state.agent_name,
capabilities: state.capabilities,
last_heartbeat: state.last_heartbeat,
heartbeat_interval: state.heartbeat_interval,
auto_heartbeat_enabled: state.auto_heartbeat_enabled,
session_duration: DateTime.diff(DateTime.utc_now(), state.last_heartbeat, :second)
}
{:reply, {:ok, info}, state}
end
def handle_call({:unregister_agent, reason}, _from, state) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "unregister_agent",
"arguments" => %{"agent_id" => state.agent_id, "reason" => reason}
}
}
result = enhanced_mcp_call(request, state)
case result do
{:ok, _data} ->
# Successfully unregistered, stop heartbeats but keep session alive
updated_state = %{state | auto_heartbeat_enabled: false}
{:reply, result, updated_state}
{:error, _reason} ->
# Failed to unregister, keep current state
{:reply, result, state}
end
end
def handle_call(:stop_session, _from, state) do
# Clean shutdown - could include task cleanup here
{:stop, :normal, :ok, state}
end
# Handle automatic heartbeat timer
def handle_info(:heartbeat_timer, state) do
if state.auto_heartbeat_enabled do
send_heartbeat(state.agent_id)
schedule_heartbeat(state.heartbeat_interval)
end
{:noreply, update_last_heartbeat(state)}
end
# Handle unexpected messages
def handle_info(_msg, state) do
{:noreply, state}
end
# Cleanup on termination
def terminate(_reason, state) do
# Stop heartbeat management
if state.agent_id do
AutoHeartbeat.stop_heartbeat(state.agent_id)
end
:ok
end
# Private helpers
defp enhanced_mcp_call(request, state) do
session_info = %{
agent_id: state.agent_id,
session_pid: state.session_pid
}
case EnhancedMCPServer.handle_enhanced_mcp_request(request, session_info) do
%{"result" => %{"content" => [%{"text" => response_json}]}} = response ->
case Jason.decode(response_json) do
{:ok, data} ->
# Include heartbeat metadata if present
metadata = Map.get(response, "_heartbeat_metadata", %{})
{:ok, Map.put(data, "_heartbeat_metadata", metadata)}
{:error, reason} ->
{:error, {:json_decode_error, reason}}
end
%{"error" => %{"message" => message}} ->
{:error, message}
unexpected ->
{:error, {:unexpected_response, unexpected}}
end
end
defp send_heartbeat(agent_id) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "heartbeat",
"arguments" => %{"agent_id" => agent_id}
}
}
case EnhancedMCPServer.handle_enhanced_mcp_request(request) do
%{"result" => _} -> :ok
%{"error" => %{"message" => message}} -> {:error, message}
_ -> {:error, :unknown_heartbeat_error}
end
end
defp schedule_heartbeat(interval) do
Process.send_after(self(), :heartbeat_timer, interval)
end
defp update_last_heartbeat(state) do
%{state | last_heartbeat: DateTime.utc_now()}
end
end

View File

@@ -0,0 +1,354 @@
defmodule AgentCoordinator.CodebaseRegistry do
@moduledoc """
Registry for managing multiple codebases and their metadata.
Tracks codebase state, dependencies, and cross-codebase coordination.
"""
use GenServer
defstruct [
:codebases,
:cross_codebase_dependencies,
:nats_conn
]
@type codebase :: %{
id: String.t(),
name: String.t(),
workspace_path: String.t(),
description: String.t() | nil,
agents: [String.t()],
active_tasks: [String.t()],
metadata: map(),
created_at: DateTime.t(),
updated_at: DateTime.t()
}
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def register_codebase(codebase_data) do
GenServer.call(__MODULE__, {:register_codebase, codebase_data})
end
def update_codebase(codebase_id, updates) do
GenServer.call(__MODULE__, {:update_codebase, codebase_id, updates})
end
def get_codebase(codebase_id) do
GenServer.call(__MODULE__, {:get_codebase, codebase_id})
end
def list_codebases do
GenServer.call(__MODULE__, :list_codebases)
end
def add_agent_to_codebase(codebase_id, agent_id) do
GenServer.call(__MODULE__, {:add_agent_to_codebase, codebase_id, agent_id})
end
def remove_agent_from_codebase(codebase_id, agent_id) do
GenServer.call(__MODULE__, {:remove_agent_from_codebase, codebase_id, agent_id})
end
def add_cross_codebase_dependency(
source_codebase,
target_codebase,
dependency_type,
metadata \\ %{}
) do
GenServer.call(
__MODULE__,
{:add_cross_dependency, source_codebase, target_codebase, dependency_type, metadata}
)
end
def get_codebase_dependencies(codebase_id) do
GenServer.call(__MODULE__, {:get_dependencies, codebase_id})
end
def get_codebase_stats(codebase_id) do
GenServer.call(__MODULE__, {:get_stats, codebase_id})
end
def can_execute_cross_codebase_task?(source_codebase, target_codebase) do
GenServer.call(__MODULE__, {:can_execute_cross_task, source_codebase, target_codebase})
end
# Server callbacks
def init(opts) do
nats_config = Keyword.get(opts, :nats, [])
nats_conn =
case nats_config do
[] ->
nil
config ->
case Gnat.start_link(config) do
{:ok, conn} ->
# Subscribe to codebase events
Gnat.sub(conn, self(), "codebase.>")
conn
{:error, _reason} ->
nil
end
end
# Register default codebase
default_codebase = create_default_codebase()
state = %__MODULE__{
codebases: %{"default" => default_codebase},
cross_codebase_dependencies: %{},
nats_conn: nats_conn
}
{:ok, state}
end
def handle_call({:register_codebase, codebase_data}, _from, state) do
codebase_id = Map.get(codebase_data, "id") || Map.get(codebase_data, :id) || UUID.uuid4()
codebase = %{
id: codebase_id,
name: Map.get(codebase_data, "name") || Map.get(codebase_data, :name, "Unnamed Codebase"),
workspace_path:
Map.get(codebase_data, "workspace_path") || Map.get(codebase_data, :workspace_path),
description: Map.get(codebase_data, "description") || Map.get(codebase_data, :description),
agents: [],
active_tasks: [],
metadata: Map.get(codebase_data, "metadata") || Map.get(codebase_data, :metadata, %{}),
created_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}
case Map.has_key?(state.codebases, codebase_id) do
true ->
{:reply, {:error, "Codebase already exists"}, state}
false ->
new_codebases = Map.put(state.codebases, codebase_id, codebase)
new_state = %{state | codebases: new_codebases}
# Publish codebase registration event
if state.nats_conn do
publish_event(state.nats_conn, "codebase.registered", %{codebase: codebase})
end
{:reply, {:ok, codebase_id}, new_state}
end
end
def handle_call({:update_codebase, codebase_id, updates}, _from, state) do
case Map.get(state.codebases, codebase_id) do
nil ->
{:reply, {:error, "Codebase not found"}, state}
codebase ->
updated_codebase =
Map.merge(codebase, updates)
|> Map.put(:updated_at, DateTime.utc_now())
new_codebases = Map.put(state.codebases, codebase_id, updated_codebase)
new_state = %{state | codebases: new_codebases}
# Publish update event
if state.nats_conn do
publish_event(state.nats_conn, "codebase.updated", %{
codebase_id: codebase_id,
updates: updates
})
end
{:reply, {:ok, updated_codebase}, new_state}
end
end
def handle_call({:get_codebase, codebase_id}, _from, state) do
codebase = Map.get(state.codebases, codebase_id)
{:reply, codebase, state}
end
def handle_call(:list_codebases, _from, state) do
codebases = Map.values(state.codebases)
{:reply, codebases, state}
end
def handle_call({:add_agent_to_codebase, codebase_id, agent_id}, _from, state) do
case Map.get(state.codebases, codebase_id) do
nil ->
{:reply, {:error, "Codebase not found"}, state}
codebase ->
updated_agents = Enum.uniq([agent_id | codebase.agents])
updated_codebase = %{codebase | agents: updated_agents, updated_at: DateTime.utc_now()}
new_codebases = Map.put(state.codebases, codebase_id, updated_codebase)
{:reply, :ok, %{state | codebases: new_codebases}}
end
end
def handle_call({:remove_agent_from_codebase, codebase_id, agent_id}, _from, state) do
case Map.get(state.codebases, codebase_id) do
nil ->
{:reply, {:error, "Codebase not found"}, state}
codebase ->
updated_agents = Enum.reject(codebase.agents, &(&1 == agent_id))
updated_codebase = %{codebase | agents: updated_agents, updated_at: DateTime.utc_now()}
new_codebases = Map.put(state.codebases, codebase_id, updated_codebase)
{:reply, :ok, %{state | codebases: new_codebases}}
end
end
def handle_call({:add_cross_dependency, source_id, target_id, dep_type, metadata}, _from, state) do
dependency = %{
source: source_id,
target: target_id,
type: dep_type,
metadata: metadata,
created_at: DateTime.utc_now()
}
key = "#{source_id}->#{target_id}"
new_dependencies = Map.put(state.cross_codebase_dependencies, key, dependency)
# Publish cross-codebase dependency event
if state.nats_conn do
publish_event(state.nats_conn, "codebase.dependency.added", %{
dependency: dependency
})
end
{:reply, :ok, %{state | cross_codebase_dependencies: new_dependencies}}
end
def handle_call({:get_dependencies, codebase_id}, _from, state) do
dependencies =
state.cross_codebase_dependencies
|> Map.values()
|> Enum.filter(fn dep -> dep.source == codebase_id or dep.target == codebase_id end)
{:reply, dependencies, state}
end
def handle_call({:get_stats, codebase_id}, _from, state) do
case Map.get(state.codebases, codebase_id) do
nil ->
{:reply, {:error, "Codebase not found"}, state}
codebase ->
stats = %{
id: codebase.id,
name: codebase.name,
agent_count: length(codebase.agents),
active_task_count: length(codebase.active_tasks),
dependencies: get_dependency_stats(state, codebase_id),
last_updated: codebase.updated_at
}
{:reply, {:ok, stats}, state}
end
end
def handle_call({:can_execute_cross_task, source_id, target_id}, _from, state) do
# Check if both codebases exist
source_exists = Map.has_key?(state.codebases, source_id)
target_exists = Map.has_key?(state.codebases, target_id)
can_execute =
source_exists and target_exists and
(source_id == target_id or has_cross_dependency?(state, source_id, target_id))
{:reply, can_execute, state}
end
# Handle NATS messages
def handle_info({:msg, %{topic: "codebase.task.started", body: body}}, state) do
%{"codebase_id" => codebase_id, "task_id" => task_id} = Jason.decode!(body)
case Map.get(state.codebases, codebase_id) do
nil ->
{:noreply, state}
codebase ->
updated_tasks = Enum.uniq([task_id | codebase.active_tasks])
updated_codebase = %{codebase | active_tasks: updated_tasks}
new_codebases = Map.put(state.codebases, codebase_id, updated_codebase)
{:noreply, %{state | codebases: new_codebases}}
end
end
def handle_info({:msg, %{topic: "codebase.task.completed", body: body}}, state) do
%{"codebase_id" => codebase_id, "task_id" => task_id} = Jason.decode!(body)
case Map.get(state.codebases, codebase_id) do
nil ->
{:noreply, state}
codebase ->
updated_tasks = Enum.reject(codebase.active_tasks, &(&1 == task_id))
updated_codebase = %{codebase | active_tasks: updated_tasks}
new_codebases = Map.put(state.codebases, codebase_id, updated_codebase)
{:noreply, %{state | codebases: new_codebases}}
end
end
def handle_info({:msg, _msg}, state) do
# Ignore other messages
{:noreply, state}
end
# Private helpers
defp create_default_codebase do
%{
id: "default",
name: "Default Codebase",
workspace_path: nil,
description: "Default codebase for agents without specific codebase assignment",
agents: [],
active_tasks: [],
metadata: %{},
created_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}
end
defp has_cross_dependency?(state, source_id, target_id) do
key = "#{source_id}->#{target_id}"
Map.has_key?(state.cross_codebase_dependencies, key)
end
defp get_dependency_stats(state, codebase_id) do
incoming =
state.cross_codebase_dependencies
|> Map.values()
|> Enum.filter(fn dep -> dep.target == codebase_id end)
|> length()
outgoing =
state.cross_codebase_dependencies
|> Map.values()
|> Enum.filter(fn dep -> dep.source == codebase_id end)
|> length()
%{incoming: incoming, outgoing: outgoing}
end
defp publish_event(conn, topic, data) do
if conn do
message = Jason.encode!(data)
Gnat.pub(conn, topic, message)
end
end
end

View File

@@ -0,0 +1,266 @@
defmodule AgentCoordinator.EnhancedMCPServer do
@moduledoc """
Enhanced MCP server with automatic heartbeat management and collision detection.
This module extends the base MCP server with:
1. Automatic heartbeats on every operation
2. Agent session tracking
3. Enhanced collision detection
4. Automatic agent cleanup on disconnect
"""
use GenServer
alias AgentCoordinator.{MCPServer, AutoHeartbeat, TaskRegistry}
# Track active agent sessions
defstruct [
:agent_sessions,
:session_monitors
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Enhanced MCP request handler with automatic heartbeat management
"""
def handle_enhanced_mcp_request(request, session_info \\ %{}) do
GenServer.call(__MODULE__, {:enhanced_mcp_request, request, session_info})
end
@doc """
Register an agent with enhanced session tracking
"""
def register_agent_with_session(name, capabilities, session_pid \\ self()) do
GenServer.call(__MODULE__, {:register_agent_with_session, name, capabilities, session_pid})
end
# Server callbacks
def init(_opts) do
state = %__MODULE__{
agent_sessions: %{},
session_monitors: %{}
}
{:ok, state}
end
def handle_call({:enhanced_mcp_request, request, session_info}, {from_pid, _}, state) do
# Extract agent_id from session or request
agent_id = extract_agent_id(request, session_info, state)
# If we have an agent_id, send heartbeat before and after operation
enhanced_result =
case agent_id do
nil ->
# No agent context, use normal MCP processing
MCPServer.handle_mcp_request(request)
id ->
# Send pre-operation heartbeat
pre_heartbeat = TaskRegistry.heartbeat_agent(id)
# Process the request
result = MCPServer.handle_mcp_request(request)
# Send post-operation heartbeat and update session activity
post_heartbeat = TaskRegistry.heartbeat_agent(id)
update_session_activity(state, id, from_pid)
# Add heartbeat metadata to successful responses
case result do
%{"result" => _} = success ->
Map.put(success, "_heartbeat_metadata", %{
agent_id: id,
pre_heartbeat: pre_heartbeat,
post_heartbeat: post_heartbeat,
timestamp: DateTime.utc_now()
})
error_result ->
error_result
end
end
{:reply, enhanced_result, state}
end
def handle_call({:register_agent_with_session, name, capabilities, session_pid}, _from, state) do
# Convert capabilities to strings if they're atoms
string_capabilities =
Enum.map(capabilities, fn
cap when is_atom(cap) -> Atom.to_string(cap)
cap when is_binary(cap) -> cap
end)
# Register the agent normally first
case MCPServer.handle_mcp_request(%{
"method" => "tools/call",
"params" => %{
"name" => "register_agent",
"arguments" => %{"name" => name, "capabilities" => string_capabilities}
}
}) do
%{"result" => %{"content" => [%{"text" => response_json}]}} ->
case Jason.decode(response_json) do
{:ok, %{"agent_id" => agent_id}} ->
# Track the session
monitor_ref = Process.monitor(session_pid)
new_state = %{
state
| agent_sessions:
Map.put(state.agent_sessions, agent_id, %{
pid: session_pid,
name: name,
capabilities: capabilities,
registered_at: DateTime.utc_now(),
last_activity: DateTime.utc_now()
}),
session_monitors: Map.put(state.session_monitors, monitor_ref, agent_id)
}
# Start automatic heartbeat management
AutoHeartbeat.start_link([])
AutoHeartbeat.register_agent_with_heartbeat(name, capabilities, %{
session_pid: session_pid,
enhanced_server: true
})
{:reply, {:ok, agent_id}, new_state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
%{"error" => %{"message" => message}} ->
{:reply, {:error, message}, state}
_ ->
{:reply, {:error, "Unexpected response format"}, state}
end
end
def handle_call(:get_enhanced_task_board, _from, state) do
# Get the regular task board
case MCPServer.handle_mcp_request(%{
"method" => "tools/call",
"params" => %{"name" => "get_task_board", "arguments" => %{}}
}) do
%{"result" => %{"content" => [%{"text" => response_json}]}} ->
case Jason.decode(response_json) do
{:ok, %{"agents" => agents}} ->
# Enhance with session information
enhanced_agents =
Enum.map(agents, fn agent ->
agent_id = agent["agent_id"]
session_info = Map.get(state.agent_sessions, agent_id, %{})
Map.merge(agent, %{
"session_active" => Map.has_key?(state.agent_sessions, agent_id),
"last_activity" => Map.get(session_info, :last_activity),
"session_duration" => calculate_session_duration(session_info)
})
end)
result = %{
"agents" => enhanced_agents,
"active_sessions" => map_size(state.agent_sessions)
}
{:reply, {:ok, result}, state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
%{"error" => %{"message" => message}} ->
{:reply, {:error, message}, state}
end
end
# Handle process monitoring - cleanup when agent session dies
def handle_info({:DOWN, monitor_ref, :process, _pid, _reason}, state) do
case Map.get(state.session_monitors, monitor_ref) do
nil ->
{:noreply, state}
agent_id ->
# Clean up the agent session
new_state = %{
state
| agent_sessions: Map.delete(state.agent_sessions, agent_id),
session_monitors: Map.delete(state.session_monitors, monitor_ref)
}
# Stop heartbeat management
AutoHeartbeat.stop_heartbeat(agent_id)
# Mark agent as offline in registry
# (This could be enhanced to gracefully handle ongoing tasks)
{:noreply, new_state}
end
end
# Private helpers
defp extract_agent_id(request, session_info, state) do
# Try to get agent_id from various sources
cond do
# From request arguments
Map.get(request, "params", %{})
|> Map.get("arguments", %{})
|> Map.get("agent_id") ->
request["params"]["arguments"]["agent_id"]
# From session info
Map.get(session_info, :agent_id) ->
session_info.agent_id
# From session lookup by PID
session_pid = Map.get(session_info, :session_pid, self()) ->
find_agent_by_session_pid(state, session_pid)
true ->
nil
end
end
defp find_agent_by_session_pid(state, session_pid) do
Enum.find_value(state.agent_sessions, fn {agent_id, session_data} ->
if session_data.pid == session_pid, do: agent_id, else: nil
end)
end
defp update_session_activity(state, agent_id, _session_pid) do
case Map.get(state.agent_sessions, agent_id) do
nil ->
:ok
session_data ->
_updated_session = %{session_data | last_activity: DateTime.utc_now()}
# Note: This doesn't update the state since we're in a call handler
# In a real implementation, you might want to use cast for this
:ok
end
end
@doc """
Get enhanced task board with session information
"""
def get_enhanced_task_board do
GenServer.call(__MODULE__, :get_enhanced_task_board)
end
defp calculate_session_duration(%{registered_at: start_time}) do
DateTime.diff(DateTime.utc_now(), start_time, :second)
end
defp calculate_session_duration(_), do: nil
end

View File

@@ -2,9 +2,9 @@ defmodule AgentCoordinator.Inbox do
@moduledoc """
Agent inbox management using GenServer for each agent's task queue.
"""
use GenServer
alias AgentCoordinator.{Task, Agent}
alias AgentCoordinator.Task
defstruct [
:agent_id,
@@ -15,15 +15,15 @@ defmodule AgentCoordinator.Inbox do
]
@type t :: %__MODULE__{
agent_id: String.t(),
pending_tasks: [Task.t()],
in_progress_task: Task.t() | nil,
completed_tasks: [Task.t()],
max_history: non_neg_integer()
}
agent_id: String.t(),
pending_tasks: [Task.t()],
in_progress_task: Task.t() | nil,
completed_tasks: [Task.t()],
max_history: non_neg_integer()
}
# Client API
def start_link(agent_id, opts \\ []) do
GenServer.start_link(__MODULE__, {agent_id, opts}, name: via_tuple(agent_id))
end
@@ -48,6 +48,21 @@ defmodule AgentCoordinator.Inbox do
GenServer.call(via_tuple(agent_id), :list_tasks)
end
def get_current_task(agent_id) do
GenServer.call(via_tuple(agent_id), :get_current_task)
end
def stop(agent_id) do
case Registry.lookup(AgentCoordinator.InboxRegistry, agent_id) do
[{pid, _}] ->
GenServer.stop(pid, :normal)
:ok
[] ->
{:error, :not_found}
end
end
# Server callbacks
def init({agent_id, opts}) do
@@ -58,7 +73,7 @@ defmodule AgentCoordinator.Inbox do
completed_tasks: [],
max_history: Keyword.get(opts, :max_history, 100)
}
{:ok, state}
end
@@ -66,55 +81,55 @@ defmodule AgentCoordinator.Inbox do
# Insert task based on priority
pending_tasks = insert_by_priority(state.pending_tasks, task)
new_state = %{state | pending_tasks: pending_tasks}
# Broadcast task added
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "agent:#{state.agent_id}",
{:task_added, task})
Phoenix.PubSub.broadcast(
AgentCoordinator.PubSub,
"agent:#{state.agent_id}",
{:task_added, task}
)
{:reply, :ok, new_state}
end
def handle_call(:get_next_task, _from, state) do
case state.pending_tasks do
[] ->
[] ->
{:reply, nil, state}
[next_task | remaining_tasks] ->
updated_task = Task.assign_to_agent(next_task, state.agent_id)
new_state = %{state |
pending_tasks: remaining_tasks,
in_progress_task: updated_task
}
new_state = %{state | pending_tasks: remaining_tasks, in_progress_task: updated_task}
# Broadcast task started
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global",
{:task_started, updated_task})
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global", {:task_started, updated_task})
{:reply, updated_task, new_state}
end
end
def handle_call(:complete_current_task, _from, state) do
case state.in_progress_task do
nil ->
nil ->
{:reply, {:error, :no_task_in_progress}, state}
task ->
completed_task = Task.complete(task)
# Add to completed tasks with history limit
completed_tasks = [completed_task | state.completed_tasks]
completed_tasks =
[completed_task | state.completed_tasks]
|> Enum.take(state.max_history)
new_state = %{state |
in_progress_task: nil,
completed_tasks: completed_tasks
}
new_state = %{state | in_progress_task: nil, completed_tasks: completed_tasks}
# Broadcast task completed
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global",
{:task_completed, completed_task})
Phoenix.PubSub.broadcast(
AgentCoordinator.PubSub,
"global",
{:task_completed, completed_task}
)
{:reply, completed_task, new_state}
end
end
@@ -126,7 +141,7 @@ defmodule AgentCoordinator.Inbox do
current_task: state.in_progress_task,
completed_count: length(state.completed_tasks)
}
{:reply, status, state}
end
@@ -134,12 +149,17 @@ defmodule AgentCoordinator.Inbox do
tasks = %{
pending: state.pending_tasks,
in_progress: state.in_progress_task,
completed: Enum.take(state.completed_tasks, 10) # Recent 10
# Recent 10
completed: Enum.take(state.completed_tasks, 10)
}
{:reply, tasks, state}
end
def handle_call(:get_current_task, _from, state) do
{:reply, state.in_progress_task, state}
end
# Private helpers
defp via_tuple(agent_id) do
@@ -149,12 +169,13 @@ defmodule AgentCoordinator.Inbox do
defp insert_by_priority(tasks, new_task) do
priority_order = %{urgent: 0, high: 1, normal: 2, low: 3}
new_priority = Map.get(priority_order, new_task.priority, 2)
{before, after} = Enum.split_while(tasks, fn task ->
task_priority = Map.get(priority_order, task.priority, 2)
task_priority <= new_priority
end)
before ++ [new_task] ++ after
{before, after_tasks} =
Enum.split_while(tasks, fn task ->
task_priority = Map.get(priority_order, task.priority, 2)
task_priority <= new_priority
end)
before ++ [new_task] ++ after_tasks
end
end
end

View File

@@ -3,9 +3,9 @@ defmodule AgentCoordinator.MCPServer do
MCP (Model Context Protocol) server for agent coordination.
Provides tools for agents to interact with the task coordination system.
"""
use GenServer
alias AgentCoordinator.{TaskRegistry, Inbox, Agent, Task}
alias AgentCoordinator.{TaskRegistry, Inbox, Agent, Task, CodebaseRegistry}
@mcp_tools [
%{
@@ -17,12 +17,33 @@ defmodule AgentCoordinator.MCPServer do
"name" => %{"type" => "string"},
"capabilities" => %{
"type" => "array",
"items" => %{"type" => "string", "enum" => ["coding", "testing", "documentation", "analysis", "review"]}
}
"items" => %{
"type" => "string",
"enum" => ["coding", "testing", "documentation", "analysis", "review"]
}
},
"codebase_id" => %{"type" => "string"},
"workspace_path" => %{"type" => "string"},
"cross_codebase_capable" => %{"type" => "boolean"}
},
"required" => ["name", "capabilities"]
}
},
%{
"name" => "register_codebase",
"description" => "Register a new codebase in the coordination system",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"id" => %{"type" => "string"},
"name" => %{"type" => "string"},
"workspace_path" => %{"type" => "string"},
"description" => %{"type" => "string"},
"metadata" => %{"type" => "object"}
},
"required" => ["name", "workspace_path"]
}
},
%{
"name" => "create_task",
"description" => "Create a new task in the coordination system",
@@ -32,15 +53,47 @@ defmodule AgentCoordinator.MCPServer do
"title" => %{"type" => "string"},
"description" => %{"type" => "string"},
"priority" => %{"type" => "string", "enum" => ["low", "normal", "high", "urgent"]},
"codebase_id" => %{"type" => "string"},
"file_paths" => %{"type" => "array", "items" => %{"type" => "string"}},
"required_capabilities" => %{
"type" => "array",
"type" => "array",
"items" => %{"type" => "string"}
},
"cross_codebase_dependencies" => %{
"type" => "array",
"items" => %{
"type" => "object",
"properties" => %{
"codebase_id" => %{"type" => "string"},
"task_id" => %{"type" => "string"}
}
}
}
},
"required" => ["title", "description"]
}
},
%{
"name" => "create_cross_codebase_task",
"description" => "Create a task that spans multiple codebases",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"title" => %{"type" => "string"},
"description" => %{"type" => "string"},
"primary_codebase_id" => %{"type" => "string"},
"affected_codebases" => %{
"type" => "array",
"items" => %{"type" => "string"}
},
"coordination_strategy" => %{
"type" => "string",
"enum" => ["sequential", "parallel", "leader_follower"]
}
},
"required" => ["title", "description", "primary_codebase_id", "affected_codebases"]
}
},
%{
"name" => "get_next_task",
"description" => "Get the next task for an agent",
@@ -66,11 +119,46 @@ defmodule AgentCoordinator.MCPServer do
%{
"name" => "get_task_board",
"description" => "Get overview of all agents and their current tasks",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"codebase_id" => %{"type" => "string"}
}
}
},
%{
"name" => "get_codebase_status",
"description" => "Get status and statistics for a specific codebase",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"codebase_id" => %{"type" => "string"}
},
"required" => ["codebase_id"]
}
},
%{
"name" => "list_codebases",
"description" => "List all registered codebases",
"inputSchema" => %{
"type" => "object",
"properties" => %{}
}
},
%{
"name" => "add_codebase_dependency",
"description" => "Add a dependency relationship between codebases",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"source_codebase_id" => %{"type" => "string"},
"target_codebase_id" => %{"type" => "string"},
"dependency_type" => %{"type" => "string"},
"metadata" => %{"type" => "object"}
},
"required" => ["source_codebase_id", "target_codebase_id", "dependency_type"]
}
},
%{
"name" => "heartbeat",
"description" => "Send heartbeat to maintain agent status",
@@ -81,11 +169,23 @@ defmodule AgentCoordinator.MCPServer do
},
"required" => ["agent_id"]
}
},
%{
"name" => "unregister_agent",
"description" => "Unregister an agent from the coordination system (e.g., when waiting for user input)",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"},
"reason" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
}
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@@ -111,28 +211,57 @@ defmodule AgentCoordinator.MCPServer do
# MCP request processing
defp process_mcp_request(%{"method" => "tools/list"}) do
defp process_mcp_request(%{"method" => "initialize"} = request) do
id = Map.get(request, "id", nil)
%{
"jsonrpc" => "2.0",
"id" => id,
"result" => %{
"protocolVersion" => "2024-11-05",
"capabilities" => %{
"tools" => %{}
},
"serverInfo" => %{
"name" => "agent-coordinator",
"version" => "0.1.0"
}
}
}
end
defp process_mcp_request(%{"method" => "tools/list"} = request) do
id = Map.get(request, "id", nil)
%{
"jsonrpc" => "2.0",
"id" => id,
"result" => %{"tools" => @mcp_tools}
}
end
defp process_mcp_request(%{
"method" => "tools/call",
"params" => %{"name" => tool_name, "arguments" => args}
} = request) do
defp process_mcp_request(
%{
"method" => "tools/call",
"params" => %{"name" => tool_name, "arguments" => args}
} = request
) do
id = Map.get(request, "id", nil)
result = case tool_name do
"register_agent" -> register_agent(args)
"create_task" -> create_task(args)
"get_next_task" -> get_next_task(args)
"complete_task" -> complete_task(args)
"get_task_board" -> get_task_board(args)
"heartbeat" -> heartbeat(args)
_ -> {:error, "Unknown tool: #{tool_name}"}
end
result =
case tool_name do
"register_agent" -> register_agent(args)
"register_codebase" -> register_codebase(args)
"create_task" -> create_task(args)
"create_cross_codebase_task" -> create_cross_codebase_task(args)
"get_next_task" -> get_next_task(args)
"complete_task" -> complete_task(args)
"get_task_board" -> get_task_board(args)
"get_codebase_status" -> get_codebase_status(args)
"list_codebases" -> list_codebases(args)
"add_codebase_dependency" -> add_codebase_dependency(args)
"heartbeat" -> heartbeat(args)
"unregister_agent" -> unregister_agent(args)
_ -> {:error, "Unknown tool: #{tool_name}"}
end
case result do
{:ok, data} ->
@@ -141,7 +270,7 @@ defmodule AgentCoordinator.MCPServer do
"id" => id,
"result" => %{"content" => [%{"type" => "text", "text" => Jason.encode!(data)}]}
}
{:error, reason} ->
%{
"jsonrpc" => "2.0",
@@ -151,65 +280,150 @@ defmodule AgentCoordinator.MCPServer do
end
end
defp process_mcp_request(_request) do
defp process_mcp_request(request) do
id = Map.get(request, "id", nil)
%{
"jsonrpc" => "2.0",
"id" => id,
"error" => %{"code" => -32601, "message" => "Method not found"}
}
end
# Tool implementations
defp register_agent(%{"name" => name, "capabilities" => capabilities}) do
caps = Enum.map(capabilities, &String.to_existing_atom/1)
agent = Agent.new(name, caps)
defp register_agent(%{"name" => name, "capabilities" => capabilities} = args) do
caps = Enum.map(capabilities, &String.to_atom/1)
opts = [
codebase_id: Map.get(args, "codebase_id", "default"),
workspace_path: Map.get(args, "workspace_path"),
metadata: %{
cross_codebase_capable: Map.get(args, "cross_codebase_capable", false)
}
]
agent = Agent.new(name, caps, opts)
case TaskRegistry.register_agent(agent) do
:ok ->
# Add agent to codebase registry
CodebaseRegistry.add_agent_to_codebase(agent.codebase_id, agent.id)
# Start inbox for the agent
{:ok, _pid} = Inbox.start_link(agent.id)
{:ok, %{agent_id: agent.id, status: "registered"}}
{:ok, %{agent_id: agent.id, codebase_id: agent.codebase_id, status: "registered"}}
{:error, reason} ->
{:error, "Failed to register agent: #{reason}"}
end
end
defp register_codebase(args) do
case CodebaseRegistry.register_codebase(args) do
{:ok, codebase_id} ->
{:ok, %{codebase_id: codebase_id, status: "registered"}}
{:error, reason} ->
{:error, "Failed to register codebase: #{reason}"}
end
end
defp create_task(%{"title" => title, "description" => description} = args) do
opts = [
priority: String.to_existing_atom(Map.get(args, "priority", "normal")),
priority: String.to_atom(Map.get(args, "priority", "normal")),
codebase_id: Map.get(args, "codebase_id", "default"),
file_paths: Map.get(args, "file_paths", []),
cross_codebase_dependencies: Map.get(args, "cross_codebase_dependencies", []),
metadata: %{
required_capabilities: Map.get(args, "required_capabilities", [])
}
]
task = Task.new(title, description, opts)
case TaskRegistry.assign_task(task) do
{:ok, agent_id} ->
{:ok, %{task_id: task.id, assigned_to: agent_id, status: "assigned"}}
{:ok, %{task_id: task.id, assigned_to: agent_id, codebase_id: task.codebase_id, status: "assigned"}}
{:error, :no_available_agents} ->
# Add to global pending queue
TaskRegistry.add_to_pending(task)
{:ok, %{task_id: task.id, status: "queued"}}
{:ok, %{task_id: task.id, codebase_id: task.codebase_id, status: "queued"}}
end
end
defp create_cross_codebase_task(%{"title" => title, "description" => description} = args) do
primary_codebase = Map.get(args, "primary_codebase_id")
affected_codebases = Map.get(args, "affected_codebases", [])
strategy = Map.get(args, "coordination_strategy", "sequential")
# Create main task in primary codebase
main_task_opts = [
codebase_id: primary_codebase,
metadata: %{
cross_codebase_task: true,
coordination_strategy: strategy,
affected_codebases: affected_codebases
}
]
main_task = Task.new(title, description, main_task_opts)
# Create dependent tasks in other codebases
dependent_tasks =
Enum.map(affected_codebases, fn codebase_id ->
if codebase_id != primary_codebase do
dependent_opts = [
codebase_id: codebase_id,
cross_codebase_dependencies: [%{codebase_id: primary_codebase, task_id: main_task.id}],
metadata: %{
cross_codebase_task: true,
primary_task_id: main_task.id,
coordination_strategy: strategy
}
]
Task.new("#{title} (#{codebase_id})", "Cross-codebase task: #{description}", dependent_opts)
end
end)
|> Enum.filter(&(&1 != nil))
# Try to assign all tasks
all_tasks = [main_task | dependent_tasks]
results =
Enum.map(all_tasks, fn task ->
case TaskRegistry.assign_task(task) do
{:ok, agent_id} -> %{task_id: task.id, codebase_id: task.codebase_id, agent_id: agent_id, status: "assigned"}
{:error, :no_available_agents} ->
TaskRegistry.add_to_pending(task)
%{task_id: task.id, codebase_id: task.codebase_id, status: "queued"}
end
end)
{:ok, %{
main_task_id: main_task.id,
primary_codebase: primary_codebase,
coordination_strategy: strategy,
tasks: results,
status: "created"
}}
end
defp get_next_task(%{"agent_id" => agent_id}) do
case Inbox.get_next_task(agent_id) do
nil ->
{:ok, %{message: "No tasks available"}}
task ->
{:ok, %{
task_id: task.id,
title: task.title,
description: task.description,
file_paths: task.file_paths,
priority: task.priority
}}
{:ok,
%{
task_id: task.id,
title: task.title,
description: task.description,
file_paths: task.file_paths,
priority: task.priority
}}
end
end
@@ -217,47 +431,122 @@ defmodule AgentCoordinator.MCPServer do
case Inbox.complete_current_task(agent_id) do
{:error, reason} ->
{:error, "Failed to complete task: #{reason}"}
completed_task ->
{:ok, %{
task_id: completed_task.id,
status: "completed",
completed_at: completed_task.updated_at
}}
{:ok,
%{
task_id: completed_task.id,
status: "completed",
completed_at: completed_task.updated_at
}}
end
end
defp get_task_board(_args) do
defp get_task_board(args) do
codebase_id = Map.get(args, "codebase_id")
agents = TaskRegistry.list_agents()
board = Enum.map(agents, fn agent ->
status = Inbox.get_status(agent.id)
%{
agent_id: agent.id,
name: agent.name,
capabilities: agent.capabilities,
status: agent.status,
online: Agent.is_online?(agent),
current_task: status.current_task && %{
id: status.current_task.id,
title: status.current_task.title
},
pending_tasks: status.pending_count,
completed_tasks: status.completed_count
}
end)
{:ok, %{agents: board}}
# Filter agents by codebase if specified
filtered_agents =
case codebase_id do
nil -> agents
id -> Enum.filter(agents, fn agent -> agent.codebase_id == id end)
end
board =
Enum.map(filtered_agents, fn agent ->
status = Inbox.get_status(agent.id)
%{
agent_id: agent.id,
name: agent.name,
capabilities: agent.capabilities,
status: agent.status,
codebase_id: agent.codebase_id,
workspace_path: agent.workspace_path,
online: Agent.is_online?(agent),
cross_codebase_capable: Agent.can_work_cross_codebase?(agent),
current_task:
status.current_task &&
%{
id: status.current_task.id,
title: status.current_task.title,
codebase_id: status.current_task.codebase_id
},
pending_tasks: status.pending_count,
completed_tasks: status.completed_count
}
end)
{:ok, %{agents: board, codebase_filter: codebase_id}}
end
defp get_codebase_status(%{"codebase_id" => codebase_id}) do
case CodebaseRegistry.get_codebase_stats(codebase_id) do
{:ok, stats} ->
{:ok, stats}
{:error, reason} ->
{:error, "Failed to get codebase status: #{reason}"}
end
end
defp list_codebases(_args) do
codebases = CodebaseRegistry.list_codebases()
codebase_summaries =
Enum.map(codebases, fn codebase ->
%{
id: codebase.id,
name: codebase.name,
workspace_path: codebase.workspace_path,
description: codebase.description,
agent_count: length(codebase.agents),
active_task_count: length(codebase.active_tasks),
created_at: codebase.created_at,
updated_at: codebase.updated_at
}
end)
{:ok, %{codebases: codebase_summaries}}
end
defp add_codebase_dependency(%{"source_codebase_id" => source, "target_codebase_id" => target, "dependency_type" => dep_type} = args) do
metadata = Map.get(args, "metadata", %{})
case CodebaseRegistry.add_cross_codebase_dependency(source, target, dep_type, metadata) do
:ok ->
{:ok, %{
source_codebase: source,
target_codebase: target,
dependency_type: dep_type,
status: "added"
}}
{:error, reason} ->
{:error, "Failed to add dependency: #{reason}"}
end
end
defp heartbeat(%{"agent_id" => agent_id}) do
case TaskRegistry.heartbeat_agent(agent_id) do
:ok ->
{:ok, %{status: "heartbeat_received"}}
{:error, reason} ->
{:error, "Heartbeat failed: #{reason}"}
end
end
end
defp unregister_agent(%{"agent_id" => agent_id} = args) do
reason = Map.get(args, "reason", "Agent unregistered")
case TaskRegistry.unregister_agent(agent_id, reason) do
:ok ->
{:ok, %{status: "agent_unregistered", agent_id: agent_id, reason: reason}}
{:error, reason} ->
{:error, "Unregister failed: #{reason}"}
end
end
end

View File

@@ -0,0 +1,888 @@
defmodule AgentCoordinator.MCPServerManager do
@moduledoc """
Manages external MCP servers as internal clients.
This module starts, monitors, and communicates with external MCP servers,
acting as a client to each while presenting their tools through the
unified Agent Coordinator interface.
"""
use GenServer
require Logger
defstruct [
:servers,
:server_processes,
:tool_registry,
:config
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Get all tools from all managed servers plus Agent Coordinator tools
"""
def get_unified_tools do
GenServer.call(__MODULE__, :get_unified_tools)
end
@doc """
Route a tool call to the appropriate server
"""
def route_tool_call(tool_name, arguments, agent_context) do
GenServer.call(__MODULE__, {:route_tool_call, tool_name, arguments, agent_context})
end
@doc """
Get status of all managed servers
"""
def get_server_status do
GenServer.call(__MODULE__, :get_server_status)
end
@doc """
Restart a specific server
"""
def restart_server(server_name) do
GenServer.call(__MODULE__, {:restart_server, server_name})
end
# Server callbacks
def init(opts) do
config = load_server_config(opts)
state = %__MODULE__{
servers: %{},
server_processes: %{},
tool_registry: %{},
config: config
}
# Start all configured servers
{:ok, state, {:continue, :start_servers}}
end
def handle_continue(:start_servers, state) do
Logger.info("Starting external MCP servers...")
new_state =
Enum.reduce(state.config.servers, state, fn {name, config}, acc ->
case start_server(name, config) do
{:ok, server_info} ->
Logger.info("Started MCP server: #{name}")
%{
acc
| servers: Map.put(acc.servers, name, server_info),
server_processes: Map.put(acc.server_processes, name, server_info.pid)
}
{:error, reason} ->
Logger.error("Failed to start MCP server #{name}: #{reason}")
acc
end
end)
# Build initial tool registry
updated_state = refresh_tool_registry(new_state)
{:noreply, updated_state}
end
def handle_call(:get_unified_tools, _from, state) do
# Combine Agent Coordinator tools with external server tools
coordinator_tools = get_coordinator_tools()
external_tools = Map.values(state.tool_registry) |> List.flatten()
all_tools = coordinator_tools ++ external_tools
{:reply, all_tools, state}
end
def handle_call({:route_tool_call, tool_name, arguments, agent_context}, _from, state) do
case find_tool_server(tool_name, state) do
{:coordinator, _} ->
# Route to Agent Coordinator's own tools
result = handle_coordinator_tool(tool_name, arguments, agent_context)
{:reply, result, state}
{:external, server_name} ->
# Route to external server
result = call_external_tool(server_name, tool_name, arguments, agent_context, state)
{:reply, result, state}
:not_found ->
error_result = %{
"error" => %{
"code" => -32601,
"message" => "Tool not found: #{tool_name}"
}
}
{:reply, error_result, state}
end
end
def handle_call(:get_server_status, _from, state) do
status =
Enum.map(state.servers, fn {name, server_info} ->
{name,
%{
status: if(Process.alive?(server_info.pid), do: :running, else: :stopped),
pid: server_info.pid,
tools_count: length(Map.get(state.tool_registry, name, [])),
started_at: server_info.started_at
}}
end)
|> Map.new()
{:reply, status, state}
end
def handle_call({:restart_server, server_name}, _from, state) do
case Map.get(state.servers, server_name) do
nil ->
{:reply, {:error, "Server not found"}, state}
server_info ->
# Stop existing server
if Process.alive?(server_info.pid) do
Process.exit(server_info.pid, :kill)
end
# Start new server
server_config = Map.get(state.config.servers, server_name)
case start_server(server_name, server_config) do
{:ok, new_server_info} ->
new_state = %{
state
| servers: Map.put(state.servers, server_name, new_server_info),
server_processes:
Map.put(state.server_processes, server_name, new_server_info.pid)
}
updated_state = refresh_tool_registry(new_state)
{:reply, {:ok, new_server_info}, updated_state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
end
def handle_info({:DOWN, _ref, :port, port, reason}, state) do
# Handle server port death
case find_server_by_port(port, state.servers) do
{server_name, server_info} ->
Logger.warning("MCP server #{server_name} port died: #{reason}")
# Cleanup PID file and kill external process
if server_info.pid_file_path do
cleanup_pid_file(server_info.pid_file_path)
end
if server_info.os_pid do
kill_external_process(server_info.os_pid)
end
# Remove from state
new_state = %{
state
| servers: Map.delete(state.servers, server_name),
server_processes: Map.delete(state.server_processes, server_name),
tool_registry: Map.delete(state.tool_registry, server_name)
}
# Attempt restart if configured
if should_auto_restart?(server_name, state.config) do
Logger.info("Auto-restarting MCP server: #{server_name}")
Process.send_after(self(), {:restart_server, server_name}, 1000)
end
{:noreply, new_state}
nil ->
{:noreply, state}
end
end
def handle_info({:restart_server, server_name}, state) do
server_config = Map.get(state.config.servers, server_name)
case start_server(server_name, server_config) do
{:ok, server_info} ->
Logger.info("Auto-restarted MCP server: #{server_name}")
new_state = %{
state
| servers: Map.put(state.servers, server_name, server_info),
server_processes: Map.put(state.server_processes, server_name, server_info.pid)
}
updated_state = refresh_tool_registry(new_state)
{:noreply, updated_state}
{:error, reason} ->
Logger.error("Failed to auto-restart MCP server #{server_name}: #{reason}")
{:noreply, state}
end
end
def handle_info(_msg, state) do
{:noreply, state}
end
# Private functions
defp load_server_config(opts) do
# Allow override from opts or config file
config_file = Keyword.get(opts, :config_file, "mcp_servers.json")
if File.exists?(config_file) do
try do
case Jason.decode!(File.read!(config_file)) do
%{"servers" => servers} = full_config ->
# Convert string types to atoms and normalize server configs
normalized_servers =
Enum.into(servers, %{}, fn {name, config} ->
normalized_config =
config
|> Map.update("type", :stdio, fn
"stdio" -> :stdio
"http" -> :http
type when is_atom(type) -> type
type -> String.to_existing_atom(type)
end)
|> Enum.into(%{}, fn
{"type", type} -> {:type, type}
{key, value} -> {String.to_atom(key), value}
end)
{name, normalized_config}
end)
base_config = %{servers: normalized_servers}
# Add any additional config from the JSON file
case Map.get(full_config, "config") do
nil -> base_config
additional_config ->
Map.merge(base_config, %{config: additional_config})
end
_ ->
Logger.warning("Invalid config file format in #{config_file}, using defaults")
get_default_config()
end
rescue
e ->
Logger.warning("Failed to load config file #{config_file}: #{Exception.message(e)}, using defaults")
get_default_config()
end
else
Logger.warning("Config file #{config_file} not found, using defaults")
get_default_config()
end
end
defp get_default_config do
%{
servers: %{
"mcp_context7" => %{
type: :stdio,
command: "uvx",
args: ["mcp-server-context7"],
auto_restart: true,
description: "Context7 library documentation server"
},
"mcp_figma" => %{
type: :stdio,
command: "npx",
args: ["-y", "@figma/mcp-server-figma"],
auto_restart: true,
description: "Figma design integration server"
},
"mcp_filesystem" => %{
type: :stdio,
command: "npx",
args: ["-y", "@modelcontextprotocol/server-filesystem", "/home/ra"],
auto_restart: true,
description: "Filesystem operations server with heartbeat coverage"
},
"mcp_firebase" => %{
type: :stdio,
command: "npx",
args: ["-y", "@firebase/mcp-server"],
auto_restart: true,
description: "Firebase integration server"
},
"mcp_memory" => %{
type: :stdio,
command: "npx",
args: ["-y", "@modelcontextprotocol/server-memory"],
auto_restart: true,
description: "Memory and knowledge graph server"
},
"mcp_sequentialthi" => %{
type: :stdio,
command: "npx",
args: ["-y", "@modelcontextprotocol/server-sequential-thinking"],
auto_restart: true,
description: "Sequential thinking and reasoning server"
}
}
}
end
defp start_server(name, %{type: :stdio} = config) do
case start_stdio_server(name, config) do
{:ok, os_pid, port, pid_file_path} ->
# Monitor the port (not the OS PID)
port_ref = Port.monitor(port)
server_info = %{
name: name,
type: :stdio,
pid: port, # Use port as the "pid" for process tracking
os_pid: os_pid,
port: port,
pid_file_path: pid_file_path,
port_ref: port_ref,
started_at: DateTime.utc_now(),
tools: []
}
# Initialize the server and get tools
case initialize_server(server_info) do
{:ok, tools} ->
{:ok, %{server_info | tools: tools}}
{:error, reason} ->
# Cleanup on initialization failure
cleanup_pid_file(pid_file_path)
kill_external_process(os_pid)
# Only close port if it's still open
if Port.info(port) do
Port.close(port)
end
{:error, reason}
end
{:error, reason} ->
{:error, reason}
end
end
defp start_server(name, %{type: :http} = config) do
# For HTTP servers, we don't spawn processes - just store connection info
server_info = %{
name: name,
type: :http,
url: Map.get(config, :url),
pid: nil, # No process to track for HTTP
os_pid: nil,
port: nil,
pid_file_path: nil,
port_ref: nil,
started_at: DateTime.utc_now(),
tools: []
}
# For HTTP servers, we can try to get tools but don't need process management
case initialize_http_server(server_info) do
{:ok, tools} ->
{:ok, %{server_info | tools: tools}}
{:error, reason} ->
{:error, reason}
end
end
defp start_stdio_server(name, config) do
command = Map.get(config, :command, "npx")
args = Map.get(config, :args, [])
env = Map.get(config, :env, %{})
# Convert env map to list format expected by Port.open
env_list = Enum.map(env, fn {key, value} -> {String.to_charlist(key), String.to_charlist(value)} end)
port_options = [
:binary,
:stream,
{:line, 1024},
{:env, env_list},
:exit_status,
:hide
]
try do
port = Port.open({:spawn_executable, System.find_executable(command)},
[{:args, args} | port_options])
# Get the OS PID of the spawned process
{:os_pid, os_pid} = Port.info(port, :os_pid)
# Create PID file for cleanup
pid_file_path = create_pid_file(name, os_pid)
Logger.info("Started MCP server #{name} with OS PID #{os_pid}")
{:ok, os_pid, port, pid_file_path}
rescue
e ->
Logger.error("Failed to start stdio server #{name}: #{Exception.message(e)}")
{:error, Exception.message(e)}
end
end
defp create_pid_file(server_name, os_pid) do
pid_dir = Path.join(System.tmp_dir(), "mcp_servers")
File.mkdir_p!(pid_dir)
pid_file_path = Path.join(pid_dir, "#{server_name}.pid")
File.write!(pid_file_path, to_string(os_pid))
pid_file_path
end
defp cleanup_pid_file(pid_file_path) do
if File.exists?(pid_file_path) do
File.rm(pid_file_path)
end
end
defp kill_external_process(os_pid) when is_integer(os_pid) do
try do
case System.cmd("kill", ["-TERM", to_string(os_pid)]) do
{_, 0} ->
Logger.info("Successfully terminated process #{os_pid}")
:ok
{_, _} ->
# Try force kill
case System.cmd("kill", ["-KILL", to_string(os_pid)]) do
{_, 0} ->
Logger.info("Force killed process #{os_pid}")
:ok
{_, _} ->
Logger.warning("Failed to kill process #{os_pid}")
:error
end
end
rescue
_ -> :error
end
end
defp find_server_by_port(port, servers) do
Enum.find(servers, fn {_name, server_info} ->
server_info.port == port
end)
end
defp initialize_server(server_info) do
# Send initialize request
init_request = %{
"jsonrpc" => "2.0",
"id" => 1,
"method" => "initialize",
"params" => %{
"protocolVersion" => "2024-11-05",
"capabilities" => %{},
"clientInfo" => %{
"name" => "agent-coordinator",
"version" => "0.1.0"
}
}
}
with {:ok, _init_response} <- send_server_request(server_info, init_request),
{:ok, tools_response} <- get_server_tools(server_info) do
{:ok, tools_response}
else
{:error, reason} -> {:error, reason}
end
end
defp initialize_http_server(server_info) do
# For HTTP servers, we would make HTTP requests instead of using ports
# For now, return empty tools list as we need to implement HTTP client logic
Logger.warning("HTTP server support not fully implemented yet for #{server_info.name}")
{:ok, []}
rescue
e ->
{:error, "HTTP server initialization failed: #{Exception.message(e)}"}
end
defp get_server_tools(server_info) do
tools_request = %{
"jsonrpc" => "2.0",
"id" => 2,
"method" => "tools/list"
}
case send_server_request(server_info, tools_request) do
{:ok, %{"result" => %{"tools" => tools}}} ->
{:ok, tools}
{:ok, unexpected} ->
Logger.warning(
"Unexpected tools response from #{server_info.name}: #{inspect(unexpected)}"
)
{:ok, []}
{:error, reason} ->
{:error, reason}
end
end
defp send_server_request(server_info, request) do
request_json = Jason.encode!(request) <> "\n"
Port.command(server_info.port, request_json)
# Collect full response by reading multiple lines if needed
response_data = collect_response(server_info.port, "", 30_000)
case Jason.decode(response_data) do
{:ok, response} -> {:ok, response}
{:error, %Jason.DecodeError{} = error} ->
Logger.error("JSON decode error for server #{server_info.name}: #{Exception.message(error)}")
Logger.debug("Raw response data: #{inspect(response_data)}")
{:error, "JSON decode error: #{Exception.message(error)}"}
{:error, reason} ->
{:error, "JSON decode error: #{inspect(reason)}"}
end
end
defp collect_response(port, acc, timeout) do
receive do
{^port, {:data, {_eol, response_line}}} ->
# Accumulate the response line
new_acc = acc <> response_line
# Check if we have a complete JSON object
case Jason.decode(new_acc) do
{:ok, _} ->
# Successfully decoded, return the complete response
new_acc
{:error, _} ->
# Not complete yet, continue collecting
collect_response(port, new_acc, timeout)
end
{^port, {:exit_status, status}} ->
Logger.error("Server exited with status: #{status}")
acc
after
timeout ->
Logger.error("Server request timeout after #{timeout}ms")
acc
end
end
defp refresh_tool_registry(state) do
new_registry =
Enum.reduce(state.servers, %{}, fn {name, server_info}, acc ->
tools = Map.get(server_info, :tools, [])
Map.put(acc, name, tools)
end)
%{state | tool_registry: new_registry}
end
defp find_tool_server(tool_name, state) do
# Check Agent Coordinator tools first
if tool_name in get_coordinator_tool_names() do
{:coordinator, tool_name}
else
# Check external servers
case find_external_tool_server(tool_name, state.tool_registry) do
nil -> :not_found
server_name -> {:external, server_name}
end
end
end
defp find_external_tool_server(tool_name, tool_registry) do
Enum.find_value(tool_registry, fn {server_name, tools} ->
if Enum.any?(tools, fn tool -> tool["name"] == tool_name end) do
server_name
else
nil
end
end)
end
defp get_coordinator_tools do
[
%{
"name" => "register_agent",
"description" => "Register a new agent with the coordination system",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"name" => %{"type" => "string"},
"capabilities" => %{
"type" => "array",
"items" => %{"type" => "string"}
}
},
"required" => ["name", "capabilities"]
}
},
%{
"name" => "create_task",
"description" => "Create a new task in the coordination system",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"title" => %{"type" => "string"},
"description" => %{"type" => "string"},
"priority" => %{"type" => "string", "enum" => ["low", "normal", "high", "urgent"]},
"required_capabilities" => %{
"type" => "array",
"items" => %{"type" => "string"}
},
"file_paths" => %{
"type" => "array",
"items" => %{"type" => "string"}
}
},
"required" => ["title", "description"]
}
},
%{
"name" => "get_next_task",
"description" => "Get the next task for an agent",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
},
%{
"name" => "complete_task",
"description" => "Mark current task as completed",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
},
%{
"name" => "get_task_board",
"description" => "Get overview of all agents and their current tasks",
"inputSchema" => %{
"type" => "object",
"properties" => %{}
}
},
%{
"name" => "heartbeat",
"description" => "Send heartbeat to maintain agent status",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
}
]
end
defp get_coordinator_tool_names do
~w[register_agent create_task get_next_task complete_task get_task_board heartbeat]
end
defp handle_coordinator_tool(tool_name, arguments, _agent_context) do
# Route to existing Agent Coordinator functionality
case tool_name do
"register_agent" ->
AgentCoordinator.TaskRegistry.register_agent(
arguments["name"],
arguments["capabilities"]
)
"create_task" ->
AgentCoordinator.TaskRegistry.create_task(
arguments["title"],
arguments["description"],
Map.take(arguments, ["priority", "required_capabilities", "file_paths"])
)
"get_next_task" ->
AgentCoordinator.TaskRegistry.get_next_task(arguments["agent_id"])
"complete_task" ->
AgentCoordinator.TaskRegistry.complete_task(arguments["agent_id"])
"get_task_board" ->
AgentCoordinator.TaskRegistry.get_task_board()
"heartbeat" ->
AgentCoordinator.TaskRegistry.heartbeat_agent(arguments["agent_id"])
_ ->
%{"error" => %{"code" => -32601, "message" => "Unknown coordinator tool: #{tool_name}"}}
end
end
defp call_external_tool(server_name, tool_name, arguments, agent_context, state) do
case Map.get(state.servers, server_name) do
nil ->
%{"error" => %{"code" => -32603, "message" => "Server not available: #{server_name}"}}
server_info ->
# Send heartbeat before tool call if agent context available
if agent_context && agent_context.agent_id do
AgentCoordinator.TaskRegistry.heartbeat_agent(agent_context.agent_id)
# Auto-create/update current task for this tool usage
update_current_task(agent_context.agent_id, tool_name, arguments)
end
# Make the actual tool call
tool_request = %{
"jsonrpc" => "2.0",
"id" => System.unique_integer([:positive]),
"method" => "tools/call",
"params" => %{
"name" => tool_name,
"arguments" => arguments
}
}
result =
case send_server_request(server_info, tool_request) do
{:ok, response} ->
# Send heartbeat after successful tool call
if agent_context && agent_context.agent_id do
AgentCoordinator.TaskRegistry.heartbeat_agent(agent_context.agent_id)
end
response
{:error, reason} ->
%{"error" => %{"code" => -32603, "message" => reason}}
end
result
end
end
defp update_current_task(agent_id, tool_name, arguments) do
# Create a descriptive task title based on the tool being used
task_title = generate_task_title(tool_name, arguments)
task_description = generate_task_description(tool_name, arguments)
# Check if agent has current task, if not create one
case AgentCoordinator.TaskRegistry.get_agent_current_task(agent_id) do
nil ->
# Create new auto-task
AgentCoordinator.TaskRegistry.create_task(
task_title,
task_description,
%{
priority: "normal",
auto_generated: true,
tool_name: tool_name,
assigned_agent: agent_id
}
)
# Auto-assign to this agent
case AgentCoordinator.TaskRegistry.get_next_task(agent_id) do
{:ok, _task} -> :ok
_ -> :ok
end
existing_task ->
# Update existing task with latest activity
AgentCoordinator.TaskRegistry.update_task_activity(
existing_task.id,
tool_name,
arguments
)
end
end
defp generate_task_title(tool_name, arguments) do
case tool_name do
"read_file" ->
"Reading file: #{Path.basename(arguments["path"] || "unknown")}"
"write_file" ->
"Writing file: #{Path.basename(arguments["path"] || "unknown")}"
"list_directory" ->
"Exploring directory: #{Path.basename(arguments["path"] || "unknown")}"
"mcp_context7_get-library-docs" ->
"Researching: #{arguments["context7CompatibleLibraryID"] || "library"}"
"mcp_figma_get_code" ->
"Generating Figma code: #{arguments["nodeId"] || "component"}"
"mcp_firebase_firestore_get_documents" ->
"Fetching Firestore documents"
"mcp_memory_search_nodes" ->
"Searching memory: #{arguments["query"] || "query"}"
"mcp_sequentialthi_sequentialthinking" ->
"Thinking through problem"
_ ->
"Using tool: #{tool_name}"
end
end
defp generate_task_description(tool_name, arguments) do
case tool_name do
"read_file" ->
"Reading and analyzing file content from #{arguments["path"]}"
"write_file" ->
"Creating or updating file at #{arguments["path"]}"
"list_directory" ->
"Exploring directory structure at #{arguments["path"]}"
"mcp_context7_get-library-docs" ->
"Researching documentation for #{arguments["context7CompatibleLibraryID"]} library"
"mcp_figma_get_code" ->
"Generating code for Figma component #{arguments["nodeId"]}"
"mcp_firebase_firestore_get_documents" ->
"Retrieving documents from Firestore: #{inspect(arguments["paths"])}"
"mcp_memory_search_nodes" ->
"Searching knowledge graph for: #{arguments["query"]}"
"mcp_sequentialthi_sequentialthinking" ->
"Using sequential thinking to solve complex problem"
_ ->
"Executing #{tool_name} with arguments: #{inspect(arguments)}"
end
end
defp should_auto_restart?(server_name, config) do
server_config = Map.get(config.servers, server_name, %{})
Map.get(server_config, :auto_restart, false)
end
end

View File

@@ -3,9 +3,8 @@ defmodule AgentCoordinator.Persistence do
Persistent storage for tasks and events using NATS JetStream.
Provides configurable retention policies and event replay capabilities.
"""
use GenServer
alias AgentCoordinator.{Task, Agent}
defstruct [
:nats_conn,
@@ -15,12 +14,15 @@ defmodule AgentCoordinator.Persistence do
@stream_config %{
"name" => "AGENT_COORDINATION",
"subjects" => ["agent.*", "task.*"],
"subjects" => ["agent.>", "task.>", "codebase.>", "cross-codebase.>"],
"storage" => "file",
"max_msgs" => 1_000_000,
"max_bytes" => 1_000_000_000, # 1GB
"max_age" => 7 * 24 * 60 * 60 * 1_000_000_000, # 7 days in nanoseconds
"max_msg_size" => 1_000_000, # 1MB
"max_msgs" => 10_000_000,
# 10GB
"max_bytes" => 10_000_000_000,
# 30 days in nanoseconds
"max_age" => 30 * 24 * 60 * 60 * 1_000_000_000,
# 1MB
"max_msg_size" => 1_000_000,
"retention" => "limits",
"discard" => "old"
}
@@ -56,68 +58,109 @@ defmodule AgentCoordinator.Persistence do
def init(opts) do
nats_config = Keyword.get(opts, :nats, [])
retention_policy = Keyword.get(opts, :retention_policy, :default)
{:ok, nats_conn} = Gnat.start_link(nats_config)
# Create or update JetStream
create_or_update_stream(nats_conn)
# Only connect to NATS if config is provided
nats_conn =
case nats_config do
[] ->
nil
config ->
case Gnat.start_link(config) do
{:ok, conn} -> conn
{:error, _reason} -> nil
end
end
# Only create stream if we have a connection
if nats_conn do
create_or_update_stream(nats_conn)
end
state = %__MODULE__{
nats_conn: nats_conn,
stream_name: @stream_config["name"],
retention_policy: retention_policy
}
{:ok, state}
end
def handle_cast({:store_event, subject, data}, state) do
enriched_data = enrich_event_data(data)
message = Jason.encode!(enriched_data)
# Publish to JetStream
case Gnat.pub(state.nats_conn, subject, message, headers: event_headers()) do
:ok -> :ok
{:error, reason} ->
IO.puts("Failed to store event: #{inspect(reason)}")
# Only publish if we have a NATS connection
if state.nats_conn do
case Gnat.pub(state.nats_conn, subject, message, headers: event_headers()) do
:ok ->
:ok
{:error, reason} ->
IO.puts("Failed to store event: #{inspect(reason)}")
end
end
{:noreply, state}
end
def handle_call({:get_agent_history, agent_id, opts}, _from, state) do
subject_filter = "agent.*.#{agent_id}"
limit = Keyword.get(opts, :limit, 100)
events = fetch_events(state.nats_conn, subject_filter, limit)
{:reply, events, state}
case state.nats_conn do
nil ->
{:reply, [], state}
conn ->
subject_filter = "agent.*.#{agent_id}"
limit = Keyword.get(opts, :limit, 100)
events = fetch_events(conn, subject_filter, limit)
{:reply, events, state}
end
end
def handle_call({:get_task_history, task_id, opts}, _from, state) do
subject_filter = "task.*"
limit = Keyword.get(opts, :limit, 100)
events = fetch_events(state.nats_conn, subject_filter, limit)
|> Enum.filter(fn event ->
case Map.get(event, "task") do
%{"id" => ^task_id} -> true
_ -> false
end
end)
{:reply, events, state}
case state.nats_conn do
nil ->
{:reply, [], state}
conn ->
subject_filter = "task.*"
limit = Keyword.get(opts, :limit, 100)
events =
fetch_events(conn, subject_filter, limit)
|> Enum.filter(fn event ->
case Map.get(event, "task") do
%{"id" => ^task_id} -> true
_ -> false
end
end)
{:reply, events, state}
end
end
def handle_call({:replay_events, subject_filter, opts}, _from, state) do
limit = Keyword.get(opts, :limit, 1000)
start_time = Keyword.get(opts, :start_time)
events = fetch_events(state.nats_conn, subject_filter, limit, start_time)
{:reply, events, state}
case state.nats_conn do
nil ->
{:reply, [], state}
conn ->
limit = Keyword.get(opts, :limit, 1000)
start_time = Keyword.get(opts, :start_time)
events = fetch_events(conn, subject_filter, limit, start_time)
{:reply, events, state}
end
end
def handle_call(:get_system_stats, _from, state) do
stats = get_stream_info(state.nats_conn, state.stream_name)
stats =
case state.nats_conn do
nil -> %{connected: false}
conn -> get_stream_info(conn, state.stream_name) || %{connected: true}
end
{:reply, stats, state}
end
@@ -129,7 +172,7 @@ defmodule AgentCoordinator.Persistence do
nil ->
# Create new stream
create_stream(conn, @stream_config)
_existing ->
# Update existing stream if needed
update_stream(conn, @stream_config)
@@ -141,26 +184,26 @@ defmodule AgentCoordinator.Persistence do
"type" => "io.nats.jetstream.api.v1.stream_create_request",
"config" => config
}
case Gnat.request(conn, "$JS.API.STREAM.CREATE.#{config["name"]}", Jason.encode!(request)) do
{:ok, response} ->
case Jason.decode!(response.body) do
%{"error" => error} ->
%{"error" => error} ->
IO.puts("Failed to create stream: #{inspect(error)}")
{:error, error}
result ->
result ->
IO.puts("Stream created successfully")
{:ok, result}
end
{:error, reason} ->
IO.puts("Failed to create stream: #{inspect(reason)}")
{:error, reason}
end
end
defp update_stream(conn, config) do
defp update_stream(_conn, _config) do
# For simplicity, we'll just ensure the stream exists
# In production, you might want more sophisticated update logic
:ok
@@ -173,24 +216,26 @@ defmodule AgentCoordinator.Persistence do
%{"error" => _} -> nil
info -> info
end
{:error, _} -> nil
{:error, _} ->
nil
end
end
defp fetch_events(conn, subject_filter, limit, start_time \\ nil) do
defp fetch_events(_conn, _subject_filter, _limit, start_time \\ nil) do
# Create a consumer to fetch messages
consumer_config = %{
_consumer_config = %{
"durable_name" => "temp_#{:rand.uniform(10000)}",
"deliver_policy" => if(start_time, do: "by_start_time", else: "all"),
"opt_start_time" => start_time,
"max_deliver" => 1,
"ack_policy" => "explicit"
}
# This is a simplified implementation
# In production, you'd use proper JetStream consumer APIs
[] # Return empty for now - would implement full JetStream integration
# Return empty for now - would implement full JetStream integration
[]
end
defp enrich_event_data(data) do
@@ -206,4 +251,4 @@ defmodule AgentCoordinator.Persistence do
{"source", "agent-coordinator"}
]
end
end
end

View File

@@ -2,7 +2,23 @@ defmodule AgentCoordinator.Task do
@moduledoc """
Task data structure for agent coordination system.
"""
@derive {Jason.Encoder,
only: [
:id,
:title,
:description,
:status,
:priority,
:agent_id,
:codebase_id,
:file_paths,
:dependencies,
:cross_codebase_dependencies,
:created_at,
:updated_at,
:metadata
]}
defstruct [
:id,
:title,
@@ -10,8 +26,10 @@ defmodule AgentCoordinator.Task do
:status,
:priority,
:agent_id,
:codebase_id,
:file_paths,
:dependencies,
:cross_codebase_dependencies,
:created_at,
:updated_at,
:metadata
@@ -21,34 +39,46 @@ defmodule AgentCoordinator.Task do
@type priority :: :low | :normal | :high | :urgent
@type t :: %__MODULE__{
id: String.t(),
title: String.t(),
description: String.t(),
status: status(),
priority: priority(),
agent_id: String.t() | nil,
file_paths: [String.t()],
dependencies: [String.t()],
created_at: DateTime.t(),
updated_at: DateTime.t(),
metadata: map()
}
id: String.t(),
title: String.t(),
description: String.t(),
status: status(),
priority: priority(),
agent_id: String.t() | nil,
codebase_id: String.t(),
file_paths: [String.t()],
dependencies: [String.t()],
cross_codebase_dependencies: [%{codebase_id: String.t(), task_id: String.t()}],
created_at: DateTime.t(),
updated_at: DateTime.t(),
metadata: map()
}
def new(title, description, opts \\ []) do
now = DateTime.utc_now()
# Handle both keyword lists and maps
get_opt = fn key, default ->
case opts do
opts when is_map(opts) -> Map.get(opts, key, default)
opts when is_list(opts) -> Keyword.get(opts, key, default)
end
end
%__MODULE__{
id: UUID.uuid4(),
title: title,
description: description,
status: Keyword.get(opts, :status, :pending),
priority: Keyword.get(opts, :priority, :normal),
agent_id: Keyword.get(opts, :agent_id),
file_paths: Keyword.get(opts, :file_paths, []),
dependencies: Keyword.get(opts, :dependencies, []),
status: get_opt.(:status, :pending),
priority: get_opt.(:priority, :normal),
agent_id: get_opt.(:agent_id, nil),
codebase_id: get_opt.(:codebase_id, "default"),
file_paths: get_opt.(:file_paths, []),
dependencies: get_opt.(:dependencies, []),
cross_codebase_dependencies: get_opt.(:cross_codebase_dependencies, []),
created_at: now,
updated_at: now,
metadata: Keyword.get(opts, :metadata, %{})
metadata: get_opt.(:metadata, %{})
}
end
@@ -71,6 +101,18 @@ defmodule AgentCoordinator.Task do
end
def has_file_conflict?(task1, task2) do
not MapSet.disjoint?(MapSet.new(task1.file_paths), MapSet.new(task2.file_paths))
# Only check conflicts within the same codebase
task1.codebase_id == task2.codebase_id and
not MapSet.disjoint?(MapSet.new(task1.file_paths), MapSet.new(task2.file_paths))
end
end
def is_cross_codebase?(task) do
not Enum.empty?(task.cross_codebase_dependencies)
end
def add_cross_codebase_dependency(task, codebase_id, task_id) do
dependency = %{codebase_id: codebase_id, task_id: task_id}
dependencies = [dependency | task.cross_codebase_dependencies]
%{task | cross_codebase_dependencies: dependencies, updated_at: DateTime.utc_now()}
end
end

View File

@@ -1,20 +1,24 @@
defmodule AgentCoordinator.TaskRegistry do
@moduledoc """
Central registry for agents and task assignment with NATS integration.
Enhanced to support multi-codebase coordination and cross-codebase task management.
"""
use GenServer
require Logger
alias AgentCoordinator.{Agent, Task, Inbox}
defstruct [
:agents,
:pending_tasks,
:file_locks,
:codebase_file_locks,
:cross_codebase_tasks,
:nats_conn
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@@ -39,28 +43,78 @@ defmodule AgentCoordinator.TaskRegistry do
GenServer.call(__MODULE__, {:heartbeat_agent, agent_id})
end
def unregister_agent(agent_id, reason \\ "Agent requested unregistration") do
GenServer.call(__MODULE__, {:unregister_agent, agent_id, reason})
end
def get_file_locks do
GenServer.call(__MODULE__, :get_file_locks)
end
def get_agent_current_task(agent_id) do
GenServer.call(__MODULE__, {:get_agent_current_task, agent_id})
end
def update_task_activity(task_id, tool_name, arguments) do
GenServer.call(__MODULE__, {:update_task_activity, task_id, tool_name, arguments})
end
def create_task(title, description, opts \\ %{}) do
GenServer.call(__MODULE__, {:create_task, title, description, opts})
end
def get_next_task(agent_id) do
GenServer.call(__MODULE__, {:get_next_task, agent_id})
end
def complete_task(agent_id) do
GenServer.call(__MODULE__, {:complete_task, agent_id})
end
def get_task_board do
GenServer.call(__MODULE__, :get_task_board)
end
def register_agent(name, capabilities) do
agent = Agent.new(name, capabilities)
GenServer.call(__MODULE__, {:register_agent, agent})
end
# Server callbacks
def init(opts) do
# Connect to NATS
# Connect to NATS if config provided
nats_config = Keyword.get(opts, :nats, [])
{:ok, nats_conn} = Gnat.start_link(nats_config)
# Subscribe to task events
Gnat.sub(nats_conn, self(), "agent.task.*")
Gnat.sub(nats_conn, self(), "agent.heartbeat.*")
nats_conn =
case nats_config do
[] ->
nil
config ->
case Gnat.start_link(config) do
{:ok, conn} ->
# Subscribe to task events
Gnat.sub(conn, self(), "agent.task.*")
Gnat.sub(conn, self(), "agent.heartbeat.*")
Gnat.sub(conn, self(), "codebase.>")
Gnat.sub(conn, self(), "cross-codebase.>")
conn
{:error, _reason} ->
nil
end
end
state = %__MODULE__{
agents: %{},
pending_tasks: [],
file_locks: %{},
codebase_file_locks: %{},
cross_codebase_tasks: %{},
nats_conn: nats_conn
}
{:ok, state}
end
@@ -70,16 +124,33 @@ defmodule AgentCoordinator.TaskRegistry do
nil ->
new_agents = Map.put(state.agents, agent.id, agent)
new_state = %{state | agents: new_agents}
# Publish agent registration
publish_event(state.nats_conn, "agent.registered", %{agent: agent})
# Create inbox for the agent
case DynamicSupervisor.start_child(
AgentCoordinator.InboxSupervisor,
{Inbox, agent.id}
) do
{:ok, _pid} ->
Logger.info("Created inbox for agent #{agent.id}")
{:error, {:already_started, _pid}} ->
Logger.info("Inbox already exists for agent #{agent.id}")
{:error, reason} ->
Logger.warning("Failed to create inbox for agent #{agent.id}: #{inspect(reason)}")
end
# Publish agent registration with codebase info
if state.nats_conn do
publish_event(state.nats_conn, "agent.registered.#{agent.codebase_id}", %{agent: agent})
end
# Try to assign pending tasks
{assigned_tasks, remaining_pending} = assign_pending_tasks(new_state)
{_assigned_tasks, remaining_pending} = assign_pending_tasks(new_state)
final_state = %{new_state | pending_tasks: remaining_pending}
{:reply, :ok, final_state}
_ ->
{:reply, {:error, "Agent name already exists"}, state}
end
@@ -89,24 +160,26 @@ defmodule AgentCoordinator.TaskRegistry do
case find_available_agent(state, task) do
nil ->
{:reply, {:error, :no_available_agents}, state}
agent ->
# Check for file conflicts
# Check for file conflicts within the same codebase
case check_file_conflicts(state, task) do
[] ->
# No conflicts, assign task
assign_task_to_agent(state, task, agent.id)
conflicts ->
# Block task due to conflicts
blocked_task = Task.block(task, "File conflicts: #{inspect(conflicts)}")
new_pending = [blocked_task | state.pending_tasks]
publish_event(state.nats_conn, "task.blocked", %{
task: blocked_task,
conflicts: conflicts
})
if state.nats_conn do
publish_event(state.nats_conn, "task.blocked.#{task.codebase_id}", %{
task: blocked_task,
conflicts: conflicts
})
end
{:reply, {:error, :file_conflicts}, %{state | pending_tasks: new_pending}}
end
end
@@ -114,7 +187,11 @@ defmodule AgentCoordinator.TaskRegistry do
def handle_call({:add_to_pending, task}, _from, state) do
new_pending = [task | state.pending_tasks]
publish_event(state.nats_conn, "task.queued", %{task: task})
if state.nats_conn do
publish_event(state.nats_conn, "task.queued.#{task.codebase_id}", %{task: task})
end
{:reply, :ok, %{state | pending_tasks: new_pending}}
end
@@ -127,130 +204,545 @@ defmodule AgentCoordinator.TaskRegistry do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, {:error, :agent_not_found}, state}
agent ->
updated_agent = Agent.heartbeat(agent)
new_agents = Map.put(state.agents, agent_id, updated_agent)
new_state = %{state | agents: new_agents}
publish_event(state.nats_conn, "agent.heartbeat", %{agent_id: agent_id})
if state.nats_conn do
publish_event(state.nats_conn, "agent.heartbeat.#{agent_id}", %{
agent_id: agent_id,
codebase_id: updated_agent.codebase_id
})
end
{:reply, :ok, new_state}
end
end
def handle_call({:unregister_agent, agent_id, reason}, _from, state) do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, {:error, :agent_not_found}, state}
agent ->
# Check if agent has current tasks
case agent.current_task_id do
nil ->
# Agent is idle, safe to unregister
unregister_agent_safely(state, agent_id, agent, reason)
task_id ->
# Agent has active task, handle accordingly
case Map.get(state, :allow_force_unregister, false) do
true ->
# Force unregister, reassign task to pending
unregister_agent_with_task_reassignment(state, agent_id, agent, task_id, reason)
false ->
{:reply,
{:error,
"Agent has active task #{task_id}. Complete task first or use force unregister."},
state}
end
end
end
end
def handle_call(:get_file_locks, _from, state) do
{:reply, state.file_locks, state}
{:reply, state.codebase_file_locks || %{}, state}
end
def handle_call({:get_agent_current_task, agent_id}, _from, state) do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, nil, state}
agent ->
case agent.current_task_id do
nil ->
{:reply, nil, state}
task_id ->
# Get task details from inbox or pending tasks
task = find_task_by_id(state, task_id)
{:reply, task, state}
end
end
end
def handle_call({:update_task_activity, task_id, tool_name, arguments}, _from, state) do
# Update task with latest activity
# This could store activity logs or update task metadata
if state.nats_conn do
publish_event(state.nats_conn, "task.activity_updated", %{
task_id: task_id,
tool_name: tool_name,
arguments: arguments,
timestamp: DateTime.utc_now()
})
end
{:reply, :ok, state}
end
def handle_call({:create_task, title, description, opts}, _from, state) do
task = Task.new(title, description, opts)
# Add to pending tasks
new_pending = [task | state.pending_tasks]
new_state = %{state | pending_tasks: new_pending}
# Try to assign immediately
case find_available_agent(new_state, task) do
nil ->
if state.nats_conn do
publish_event(state.nats_conn, "task.created", %{task: task})
end
{:reply, {:ok, task}, new_state}
agent ->
case check_file_conflicts(new_state, task) do
[] ->
# Assign immediately
case assign_task_to_agent(new_state, task, agent.id) do
{:reply, {:ok, _agent_id}, final_state} ->
# Remove from pending since it was assigned
final_state = %{final_state | pending_tasks: state.pending_tasks}
{:reply, {:ok, task}, final_state}
error ->
error
end
_conflicts ->
# Keep in pending due to conflicts
{:reply, {:ok, task}, new_state}
end
end
end
def handle_call({:get_next_task, agent_id}, _from, state) do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, {:error, :agent_not_found}, state}
agent ->
# First ensure the agent's inbox exists
case ensure_inbox_started(agent_id) do
:ok ->
case Inbox.get_next_task(agent_id) do
nil ->
{:reply, {:error, :no_tasks}, state}
task ->
# Update agent status
updated_agent = Agent.assign_task(agent, task.id)
new_agents = Map.put(state.agents, agent_id, updated_agent)
new_state = %{state | agents: new_agents}
if state.nats_conn do
publish_event(state.nats_conn, "task.started", %{
task: task,
agent_id: agent_id
})
end
{:reply, {:ok, task}, new_state}
end
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
end
def handle_call({:complete_task, agent_id}, _from, state) do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, {:error, :agent_not_found}, state}
agent ->
case agent.current_task_id do
nil ->
{:reply, {:error, :no_current_task}, state}
task_id ->
# Mark task as completed in inbox
case Inbox.complete_current_task(agent_id) do
task when is_map(task) ->
# Update agent status back to idle
updated_agent = Agent.complete_task(agent)
new_agents = Map.put(state.agents, agent_id, updated_agent)
new_state = %{state | agents: new_agents}
if state.nats_conn do
publish_event(state.nats_conn, "task.completed", %{
task_id: task_id,
agent_id: agent_id
})
end
# Try to assign pending tasks
{_assigned, remaining_pending} = assign_pending_tasks(new_state)
final_state = %{new_state | pending_tasks: remaining_pending}
{:reply, :ok, final_state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
end
end
def handle_call(:get_task_board, _from, state) do
agents_info =
Enum.map(state.agents, fn {_id, agent} ->
current_task =
case agent.current_task_id do
nil -> nil
task_id -> find_task_by_id(state, task_id)
end
%{
agent_id: agent.id,
name: agent.name,
status: agent.status,
capabilities: agent.capabilities,
current_task: current_task,
last_heartbeat: agent.last_heartbeat,
online: Agent.is_online?(agent)
}
end)
task_board = %{
agents: agents_info,
pending_tasks: state.pending_tasks,
total_agents: map_size(state.agents),
active_tasks: Enum.count(state.agents, fn {_id, agent} -> agent.current_task_id != nil end),
pending_count: length(state.pending_tasks)
}
{:reply, task_board, state}
end
# Handle NATS messages
def handle_info({:msg, %{topic: "agent.task.started", body: body}}, state) do
%{"task" => task_data} = Jason.decode!(body)
# Update file locks
file_locks = add_file_locks(state.file_locks, task_data["id"], task_data["file_paths"])
{:noreply, %{state | file_locks: file_locks}}
%{"task" => task_data, "codebase_id" => codebase_id} = Jason.decode!(body)
# Update codebase-specific file locks
codebase_file_locks =
add_file_locks(
state.codebase_file_locks,
codebase_id,
task_data["id"],
task_data["file_paths"]
)
{:noreply, %{state | codebase_file_locks: codebase_file_locks}}
end
def handle_info({:msg, %{topic: "agent.task.completed", body: body}}, state) do
%{"task" => task_data} = Jason.decode!(body)
# Remove file locks
file_locks = remove_file_locks(state.file_locks, task_data["id"])
%{"task" => task_data, "codebase_id" => codebase_id} = Jason.decode!(body)
# Remove codebase-specific file locks
codebase_file_locks =
remove_file_locks(
state.codebase_file_locks,
codebase_id,
task_data["id"]
)
# Try to assign pending tasks that might now be unblocked
{_assigned, remaining_pending} = assign_pending_tasks(%{state | file_locks: file_locks})
{:noreply, %{state | file_locks: file_locks, pending_tasks: remaining_pending}}
{_assigned, remaining_pending} =
assign_pending_tasks(%{state | codebase_file_locks: codebase_file_locks})
{:noreply,
%{state | codebase_file_locks: codebase_file_locks, pending_tasks: remaining_pending}}
end
def handle_info({:msg, %{topic: topic}}, state) when topic != "agent.task.started" and topic != "agent.task.completed" do
def handle_info({:msg, %{topic: "cross-codebase.task.created", body: body}}, state) do
%{"main_task_id" => main_task_id, "dependent_tasks" => dependent_tasks} = Jason.decode!(body)
# Track cross-codebase task relationship
cross_codebase_tasks = Map.put(state.cross_codebase_tasks, main_task_id, dependent_tasks)
{:noreply, %{state | cross_codebase_tasks: cross_codebase_tasks}}
end
def handle_info({:msg, %{topic: "codebase.agent.registered", body: body}}, state) do
# Handle cross-codebase agent registration notifications
%{"agent" => _agent_data} = Jason.decode!(body)
# Could trigger reassignment of pending cross-codebase tasks
{:noreply, state}
end
def handle_info({:msg, %{topic: topic}}, state)
when topic != "agent.task.started" and
topic != "agent.task.completed" and
topic != "cross-codebase.task.created" and
topic != "codebase.agent.registered" do
# Ignore other messages for now
{:noreply, state}
end
# Private helpers
defp ensure_inbox_started(agent_id) do
case Registry.lookup(AgentCoordinator.InboxRegistry, agent_id) do
[{_pid, _}] ->
:ok
[] ->
# Start the inbox for this agent
case DynamicSupervisor.start_child(
AgentCoordinator.InboxSupervisor,
{Inbox, agent_id}
) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, reason} -> {:error, reason}
end
end
end
defp find_available_agent(state, task) do
state.agents
|> Map.values()
|> Enum.filter(fn agent ->
agent.status == :idle and
Agent.is_online?(agent) and
Agent.can_handle?(agent, task)
|> Enum.filter(fn agent ->
agent.codebase_id == task.codebase_id and
agent.status == :idle and
Agent.is_online?(agent) and
Agent.can_handle?(agent, task)
end)
|> Enum.sort_by(fn agent ->
# Prefer agents with fewer pending tasks
case Inbox.get_status(agent.id) do
%{pending_count: count} -> count
_ -> 999
end
|> Enum.sort_by(fn agent ->
# Prefer agents with fewer pending tasks and same codebase
codebase_match = if agent.codebase_id == task.codebase_id, do: 0, else: 1
pending_count =
case Registry.lookup(AgentCoordinator.InboxRegistry, agent.id) do
[{_pid, _}] ->
try do
case Inbox.get_status(agent.id) do
%{pending_count: count} -> count
_ -> 0
end
catch
:exit, _ -> 0
end
[] ->
# No inbox process exists, treat as 0 pending tasks
0
end
{codebase_match, pending_count}
end)
|> List.first()
end
defp check_file_conflicts(state, task) do
# Get codebase-specific file locks
codebase_locks = Map.get(state.codebase_file_locks, task.codebase_id, %{})
task.file_paths
|> Enum.filter(fn file_path ->
Map.has_key?(state.file_locks, file_path)
Map.has_key?(codebase_locks, file_path)
end)
end
defp assign_task_to_agent(state, task, agent_id) do
# Ensure inbox exists for the agent
ensure_inbox_exists(agent_id)
# Add to agent's inbox
Inbox.add_task(agent_id, task)
# Update agent status
agent = Map.get(state.agents, agent_id)
updated_agent = Agent.assign_task(agent, task.id)
new_agents = Map.put(state.agents, agent_id, updated_agent)
# Publish assignment
publish_event(state.nats_conn, "task.assigned", %{
task: task,
agent_id: agent_id
})
# Publish assignment with codebase context
if state.nats_conn do
publish_event(state.nats_conn, "task.assigned.#{task.codebase_id}", %{
task: task,
agent_id: agent_id
})
end
{:reply, {:ok, agent_id}, %{state | agents: new_agents}}
end
defp assign_pending_tasks(state) do
{assigned, remaining} = Enum.reduce(state.pending_tasks, {[], []}, fn task, {assigned, pending} ->
case find_available_agent(state, task) do
nil ->
{assigned, [task | pending]}
agent ->
case check_file_conflicts(state, task) do
[] ->
Inbox.add_task(agent.id, task)
{[{task, agent.id} | assigned], pending}
_conflicts ->
{assigned, [task | pending]}
end
end
end)
{assigned, remaining} =
Enum.reduce(state.pending_tasks, {[], []}, fn task, {assigned, pending} ->
case find_available_agent(state, task) do
nil ->
{assigned, [task | pending]}
agent ->
case check_file_conflicts(state, task) do
[] ->
# Ensure inbox exists for the agent
ensure_inbox_exists(agent.id)
Inbox.add_task(agent.id, task)
{[{task, agent.id} | assigned], pending}
_conflicts ->
{assigned, [task | pending]}
end
end
end)
{assigned, Enum.reverse(remaining)}
end
defp add_file_locks(file_locks, task_id, file_paths) do
Enum.reduce(file_paths, file_locks, fn path, locks ->
Map.put(locks, path, task_id)
end)
defp add_file_locks(codebase_file_locks, codebase_id, task_id, file_paths) do
codebase_locks = Map.get(codebase_file_locks, codebase_id, %{})
updated_locks =
Enum.reduce(file_paths, codebase_locks, fn path, locks ->
Map.put(locks, path, task_id)
end)
Map.put(codebase_file_locks, codebase_id, updated_locks)
end
defp remove_file_locks(file_locks, task_id) do
Enum.reject(file_locks, fn {_path, locked_task_id} ->
locked_task_id == task_id
end)
|> Map.new()
defp remove_file_locks(codebase_file_locks, codebase_id, task_id) do
case Map.get(codebase_file_locks, codebase_id) do
nil ->
codebase_file_locks
codebase_locks ->
updated_locks =
Enum.reject(codebase_locks, fn {_path, locked_task_id} ->
locked_task_id == task_id
end)
|> Map.new()
Map.put(codebase_file_locks, codebase_id, updated_locks)
end
end
defp find_task_by_id(state, task_id) do
# Look for task in pending tasks first
case Enum.find(state.pending_tasks, fn task -> task.id == task_id end) do
nil ->
# Try to find in agent inboxes - for now return nil
# TODO: Implement proper task lookup in Inbox module
nil
task ->
task
end
end
defp publish_event(conn, topic, data) do
message = Jason.encode!(data)
Gnat.pub(conn, topic, message)
if conn do
message = Jason.encode!(data)
Gnat.pub(conn, topic, message)
end
end
end
# Agent unregistration helpers
defp unregister_agent_safely(state, agent_id, agent, reason) do
# Remove agent from registry
new_agents = Map.delete(state.agents, agent_id)
new_state = %{state | agents: new_agents}
# Stop the agent's inbox if it exists
case Inbox.stop(agent_id) do
:ok -> :ok
# Inbox already stopped
{:error, :not_found} -> :ok
# Continue regardless
_ -> :ok
end
# Publish unregistration event
if state.nats_conn do
publish_event(state.nats_conn, "agent.unregistered", %{
agent_id: agent_id,
agent_name: agent.name,
codebase_id: agent.codebase_id,
reason: reason,
timestamp: DateTime.utc_now()
})
end
{:reply, :ok, new_state}
end
defp unregister_agent_with_task_reassignment(state, agent_id, agent, task_id, reason) do
# Get the current task from inbox
case Inbox.get_current_task(agent_id) do
nil ->
# No actual task, treat as safe unregister
unregister_agent_safely(state, agent_id, agent, reason)
task ->
# Reassign task to pending queue
new_pending = [task | state.pending_tasks]
# Remove agent
new_agents = Map.delete(state.agents, agent_id)
new_state = %{state | agents: new_agents, pending_tasks: new_pending}
# Stop the agent's inbox
Inbox.stop(agent_id)
# Publish events
if state.nats_conn do
publish_event(state.nats_conn, "agent.unregistered.with_reassignment", %{
agent_id: agent_id,
agent_name: agent.name,
codebase_id: agent.codebase_id,
reason: reason,
reassigned_task_id: task_id,
timestamp: DateTime.utc_now()
})
publish_event(state.nats_conn, "task.reassigned", %{
task_id: task_id,
from_agent_id: agent_id,
to_queue: "pending",
reason: "Agent unregistered: #{reason}"
})
end
{:reply, :ok, new_state}
end
end
# Helper function to ensure an inbox exists for an agent
defp ensure_inbox_exists(agent_id) do
case Registry.lookup(AgentCoordinator.InboxRegistry, agent_id) do
[] ->
# No inbox exists, create one
case DynamicSupervisor.start_child(
AgentCoordinator.InboxSupervisor,
{Inbox, agent_id}
) do
{:ok, _pid} ->
Logger.info("Created inbox for agent #{agent_id}")
:ok
{:error, {:already_started, _pid}} ->
Logger.info("Inbox already exists for agent #{agent_id}")
:ok
{:error, reason} ->
Logger.warning("Failed to create inbox for agent #{agent_id}: #{inspect(reason)}")
{:error, reason}
end
[{_pid, _}] ->
# Inbox already exists
:ok
end
end
end

View File

@@ -0,0 +1,251 @@
defmodule AgentCoordinator.UnifiedMCPServer do
@moduledoc """
Unified MCP Server that aggregates all external MCP servers and Agent Coordinator tools.
This is the single MCP server that GitHub Copilot sees, which internally manages
all other MCP servers and provides automatic task tracking for any tool usage.
"""
use GenServer
require Logger
alias AgentCoordinator.{MCPServerManager, TaskRegistry}
defstruct [
:agent_sessions,
:request_id_counter
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Handle MCP request from GitHub Copilot
"""
def handle_mcp_request(request) do
GenServer.call(__MODULE__, {:handle_request, request})
end
# Server callbacks
def init(_opts) do
state = %__MODULE__{
agent_sessions: %{},
request_id_counter: 0
}
Logger.info("Unified MCP Server starting...")
{:ok, state}
end
def handle_call({:handle_request, request}, _from, state) do
response = process_mcp_request(request, state)
{:reply, response, state}
end
def handle_call({:register_agent_session, agent_id, session_info}, _from, state) do
new_state = %{state | agent_sessions: Map.put(state.agent_sessions, agent_id, session_info)}
{:reply, :ok, new_state}
end
def handle_info(_msg, state) do
{:noreply, state}
end
# Private functions
defp process_mcp_request(request, state) do
method = Map.get(request, "method")
id = Map.get(request, "id")
case method do
"initialize" ->
handle_initialize(request, id)
"tools/list" ->
handle_tools_list(request, id)
"tools/call" ->
handle_tools_call(request, id, state)
_ ->
error_response(id, -32601, "Method not found: #{method}")
end
end
defp handle_initialize(_request, id) do
%{
"jsonrpc" => "2.0",
"id" => id,
"result" => %{
"protocolVersion" => "2024-11-05",
"capabilities" => %{
"tools" => %{},
"coordination" => %{
"automatic_task_tracking" => true,
"agent_management" => true,
"multi_server_proxy" => true,
"heartbeat_coverage" => true
}
},
"serverInfo" => %{
"name" => "agent-coordinator-unified",
"version" => "0.1.0",
"description" =>
"Unified MCP server with automatic task tracking and agent coordination"
}
}
}
end
defp handle_tools_list(_request, id) do
case MCPServerManager.get_unified_tools() do
tools when is_list(tools) ->
%{
"jsonrpc" => "2.0",
"id" => id,
"result" => %{
"tools" => tools
}
}
{:error, reason} ->
error_response(id, -32603, "Failed to get tools: #{reason}")
end
end
defp handle_tools_call(request, id, state) do
params = Map.get(request, "params", %{})
tool_name = Map.get(params, "name")
arguments = Map.get(params, "arguments", %{})
# Determine agent context from the request or session
agent_context = determine_agent_context(request, arguments, state)
case MCPServerManager.route_tool_call(tool_name, arguments, agent_context) do
%{"error" => _} = error_result ->
Map.put(error_result, "id", id)
result ->
# Wrap successful results in MCP format
success_response = %{
"jsonrpc" => "2.0",
"id" => id,
"result" => format_tool_result(result, tool_name, agent_context)
}
success_response
end
end
defp determine_agent_context(request, arguments, state) do
# Try to determine agent from various sources:
# 1. Explicit agent_id in arguments
case Map.get(arguments, "agent_id") do
agent_id when is_binary(agent_id) ->
%{agent_id: agent_id}
_ ->
# 2. Try to extract from request metadata
case extract_agent_from_request(request) do
agent_id when is_binary(agent_id) ->
%{agent_id: agent_id}
_ ->
# 3. Use a default session for GitHub Copilot
default_agent_context(state)
end
end
end
defp extract_agent_from_request(_request) do
# Look for agent info in request headers, params, etc.
# This could be extended to support various ways of identifying the agent
nil
end
defp default_agent_context(state) do
# Create or use a default agent session for GitHub Copilot
default_agent_id = "github_copilot_session"
case Map.get(state.agent_sessions, default_agent_id) do
nil ->
# Auto-register GitHub Copilot as an agent
case TaskRegistry.register_agent("GitHub Copilot", [
"coding",
"analysis",
"review",
"documentation"
]) do
{:ok, %{agent_id: agent_id}} ->
session_info = %{
agent_id: agent_id,
name: "GitHub Copilot",
auto_registered: true,
created_at: DateTime.utc_now()
}
GenServer.call(self(), {:register_agent_session, agent_id, session_info})
%{agent_id: agent_id}
_ ->
%{agent_id: default_agent_id}
end
session_info ->
%{agent_id: session_info.agent_id}
end
end
defp format_tool_result(result, tool_name, agent_context) do
# Format the result according to MCP tool call response format
base_result =
case result do
%{"result" => content} when is_map(content) ->
# Already properly formatted
content
{:ok, content} ->
# Convert tuple response to content
%{"content" => [%{"type" => "text", "text" => inspect(content)}]}
%{} = map_result ->
# Convert map to text content
%{"content" => [%{"type" => "text", "text" => Jason.encode!(map_result)}]}
binary when is_binary(binary) ->
# Simple text result
%{"content" => [%{"type" => "text", "text" => binary}]}
other ->
# Fallback for any other type
%{"content" => [%{"type" => "text", "text" => inspect(other)}]}
end
# Add metadata about the operation
metadata = %{
"tool_name" => tool_name,
"agent_id" => agent_context.agent_id,
"timestamp" => DateTime.utc_now() |> DateTime.to_iso8601(),
"auto_tracked" => true
}
Map.put(base_result, "_metadata", metadata)
end
defp error_response(id, code, message) do
%{
"jsonrpc" => "2.0",
"id" => id,
"error" => %{
"code" => code,
"message" => message
}
}
end
end