start repo

This commit is contained in:
Ra
2025-08-22 05:08:00 -07:00
commit 5048db99c7
15 changed files with 1470 additions and 0 deletions

18
lib/agent_coordinator.ex Normal file
View File

@@ -0,0 +1,18 @@
defmodule AgentCoordinator do
@moduledoc """
Documentation for `AgentCoordinator`.
"""
@doc """
Hello world.
## Examples
iex> AgentCoordinator.hello()
:world
"""
def hello do
:world
end
end

View File

@@ -0,0 +1,66 @@
defmodule AgentCoordinator.Agent do
@moduledoc """
Agent data structure for the coordination system.
"""
defstruct [
:id,
:name,
:capabilities,
:status,
:current_task_id,
:last_heartbeat,
:metadata
]
@type status :: :idle | :busy | :offline | :error
@type capability :: :coding | :testing | :documentation | :analysis | :review
@type t :: %__MODULE__{
id: String.t(),
name: String.t(),
capabilities: [capability()],
status: status(),
current_task_id: String.t() | nil,
last_heartbeat: DateTime.t(),
metadata: map()
}
def new(name, capabilities, opts \\ []) do
%__MODULE__{
id: UUID.uuid4(),
name: name,
capabilities: capabilities,
status: :idle,
current_task_id: nil,
last_heartbeat: DateTime.utc_now(),
metadata: Keyword.get(opts, :metadata, %{})
}
end
def heartbeat(agent) do
%{agent | last_heartbeat: DateTime.utc_now()}
end
def assign_task(agent, task_id) do
%{agent | status: :busy, current_task_id: task_id}
end
def complete_task(agent) do
%{agent | status: :idle, current_task_id: nil}
end
def is_online?(agent) do
DateTime.diff(DateTime.utc_now(), agent.last_heartbeat, :second) < 30
end
def can_handle?(agent, task) do
# Simple capability matching - can be enhanced
required_capabilities = Map.get(task.metadata, :required_capabilities, [])
case required_capabilities do
[] -> true
caps -> Enum.any?(caps, fn cap -> cap in agent.capabilities end)
end
end
end

View File

@@ -0,0 +1,43 @@
defmodule AgentCoordinator.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
# Registry for agent inboxes
{Registry, keys: :unique, name: AgentCoordinator.InboxRegistry},
# PubSub for real-time updates
{Phoenix.PubSub, name: AgentCoordinator.PubSub},
# Persistence layer
{AgentCoordinator.Persistence, nats: nats_config()},
# Task registry with NATS integration
{AgentCoordinator.TaskRegistry, nats: nats_config()},
# MCP server
AgentCoordinator.MCPServer,
# Dynamic supervisor for agent inboxes
{DynamicSupervisor, name: AgentCoordinator.InboxSupervisor, strategy: :one_for_one}
]
opts = [strategy: :one_for_one, name: AgentCoordinator.Supervisor]
Supervisor.start_link(children, opts)
end
defp nats_config do
[
host: System.get_env("NATS_HOST", "localhost"),
port: String.to_integer(System.get_env("NATS_PORT", "4222")),
connection_settings: [
name: :agent_coordinator
]
]
end
end

View File

