From 5048db99c78bafe8fd66f07e94549336420c6c22 Mon Sep 17 00:00:00 2001 From: Ra Date: Fri, 22 Aug 2025 05:08:00 -0700 Subject: [PATCH] start repo --- .formatter.exs | 4 + .gitignore | 23 +++ README.md | 111 +++++++++++ lib/agent_coordinator.ex | 18 ++ lib/agent_coordinator/agent.ex | 66 +++++++ lib/agent_coordinator/application.ex | 43 ++++ lib/agent_coordinator/cli.ex | 200 +++++++++++++++++++ lib/agent_coordinator/inbox.ex | 160 +++++++++++++++ lib/agent_coordinator/mcp_server.ex | 263 +++++++++++++++++++++++++ lib/agent_coordinator/persistence.ex | 209 ++++++++++++++++++++ lib/agent_coordinator/task.ex | 76 +++++++ lib/agent_coordinator/task_registry.ex | 256 ++++++++++++++++++++++++ mix.exs | 32 +++ test/agent_coordinator_test.exs | 8 + test/test_helper.exs | 1 + 15 files changed, 1470 insertions(+) create mode 100644 .formatter.exs create mode 100644 .gitignore create mode 100644 README.md create mode 100644 lib/agent_coordinator.ex create mode 100644 lib/agent_coordinator/agent.ex create mode 100644 lib/agent_coordinator/application.ex create mode 100644 lib/agent_coordinator/cli.ex create mode 100644 lib/agent_coordinator/inbox.ex create mode 100644 lib/agent_coordinator/mcp_server.ex create mode 100644 lib/agent_coordinator/persistence.ex create mode 100644 lib/agent_coordinator/task.ex create mode 100644 lib/agent_coordinator/task_registry.ex create mode 100644 mix.exs create mode 100644 test/agent_coordinator_test.exs create mode 100644 test/test_helper.exs diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2cb2993 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +agent_coordinator-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..ef92c31 --- /dev/null +++ b/README.md @@ -0,0 +1,111 @@ +# AgentCoordinator + +A distributed task coordination system for AI agents built with Elixir and NATS. + +## Overview + +AgentCoordinator is a centralized task management system designed to enable multiple AI agents (Claude Code, GitHub Copilot, etc.) to work collaboratively on the same codebase without conflicts. It provides: + +- **Distributed Task Management**: Centralized task queue with agent-specific inboxes +- **Conflict Resolution**: File-level locking prevents agents from working on the same files +- **Real-time Communication**: NATS messaging for instant coordination +- **Persistent Storage**: Event sourcing with configurable retention policies +- **MCP Integration**: Model Context Protocol server for agent communication +- **Fault Tolerance**: Elixir supervision trees ensure system resilience + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ AI Agent 1 │ │ AI Agent 2 │ │ AI Agent N │ +│ (Claude Code) │ │ (Copilot) │ │ ... │ +└─────────┬───────┘ └─────────┬────────┘ └─────────┬───────┘ + │ │ │ + └──────────────────────┼───────────────────────┘ + │ + ┌─────────────┴──────────────┐ + │ MCP Server Interface │ + └─────────────┬──────────────┘ + │ + ┌─────────────┴──────────────┐ + │ AgentCoordinator │ + │ │ + │ ┌──────────────────────┐ │ + │ │ Task Registry │ │ + │ │ ┌──────────────┐ │ │ + │ │ │ Agent Inbox │ │ │ + │ │ │ Agent Inbox │ │ │ + │ │ │ Agent Inbox │ │ │ + │ │ └──────────────┘ │ │ + │ └──────────────────────┘ │ + │ │ + │ ┌──────────────────────┐ │ + │ │ NATS Messaging │ │ + │ └──────────────────────┘ │ + │ │ + │ ┌──────────────────────┐ │ + │ │ Persistence │ │ + │ │ (JetStream) │ │ + │ └──────────────────────┘ │ + └────────────────────────────┘ +``` + +## Installation + +### Prerequisites + +- Elixir 1.16+ and Erlang/OTP 28+ +- NATS server (with JetStream enabled) + +### Setup + +1. **Install Dependencies** + ```bash + mix deps.get + ``` + +2. **Start NATS Server** + ```bash + # Using Docker + docker run -p 4222:4222 -p 8222:8222 nats:latest -js + + # Or install locally and run + nats-server -js + ``` + +3. **Configure Environment** + ```bash + export NATS_HOST=localhost + export NATS_PORT=4222 + ``` + +4. **Start the Application** + ```bash + iex -S mix + ``` + +## Usage + +### Command Line Interface + +```bash +# Register an agent +mix run -e "AgentCoordinator.CLI.main([\"register\", \"CodeBot\", \"coding\", \"testing\"])" + +# Create a task +mix run -e "AgentCoordinator.CLI.main([\"create-task\", \"Fix login bug\", \"User login fails\", \"priority=high\"])" + +# View task board +mix run -e "AgentCoordinator.CLI.main([\"board\"])" +``` + +### MCP Integration + +Available MCP tools for agents: +- `register_agent` - Register a new agent +- `create_task` - Create a new task +- `get_next_task` - Get next task for agent +- `complete_task` - Mark current task complete +- `get_task_board` - View all agent statuses +- `heartbeat` - Send agent heartbeat + diff --git a/lib/agent_coordinator.ex b/lib/agent_coordinator.ex new file mode 100644 index 0000000..9b5f5f8 --- /dev/null +++ b/lib/agent_coordinator.ex @@ -0,0 +1,18 @@ +defmodule AgentCoordinator do + @moduledoc """ + Documentation for `AgentCoordinator`. + """ + + @doc """ + Hello world. + + ## Examples + + iex> AgentCoordinator.hello() + :world + + """ + def hello do + :world + end +end diff --git a/lib/agent_coordinator/agent.ex b/lib/agent_coordinator/agent.ex new file mode 100644 index 0000000..c22c93d --- /dev/null +++ b/lib/agent_coordinator/agent.ex @@ -0,0 +1,66 @@ +defmodule AgentCoordinator.Agent do + @moduledoc """ + Agent data structure for the coordination system. + """ + + defstruct [ + :id, + :name, + :capabilities, + :status, + :current_task_id, + :last_heartbeat, + :metadata + ] + + @type status :: :idle | :busy | :offline | :error + @type capability :: :coding | :testing | :documentation | :analysis | :review + + @type t :: %__MODULE__{ + id: String.t(), + name: String.t(), + capabilities: [capability()], + status: status(), + current_task_id: String.t() | nil, + last_heartbeat: DateTime.t(), + metadata: map() + } + + def new(name, capabilities, opts \\ []) do + %__MODULE__{ + id: UUID.uuid4(), + name: name, + capabilities: capabilities, + status: :idle, + current_task_id: nil, + last_heartbeat: DateTime.utc_now(), + metadata: Keyword.get(opts, :metadata, %{}) + } + end + + def heartbeat(agent) do + %{agent | last_heartbeat: DateTime.utc_now()} + end + + def assign_task(agent, task_id) do + %{agent | status: :busy, current_task_id: task_id} + end + + def complete_task(agent) do + %{agent | status: :idle, current_task_id: nil} + end + + def is_online?(agent) do + DateTime.diff(DateTime.utc_now(), agent.last_heartbeat, :second) < 30 + end + + def can_handle?(agent, task) do + # Simple capability matching - can be enhanced + required_capabilities = Map.get(task.metadata, :required_capabilities, []) + + case required_capabilities do + [] -> true + caps -> Enum.any?(caps, fn cap -> cap in agent.capabilities end) + end + end +end \ No newline at end of file diff --git a/lib/agent_coordinator/application.ex b/lib/agent_coordinator/application.ex new file mode 100644 index 0000000..a4acff1 --- /dev/null +++ b/lib/agent_coordinator/application.ex @@ -0,0 +1,43 @@ +defmodule AgentCoordinator.Application do + # See https://hexdocs.pm/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + children = [ + # Registry for agent inboxes + {Registry, keys: :unique, name: AgentCoordinator.InboxRegistry}, + + # PubSub for real-time updates + {Phoenix.PubSub, name: AgentCoordinator.PubSub}, + + # Persistence layer + {AgentCoordinator.Persistence, nats: nats_config()}, + + # Task registry with NATS integration + {AgentCoordinator.TaskRegistry, nats: nats_config()}, + + # MCP server + AgentCoordinator.MCPServer, + + # Dynamic supervisor for agent inboxes + {DynamicSupervisor, name: AgentCoordinator.InboxSupervisor, strategy: :one_for_one} + ] + + opts = [strategy: :one_for_one, name: AgentCoordinator.Supervisor] + Supervisor.start_link(children, opts) + end + + defp nats_config do + [ + host: System.get_env("NATS_HOST", "localhost"), + port: String.to_integer(System.get_env("NATS_PORT", "4222")), + connection_settings: [ + name: :agent_coordinator + ] + ] + end +end diff --git a/lib/agent_coordinator/cli.ex b/lib/agent_coordinator/cli.ex new file mode 100644 index 0000000..4954473 --- /dev/null +++ b/lib/agent_coordinator/cli.ex @@ -0,0 +1,200 @@ +defmodule AgentCoordinator.CLI do + @moduledoc """ + Command line interface for testing the agent coordination system. + """ + + alias AgentCoordinator.{MCPServer, TaskRegistry, Inbox, Agent, Task} + + def main(args \\ []) do + case args do + ["register", name | capabilities] -> + register_agent(name, capabilities) + + ["create-task", title, description | opts] -> + create_task(title, description, parse_task_opts(opts)) + + ["board"] -> + show_task_board() + + ["agent-status", agent_id] -> + show_agent_status(agent_id) + + ["help"] -> + show_help() + + _ -> + IO.puts("Invalid command. Use 'help' for usage information.") + end + end + + defp register_agent(name, capabilities) do + caps = Enum.map(capabilities, &String.to_existing_atom/1) + + request = %{ + "method" => "tools/call", + "params" => %{ + "name" => "register_agent", + "arguments" => %{ + "name" => name, + "capabilities" => capabilities + } + } + } + + case MCPServer.handle_mcp_request(request) do + %{"result" => %{"content" => [%{"text" => result}]}} -> + data = Jason.decode!(result) + IO.puts("✓ Agent registered successfully!") + IO.puts(" Agent ID: #{data["agent_id"]}") + IO.puts(" Status: #{data["status"]}") + + %{"error" => %{"message" => message}} -> + IO.puts("✗ Registration failed: #{message}") + end + end + + defp create_task(title, description, opts) do + request = %{ + "method" => "tools/call", + "params" => %{ + "name" => "create_task", + "arguments" => Map.merge(%{ + "title" => title, + "description" => description + }, opts) + } + } + + case MCPServer.handle_mcp_request(request) do + %{"result" => %{"content" => [%{"text" => result}]}} -> + data = Jason.decode!(result) + IO.puts("✓ Task created successfully!") + IO.puts(" Task ID: #{data["task_id"]}") + IO.puts(" Status: #{data["status"]}") + + if Map.has_key?(data, "assigned_to") do + IO.puts(" Assigned to: #{data["assigned_to"]}") + end + + %{"error" => %{"message" => message}} -> + IO.puts("✗ Task creation failed: #{message}") + end + end + + defp show_task_board do + request = %{ + "method" => "tools/call", + "params" => %{ + "name" => "get_task_board", + "arguments" => %{} + } + } + + case MCPServer.handle_mcp_request(request) do + %{"result" => %{"content" => [%{"text" => result}]}} -> + %{"agents" => agents} = Jason.decode!(result) + + IO.puts("\n📋 Task Board") + IO.puts(String.duplicate("=", 50)) + + if Enum.empty?(agents) do + IO.puts("No agents registered.") + else + Enum.each(agents, &print_agent_summary/1) + end + + error -> + IO.puts("✗ Failed to fetch task board: #{inspect(error)}") + end + end + + defp show_agent_status(agent_id) do + case Inbox.get_status(agent_id) do + status -> + IO.puts("\n👤 Agent Status: #{agent_id}") + IO.puts(String.duplicate("-", 30)) + IO.puts("Pending tasks: #{status.pending_count}") + IO.puts("Completed tasks: #{status.completed_count}") + + case status.current_task do + nil -> + IO.puts("Current task: None") + + task -> + IO.puts("Current task: #{task.title}") + IO.puts(" Description: #{task.description}") + IO.puts(" Priority: #{task.priority}") + end + end + end + + defp print_agent_summary(agent) do + status_icon = case agent["status"] do + "idle" -> "💤" + "busy" -> "🔧" + "offline" -> "❌" + _ -> "❓" + end + + online_status = if agent["online"], do: "🟢", else: "🔴" + + IO.puts("\n#{status_icon} #{agent["name"]} (#{agent["agent_id"]}) #{online_status}") + IO.puts(" Capabilities: #{Enum.join(agent["capabilities"], ", ")}") + IO.puts(" Pending: #{agent["pending_tasks"]} | Completed: #{agent["completed_tasks"]}") + + case agent["current_task"] do + nil -> + IO.puts(" Current: No active task") + + task -> + IO.puts(" Current: #{task["title"]}") + end + end + + defp parse_task_opts(opts) do + Enum.reduce(opts, %{}, fn opt, acc -> + case String.split(opt, "=", parts: 2) do + ["priority", value] -> + Map.put(acc, "priority", value) + + ["files", files] -> + Map.put(acc, "file_paths", String.split(files, ",")) + + ["caps", capabilities] -> + Map.put(acc, "required_capabilities", String.split(capabilities, ",")) + + _ -> + acc + end + end) + end + + defp show_help do + IO.puts(""" + Agent Coordinator CLI + + Commands: + register ... + Register a new agent with specified capabilities + Capabilities: coding, testing, documentation, analysis, review + + create-task <description> [priority=<low|normal|high|urgent>] [files=<file1,file2>] [caps=<cap1,cap2>] + Create a new task with optional parameters + + board + Show current task board with all agents and their status + + agent-status <agent-id> + Show detailed status for a specific agent + + help + Show this help message + + Examples: + register "CodeBot" coding testing + create-task "Fix login bug" "User login fails with 500 error" priority=high files=auth.ex,login.ex + board + agent-status abc-123-def + """) + end +end \ No newline at end of file diff --git a/lib/agent_coordinator/inbox.ex b/lib/agent_coordinator/inbox.ex new file mode 100644 index 0000000..b8d2a3b --- /dev/null +++ b/lib/agent_coordinator/inbox.ex @@ -0,0 +1,160 @@ +defmodule AgentCoordinator.Inbox do + @moduledoc """ + Agent inbox management using GenServer for each agent's task queue. + """ + + use GenServer + alias AgentCoordinator.{Task, Agent} + + defstruct [ + :agent_id, + :pending_tasks, + :in_progress_task, + :completed_tasks, + :max_history + ] + + @type t :: %__MODULE__{ + agent_id: String.t(), + pending_tasks: [Task.t()], + in_progress_task: Task.t() | nil, + completed_tasks: [Task.t()], + max_history: non_neg_integer() + } + + # Client API + + def start_link(agent_id, opts \\ []) do + GenServer.start_link(__MODULE__, {agent_id, opts}, name: via_tuple(agent_id)) + end + + def add_task(agent_id, task) do + GenServer.call(via_tuple(agent_id), {:add_task, task}) + end + + def get_next_task(agent_id) do + GenServer.call(via_tuple(agent_id), :get_next_task) + end + + def complete_current_task(agent_id) do + GenServer.call(via_tuple(agent_id), :complete_current_task) + end + + def get_status(agent_id) do + GenServer.call(via_tuple(agent_id), :get_status) + end + + def list_tasks(agent_id) do + GenServer.call(via_tuple(agent_id), :list_tasks) + end + + # Server callbacks + + def init({agent_id, opts}) do + state = %__MODULE__{ + agent_id: agent_id, + pending_tasks: [], + in_progress_task: nil, + completed_tasks: [], + max_history: Keyword.get(opts, :max_history, 100) + } + + {:ok, state} + end + + def handle_call({:add_task, task}, _from, state) do + # Insert task based on priority + pending_tasks = insert_by_priority(state.pending_tasks, task) + new_state = %{state | pending_tasks: pending_tasks} + + # Broadcast task added + Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "agent:#{state.agent_id}", + {:task_added, task}) + + {:reply, :ok, new_state} + end + + def handle_call(:get_next_task, _from, state) do + case state.pending_tasks do + [] -> + {:reply, nil, state} + + [next_task | remaining_tasks] -> + updated_task = Task.assign_to_agent(next_task, state.agent_id) + new_state = %{state | + pending_tasks: remaining_tasks, + in_progress_task: updated_task + } + + # Broadcast task started + Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global", + {:task_started, updated_task}) + + {:reply, updated_task, new_state} + end + end + + def handle_call(:complete_current_task, _from, state) do + case state.in_progress_task do + nil -> + {:reply, {:error, :no_task_in_progress}, state} + + task -> + completed_task = Task.complete(task) + + # Add to completed tasks with history limit + completed_tasks = [completed_task | state.completed_tasks] + |> Enum.take(state.max_history) + + new_state = %{state | + in_progress_task: nil, + completed_tasks: completed_tasks + } + + # Broadcast task completed + Phoenix.PubSub.broadcast(AgentCoordinator.PubSub, "global", + {:task_completed, completed_task}) + + {:reply, completed_task, new_state} + end + end + + def handle_call(:get_status, _from, state) do + status = %{ + agent_id: state.agent_id, + pending_count: length(state.pending_tasks), + current_task: state.in_progress_task, + completed_count: length(state.completed_tasks) + } + + {:reply, status, state} + end + + def handle_call(:list_tasks, _from, state) do + tasks = %{ + pending: state.pending_tasks, + in_progress: state.in_progress_task, + completed: Enum.take(state.completed_tasks, 10) # Recent 10 + } + + {:reply, tasks, state} + end + + # Private helpers + + defp via_tuple(agent_id) do + {:via, Registry, {AgentCoordinator.InboxRegistry, agent_id}} + end + + defp insert_by_priority(tasks, new_task) do + priority_order = %{urgent: 0, high: 1, normal: 2, low: 3} + new_priority = Map.get(priority_order, new_task.priority, 2) + + {before, after} = Enum.split_while(tasks, fn task -> + task_priority = Map.get(priority_order, task.priority, 2) + task_priority <= new_priority + end) + + before ++ [new_task] ++ after + end +end \ No newline at end of file diff --git a/lib/agent_coordinator/mcp_server.ex b/lib/agent_coordinator/mcp_server.ex new file mode 100644 index 0000000..8871661 --- /dev/null +++ b/lib/agent_coordinator/mcp_server.ex @@ -0,0 +1,263 @@ +defmodule AgentCoordinator.MCPServer do + @moduledoc """ + MCP (Model Context Protocol) server for agent coordination. + Provides tools for agents to interact with the task coordination system. + """ + + use GenServer + alias AgentCoordinator.{TaskRegistry, Inbox, Agent, Task} + + @mcp_tools [ + %{ + "name" => "register_agent", + "description" => "Register a new agent with the coordination system", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "name" => %{"type" => "string"}, + "capabilities" => %{ + "type" => "array", + "items" => %{"type" => "string", "enum" => ["coding", "testing", "documentation", "analysis", "review"]} + } + }, + "required" => ["name", "capabilities"] + } + }, + %{ + "name" => "create_task", + "description" => "Create a new task in the coordination system", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "title" => %{"type" => "string"}, + "description" => %{"type" => "string"}, + "priority" => %{"type" => "string", "enum" => ["low", "normal", "high", "urgent"]}, + "file_paths" => %{"type" => "array", "items" => %{"type" => "string"}}, + "required_capabilities" => %{ + "type" => "array", + "items" => %{"type" => "string"} + } + }, + "required" => ["title", "description"] + } + }, + %{ + "name" => "get_next_task", + "description" => "Get the next task for an agent", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "agent_id" => %{"type" => "string"} + }, + "required" => ["agent_id"] + } + }, + %{ + "name" => "complete_task", + "description" => "Mark current task as completed", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "agent_id" => %{"type" => "string"} + }, + "required" => ["agent_id"] + } + }, + %{ + "name" => "get_task_board", + "description" => "Get overview of all agents and their current tasks", + "inputSchema" => %{ + "type" => "object", + "properties" => %{} + } + }, + %{ + "name" => "heartbeat", + "description" => "Send heartbeat to maintain agent status", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "agent_id" => %{"type" => "string"} + }, + "required" => ["agent_id"] + } + } + ] + + # Client API + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def handle_mcp_request(request) do + GenServer.call(__MODULE__, {:mcp_request, request}) + end + + def get_tools do + @mcp_tools + end + + # Server callbacks + + def init(_opts) do + {:ok, %{}} + end + + def handle_call({:mcp_request, request}, _from, state) do + response = process_mcp_request(request) + {:reply, response, state} + end + + # MCP request processing + + defp process_mcp_request(%{"method" => "tools/list"}) do + %{ + "jsonrpc" => "2.0", + "result" => %{"tools" => @mcp_tools} + } + end + + defp process_mcp_request(%{ + "method" => "tools/call", + "params" => %{"name" => tool_name, "arguments" => args} + } = request) do + id = Map.get(request, "id", nil) + + result = case tool_name do + "register_agent" -> register_agent(args) + "create_task" -> create_task(args) + "get_next_task" -> get_next_task(args) + "complete_task" -> complete_task(args) + "get_task_board" -> get_task_board(args) + "heartbeat" -> heartbeat(args) + _ -> {:error, "Unknown tool: #{tool_name}"} + end + + case result do + {:ok, data} -> + %{ + "jsonrpc" => "2.0", + "id" => id, + "result" => %{"content" => [%{"type" => "text", "text" => Jason.encode!(data)}]} + } + + {:error, reason} -> + %{ + "jsonrpc" => "2.0", + "id" => id, + "error" => %{"code" => -1, "message" => reason} + } + end + end + + defp process_mcp_request(_request) do + %{ + "jsonrpc" => "2.0", + "error" => %{"code" => -32601, "message" => "Method not found"} + } + end + + # Tool implementations + + defp register_agent(%{"name" => name, "capabilities" => capabilities}) do + caps = Enum.map(capabilities, &String.to_existing_atom/1) + agent = Agent.new(name, caps) + + case TaskRegistry.register_agent(agent) do + :ok -> + # Start inbox for the agent + {:ok, _pid} = Inbox.start_link(agent.id) + {:ok, %{agent_id: agent.id, status: "registered"}} + + {:error, reason} -> + {:error, "Failed to register agent: #{reason}"} + end + end + + defp create_task(%{"title" => title, "description" => description} = args) do + opts = [ + priority: String.to_existing_atom(Map.get(args, "priority", "normal")), + file_paths: Map.get(args, "file_paths", []), + metadata: %{ + required_capabilities: Map.get(args, "required_capabilities", []) + } + ] + + task = Task.new(title, description, opts) + + case TaskRegistry.assign_task(task) do + {:ok, agent_id} -> + {:ok, %{task_id: task.id, assigned_to: agent_id, status: "assigned"}} + + {:error, :no_available_agents} -> + # Add to global pending queue + TaskRegistry.add_to_pending(task) + {:ok, %{task_id: task.id, status: "queued"}} + end + end + + defp get_next_task(%{"agent_id" => agent_id}) do + case Inbox.get_next_task(agent_id) do + nil -> + {:ok, %{message: "No tasks available"}} + + task -> + {:ok, %{ + task_id: task.id, + title: task.title, + description: task.description, + file_paths: task.file_paths, + priority: task.priority + }} + end + end + + defp complete_task(%{"agent_id" => agent_id}) do + case Inbox.complete_current_task(agent_id) do + {:error, reason} -> + {:error, "Failed to complete task: #{reason}"} + + completed_task -> + {:ok, %{ + task_id: completed_task.id, + status: "completed", + completed_at: completed_task.updated_at + }} + end + end + + defp get_task_board(_args) do + agents = TaskRegistry.list_agents() + + board = Enum.map(agents, fn agent -> + status = Inbox.get_status(agent.id) + + %{ + agent_id: agent.id, + name: agent.name, + capabilities: agent.capabilities, + status: agent.status, + online: Agent.is_online?(agent), + current_task: status.current_task && %{ + id: status.current_task.id, + title: status.current_task.title + }, + pending_tasks: status.pending_count, + completed_tasks: status.completed_count + } + end) + + {:ok, %{agents: board}} + end + + defp heartbeat(%{"agent_id" => agent_id}) do + case TaskRegistry.heartbeat_agent(agent_id) do + :ok -> + {:ok, %{status: "heartbeat_received"}} + + {:error, reason} -> + {:error, "Heartbeat failed: #{reason}"} + end + end +end \ No newline at end of file diff --git a/lib/agent_coordinator/persistence.ex b/lib/agent_coordinator/persistence.ex new file mode 100644 index 0000000..4c0b3ee --- /dev/null +++ b/lib/agent_coordinator/persistence.ex @@ -0,0 +1,209 @@ +defmodule AgentCoordinator.Persistence do + @moduledoc """ + Persistent storage for tasks and events using NATS JetStream. + Provides configurable retention policies and event replay capabilities. + """ + + use GenServer + alias AgentCoordinator.{Task, Agent} + + defstruct [ + :nats_conn, + :stream_name, + :retention_policy + ] + + @stream_config %{ + "name" => "AGENT_COORDINATION", + "subjects" => ["agent.*", "task.*"], + "storage" => "file", + "max_msgs" => 1_000_000, + "max_bytes" => 1_000_000_000, # 1GB + "max_age" => 7 * 24 * 60 * 60 * 1_000_000_000, # 7 days in nanoseconds + "max_msg_size" => 1_000_000, # 1MB + "retention" => "limits", + "discard" => "old" + } + + # Client API + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def store_event(subject, data) do + GenServer.cast(__MODULE__, {:store_event, subject, data}) + end + + def get_agent_history(agent_id, opts \\ []) do + GenServer.call(__MODULE__, {:get_agent_history, agent_id, opts}) + end + + def get_task_history(task_id, opts \\ []) do + GenServer.call(__MODULE__, {:get_task_history, task_id, opts}) + end + + def replay_events(subject_filter, opts \\ []) do + GenServer.call(__MODULE__, {:replay_events, subject_filter, opts}) + end + + def get_system_stats do + GenServer.call(__MODULE__, :get_system_stats) + end + + # Server callbacks + + def init(opts) do + nats_config = Keyword.get(opts, :nats, []) + retention_policy = Keyword.get(opts, :retention_policy, :default) + + {:ok, nats_conn} = Gnat.start_link(nats_config) + + # Create or update JetStream + create_or_update_stream(nats_conn) + + state = %__MODULE__{ + nats_conn: nats_conn, + stream_name: @stream_config["name"], + retention_policy: retention_policy + } + + {:ok, state} + end + + def handle_cast({:store_event, subject, data}, state) do + enriched_data = enrich_event_data(data) + message = Jason.encode!(enriched_data) + + # Publish to JetStream + case Gnat.pub(state.nats_conn, subject, message, headers: event_headers()) do + :ok -> :ok + {:error, reason} -> + IO.puts("Failed to store event: #{inspect(reason)}") + end + + {:noreply, state} + end + + def handle_call({:get_agent_history, agent_id, opts}, _from, state) do + subject_filter = "agent.*.#{agent_id}" + limit = Keyword.get(opts, :limit, 100) + + events = fetch_events(state.nats_conn, subject_filter, limit) + {:reply, events, state} + end + + def handle_call({:get_task_history, task_id, opts}, _from, state) do + subject_filter = "task.*" + limit = Keyword.get(opts, :limit, 100) + + events = fetch_events(state.nats_conn, subject_filter, limit) + |> Enum.filter(fn event -> + case Map.get(event, "task") do + %{"id" => ^task_id} -> true + _ -> false + end + end) + + {:reply, events, state} + end + + def handle_call({:replay_events, subject_filter, opts}, _from, state) do + limit = Keyword.get(opts, :limit, 1000) + start_time = Keyword.get(opts, :start_time) + + events = fetch_events(state.nats_conn, subject_filter, limit, start_time) + {:reply, events, state} + end + + def handle_call(:get_system_stats, _from, state) do + stats = get_stream_info(state.nats_conn, state.stream_name) + {:reply, stats, state} + end + + # Private helpers + + defp create_or_update_stream(conn) do + # Check if stream exists + case get_stream_info(conn, @stream_config["name"]) do + nil -> + # Create new stream + create_stream(conn, @stream_config) + + _existing -> + # Update existing stream if needed + update_stream(conn, @stream_config) + end + end + + defp create_stream(conn, config) do + request = %{ + "type" => "io.nats.jetstream.api.v1.stream_create_request", + "config" => config + } + + case Gnat.request(conn, "$JS.API.STREAM.CREATE.#{config["name"]}", Jason.encode!(request)) do + {:ok, response} -> + case Jason.decode!(response.body) do + %{"error" => error} -> + IO.puts("Failed to create stream: #{inspect(error)}") + {:error, error} + + result -> + IO.puts("Stream created successfully") + {:ok, result} + end + + {:error, reason} -> + IO.puts("Failed to create stream: #{inspect(reason)}") + {:error, reason} + end + end + + defp update_stream(conn, config) do + # For simplicity, we'll just ensure the stream exists + # In production, you might want more sophisticated update logic + :ok + end + + defp get_stream_info(conn, stream_name) do + case Gnat.request(conn, "$JS.API.STREAM.INFO.#{stream_name}", "") do + {:ok, response} -> + case Jason.decode!(response.body) do + %{"error" => _} -> nil + info -> info + end + + {:error, _} -> nil + end + end + + defp fetch_events(conn, subject_filter, limit, start_time \\ nil) do + # Create a consumer to fetch messages + consumer_config = %{ + "durable_name" => "temp_#{:rand.uniform(10000)}", + "deliver_policy" => if(start_time, do: "by_start_time", else: "all"), + "opt_start_time" => start_time, + "max_deliver" => 1, + "ack_policy" => "explicit" + } + + # This is a simplified implementation + # In production, you'd use proper JetStream consumer APIs + [] # Return empty for now - would implement full JetStream integration + end + + defp enrich_event_data(data) do + Map.merge(data, %{ + "timestamp" => DateTime.utc_now() |> DateTime.to_iso8601(), + "version" => "1.0" + }) + end + + defp event_headers do + [ + {"content-type", "application/json"}, + {"source", "agent-coordinator"} + ] + end +end \ No newline at end of file diff --git a/lib/agent_coordinator/task.ex b/lib/agent_coordinator/task.ex new file mode 100644 index 0000000..95c07f0 --- /dev/null +++ b/lib/agent_coordinator/task.ex @@ -0,0 +1,76 @@ +defmodule AgentCoordinator.Task do + @moduledoc """ + Task data structure for agent coordination system. + """ + + defstruct [ + :id, + :title, + :description, + :status, + :priority, + :agent_id, + :file_paths, + :dependencies, + :created_at, + :updated_at, + :metadata + ] + + @type status :: :pending | :in_progress | :completed | :failed | :blocked + @type priority :: :low | :normal | :high | :urgent + + @type t :: %__MODULE__{ + id: String.t(), + title: String.t(), + description: String.t(), + status: status(), + priority: priority(), + agent_id: String.t() | nil, + file_paths: [String.t()], + dependencies: [String.t()], + created_at: DateTime.t(), + updated_at: DateTime.t(), + metadata: map() + } + + def new(title, description, opts \\ []) do + now = DateTime.utc_now() + + %__MODULE__{ + id: UUID.uuid4(), + title: title, + description: description, + status: Keyword.get(opts, :status, :pending), + priority: Keyword.get(opts, :priority, :normal), + agent_id: Keyword.get(opts, :agent_id), + file_paths: Keyword.get(opts, :file_paths, []), + dependencies: Keyword.get(opts, :dependencies, []), + created_at: now, + updated_at: now, + metadata: Keyword.get(opts, :metadata, %{}) + } + end + + def assign_to_agent(task, agent_id) do + %{task | agent_id: agent_id, status: :in_progress, updated_at: DateTime.utc_now()} + end + + def complete(task) do + %{task | status: :completed, updated_at: DateTime.utc_now()} + end + + def fail(task, reason \\ nil) do + metadata = if reason, do: Map.put(task.metadata, :failure_reason, reason), else: task.metadata + %{task | status: :failed, metadata: metadata, updated_at: DateTime.utc_now()} + end + + def block(task, reason \\ nil) do + metadata = if reason, do: Map.put(task.metadata, :block_reason, reason), else: task.metadata + %{task | status: :blocked, metadata: metadata, updated_at: DateTime.utc_now()} + end + + def has_file_conflict?(task1, task2) do + not MapSet.disjoint?(MapSet.new(task1.file_paths), MapSet.new(task2.file_paths)) + end +end \ No newline at end of file diff --git a/lib/agent_coordinator/task_registry.ex b/lib/agent_coordinator/task_registry.ex new file mode 100644 index 0000000..ecc0ea7 --- /dev/null +++ b/lib/agent_coordinator/task_registry.ex @@ -0,0 +1,256 @@ +defmodule AgentCoordinator.TaskRegistry do + @moduledoc """ + Central registry for agents and task assignment with NATS integration. + """ + + use GenServer + alias AgentCoordinator.{Agent, Task, Inbox} + + defstruct [ + :agents, + :pending_tasks, + :file_locks, + :nats_conn + ] + + # Client API + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def register_agent(agent) do + GenServer.call(__MODULE__, {:register_agent, agent}) + end + + def assign_task(task) do + GenServer.call(__MODULE__, {:assign_task, task}) + end + + def add_to_pending(task) do + GenServer.call(__MODULE__, {:add_to_pending, task}) + end + + def list_agents do + GenServer.call(__MODULE__, :list_agents) + end + + def heartbeat_agent(agent_id) do + GenServer.call(__MODULE__, {:heartbeat_agent, agent_id}) + end + + def get_file_locks do + GenServer.call(__MODULE__, :get_file_locks) + end + + # Server callbacks + + def init(opts) do + # Connect to NATS + nats_config = Keyword.get(opts, :nats, []) + {:ok, nats_conn} = Gnat.start_link(nats_config) + + # Subscribe to task events + Gnat.sub(nats_conn, self(), "agent.task.*") + Gnat.sub(nats_conn, self(), "agent.heartbeat.*") + + state = %__MODULE__{ + agents: %{}, + pending_tasks: [], + file_locks: %{}, + nats_conn: nats_conn + } + + {:ok, state} + end + + def handle_call({:register_agent, agent}, _from, state) do + # Check for duplicate names + case Enum.find(state.agents, fn {_id, a} -> a.name == agent.name end) do + nil -> + new_agents = Map.put(state.agents, agent.id, agent) + new_state = %{state | agents: new_agents} + + # Publish agent registration + publish_event(state.nats_conn, "agent.registered", %{agent: agent}) + + # Try to assign pending tasks + {assigned_tasks, remaining_pending} = assign_pending_tasks(new_state) + final_state = %{new_state | pending_tasks: remaining_pending} + + {:reply, :ok, final_state} + + _ -> + {:reply, {:error, "Agent name already exists"}, state} + end + end + + def handle_call({:assign_task, task}, _from, state) do + case find_available_agent(state, task) do + nil -> + {:reply, {:error, :no_available_agents}, state} + + agent -> + # Check for file conflicts + case check_file_conflicts(state, task) do + [] -> + # No conflicts, assign task + assign_task_to_agent(state, task, agent.id) + + conflicts -> + # Block task due to conflicts + blocked_task = Task.block(task, "File conflicts: #{inspect(conflicts)}") + new_pending = [blocked_task | state.pending_tasks] + + publish_event(state.nats_conn, "task.blocked", %{ + task: blocked_task, + conflicts: conflicts + }) + + {:reply, {:error, :file_conflicts}, %{state | pending_tasks: new_pending}} + end + end + end + + def handle_call({:add_to_pending, task}, _from, state) do + new_pending = [task | state.pending_tasks] + publish_event(state.nats_conn, "task.queued", %{task: task}) + {:reply, :ok, %{state | pending_tasks: new_pending}} + end + + def handle_call(:list_agents, _from, state) do + agents = Map.values(state.agents) + {:reply, agents, state} + end + + def handle_call({:heartbeat_agent, agent_id}, _from, state) do + case Map.get(state.agents, agent_id) do + nil -> + {:reply, {:error, :agent_not_found}, state} + + agent -> + updated_agent = Agent.heartbeat(agent) + new_agents = Map.put(state.agents, agent_id, updated_agent) + new_state = %{state | agents: new_agents} + + publish_event(state.nats_conn, "agent.heartbeat", %{agent_id: agent_id}) + + {:reply, :ok, new_state} + end + end + + def handle_call(:get_file_locks, _from, state) do + {:reply, state.file_locks, state} + end + + # Handle NATS messages + def handle_info({:msg, %{topic: "agent.task.started", body: body}}, state) do + %{"task" => task_data} = Jason.decode!(body) + + # Update file locks + file_locks = add_file_locks(state.file_locks, task_data["id"], task_data["file_paths"]) + + {:noreply, %{state | file_locks: file_locks}} + end + + def handle_info({:msg, %{topic: "agent.task.completed", body: body}}, state) do + %{"task" => task_data} = Jason.decode!(body) + + # Remove file locks + file_locks = remove_file_locks(state.file_locks, task_data["id"]) + + # Try to assign pending tasks that might now be unblocked + {_assigned, remaining_pending} = assign_pending_tasks(%{state | file_locks: file_locks}) + + {:noreply, %{state | file_locks: file_locks, pending_tasks: remaining_pending}} + end + + def handle_info({:msg, %{topic: topic}}, state) when topic != "agent.task.started" and topic != "agent.task.completed" do + # Ignore other messages for now + {:noreply, state} + end + + # Private helpers + + defp find_available_agent(state, task) do + state.agents + |> Map.values() + |> Enum.filter(fn agent -> + agent.status == :idle and + Agent.is_online?(agent) and + Agent.can_handle?(agent, task) + end) + |> Enum.sort_by(fn agent -> + # Prefer agents with fewer pending tasks + case Inbox.get_status(agent.id) do + %{pending_count: count} -> count + _ -> 999 + end + end) + |> List.first() + end + + defp check_file_conflicts(state, task) do + task.file_paths + |> Enum.filter(fn file_path -> + Map.has_key?(state.file_locks, file_path) + end) + end + + defp assign_task_to_agent(state, task, agent_id) do + # Add to agent's inbox + Inbox.add_task(agent_id, task) + + # Update agent status + agent = Map.get(state.agents, agent_id) + updated_agent = Agent.assign_task(agent, task.id) + new_agents = Map.put(state.agents, agent_id, updated_agent) + + # Publish assignment + publish_event(state.nats_conn, "task.assigned", %{ + task: task, + agent_id: agent_id + }) + + {:reply, {:ok, agent_id}, %{state | agents: new_agents}} + end + + defp assign_pending_tasks(state) do + {assigned, remaining} = Enum.reduce(state.pending_tasks, {[], []}, fn task, {assigned, pending} -> + case find_available_agent(state, task) do + nil -> + {assigned, [task | pending]} + + agent -> + case check_file_conflicts(state, task) do + [] -> + Inbox.add_task(agent.id, task) + {[{task, agent.id} | assigned], pending} + + _conflicts -> + {assigned, [task | pending]} + end + end + end) + + {assigned, Enum.reverse(remaining)} + end + + defp add_file_locks(file_locks, task_id, file_paths) do + Enum.reduce(file_paths, file_locks, fn path, locks -> + Map.put(locks, path, task_id) + end) + end + + defp remove_file_locks(file_locks, task_id) do + Enum.reject(file_locks, fn {_path, locked_task_id} -> + locked_task_id == task_id + end) + |> Map.new() + end + + defp publish_event(conn, topic, data) do + message = Jason.encode!(data) + Gnat.pub(conn, topic, message) + end +end \ No newline at end of file diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..6b3c14f --- /dev/null +++ b/mix.exs @@ -0,0 +1,32 @@ +defmodule AgentCoordinator.MixProject do + use Mix.Project + + def project do + [ + app: :agent_coordinator, + version: "0.1.0", + elixir: "~> 1.18", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {AgentCoordinator.Application, []} + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:jason, "~> 1.4"}, + {:gnat, "~> 1.8"}, + {:phoenix_pubsub, "~> 2.1"}, + {:gen_stage, "~> 1.2"}, + {:uuid, "~> 1.1"} + ] + end +end diff --git a/test/agent_coordinator_test.exs b/test/agent_coordinator_test.exs new file mode 100644 index 0000000..c08f0db --- /dev/null +++ b/test/agent_coordinator_test.exs @@ -0,0 +1,8 @@ +defmodule AgentCoordinatorTest do + use ExUnit.Case + doctest AgentCoordinator + + test "greets the world" do + assert AgentCoordinator.hello() == :world + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start()