@@ -0,0 +1,200 @@
defmodule AgentCoordinator.CLI do
@moduledoc """
Command line interface for testing the agent coordination system.
"""
alias AgentCoordinator.{MCPServer, TaskRegistry, Inbox, Agent, Task}
def main(args \\ []) do
case args do
["register", name | capabilities] ->
register_agent(name, capabilities)
["create-task", title, description | opts] ->
create_task(title, description, parse_task_opts(opts))
["board"] ->
show_task_board()
["agent-status", agent_id] ->
show_agent_status(agent_id)
["help"] ->
show_help()
_ ->
IO.puts("Invalid command. Use 'help' for usage information.")
end
end
defp register_agent(name, capabilities) do
caps = Enum.map(capabilities, &String.to_existing_atom/1)
request = %{
"method" => "tools/call",
"params" => %{
"name" => "register_agent",
"arguments" => %{
"name" => name,
"capabilities" => capabilities
}
}
}
case MCPServer.handle_mcp_request(request) do
%{"result" => %{"content" => [%{"text" => result}]}} ->
data = Jason.decode!(result)
IO.puts("✓ Agent registered successfully!")
IO.puts(" Agent ID: #{data["agent_id"]}")
IO.puts(" Status: #{data["status"]}")
%{"error" => %{"message" => message}} ->
IO.puts("✗ Registration failed: #{message}")
end
end
defp create_task(title, description, opts) do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "create_task",
"arguments" => Map.merge(%{
"title" => title,
"description" => description
}, opts)
}
}
case MCPServer.handle_mcp_request(request) do
%{"result" => %{"content" => [%{"text" => result}]}} ->
data = Jason.decode!(result)
IO.puts("✓ Task created successfully!")
IO.puts(" Task ID: #{data["task_id"]}")
IO.puts(" Status: #{data["status"]}")
if Map.has_key?(data, "assigned_to") do
IO.puts(" Assigned to: #{data["assigned_to"]}")
end
%{"error" => %{"message" => message}} ->
IO.puts("✗ Task creation failed: #{message}")
end
end
defp show_task_board do
request = %{
"method" => "tools/call",
"params" => %{
"name" => "get_task_board",
"arguments" => %{}
}
}
case MCPServer.handle_mcp_request(request) do
%{"result" => %{"content" => [%{"text" => result}]}} ->
%{"agents" => agents} = Jason.decode!(result)
IO.puts("\n📋 Task Board")
IO.puts(String.duplicate("=", 50))
if Enum.empty?(agents) do
IO.puts("No agents registered.")
else
Enum.each(agents, &print_agent_summary/1)
end
error ->
IO.puts("✗ Failed to fetch task board: #{inspect(error)}")
end
end
defp show_agent_status(agent_id) do
case Inbox.get_status(agent_id) do
status ->
IO.puts("\n👤 Agent Status: #{agent_id}")
IO.puts(String.duplicate("-", 30))
IO.puts("Pending tasks: #{status.pending_count}")
IO.puts("Completed tasks: #{status.completed_count}")
case status.current_task do
nil ->
IO.puts("Current task: None")
task ->
IO.puts("Current task: #{task.title}")
IO.puts(" Description: #{task.description}")
IO.puts(" Priority: #{task.priority}")
end
end
end
defp print_agent_summary(agent) do
status_icon = case agent["status"] do
"idle" -> "💤"
"busy" -> "🔧"
"offline" -> ""
_ -> ""
end
online_status = if agent["online"], do: "🟢", else: "🔴"
IO.puts("\n#{status_icon} #{agent["name"]} (#{agent["agent_id"]}) #{online_status}")
IO.puts(" Capabilities: #{Enum.join(agent["capabilities"], ", ")}")
IO.puts(" Pending: #{agent["pending_tasks"]} | Completed: #{agent["completed_tasks"]}")
case agent["current_task"] do
nil ->
IO.puts(" Current: No active task")
task ->
IO.puts(" Current: #{task["title"]}")
end
end
defp parse_task_opts(opts) do
Enum.reduce(opts, %{}, fn opt, acc ->
case String.split(opt, "=", parts: 2) do
["priority", value] ->
Map.put(acc, "priority", value)
["files", files] ->
Map.put(acc, "file_paths", String.split(files, ","))
["caps", capabilities] ->
Map.put(acc, "required_capabilities", String.split(capabilities, ","))
_ ->
acc
end
end)
end
defp show_help do
IO.puts("""
Agent Coordinator CLI
Commands:
register <name> <capability1> <capability2> ...
Register a new agent with specified capabilities
Capabilities: coding, testing, documentation, analysis, review
create-task <title> <description> [priority=<low|normal|high|urgent>] [files=<file1,file2>] [caps=<cap1,cap2>]
Create a new task with optional parameters
board
Show current task board with all agents and their status
agent-status <agent-id>
Show detailed status for a specific agent
help
Show this help message
Examples:
register "CodeBot" coding testing
create-task "Fix login bug" "User login fails with 500 error" priority=high files=auth.ex,login.ex
board
agent-status abc-123-def
""")
end
end

View File

@@ -0,0 +1,160 @@
defmodule AgentCoordinator.Inbox do
@moduledoc """
Agent inbox management using GenServer for each agent's task queue.
"""
use GenServer
alias AgentCoordinator.{Task, Agent}
defstruct [
:agent_id,
:pending_tasks,
:in_progress_task,
:completed_tasks,
:max_history
]
@type t :: %__MODULE__{
agent_id: String.t(),
pending_tasks: [Task.t()],
in_progress_task: Task.t() | nil,
completed_tasks: [Task.t()],
max_history: non_neg_integer()
}
# Client API
def start_link(agent_id, opts \\ []) do
GenServer.start_link(__MODULE__, {agent_id, opts}, name: via_tuple(agent_id))
end
def add_task(agent_id, task) do
GenServer.call(via_tuple(agent_id), {:add_task, task})
end
def get_next_task(agent_id) do
GenServer.call(via_tuple(agent_id), :get_next_task)
end
def complete_current_task(agent_id) do
GenServer.call(via_tuple(agent_id), :complete_current_task)
end
def get_status(agent_id) do
GenServer.call(via_tuple(agent_id), :get_status)
end
def list_tasks(agent_id) do
GenServer.call(via_tuple(agent_id), :list_tasks)
end
# Server callbacks
def init({agent_id, opts}) do
state = %__MODULE__{
agent_id: agent_id,
pending_tasks: [],
in_progress_task: nil,
completed_tasks: [],
max_history: Keyword.get(opts, :max_history, 100)
}
{:ok, state}
end
def handle_call({:add_task, task}, _from, state) do
# Insert task based on priority
pending_tasks = insert_by_priority(state.pending_tasks, task)
new_state = %{state | pending_tasks: pending_tasks}
# Broadcast task added
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "agent:#{state.agent_id}",
{:task_added, task})
{:reply, :ok, new_state}
end
def handle_call(:get_next_task, _from, state) do
case state.pending_tasks do
[] ->
{:reply, nil, state}
[next_task | remaining_tasks] ->
updated_task = Task.assign_to_agent(next_task, state.agent_id)
new_state = %{state |
pending_tasks: remaining_tasks,
in_progress_task: updated_task
}
# Broadcast task started
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global",
{:task_started, updated_task})
{:reply, updated_task, new_state}
end
end
def handle_call(:complete_current_task, _from, state) do
case state.in_progress_task do
nil ->
{:reply, {:error, :no_task_in_progress}, state}
task ->
completed_task = Task.complete(task)
# Add to completed tasks with history limit
completed_tasks = [completed_task | state.completed_tasks]
|> Enum.take(state.max_history)
new_state = %{state |
in_progress_task: nil,
completed_tasks: completed_tasks
}
# Broadcast task completed
Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global",
{:task_completed, completed_task})
{:reply, completed_task, new_state}
end
end
def handle_call(:get_status, _from, state) do
status = %{
agent_id: state.agent_id,
pending_count: length(state.pending_tasks),
current_task: state.in_progress_task,
completed_count: length(state.completed_tasks)
}
{:reply, status, state}
end
def handle_call(:list_tasks, _from, state) do
tasks = %{
pending: state.pending_tasks,
in_progress: state.in_progress_task,
completed: Enum.take(state.completed_tasks, 10) # Recent 10
}
{:reply, tasks, state}
end
# Private helpers
defp via_tuple(agent_id) do
{:via, Registry, {AgentCoordinator.InboxRegistry, agent_id}}
end
defp insert_by_priority(tasks, new_task) do
priority_order = %{urgent: 0, high: 1, normal: 2, low: 3}
new_priority = Map.get(priority_order, new_task.priority, 2)
{before, after} = Enum.split_while(tasks, fn task ->
task_priority = Map.get(priority_order, task.priority, 2)
task_priority <= new_priority
end)
before ++ [new_task] ++ after
end
end

View File

@@ -0,0 +1,263 @@
defmodule AgentCoordinator.MCPServer do
@moduledoc """
MCP (Model Context Protocol) server for agent coordination.
Provides tools for agents to interact with the task coordination system.
"""
use GenServer
alias AgentCoordinator.{TaskRegistry, Inbox, Agent, Task}
@mcp_tools [
%{
"name" => "register_agent",
"description" => "Register a new agent with the coordination system",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"name" => %{"type" => "string"},
"capabilities" => %{
"type" => "array",
"items" => %{"type" => "string", "enum" => ["coding", "testing", "documentation", "analysis", "review"]}
}
},
"required" => ["name", "capabilities"]
}
},
%{
"name" => "create_task",
"description" => "Create a new task in the coordination system",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"title" => %{"type" => "string"},
"description" => %{"type" => "string"},
"priority" => %{"type" => "string", "enum" => ["low", "normal", "high", "urgent"]},
"file_paths" => %{"type" => "array", "items" => %{"type" => "string"}},
"required_capabilities" => %{
"type" => "array",
"items" => %{"type" => "string"}
}
},
"required" => ["title", "description"]
}
},
%{
"name" => "get_next_task",
"description" => "Get the next task for an agent",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
},
%{
"name" => "complete_task",
"description" => "Mark current task as completed",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
},
%{
"name" => "get_task_board",
"description" => "Get overview of all agents and their current tasks",
"inputSchema" => %{
"type" => "object",
"properties" => %{}
}
},
%{
"name" => "heartbeat",
"description" => "Send heartbeat to maintain agent status",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"}
},
"required" => ["agent_id"]
}
}
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def handle_mcp_request(request) do
GenServer.call(__MODULE__, {:mcp_request, request})
end
def get_tools do
@mcp_tools
end
# Server callbacks
def init(_opts) do
{:ok, %{}}
end
def handle_call({:mcp_request, request}, _from, state) do
response = process_mcp_request(request)
{:reply, response, state}
end
# MCP request processing
defp process_mcp_request(%{"method" => "tools/list"}) do
%{
"jsonrpc" => "2.0",
"result" => %{"tools" => @mcp_tools}
}
end
defp process_mcp_request(%{
"method" => "tools/call",
"params" => %{"name" => tool_name, "arguments" => args}
} = request) do
id = Map.get(request, "id", nil)
result = case tool_name do
"register_agent" -> register_agent(args)
"create_task" -> create_task(args)
"get_next_task" -> get_next_task(args)
"complete_task" -> complete_task(args)
"get_task_board" -> get_task_board(args)
"heartbeat" -> heartbeat(args)
_ -> {:error, "Unknown tool: #{tool_name}"}
end
case result do
{:ok, data} ->
%{
"jsonrpc" => "2.0",
"id" => id,
"result" => %{"content" => [%{"type" => "text", "text" => Jason.encode!(data)}]}
}
{:error, reason} ->
%{
"jsonrpc" => "2.0",
"id" => id,
"error" => %{"code" => -1, "message" => reason}
}
end
end
defp process_mcp_request(_request) do
%{
"jsonrpc" => "2.0",
"error" => %{"code" => -32601, "message" => "Method not found"}
}
end
# Tool implementations
defp register_agent(%{"name" => name, "capabilities" => capabilities}) do
caps = Enum.map(capabilities, &String.to_existing_atom/1)
agent = Agent.new(name, caps)
case TaskRegistry.register_agent(agent) do
:ok ->
# Start inbox for the agent
{:ok, _pid} = Inbox.start_link(agent.id)
{:ok, %{agent_id: agent.id, status: "registered"}}
{:error, reason} ->
{:error, "Failed to register agent: #{reason}"}
end
end
defp create_task(%{"title" => title, "description" => description} = args) do
opts = [
priority: String.to_existing_atom(Map.get(args, "priority", "normal")),
file_paths: Map.get(args, "file_paths", []),
metadata: %{
required_capabilities: Map.get(args, "required_capabilities", [])
}
]
task = Task.new(title, description, opts)
case TaskRegistry.assign_task(task) do
{:ok, agent_id} ->
{:ok, %{task_id: task.id, assigned_to: agent_id, status: "assigned"}}
{:error, :no_available_agents} ->
# Add to global pending queue
TaskRegistry.add_to_pending(task)
{:ok, %{task_id: task.id, status: "queued"}}
end
end
defp get_next_task(%{"agent_id" => agent_id}) do
case Inbox.get_next_task(agent_id) do
nil ->
{:ok, %{message: "No tasks available"}}
task ->
{:ok, %{
task_id: task.id,
title: task.title,
description: task.description,
file_paths: task.file_paths,
priority: task.priority
}}
end
end
defp complete_task(%{"agent_id" => agent_id}) do
case Inbox.complete_current_task(agent_id) do
{:error, reason} ->
{:error, "Failed to complete task: #{reason}"}
completed_task ->
{:ok, %{
task_id: completed_task.id,
status: "completed",
completed_at: completed_task.updated_at
}}
end
end
defp get_task_board(_args) do
agents = TaskRegistry.list_agents()
board = Enum.map(agents, fn agent ->
status = Inbox.get_status(agent.id)
%{
agent_id: agent.id,
name: agent.name,
capabilities: agent.capabilities,
status: agent.status,
online: Agent.is_online?(agent),
current_task: status.current_task && %{
id: status.current_task.id,
title: status.current_task.title
},
pending_tasks: status.pending_count,
completed_tasks: status.completed_count
}
end)
{:ok, %{agents: board}}
end
defp heartbeat(%{"agent_id" => agent_id}) do
case TaskRegistry.heartbeat_agent(agent_id) do
:ok ->
{:ok, %{status: "heartbeat_received"}}
{:error, reason} ->
{:error, "Heartbeat failed: #{reason}"}
end
end
end

View File

@@ -0,0 +1,209 @@
defmodule AgentCoordinator.Persistence do
@moduledoc """
Persistent storage for tasks and events using NATS JetStream.
Provides configurable retention policies and event replay capabilities.
"""
use GenServer
alias AgentCoordinator.{Task, Agent}
defstruct [
:nats_conn,
:stream_name,
:retention_policy
]
@stream_config %{
"name" => "AGENT_COORDINATION",
"subjects" => ["agent.*", "task.*"],
"storage" => "file",
"max_msgs" => 1_000_000,
"max_bytes" => 1_000_000_000, # 1GB
"max_age" => 7 * 24 * 60 * 60 * 1_000_000_000, # 7 days in nanoseconds
"max_msg_size" => 1_000_000, # 1MB
"retention" => "limits",
"discard" => "old"
}
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def store_event(subject, data) do
GenServer.cast(__MODULE__, {:store_event, subject, data})
end
def get_agent_history(agent_id, opts \\ []) do
GenServer.call(__MODULE__, {:get_agent_history, agent_id, opts})
end
def get_task_history(task_id, opts \\ []) do
GenServer.call(__MODULE__, {:get_task_history, task_id, opts})
end
def replay_events(subject_filter, opts \\ []) do
GenServer.call(__MODULE__, {:replay_events, subject_filter, opts})
end
def get_system_stats do
GenServer.call(__MODULE__, :get_system_stats)
end
# Server callbacks
def init(opts) do
nats_config = Keyword.get(opts, :nats, [])
retention_policy = Keyword.get(opts, :retention_policy, :default)
{:ok, nats_conn} = Gnat.start_link(nats_config)
# Create or update JetStream
create_or_update_stream(nats_conn)
state = %__MODULE__{
nats_conn: nats_conn,
stream_name: @stream_config["name"],
retention_policy: retention_policy
}
{:ok, state}
end
def handle_cast({:store_event, subject, data}, state) do
enriched_data = enrich_event_data(data)
message = Jason.encode!(enriched_data)
# Publish to JetStream
case Gnat.pub(state.nats_conn, subject, message, headers: event_headers()) do
:ok -> :ok
{:error, reason} ->
IO.puts("Failed to store event: #{inspect(reason)}")
end
{:noreply, state}
end
def handle_call({:get_agent_history, agent_id, opts}, _from, state) do
subject_filter = "agent.*.#{agent_id}"
limit = Keyword.get(opts, :limit, 100)
events = fetch_events(state.nats_conn, subject_filter, limit)
{:reply, events, state}
end
def handle_call({:get_task_history, task_id, opts}, _from, state) do
subject_filter = "task.*"
limit = Keyword.get(opts, :limit, 100)
events = fetch_events(state.nats_conn, subject_filter, limit)
|> Enum.filter(fn event ->
case Map.get(event, "task") do
%{"id" => ^task_id} -> true
_ -> false
end
end)
{:reply, events, state}
end
def handle_call({:replay_events, subject_filter, opts}, _from, state) do
limit = Keyword.get(opts, :limit, 1000)
start_time = Keyword.get(opts, :start_time)
events = fetch_events(state.nats_conn, subject_filter, limit, start_time)
{:reply, events, state}
end
def handle_call(:get_system_stats, _from, state) do
stats = get_stream_info(state.nats_conn, state.stream_name)
{:reply, stats, state}
end
# Private helpers
defp create_or_update_stream(conn) do
# Check if stream exists
case get_stream_info(conn, @stream_config["name"]) do
nil ->
# Create new stream
create_stream(conn, @stream_config)
_existing ->
# Update existing stream if needed
update_stream(conn, @stream_config)
end
end
defp create_stream(conn, config) do
request = %{
"type" => "io.nats.jetstream.api.v1.stream_create_request",
"config" => config
}
case Gnat.request(conn, "$JS.API.STREAM.CREATE.#{config["name"]}", Jason.encode!(request)) do
{:ok, response} ->
case Jason.decode!(response.body) do
%{"error" => error} ->
IO.puts("Failed to create stream: #{inspect(error)}")
{:error, error}
result ->
IO.puts("Stream created successfully")
{:ok, result}
end
{:error, reason} ->
IO.puts("Failed to create stream: #{inspect(reason)}")
{:error, reason}
end
end
defp update_stream(conn, config) do
# For simplicity, we'll just ensure the stream exists
# In production, you might want more sophisticated update logic
:ok
end
defp get_stream_info(conn, stream_name) do
case Gnat.request(conn, "$JS.API.STREAM.INFO.#{stream_name}", "") do
{:ok, response} ->
case Jason.decode!(response.body) do
%{"error" => _} -> nil
info -> info
end
{:error, _} -> nil
end
end
defp fetch_events(conn, subject_filter, limit, start_time \\ nil) do
# Create a consumer to fetch messages
consumer_config = %{
"durable_name" => "temp_#{:rand.uniform(10000)}",
"deliver_policy" => if(start_time, do: "by_start_time", else: "all"),
"opt_start_time" => start_time,
"max_deliver" => 1,
"ack_policy" => "explicit"
}
# This is a simplified implementation
# In production, you'd use proper JetStream consumer APIs
[] # Return empty for now - would implement full JetStream integration
end
defp enrich_event_data(data) do
Map.merge(data, %{
"timestamp" => DateTime.utc_now() |> DateTime.to_iso8601(),
"version" => "1.0"
})
end
defp event_headers do
[
{"content-type", "application/json"},
{"source", "agent-coordinator"}
]
end
end

View File

@@ -0,0 +1,76 @@
defmodule AgentCoordinator.Task do
@moduledoc """
Task data structure for agent coordination system.
"""
defstruct [
:id,
:title,
:description,
:status,
:priority,
:agent_id,
:file_paths,
:dependencies,
:created_at,
:updated_at,
:metadata
]
@type status :: :pending | :in_progress | :completed | :failed | :blocked
@type priority :: :low | :normal | :high | :urgent
@type t :: %__MODULE__{
id: String.t(),
title: String.t(),
description: String.t(),
status: status(),
priority: priority(),
agent_id: String.t() | nil,
file_paths: [String.t()],
dependencies: [String.t()],
created_at: DateTime.t(),
updated_at: DateTime.t(),
metadata: map()
}
def new(title, description, opts \\ []) do
now = DateTime.utc_now()
%__MODULE__{
id: UUID.uuid4(),
title: title,
description: description,
status: Keyword.get(opts, :status, :pending),
priority: Keyword.get(opts, :priority, :normal),
agent_id: Keyword.get(opts, :agent_id),
file_paths: Keyword.get(opts, :file_paths, []),
dependencies: Keyword.get(opts, :dependencies, []),
created_at: now,
updated_at: now,
metadata: Keyword.get(opts, :metadata, %{})
}
end
def assign_to_agent(task, agent_id) do
%{task | agent_id: agent_id, status: :in_progress, updated_at: DateTime.utc_now()}
end
def complete(task) do
%{task | status: :completed, updated_at: DateTime.utc_now()}
end
def fail(task, reason \\ nil) do
metadata = if reason, do: Map.put(task.metadata, :failure_reason, reason), else: task.metadata
%{task | status: :failed, metadata: metadata, updated_at: DateTime.utc_now()}
end
def block(task, reason \\ nil) do
metadata = if reason, do: Map.put(task.metadata, :block_reason, reason), else: task.metadata
%{task | status: :blocked, metadata: metadata, updated_at: DateTime.utc_now()}
end
def has_file_conflict?(task1, task2) do
not MapSet.disjoint?(MapSet.new(task1.file_paths), MapSet.new(task2.file_paths))
end
end

View File

@@ -0,0 +1,256 @@
defmodule AgentCoordinator.TaskRegistry do
@moduledoc """
Central registry for agents and task assignment with NATS integration.
"""
use GenServer
alias AgentCoordinator.{Agent, Task, Inbox}
defstruct [
:agents,
:pending_tasks,
:file_locks,
:nats_conn
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def register_agent(agent) do
GenServer.call(__MODULE__, {:register_agent, agent})
end
def assign_task(task) do
GenServer.call(__MODULE__, {:assign_task, task})
end
def add_to_pending(task) do
GenServer.call(__MODULE__, {:add_to_pending, task})
end
def list_agents do
GenServer.call(__MODULE__, :list_agents)
end
def heartbeat_agent(agent_id) do
GenServer.call(__MODULE__, {:heartbeat_agent, agent_id})
end
def get_file_locks do
GenServer.call(__MODULE__, :get_file_locks)
end
# Server callbacks
def init(opts) do
# Connect to NATS
nats_config = Keyword.get(opts, :nats, [])
{:ok, nats_conn} = Gnat.start_link(nats_config)
# Subscribe to task events
Gnat.sub(nats_conn, self(), "agent.task.*")
Gnat.sub(nats_conn, self(), "agent.heartbeat.*")
state = %__MODULE__{
agents: %{},
pending_tasks: [],
file_locks: %{},
nats_conn: nats_conn
}
{:ok, state}
end
def handle_call({:register_agent, agent}, _from, state) do
# Check for duplicate names
case Enum.find(state.agents, fn {_id, a} -> a.name == agent.name end) do
nil ->
new_agents = Map.put(state.agents, agent.id, agent)
new_state = %{state | agents: new_agents}
# Publish agent registration
publish_event(state.nats_conn, "agent.registered", %{agent: agent})
# Try to assign pending tasks
{assigned_tasks, remaining_pending} = assign_pending_tasks(new_state)
final_state = %{new_state | pending_tasks: remaining_pending}
{:reply, :ok, final_state}
_ ->
{:reply, {:error, "Agent name already exists"}, state}
end
end
def handle_call({:assign_task, task}, _from, state) do
case find_available_agent(state, task) do
nil ->
{:reply, {:error, :no_available_agents}, state}
agent ->
# Check for file conflicts
case check_file_conflicts(state, task) do
[] ->
# No conflicts, assign task
assign_task_to_agent(state, task, agent.id)
conflicts ->
# Block task due to conflicts
blocked_task = Task.block(task, "File conflicts: #{inspect(conflicts)}")
new_pending = [blocked_task | state.pending_tasks]
publish_event(state.nats_conn, "task.blocked", %{
task: blocked_task,
conflicts: conflicts
})
{:reply, {:error, :file_conflicts}, %{state | pending_tasks: new_pending}}
end
end
end
def handle_call({:add_to_pending, task}, _from, state) do
new_pending = [task | state.pending_tasks]
publish_event(state.nats_conn, "task.queued", %{task: task})
{:reply, :ok, %{state | pending_tasks: new_pending}}
end
def handle_call(:list_agents, _from, state) do
agents = Map.values(state.agents)
{:reply, agents, state}
end
def handle_call({:heartbeat_agent, agent_id}, _from, state) do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, {:error, :agent_not_found}, state}
agent ->
updated_agent = Agent.heartbeat(agent)
new_agents = Map.put(state.agents, agent_id, updated_agent)
new_state = %{state | agents: new_agents}
publish_event(state.nats_conn, "agent.heartbeat", %{agent_id: agent_id})
{:reply, :ok, new_state}
end
end
def handle_call(:get_file_locks, _from, state) do
{:reply, state.file_locks, state}
end
# Handle NATS messages
def handle_info({:msg, %{topic: "agent.task.started", body: body}}, state) do
%{"task" => task_data} = Jason.decode!(body)
# Update file locks
file_locks = add_file_locks(state.file_locks, task_data["id"], task_data["file_paths"])
{:noreply, %{state | file_locks: file_locks}}
end
def handle_info({:msg, %{topic: "agent.task.completed", body: body}}, state) do
%{"task" => task_data} = Jason.decode!(body)
# Remove file locks
file_locks = remove_file_locks(state.file_locks, task_data["id"])
# Try to assign pending tasks that might now be unblocked
{_assigned, remaining_pending} = assign_pending_tasks(%{state | file_locks: file_locks})
{:noreply, %{state | file_locks: file_locks, pending_tasks: remaining_pending}}
end
def handle_info({:msg, %{topic: topic}}, state) when topic != "agent.task.started" and topic != "agent.task.completed" do
# Ignore other messages for now
{:noreply, state}
end
# Private helpers
defp find_available_agent(state, task) do
state.agents
|> Map.values()
|> Enum.filter(fn agent ->
agent.status == :idle and
Agent.is_online?(agent) and
Agent.can_handle?(agent, task)
end)
|> Enum.sort_by(fn agent ->
# Prefer agents with fewer pending tasks
case Inbox.get_status(agent.id) do
%{pending_count: count} -> count
_ -> 999
end
end)
|> List.first()
end
defp check_file_conflicts(state, task) do
task.file_paths
|> Enum.filter(fn file_path ->
Map.has_key?(state.file_locks, file_path)
end)
end
defp assign_task_to_agent(state, task, agent_id) do
# Add to agent's inbox
Inbox.add_task(agent_id, task)
# Update agent status
agent = Map.get(state.agents, agent_id)
updated_agent = Agent.assign_task(agent, task.id)
new_agents = Map.put(state.agents, agent_id, updated_agent)
# Publish assignment
publish_event(state.nats_conn, "task.assigned", %{
task: task,
agent_id: agent_id
})
{:reply, {:ok, agent_id}, %{state | agents: new_agents}}
end
defp assign_pending_tasks(state) do
{assigned, remaining} = Enum.reduce(state.pending_tasks, {[], []}, fn task, {assigned, pending} ->
case find_available_agent(state, task) do
nil ->
{assigned, [task | pending]}
agent ->
case check_file_conflicts(state, task) do
[] ->
Inbox.add_task(agent.id, task)
{[{task, agent.id} | assigned], pending}
_conflicts ->
{assigned, [task | pending]}
end
end
end)
{assigned, Enum.reverse(remaining)}
end
defp add_file_locks(file_locks, task_id, file_paths) do
Enum.reduce(file_paths, file_locks, fn path, locks ->
Map.put(locks, path, task_id)
end)
end
defp remove_file_locks(file_locks, task_id) do
Enum.reject(file_locks, fn {_path, locked_task_id} ->
locked_task_id == task_id
end)
|> Map.new()
end
defp publish_event(conn, topic, data) do
message = Jason.encode!(data)
Gnat.pub(conn, topic, message)
end
end