Add comprehensive agent activity tracking

- Enhanced Agent struct with current_activity, current_files, and activity_history fields
- Created ActivityTracker module to infer activities from tool calls
- Integrated activity tracking into MCP server tool routing
- Updated task board APIs to include activity information
- Agents now show real-time status like 'Reading file.ex', 'Editing main.py', 'Sequential thinking', etc.
- Added activity history to track recent agent actions
- All file operations and tool calls are now tracked and displayed
This commit is contained in:
Ra
2025-09-06 09:58:59 -07:00
parent 1056672e7c
commit b1f55799ec
27 changed files with 4761 additions and 321 deletions

View File

@@ -0,0 +1,291 @@
defmodule AgentCoordinator.ActivityTracker do
@moduledoc """
Tracks agent activities based on tool calls and infers human-readable activity descriptions.
"""
alias AgentCoordinator.{Agent, TaskRegistry}
@doc """
Infer activity description and files from tool name and arguments.
Returns {activity_description, files_list}.
"""
def infer_activity(tool_name, args) do
case tool_name do
# File operations
"read_file" ->
file_path = extract_file_path(args)
{"Reading #{Path.basename(file_path || "file")}", [file_path]}
"read_text_file" ->
file_path = extract_file_path(args)
{"Reading #{Path.basename(file_path || "file")}", [file_path]}
"read_multiple_files" ->
files = Map.get(args, "paths", [])
file_names = Enum.map(files, &Path.basename/1)
{"Reading #{length(files)} files: #{Enum.join(file_names, ", ")}", files}
"write_file" ->
file_path = extract_file_path(args)
{"Writing #{Path.basename(file_path || "file")}", [file_path]}
"edit_file" ->
file_path = extract_file_path(args)
{"Editing #{Path.basename(file_path || "file")}", [file_path]}
"create_file" ->
file_path = extract_file_path(args)
{"Creating #{Path.basename(file_path || "file")}", [file_path]}
"move_file" ->
source = Map.get(args, "source")
dest = Map.get(args, "destination")
files = [source, dest] |> Enum.filter(&(&1))
{"Moving #{Path.basename(source || "file")} to #{Path.basename(dest || "destination")}", files}
# VS Code operations
"vscode_read_file" ->
file_path = extract_file_path(args)
{"Reading #{Path.basename(file_path || "file")} in VS Code", [file_path]}
"vscode_write_file" ->
file_path = extract_file_path(args)
{"Writing #{Path.basename(file_path || "file")} in VS Code", [file_path]}
"vscode_set_editor_content" ->
file_path = Map.get(args, "file_path")
if file_path do
{"Editing #{Path.basename(file_path)} in VS Code", [file_path]}
else
{"Editing active file in VS Code", []}
end
"vscode_get_active_editor" ->
{"Viewing active editor in VS Code", []}
"vscode_get_selection" ->
{"Viewing text selection in VS Code", []}
# Directory operations
"list_directory" ->
path = extract_file_path(args)
{"Browsing directory #{Path.basename(path || ".")}", []}
"list_directory_with_sizes" ->
path = extract_file_path(args)
{"Browsing directory #{Path.basename(path || ".")} with sizes", []}
"directory_tree" ->
path = extract_file_path(args)
{"Exploring directory tree for #{Path.basename(path || ".")}", []}
"create_directory" ->
path = extract_file_path(args)
{"Creating directory #{Path.basename(path || "directory")}", []}
# Search operations
"search_files" ->
pattern = Map.get(args, "pattern", "files")
{"Searching for #{pattern}", []}
"grep_search" ->
query = Map.get(args, "query", "text")
{"Searching for '#{query}' in files", []}
"semantic_search" ->
query = Map.get(args, "query", "content")
{"Semantic search for '#{query}'", []}
# Thinking operations
"sequentialthinking" ->
thought = Map.get(args, "thought", "")
thought_summary = String.slice(thought, 0, 50) |> String.trim()
{"Sequential thinking: #{thought_summary}...", []}
# Terminal operations
"run_in_terminal" ->
command = Map.get(args, "command", "command")
command_summary = String.slice(command, 0, 30) |> String.trim()
{"Running: #{command_summary}...", []}
"get_terminal_output" ->
{"Checking terminal output", []}
# Test operations
"runTests" ->
files = Map.get(args, "files", [])
if files != [] do
file_names = Enum.map(files, &Path.basename/1)
{"Running tests in #{Enum.join(file_names, ", ")}", files}
else
{"Running all tests", []}
end
# Task management
"create_task" ->
title = Map.get(args, "title", "task")
{"Creating task: #{title}", []}
"get_next_task" ->
{"Getting next task", []}
"complete_task" ->
{"Completing current task", []}
# Knowledge operations
"create_entities" ->
entities = Map.get(args, "entities", [])
count = length(entities)
{"Creating #{count} knowledge entities", []}
"create_relations" ->
relations = Map.get(args, "relations", [])
count = length(relations)
{"Creating #{count} knowledge relations", []}
"search_nodes" ->
query = Map.get(args, "query", "nodes")
{"Searching knowledge graph for '#{query}'", []}
"read_graph" ->
{"Reading knowledge graph", []}
# HTTP/Web operations
"fetch_webpage" ->
urls = Map.get(args, "urls", [])
if urls != [] do
{"Fetching #{length(urls)} webpages", []}
else
{"Fetching webpage", []}
end
# Development operations
"get_errors" ->
files = Map.get(args, "filePaths", [])
if files != [] do
file_names = Enum.map(files, &Path.basename/1)
{"Checking errors in #{Enum.join(file_names, ", ")}", files}
else
{"Checking all errors", []}
end
"list_code_usages" ->
symbol = Map.get(args, "symbolName", "symbol")
{"Finding usages of #{symbol}", []}
# Elixir-specific operations
"elixir-definition" ->
symbol = Map.get(args, "symbol", "symbol")
{"Finding definition of #{symbol}", []}
"elixir-docs" ->
modules = Map.get(args, "modules", [])
if modules != [] do
{"Getting docs for #{Enum.join(modules, ", ")}", []}
else
{"Getting Elixir documentation", []}
end
"elixir-environment" ->
location = Map.get(args, "location", "code")
{"Analyzing Elixir environment at #{location}", []}
# Python operations
"pylanceRunCodeSnippet" ->
{"Running Python code snippet", []}
"pylanceFileSyntaxErrors" ->
file_uri = Map.get(args, "fileUri")
if file_uri do
file_path = uri_to_path(file_uri)
{"Checking syntax errors in #{Path.basename(file_path)}", [file_path]}
else
{"Checking Python syntax errors", []}
end
# Default cases
tool_name when is_binary(tool_name) ->
cond do
String.starts_with?(tool_name, "vscode_") ->
action = String.replace(tool_name, "vscode_", "") |> String.replace("_", " ")
{"VS Code: #{action}", []}
String.starts_with?(tool_name, "elixir-") ->
action = String.replace(tool_name, "elixir-", "") |> String.replace("-", " ")
{"Elixir: #{action}", []}
String.starts_with?(tool_name, "pylance") ->
action = String.replace(tool_name, "pylance", "") |> humanize_string()
{"Python: #{action}", []}
String.contains?(tool_name, "_") ->
action = String.replace(tool_name, "_", " ") |> String.capitalize()
{action, []}
true ->
{String.capitalize(tool_name), []}
end
_ ->
{"Unknown activity", []}
end
end
@doc """
Update an agent's activity based on a tool call.
"""
def update_agent_activity(agent_id, tool_name, args) do
{activity, files} = infer_activity(tool_name, args)
case TaskRegistry.get_agent(agent_id) do
{:ok, agent} ->
updated_agent = Agent.update_activity(agent, activity, files)
# Update the agent in the registry
TaskRegistry.update_agent(agent_id, updated_agent)
{:error, _} ->
# Agent not found, ignore
:ok
end
end
@doc """
Clear an agent's activity (e.g., when they go idle).
"""
def clear_agent_activity(agent_id) do
case TaskRegistry.get_agent(agent_id) do
{:ok, agent} ->
updated_agent = Agent.clear_activity(agent)
TaskRegistry.update_agent(agent_id, updated_agent)
{:error, _} ->
:ok
end
end
# Private helper functions
defp extract_file_path(args) do
# Try various common parameter names for file paths
args["path"] || args["filePath"] || args["file_path"] ||
args["source"] || args["destination"] || args["fileUri"] |> uri_to_path()
end
defp uri_to_path(nil), do: nil
defp uri_to_path(uri) when is_binary(uri) do
if String.starts_with?(uri, "file://") do
String.replace_prefix(uri, "file://", "")
else
uri
end
end
defp humanize_string(str) do
str
|> String.split(~r/[A-Z]/)
|> Enum.map(&String.downcase/1)
|> Enum.filter(&(&1 != ""))
|> Enum.join(" ")
|> String.capitalize()
end
end

View File

@@ -13,7 +13,10 @@ defmodule AgentCoordinator.Agent do
:codebase_id,
:workspace_path,
:last_heartbeat,
:metadata
:metadata,
:current_activity,
:current_files,
:activity_history
]}
defstruct [
:id,
@@ -24,7 +27,10 @@ defmodule AgentCoordinator.Agent do
:codebase_id,
:workspace_path,
:last_heartbeat,
:metadata
:metadata,
:current_activity,
:current_files,
:activity_history
]
@type status :: :idle | :busy | :offline | :error
@@ -39,20 +45,45 @@ defmodule AgentCoordinator.Agent do
codebase_id: String.t(),
workspace_path: String.t() | nil,
last_heartbeat: DateTime.t(),
metadata: map()
metadata: map(),
current_activity: String.t() | nil,
current_files: [String.t()],
activity_history: [map()]
}
def new(name, capabilities, opts \\ []) do
workspace_path = Keyword.get(opts, :workspace_path)
# Use smart codebase identification
codebase_id = case Keyword.get(opts, :codebase_id) do
nil when workspace_path ->
# Auto-detect from workspace
case AgentCoordinator.CodebaseIdentifier.identify_codebase(workspace_path) do
%{canonical_id: canonical_id} -> canonical_id
_ -> Path.basename(workspace_path || "default")
end
nil ->
"default"
explicit_id ->
# Normalize the provided ID
AgentCoordinator.CodebaseIdentifier.normalize_codebase_reference(explicit_id, workspace_path)
end
%__MODULE__{
id: UUID.uuid4(),
name: name,
capabilities: capabilities,
status: :idle,
current_task_id: nil,
codebase_id: Keyword.get(opts, :codebase_id, "default"),
workspace_path: Keyword.get(opts, :workspace_path),
codebase_id: codebase_id,
workspace_path: workspace_path,
last_heartbeat: DateTime.utc_now(),
metadata: Keyword.get(opts, :metadata, %{})
metadata: Keyword.get(opts, :metadata, %{}),
current_activity: nil,
current_files: [],
activity_history: []
}
end
@@ -60,6 +91,33 @@ defmodule AgentCoordinator.Agent do
%{agent | last_heartbeat: DateTime.utc_now()}
end
def update_activity(agent, activity, files \\ []) do
# Add to activity history (keep last 10 activities)
activity_entry = %{
activity: activity,
files: files,
timestamp: DateTime.utc_now()
}
new_history = [activity_entry | agent.activity_history]
|> Enum.take(10)
%{agent |
current_activity: activity,
current_files: files,
activity_history: new_history,
last_heartbeat: DateTime.utc_now()
}
end
def clear_activity(agent) do
%{agent |
current_activity: nil,
current_files: [],
last_heartbeat: DateTime.utc_now()
}
end
def assign_task(agent, task_id) do
%{agent | status: :busy, current_task_id: task_id}
end

View File

@@ -24,9 +24,15 @@ defmodule AgentCoordinator.Application do
# Task registry with NATS integration (conditionally add persistence)
{AgentCoordinator.TaskRegistry, nats: if(enable_persistence, do: nats_config(), else: nil)},
# Session manager for MCP session token handling
AgentCoordinator.SessionManager,
# Unified MCP server (includes external server management, session tracking, and auto-registration)
AgentCoordinator.MCPServer,
# Interface manager for multiple MCP interface modes
AgentCoordinator.InterfaceManager,
# Auto-heartbeat manager
AgentCoordinator.AutoHeartbeat,

View File

@@ -0,0 +1,313 @@
defmodule AgentCoordinator.CodebaseIdentifier do
@moduledoc """
Smart codebase identification system that works across local and remote scenarios.
Generates canonical codebase identifiers using multiple strategies:
1. Git repository detection (preferred)
2. Local folder name fallback
3. Remote workspace mapping
4. Custom identifier override
"""
require Logger
@type codebase_info :: %{
canonical_id: String.t(),
display_name: String.t(),
workspace_path: String.t(),
repository_url: String.t() | nil,
git_remote: String.t() | nil,
branch: String.t() | nil,
commit_hash: String.t() | nil,
identification_method: :git_remote | :git_local | :folder_name | :custom
}
@doc """
Identify a codebase from a workspace path, generating a canonical ID.
Priority order:
1. Git remote URL (most reliable for distributed teams)
2. Git local repository info
3. Folder name (fallback for non-git projects)
4. Custom override from metadata
## Examples
# Git repository with remote
iex> identify_codebase("/home/user/my-project")
%{
canonical_id: "github.com/owner/my-project",
display_name: "my-project",
workspace_path: "/home/user/my-project",
repository_url: "https://github.com/owner/my-project.git",
git_remote: "origin",
branch: "main",
identification_method: :git_remote
}
# Local folder (no git)
iex> identify_codebase("/home/user/local-project")
%{
canonical_id: "local:/home/user/local-project",
display_name: "local-project",
workspace_path: "/home/user/local-project",
repository_url: nil,
identification_method: :folder_name
}
"""
def identify_codebase(workspace_path, opts \\ [])
def identify_codebase(nil, opts) do
custom_id = Keyword.get(opts, :custom_id, "default")
build_custom_codebase_info(nil, custom_id)
end
def identify_codebase(workspace_path, opts) do
custom_id = Keyword.get(opts, :custom_id)
cond do
custom_id ->
build_custom_codebase_info(workspace_path, custom_id)
git_repository?(workspace_path) ->
identify_git_codebase(workspace_path)
true ->
identify_folder_codebase(workspace_path)
end
end
@doc """
Normalize different codebase references to canonical IDs.
Handles cases where agents specify different local paths for same repository.
"""
def normalize_codebase_reference(codebase_ref, workspace_path) do
case codebase_ref do
# Already canonical
id when is_binary(id) ->
if String.contains?(id, ".com/") or String.starts_with?(id, "local:") do
id
else
# Folder name - try to resolve to canonical
case identify_codebase(workspace_path) do
%{canonical_id: canonical_id} -> canonical_id
_ -> "local:#{id}"
end
end
_ ->
# Fallback to folder-based ID
Path.basename(workspace_path || "/unknown")
end
end
@doc """
Check if two workspace paths refer to the same codebase.
Useful for detecting when agents from different machines work on same project.
"""
def same_codebase?(workspace_path1, workspace_path2) do
info1 = identify_codebase(workspace_path1)
info2 = identify_codebase(workspace_path2)
info1.canonical_id == info2.canonical_id
end
# Private functions
defp build_custom_codebase_info(workspace_path, custom_id) do
%{
canonical_id: custom_id,
display_name: custom_id,
workspace_path: workspace_path,
repository_url: nil,
git_remote: nil,
branch: nil,
commit_hash: nil,
identification_method: :custom
}
end
defp identify_git_codebase(workspace_path) do
with {:ok, git_info} <- get_git_info(workspace_path) do
canonical_id = case git_info.remote_url do
nil ->
# Local git repo without remote
"git-local:#{git_info.repo_name}"
remote_url ->
# Extract canonical identifier from remote URL
extract_canonical_from_remote(remote_url)
end
%{
canonical_id: canonical_id,
display_name: git_info.repo_name,
workspace_path: workspace_path,
repository_url: git_info.remote_url,
git_remote: git_info.remote_name,
branch: git_info.branch,
commit_hash: git_info.commit_hash,
identification_method: if(git_info.remote_url, do: :git_remote, else: :git_local)
}
else
_ ->
identify_folder_codebase(workspace_path)
end
end
defp identify_folder_codebase(workspace_path) when is_nil(workspace_path) do
%{
canonical_id: "default",
display_name: "default",
workspace_path: nil,
repository_url: nil,
git_remote: nil,
branch: nil,
commit_hash: nil,
identification_method: :folder_name
}
end
defp identify_folder_codebase(workspace_path) do
folder_name = Path.basename(workspace_path)
%{
canonical_id: "local:#{workspace_path}",
display_name: folder_name,
workspace_path: workspace_path,
repository_url: nil,
git_remote: nil,
branch: nil,
commit_hash: nil,
identification_method: :folder_name
}
end
defp git_repository?(workspace_path) when is_nil(workspace_path), do: false
defp git_repository?(workspace_path) do
File.exists?(Path.join(workspace_path, ".git"))
end
defp get_git_info(workspace_path) do
try do
# Get repository name
repo_name = Path.basename(workspace_path)
# Get current branch
{branch, 0} = System.cmd("git", ["branch", "--show-current"], cd: workspace_path)
branch = String.trim(branch)
# Get current commit
{commit_hash, 0} = System.cmd("git", ["rev-parse", "HEAD"], cd: workspace_path)
commit_hash = String.trim(commit_hash)
# Try to get remote URL
{remote_info, _remote_result_use_me?} = case System.cmd("git", ["remote", "-v"], cd: workspace_path) do
{output, 0} when output != "" ->
# Parse remote output to extract origin URL
lines = String.split(String.trim(output), "\n")
origin_line = Enum.find(lines, fn line ->
String.starts_with?(line, "origin") and String.contains?(line, "(fetch)")
end)
case origin_line do
nil -> {nil, :no_origin}
line ->
# Extract URL from "origin <url> (fetch)"
url = line
|> String.split()
|> Enum.at(1)
{url, :ok}
end
_ -> {nil, :no_remotes}
end
git_info = %{
repo_name: repo_name,
branch: branch,
commit_hash: commit_hash,
remote_url: remote_info,
remote_name: if(remote_info, do: "origin", else: nil)
}
{:ok, git_info}
rescue
_ -> {:error, :git_command_failed}
end
end
defp extract_canonical_from_remote(remote_url) do
cond do
# GitHub HTTPS
String.contains?(remote_url, "github.com") ->
extract_github_id(remote_url)
# GitLab HTTPS
String.contains?(remote_url, "gitlab.com") ->
extract_gitlab_id(remote_url)
# SSH format
String.contains?(remote_url, "@") and String.contains?(remote_url, ":") ->
extract_ssh_id(remote_url)
# Other HTTPS
String.starts_with?(remote_url, "https://") ->
extract_https_id(remote_url)
true ->
# Fallback - use raw URL
"remote:#{remote_url}"
end
end
defp extract_github_id(url) do
# Extract "owner/repo" from various GitHub URL formats
regex = ~r/github\.com[\/:]([^\/]+)\/([^\/\.]+)/
case Regex.run(regex, url) do
[_, owner, repo] ->
"github.com/#{owner}/#{repo}"
_ ->
"github.com/unknown"
end
end
defp extract_gitlab_id(url) do
# Similar logic for GitLab
regex = ~r/gitlab\.com[\/:]([^\/]+)\/([^\/\.]+)/
case Regex.run(regex, url) do
[_, owner, repo] ->
"gitlab.com/#{owner}/#{repo}"
_ ->
"gitlab.com/unknown"
end
end
defp extract_ssh_id(url) do
# SSH format: git@host:owner/repo.git
case String.split(url, ":") do
[host_part, path_part] ->
host = String.replace(host_part, ~r/.*@/, "")
path = String.replace(path_part, ".git", "")
"#{host}/#{path}"
_ ->
"ssh:#{url}"
end
end
defp extract_https_id(url) do
# Extract from general HTTPS URLs
uri = URI.parse(url)
host = uri.host
path = String.replace(uri.path || "", ~r/^\//, "")
path = String.replace(path, ".git", "")
if host && path != "" do
"#{host}/#{path}"
else
"https:#{url}"
end
end
end

View File

@@ -0,0 +1,597 @@
defmodule AgentCoordinator.HttpInterface do
@moduledoc """
HTTP and WebSocket interface for the Agent Coordinator MCP server.
This module provides:
- HTTP REST API for MCP requests
- WebSocket support for real-time communication
- Remote client detection and tool filtering
- CORS support for web clients
- Session management across HTTP requests
"""
use Plug.Router
require Logger
alias AgentCoordinator.{MCPServer, ToolFilter, SessionManager}
plug Plug.Logger
plug :match
plug Plug.Parsers, parsers: [:json], json_decoder: Jason
plug :put_cors_headers
plug :dispatch
@doc """
Start the HTTP server on the specified port.
"""
def start_link(opts \\ []) do
port = Keyword.get(opts, :port, 8080)
Logger.info("Starting Agent Coordinator HTTP interface on port #{port}")
Plug.Cowboy.http(__MODULE__, [],
port: port,
dispatch: cowboy_dispatch()
)
end
# HTTP Routes
get "/health" do
send_json_response(conn, 200, %{
status: "healthy",
service: "agent-coordinator",
version: AgentCoordinator.version(),
timestamp: DateTime.utc_now()
})
end
get "/mcp/capabilities" do
context = extract_client_context(conn)
# Get filtered tools based on client context
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, context)
capabilities = %{
protocolVersion: "2024-11-05",
serverInfo: %{
name: "agent-coordinator-http",
version: AgentCoordinator.version(),
description: "Agent Coordinator HTTP/WebSocket interface"
},
capabilities: %{
tools: %{},
coordination: %{
automatic_task_tracking: true,
agent_management: true,
multi_server_proxy: true,
heartbeat_coverage: true,
session_tracking: true,
tool_filtering: true
}
},
tools: filtered_tools,
context: %{
connection_type: context.connection_type,
security_level: context.security_level,
tool_count: length(filtered_tools)
}
}
send_json_response(conn, 200, capabilities)
end
get "/mcp/tools" do
context = extract_client_context(conn)
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, context)
filter_stats = ToolFilter.get_filter_stats(all_tools, context)
response = %{
tools: filtered_tools,
_meta: %{
filter_stats: filter_stats,
context: %{
connection_type: context.connection_type,
security_level: context.security_level
}
}
}
send_json_response(conn, 200, response)
end
post "/mcp/tools/:tool_name" do
context = extract_client_context(conn)
# Check if tool is allowed for this client
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, context)
tool_allowed = Enum.any?(filtered_tools, fn tool ->
Map.get(tool, "name") == tool_name
end)
if not tool_allowed do
send_json_response(conn, 403, %{
error: %{
code: -32601,
message: "Tool not available for remote clients: #{tool_name}",
data: %{
available_tools: Enum.map(filtered_tools, &Map.get(&1, "name")),
connection_type: context.connection_type
}
}
})
else
# Execute the tool call
args = Map.get(conn.body_params, "arguments", %{})
# Create MCP request format
mcp_request = %{
"jsonrpc" => "2.0",
"id" => Map.get(conn.body_params, "id", generate_request_id()),
"method" => "tools/call",
"params" => %{
"name" => tool_name,
"arguments" => args
}
}
# Add session tracking
mcp_request = add_session_info(mcp_request, conn, context)
# Execute through MCP server
case MCPServer.handle_mcp_request(mcp_request) do
%{"result" => result} ->
send_json_response(conn, 200, %{
result: result,
_meta: %{
tool_name: tool_name,
request_id: mcp_request["id"],
context: context.connection_type
}
})
%{"error" => error} ->
send_json_response(conn, 400, %{error: error})
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
send_json_response(conn, 500, %{
error: %{
code: -32603,
message: "Internal server error"
}
})
end
end
end
post "/mcp/request" do
context = extract_client_context(conn)
# Validate MCP request format
case validate_mcp_request(conn.body_params) do
{:ok, mcp_request} ->
method = Map.get(mcp_request, "method")
# Validate session for this method
case validate_session_for_method(method, conn, context) do
{:ok, _session_info} ->
# Add session tracking
enhanced_request = add_session_info(mcp_request, conn, context)
# For tool calls, check tool filtering
case method do
"tools/call" ->
tool_name = get_in(enhanced_request, ["params", "name"])
if tool_allowed_for_context?(tool_name, context) do
execute_mcp_request(conn, enhanced_request, context)
else
send_json_response(conn, 403, %{
jsonrpc: "2.0",
id: Map.get(enhanced_request, "id"),
error: %{
code: -32601,
message: "Tool not available: #{tool_name}"
}
})
end
"tools/list" ->
# Override tools/list to return filtered tools
handle_filtered_tools_list(conn, enhanced_request, context)
_ ->
# Other methods pass through normally
execute_mcp_request(conn, enhanced_request, context)
end
{:error, auth_error} ->
send_json_response(conn, 401, %{
jsonrpc: "2.0",
id: Map.get(mcp_request, "id"),
error: auth_error
})
end
{:error, reason} ->
send_json_response(conn, 400, %{
jsonrpc: "2.0",
id: Map.get(conn.body_params, "id"),
error: %{
code: -32700,
message: "Invalid request: #{reason}"
}
})
end
end
get "/mcp/ws" do
conn
|> WebSockAdapter.upgrade(AgentCoordinator.WebSocketHandler, %{}, timeout: 60_000)
end
get "/agents" do
context = extract_client_context(conn)
# Only allow agent status for authorized clients
case context.security_level do
level when level in [:trusted, :sandboxed] ->
mcp_request = %{
"jsonrpc" => "2.0",
"id" => generate_request_id(),
"method" => "tools/call",
"params" => %{
"name" => "get_task_board",
"arguments" => %{"agent_id" => "http_interface"}
}
}
case MCPServer.handle_mcp_request(mcp_request) do
%{"result" => %{"content" => [%{"text" => text}]}} ->
data = Jason.decode!(text)
send_json_response(conn, 200, data)
%{"error" => error} ->
send_json_response(conn, 500, %{error: error})
end
_ ->
send_json_response(conn, 403, %{
error: "Insufficient privileges to view agent status"
})
end
end
# Server-Sent Events (SSE) endpoint for real-time MCP streaming.
# Implements MCP Streamable HTTP transport for live updates.
get "/mcp/stream" do
context = extract_client_context(conn)
# Validate session for SSE stream
case validate_session_for_method("stream/subscribe", conn, context) do
{:ok, session_info} ->
# Set up SSE headers
conn = conn
|> put_resp_content_type("text/event-stream")
|> put_mcp_headers()
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("access-control-allow-credentials", "true")
|> send_chunked(200)
# Send initial connection event
{:ok, conn} = chunk(conn, format_sse_event("connected", %{
session_id: Map.get(session_info, :agent_id, "anonymous"),
protocol_version: "2025-06-18",
timestamp: DateTime.utc_now() |> DateTime.to_iso8601()
}))
# Start streaming loop
stream_mcp_events(conn, session_info, context)
{:error, auth_error} ->
send_json_response(conn, 401, auth_error)
end
end
defp stream_mcp_events(conn, session_info, context) do
# This is a basic implementation - in production you'd want to:
# 1. Subscribe to a GenServer/PubSub for real-time events
# 2. Handle client disconnections gracefully
# 3. Implement proper backpressure
# Send periodic heartbeat for now
try do
:timer.sleep(1000)
{:ok, conn} = chunk(conn, format_sse_event("heartbeat", %{
timestamp: DateTime.utc_now() |> DateTime.to_iso8601(),
session_id: Map.get(session_info, :agent_id, "anonymous")
}))
# Continue streaming (this would be event-driven in production)
stream_mcp_events(conn, session_info, context)
rescue
# Client disconnected
_ ->
Logger.info("SSE client disconnected")
conn
end
end
defp format_sse_event(event_type, data) do
"event: #{event_type}\ndata: #{Jason.encode!(data)}\n\n"
end
# Catch-all for unmatched routes
match _ do
send_json_response(conn, 404, %{
error: "Not found",
available_endpoints: [
"GET /health",
"GET /mcp/capabilities",
"GET /mcp/tools",
"POST /mcp/tools/:tool_name",
"POST /mcp/request",
"GET /mcp/stream (SSE)",
"GET /mcp/ws",
"GET /agents"
]
})
end
# Private helper functions
defp cowboy_dispatch do
[
{:_, [
{"/mcp/ws", AgentCoordinator.WebSocketHandler, []},
{:_, Plug.Cowboy.Handler, {__MODULE__, []}}
]}
]
end
defp extract_client_context(conn) do
remote_ip = get_remote_ip(conn)
user_agent = get_req_header(conn, "user-agent") |> List.first()
origin = get_req_header(conn, "origin") |> List.first()
connection_info = %{
transport: :http,
remote_ip: remote_ip,
user_agent: user_agent,
origin: origin,
secure: conn.scheme == :https,
headers: conn.req_headers
}
ToolFilter.detect_client_context(connection_info)
end
defp get_remote_ip(conn) do
# Check for forwarded headers first (for reverse proxies)
forwarded_for = get_req_header(conn, "x-forwarded-for") |> List.first()
real_ip = get_req_header(conn, "x-real-ip") |> List.first()
cond do
forwarded_for ->
forwarded_for |> String.split(",") |> List.first() |> String.trim()
real_ip ->
real_ip
true ->
conn.remote_ip |> :inet.ntoa() |> to_string()
end
end
defp put_cors_headers(conn, _opts) do
# Validate origin for enhanced security
origin = get_req_header(conn, "origin") |> List.first()
allowed_origin = validate_origin(origin)
conn
|> put_resp_header("access-control-allow-origin", allowed_origin)
|> put_resp_header("access-control-allow-methods", "GET, POST, OPTIONS")
|> put_resp_header("access-control-allow-headers", "content-type, authorization, mcp-session-id, mcp-protocol-version, x-session-id")
|> put_resp_header("access-control-expose-headers", "mcp-protocol-version, server")
|> put_resp_header("access-control-max-age", "86400")
end
defp validate_origin(nil), do: "*" # No origin header (direct API calls)
defp validate_origin(origin) do
# Allow localhost and development origins
case URI.parse(origin) do
%URI{host: host} when host in ["localhost", "127.0.0.1", "::1"] -> origin
%URI{host: host} when is_binary(host) ->
# Allow HTTPS origins and known development domains
if String.starts_with?(origin, "https://") or
String.contains?(host, ["localhost", "127.0.0.1", "dev", "local"]) do
origin
else
# For production, be more restrictive
Logger.warning("Potentially unsafe origin: #{origin}")
"*" # Fallback for now, could be more restrictive
end
_ -> "*"
end
end
defp send_json_response(conn, status, data) do
conn
|> put_resp_content_type("application/json")
|> put_mcp_headers()
|> send_resp(status, Jason.encode!(data))
end
defp put_mcp_headers(conn) do
conn
|> put_resp_header("mcp-protocol-version", "2025-06-18")
|> put_resp_header("server", "AgentCoordinator/1.0")
end
defp validate_mcp_request(params) when is_map(params) do
required_fields = ["jsonrpc", "method"]
missing_fields = Enum.filter(required_fields, fn field ->
not Map.has_key?(params, field)
end)
cond do
not Enum.empty?(missing_fields) ->
{:error, "Missing required fields: #{Enum.join(missing_fields, ", ")}"}
Map.get(params, "jsonrpc") != "2.0" ->
{:error, "Invalid jsonrpc version, must be '2.0'"}
not is_binary(Map.get(params, "method")) ->
{:error, "Method must be a string"}
true ->
{:ok, params}
end
end
defp validate_mcp_request(_), do: {:error, "Request must be a JSON object"}
defp add_session_info(mcp_request, conn, context) do
# Extract and validate MCP session token
{session_id, session_info} = get_session_info(conn)
# Add context metadata to request params
enhanced_params = Map.get(mcp_request, "params", %{})
|> Map.put("_session_id", session_id)
|> Map.put("_session_info", session_info)
|> Map.put("_client_context", %{
connection_type: context.connection_type,
security_level: context.security_level,
remote_ip: get_remote_ip(conn),
user_agent: context.user_agent
})
Map.put(mcp_request, "params", enhanced_params)
end
defp get_session_info(conn) do
# Check for MCP-Session-Id header (MCP compliant)
case get_req_header(conn, "mcp-session-id") do
[session_token] when byte_size(session_token) > 0 ->
case SessionManager.validate_session(session_token) do
{:ok, session_info} ->
{session_info.agent_id, %{
token: session_token,
agent_id: session_info.agent_id,
capabilities: session_info.capabilities,
expires_at: session_info.expires_at,
validated: true
}}
{:error, reason} ->
Logger.warning("Invalid MCP session token: #{reason}")
# Fall back to generating anonymous session
anonymous_id = "http_anonymous_" <> (:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower))
{anonymous_id, %{validated: false, reason: reason}}
end
[] ->
# Check legacy X-Session-Id header for backward compatibility
case get_req_header(conn, "x-session-id") do
[session_id] when byte_size(session_id) > 0 ->
{session_id, %{validated: false, legacy: true}}
_ ->
# No session header, generate anonymous session
anonymous_id = "http_anonymous_" <> (:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower))
{anonymous_id, %{validated: false, anonymous: true}}
end
end
end
defp require_authenticated_session(conn, _context) do
{_session_id, session_info} = get_session_info(conn)
case Map.get(session_info, :validated, false) do
true ->
{:ok, session_info}
false ->
reason = Map.get(session_info, :reason, "Session not authenticated")
{:error, %{
code: -32001,
message: "Authentication required",
data: %{reason: reason}
}}
end
end
defp validate_session_for_method(method, conn, context) do
# Define which methods require authenticated sessions
authenticated_methods = MapSet.new([
"agents/register",
"agents/unregister",
"agents/heartbeat",
"tasks/create",
"tasks/complete",
"codebase/register",
"stream/subscribe"
])
if MapSet.member?(authenticated_methods, method) do
require_authenticated_session(conn, context)
else
{:ok, %{anonymous: true}}
end
end
defp tool_allowed_for_context?(tool_name, context) do
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, context)
Enum.any?(filtered_tools, fn tool ->
Map.get(tool, "name") == tool_name
end)
end
defp execute_mcp_request(conn, mcp_request, _context) do
case MCPServer.handle_mcp_request(mcp_request) do
%{"result" => _} = response ->
send_json_response(conn, 200, response)
%{"error" => _} = response ->
send_json_response(conn, 400, response)
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
send_json_response(conn, 500, %{
jsonrpc: "2.0",
id: Map.get(mcp_request, "id"),
error: %{
code: -32603,
message: "Internal server error"
}
})
end
end
defp handle_filtered_tools_list(conn, mcp_request, context) do
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, context)
response = %{
"jsonrpc" => "2.0",
"id" => Map.get(mcp_request, "id"),
"result" => %{
"tools" => filtered_tools,
"_meta" => %{
"filtered_for" => context.connection_type,
"original_count" => length(all_tools),
"filtered_count" => length(filtered_tools)
}
}
}
send_json_response(conn, 200, response)
end
defp generate_request_id do
"http_req_" <> (:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower))
end
end

View File

@@ -0,0 +1,649 @@
defmodule AgentCoordinator.InterfaceManager do
@moduledoc """
Centralized manager for multiple MCP interface modes.
This module coordinates between different interface types:
- STDIO interface (for local MCP clients like VSCode)
- HTTP REST interface (for remote API access)
- WebSocket interface (for real-time web clients)
Responsibilities:
- Start/stop interface servers based on configuration
- Coordinate session state across interfaces
- Apply appropriate tool filtering per interface
- Monitor interface health and restart if needed
- Provide unified metrics and monitoring
"""
use GenServer
require Logger
alias AgentCoordinator.{HttpInterface, ToolFilter}
defstruct [
:config,
:interfaces,
:stdio_handler,
:session_registry,
:metrics
]
@interface_types [:stdio, :http, :websocket]
# Client API
@doc """
Start the interface manager with configuration.
"""
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Get current interface status.
"""
def get_status do
GenServer.call(__MODULE__, :get_status)
end
@doc """
Start a specific interface type.
"""
def start_interface(interface_type, opts \\ []) do
GenServer.call(__MODULE__, {:start_interface, interface_type, opts})
end
@doc """
Stop a specific interface type.
"""
def stop_interface(interface_type) do
GenServer.call(__MODULE__, {:stop_interface, interface_type})
end
@doc """
Restart an interface.
"""
def restart_interface(interface_type) do
GenServer.call(__MODULE__, {:restart_interface, interface_type})
end
@doc """
Get metrics for all interfaces.
"""
def get_metrics do
GenServer.call(__MODULE__, :get_metrics)
end
@doc """
Register a session across interfaces.
"""
def register_session(session_id, interface_type, session_info) do
GenServer.cast(__MODULE__, {:register_session, session_id, interface_type, session_info})
end
@doc """
Unregister a session.
"""
def unregister_session(session_id) do
GenServer.cast(__MODULE__, {:unregister_session, session_id})
end
# Server callbacks
@impl GenServer
def init(opts) do
# Load configuration
config = load_interface_config(opts)
state = %__MODULE__{
config: config,
interfaces: %{},
stdio_handler: nil,
session_registry: %{},
metrics: initialize_metrics()
}
Logger.info("Interface Manager starting with config: #{inspect(config.enabled_interfaces)}")
# Start enabled interfaces
{:ok, state, {:continue, :start_interfaces}}
end
@impl GenServer
def handle_continue(:start_interfaces, state) do
# Start each enabled interface
updated_state = Enum.reduce(state.config.enabled_interfaces, state, fn interface_type, acc ->
case start_interface_server(interface_type, state.config, acc) do
{:ok, interface_info} ->
Logger.info("Started #{interface_type} interface")
%{acc | interfaces: Map.put(acc.interfaces, interface_type, interface_info)}
{:error, reason} ->
Logger.error("Failed to start #{interface_type} interface: #{reason}")
acc
end
end)
{:noreply, updated_state}
end
@impl GenServer
def handle_call(:get_status, _from, state) do
status = %{
enabled_interfaces: state.config.enabled_interfaces,
running_interfaces: Map.keys(state.interfaces),
active_sessions: map_size(state.session_registry),
config: %{
stdio: state.config.stdio,
http: state.config.http,
websocket: state.config.websocket
},
uptime: get_uptime(),
metrics: state.metrics
}
{:reply, status, state}
end
@impl GenServer
def handle_call({:start_interface, interface_type, opts}, _from, state) do
if interface_type in @interface_types do
case start_interface_server(interface_type, state.config, state, opts) do
{:ok, interface_info} ->
updated_interfaces = Map.put(state.interfaces, interface_type, interface_info)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Started #{interface_type} interface on demand")
{:reply, {:ok, interface_info}, updated_state}
{:error, reason} ->
Logger.error("Failed to start #{interface_type} interface: #{reason}")
{:reply, {:error, reason}, state}
end
else
{:reply, {:error, "Unknown interface type: #{interface_type}"}, state}
end
end
@impl GenServer
def handle_call({:stop_interface, interface_type}, _from, state) do
case Map.get(state.interfaces, interface_type) do
nil ->
{:reply, {:error, "Interface not running: #{interface_type}"}, state}
interface_info ->
case stop_interface_server(interface_type, interface_info) do
:ok ->
updated_interfaces = Map.delete(state.interfaces, interface_type)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Stopped #{interface_type} interface")
{:reply, :ok, updated_state}
{:error, reason} ->
Logger.error("Failed to stop #{interface_type} interface: #{reason}")
{:reply, {:error, reason}, state}
end
end
end
@impl GenServer
def handle_call({:restart_interface, interface_type}, _from, state) do
case Map.get(state.interfaces, interface_type) do
nil ->
{:reply, {:error, "Interface not running: #{interface_type}"}, state}
interface_info ->
# Stop the interface
case stop_interface_server(interface_type, interface_info) do
:ok ->
# Start it again
case start_interface_server(interface_type, state.config, state) do
{:ok, new_interface_info} ->
updated_interfaces = Map.put(state.interfaces, interface_type, new_interface_info)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Restarted #{interface_type} interface")
{:reply, {:ok, new_interface_info}, updated_state}
{:error, reason} ->
# Remove from running interfaces since it failed to restart
updated_interfaces = Map.delete(state.interfaces, interface_type)
updated_state = %{state | interfaces: updated_interfaces}
Logger.error("Failed to restart #{interface_type} interface: #{reason}")
{:reply, {:error, reason}, updated_state}
end
{:error, reason} ->
Logger.error("Failed to stop #{interface_type} interface for restart: #{reason}")
{:reply, {:error, reason}, state}
end
end
end
@impl GenServer
def handle_call(:get_metrics, _from, state) do
# Collect metrics from all running interfaces
interface_metrics = Enum.map(state.interfaces, fn {interface_type, interface_info} ->
{interface_type, get_interface_metrics(interface_type, interface_info)}
end) |> Enum.into(%{})
metrics = %{
interfaces: interface_metrics,
sessions: %{
total: map_size(state.session_registry),
by_interface: get_sessions_by_interface(state.session_registry)
},
uptime: get_uptime(),
timestamp: DateTime.utc_now()
}
{:reply, metrics, state}
end
@impl GenServer
def handle_cast({:register_session, session_id, interface_type, session_info}, state) do
session_data = %{
interface_type: interface_type,
info: session_info,
registered_at: DateTime.utc_now(),
last_activity: DateTime.utc_now()
}
updated_registry = Map.put(state.session_registry, session_id, session_data)
updated_state = %{state | session_registry: updated_registry}
Logger.debug("Registered session #{session_id} for #{interface_type}")
{:noreply, updated_state}
end
@impl GenServer
def handle_cast({:unregister_session, session_id}, state) do
case Map.get(state.session_registry, session_id) do
nil ->
Logger.debug("Attempted to unregister unknown session: #{session_id}")
{:noreply, state}
_session_data ->
updated_registry = Map.delete(state.session_registry, session_id)
updated_state = %{state | session_registry: updated_registry}
Logger.debug("Unregistered session #{session_id}")
{:noreply, updated_state}
end
end
@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
# Handle interface process crashes
case find_interface_by_pid(pid, state.interfaces) do
{interface_type, _interface_info} ->
Logger.error("#{interface_type} interface crashed: #{inspect(reason)}")
# Remove from running interfaces
updated_interfaces = Map.delete(state.interfaces, interface_type)
updated_state = %{state | interfaces: updated_interfaces}
# Optionally restart if configured
if should_auto_restart?(interface_type, state.config) do
Logger.info("Auto-restarting #{interface_type} interface")
Process.send_after(self(), {:restart_interface, interface_type}, 5000)
end
{:noreply, updated_state}
nil ->
Logger.debug("Unknown process died: #{inspect(pid)}")
{:noreply, state}
end
end
@impl GenServer
def handle_info({:restart_interface, interface_type}, state) do
case start_interface_server(interface_type, state.config, state) do
{:ok, interface_info} ->
updated_interfaces = Map.put(state.interfaces, interface_type, interface_info)
updated_state = %{state | interfaces: updated_interfaces}
Logger.info("Auto-restarted #{interface_type} interface")
{:noreply, updated_state}
{:error, reason} ->
Logger.error("Failed to auto-restart #{interface_type} interface: #{reason}")
{:noreply, state}
end
end
@impl GenServer
def handle_info(message, state) do
Logger.debug("Interface Manager received unexpected message: #{inspect(message)}")
{:noreply, state}
end
# Private helper functions
defp load_interface_config(opts) do
# Load from application config and override with opts
base_config = Application.get_env(:agent_coordinator, :interfaces, %{})
# Default configuration
default_config = %{
enabled_interfaces: [:stdio],
stdio: %{
enabled: true,
handle_stdio: true
},
http: %{
enabled: false,
port: 8080,
host: "localhost",
cors_enabled: true
},
websocket: %{
enabled: false,
port: 8081,
host: "localhost"
},
auto_restart: %{
stdio: false,
http: true,
websocket: true
}
}
# Merge configurations
config = deep_merge(default_config, base_config)
config = deep_merge(config, Enum.into(opts, %{}))
# Determine enabled interfaces from environment or config
enabled = determine_enabled_interfaces(config)
# Update individual interface enabled flags based on environment
config = update_interface_enabled_flags(config, enabled)
%{config | enabled_interfaces: enabled}
end
defp determine_enabled_interfaces(config) do
# Check environment variables
interface_mode = System.get_env("MCP_INTERFACE_MODE", "stdio")
case interface_mode do
"stdio" -> [:stdio]
"http" -> [:http]
"websocket" -> [:websocket]
"all" -> [:stdio, :http, :websocket]
"remote" -> [:http, :websocket]
_ ->
# Check for comma-separated list
if String.contains?(interface_mode, ",") do
interface_mode
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.map(&String.to_atom/1)
|> Enum.filter(&(&1 in @interface_types))
else
# Fall back to config
Map.get(config, :enabled_interfaces, [:stdio])
end
end
end
defp update_interface_enabled_flags(config, enabled_interfaces) do
# Update individual interface enabled flags based on which interfaces are enabled
config
|> update_in([:stdio, :enabled], fn _ -> :stdio in enabled_interfaces end)
|> update_in([:http, :enabled], fn _ -> :http in enabled_interfaces end)
|> update_in([:websocket, :enabled], fn _ -> :websocket in enabled_interfaces end)
# Also update ports from environment if set
|> update_http_config_from_env()
end
defp update_http_config_from_env(config) do
config = case System.get_env("MCP_HTTP_PORT") do
nil -> config
port_str ->
case Integer.parse(port_str) do
{port, ""} -> put_in(config, [:http, :port], port)
_ -> config
end
end
case System.get_env("MCP_HTTP_HOST") do
nil -> config
host -> put_in(config, [:http, :host], host)
end
end
# Declare defaults once
defp start_interface_server(type, config, state, opts \\ %{})
defp start_interface_server(:stdio, config, state, _opts) do
if config.stdio.enabled and config.stdio.handle_stdio do
# Start stdio handler
stdio_handler = spawn_link(fn -> handle_stdio_loop(state) end)
interface_info = %{
type: :stdio,
pid: stdio_handler,
started_at: DateTime.utc_now(),
config: config.stdio
}
{:ok, interface_info}
else
{:error, "STDIO interface not enabled"}
end
end
defp start_interface_server(:http, config, _state, opts) do
if config.http.enabled do
http_opts = [
port: Map.get(opts, :port, config.http.port),
host: Map.get(opts, :host, config.http.host)
]
case HttpInterface.start_link(http_opts) do
{:ok, pid} ->
# Monitor the process
ref = Process.monitor(pid)
interface_info = %{
type: :http,
pid: pid,
monitor_ref: ref,
started_at: DateTime.utc_now(),
config: Map.merge(config.http, Enum.into(opts, %{})),
port: http_opts[:port]
}
{:ok, interface_info}
{:error, reason} ->
{:error, reason}
end
else
{:error, "HTTP interface not enabled"}
end
end
defp start_interface_server(:websocket, config, _state, _opts) do
if config.websocket.enabled do
# WebSocket is handled by the HTTP server, so just mark it as enabled
interface_info = %{
type: :websocket,
pid: :embedded, # Embedded in HTTP server
started_at: DateTime.utc_now(),
config: config.websocket
}
{:ok, interface_info}
else
{:error, "WebSocket interface not enabled"}
end
end
defp start_interface_server(unknown_type, _config, _state, _opts) do
{:error, "Unknown interface type: #{unknown_type}"}
end
defp stop_interface_server(:stdio, interface_info) do
if Process.alive?(interface_info.pid) do
Process.exit(interface_info.pid, :shutdown)
:ok
else
:ok
end
end
defp stop_interface_server(:http, interface_info) do
if Process.alive?(interface_info.pid) do
Process.exit(interface_info.pid, :shutdown)
:ok
else
:ok
end
end
defp stop_interface_server(:websocket, _interface_info) do
# WebSocket is embedded in HTTP server, so nothing to stop separately
:ok
end
defp stop_interface_server(_type, _interface_info) do
{:error, "Unknown interface type"}
end
defp handle_stdio_loop(state) do
# Handle MCP JSON-RPC messages from STDIO
case IO.read(:stdio, :line) do
:eof ->
Logger.info("STDIO interface shutting down (EOF)")
exit(:normal)
{:error, reason} ->
Logger.error("STDIO error: #{inspect(reason)}")
exit({:error, reason})
line ->
handle_stdio_message(String.trim(line), state)
handle_stdio_loop(state)
end
end
defp handle_stdio_message("", _state), do: :ok
defp handle_stdio_message(json_line, _state) do
try do
request = Jason.decode!(json_line)
# Create local client context for stdio
_client_context = ToolFilter.local_context()
# Process through MCP server with full tool access
response = AgentCoordinator.MCPServer.handle_mcp_request(request)
# Send response
IO.puts(Jason.encode!(response))
rescue
e in Jason.DecodeError ->
error_response = %{
"jsonrpc" => "2.0",
"id" => nil,
"error" => %{
"code" => -32700,
"message" => "Parse error: #{Exception.message(e)}"
}
}
IO.puts(Jason.encode!(error_response))
e ->
# Try to get the ID from the malformed request
id = try do
partial = Jason.decode!(json_line)
Map.get(partial, "id")
rescue
_ -> nil
end
error_response = %{
"jsonrpc" => "2.0",
"id" => id,
"error" => %{
"code" => -32603,
"message" => "Internal error: #{Exception.message(e)}"
}
}
IO.puts(Jason.encode!(error_response))
end
end
defp get_interface_metrics(:stdio, interface_info) do
%{
type: :stdio,
status: if(Process.alive?(interface_info.pid), do: :running, else: :stopped),
uptime: DateTime.diff(DateTime.utc_now(), interface_info.started_at, :second),
pid: interface_info.pid
}
end
defp get_interface_metrics(:http, interface_info) do
%{
type: :http,
status: if(Process.alive?(interface_info.pid), do: :running, else: :stopped),
uptime: DateTime.diff(DateTime.utc_now(), interface_info.started_at, :second),
port: interface_info.port,
pid: interface_info.pid
}
end
defp get_interface_metrics(:websocket, interface_info) do
%{
type: :websocket,
status: :running, # Embedded in HTTP server
uptime: DateTime.diff(DateTime.utc_now(), interface_info.started_at, :second),
embedded: true
}
end
defp get_sessions_by_interface(session_registry) do
Enum.reduce(session_registry, %{}, fn {_session_id, session_data}, acc ->
interface_type = session_data.interface_type
count = Map.get(acc, interface_type, 0)
Map.put(acc, interface_type, count + 1)
end)
end
defp find_interface_by_pid(pid, interfaces) do
Enum.find(interfaces, fn {_type, interface_info} ->
interface_info.pid == pid
end)
end
defp should_auto_restart?(interface_type, config) do
Map.get(config.auto_restart, interface_type, false)
end
defp initialize_metrics do
%{
started_at: DateTime.utc_now(),
requests_total: 0,
errors_total: 0,
sessions_total: 0
}
end
defp get_uptime do
{uptime_ms, _} = :erlang.statistics(:wall_clock)
div(uptime_ms, 1000)
end
# Deep merge helper for configuration
defp deep_merge(left, right) when is_map(left) and is_map(right) do
Map.merge(left, right, fn _key, left_val, right_val ->
deep_merge(left_val, right_val)
end)
end
defp deep_merge(_left, right), do: right
end

View File

@@ -11,7 +11,7 @@ defmodule AgentCoordinator.MCPServer do
use GenServer
require Logger
alias AgentCoordinator.{TaskRegistry, Inbox, Agent, Task, CodebaseRegistry, VSCodeToolProvider}
alias AgentCoordinator.{TaskRegistry, Inbox, Agent, Task, CodebaseRegistry, VSCodeToolProvider, ToolFilter, SessionManager, ActivityTracker}
# State for tracking external servers and agent sessions
defstruct [
@@ -38,7 +38,7 @@ defmodule AgentCoordinator.MCPServer do
"enum" => ["coding", "testing", "documentation", "analysis", "review"]
}
},
"codebase_id" => %{"type" => "string"},
"codebase_id" => %{"type" => "string", "description" => "If the project is found locally on the machine, use the name of the directory in which you are currently at (.). If it is remote, use the git registered codebase ID, if it is a multicodebase project, and there is no apparently folder to base as the rootmost -- ask."},
"workspace_path" => %{"type" => "string"},
"cross_codebase_capable" => %{"type" => "boolean"}
},
@@ -71,7 +71,7 @@ defmodule AgentCoordinator.MCPServer do
"title" => %{"type" => "string"},
"description" => %{"type" => "string"},
"priority" => %{"type" => "string", "enum" => ["low", "normal", "high", "urgent"]},
"codebase_id" => %{"type" => "string"},
"codebase_id" => %{"type" => "string", "description" => "If the project is found locally on the machine, use the name of the directory in which you are currently at (.). If it is remote, use the git registered codebase ID, if it is a multicodebase project, and there is no apparently folder to base as the rootmost -- ask."},
"file_paths" => %{"type" => "array", "items" => %{"type" => "string"}},
"required_capabilities" => %{
"type" => "array",
@@ -331,6 +331,25 @@ defmodule AgentCoordinator.MCPServer do
},
"required" => ["agent_id"]
}
},
%{
"name" => "discover_codebase_info",
"description" => "Intelligently discover codebase information from workspace path, including git repository details, canonical ID generation, and project identification.",
"inputSchema" => %{
"type" => "object",
"properties" => %{
"agent_id" => %{"type" => "string"},
"workspace_path" => %{
"type" => "string",
"description" => "Path to the workspace/project directory"
},
"custom_id" => %{
"type" => "string",
"description" => "Optional: Override automatic codebase ID detection"
}
},
"required" => ["agent_id", "workspace_path"]
}
}
]
@@ -344,8 +363,8 @@ defmodule AgentCoordinator.MCPServer do
GenServer.call(__MODULE__, {:mcp_request, request})
end
def get_tools do
case GenServer.call(__MODULE__, :get_all_tools, 5000) do
def get_tools(client_context \\ nil) do
case GenServer.call(__MODULE__, {:get_all_tools, client_context}, 5000) do
tools when is_list(tools) -> tools
_ -> @mcp_tools
end
@@ -464,7 +483,20 @@ defmodule AgentCoordinator.MCPServer do
end
end
def handle_call({:get_all_tools, client_context}, _from, state) do
all_tools = get_all_unified_tools_from_state(state)
# Apply tool filtering if client context is provided
filtered_tools = case client_context do
nil -> all_tools # No filtering for nil context (backward compatibility)
context -> ToolFilter.filter_tools(all_tools, context)
end
{:reply, filtered_tools, state}
end
def handle_call(:get_all_tools, _from, state) do
# Backward compatibility - no filtering
all_tools = get_all_unified_tools_from_state(state)
{:reply, all_tools, state}
end
@@ -599,10 +631,33 @@ defmodule AgentCoordinator.MCPServer do
:ok
end
# Track the session if we have caller info
track_agent_session(agent.id, name, capabilities)
# Generate session token for the agent
session_metadata = %{
name: name,
capabilities: capabilities,
codebase_id: agent.codebase_id,
workspace_path: opts[:workspace_path],
registered_at: DateTime.utc_now()
}
{:ok, %{agent_id: agent.id, codebase_id: agent.codebase_id, status: "registered"}}
case SessionManager.create_session(agent.id, session_metadata) do
{:ok, session_token} ->
# Track the session if we have caller info
track_agent_session(agent.id, name, capabilities)
{:ok, %{
agent_id: agent.id,
codebase_id: agent.codebase_id,
status: "registered",
session_token: session_token,
expires_at: DateTime.add(DateTime.utc_now(), 60, :minute) |> DateTime.to_iso8601()
}}
{:error, reason} ->
Logger.error("Failed to create session for agent #{agent.id}: #{inspect(reason)}")
# Still return success but without session token for backward compatibility
{:ok, %{agent_id: agent.id, codebase_id: agent.codebase_id, status: "registered"}}
end
{:error, reason} ->
{:error, "Failed to register agent: #{reason}"}
@@ -775,6 +830,8 @@ defmodule AgentCoordinator.MCPServer do
workspace_path: agent.workspace_path,
online: Agent.is_online?(agent),
cross_codebase_capable: Agent.can_work_cross_codebase?(agent),
current_activity: agent.current_activity,
current_files: agent.current_files || [],
current_task:
status.current_task &&
%{
@@ -1008,6 +1065,9 @@ defmodule AgentCoordinator.MCPServer do
online: Agent.is_online?(agent),
cross_codebase_capable: Agent.can_work_cross_codebase?(agent),
last_heartbeat: agent.last_heartbeat,
current_activity: agent.current_activity,
current_files: agent.current_files || [],
activity_history: agent.activity_history || [],
tasks: task_info
}
end)
@@ -1074,6 +1134,77 @@ defmodule AgentCoordinator.MCPServer do
end
end
# NEW: Codebase discovery function
defp discover_codebase_info(%{"agent_id" => agent_id, "workspace_path" => workspace_path} = args) do
custom_id = Map.get(args, "custom_id")
# Use the CodebaseIdentifier to analyze the workspace
opts = if custom_id, do: [custom_id: custom_id], else: []
case AgentCoordinator.CodebaseIdentifier.identify_codebase(workspace_path, opts) do
codebase_info ->
# Also check if this codebase is already registered
existing_codebase = case CodebaseRegistry.get_codebase(codebase_info.canonical_id) do
{:ok, codebase} -> codebase
{:error, :not_found} -> nil
end
# Check for other agents working on same codebase
agents = TaskRegistry.list_agents()
related_agents = Enum.filter(agents, fn agent ->
agent.codebase_id == codebase_info.canonical_id and agent.id != agent_id
end)
response = %{
codebase_info: codebase_info,
already_registered: existing_codebase != nil,
existing_codebase: existing_codebase,
related_agents: Enum.map(related_agents, fn agent ->
%{
agent_id: agent.id,
name: agent.name,
capabilities: agent.capabilities,
status: agent.status,
workspace_path: agent.workspace_path,
online: Agent.is_online?(agent)
}
end),
recommendations: generate_codebase_recommendations(codebase_info, existing_codebase, related_agents)
}
{:ok, response}
end
end
defp generate_codebase_recommendations(codebase_info, existing_codebase, related_agents) do
recommendations = []
# Recommend registration if not already registered
recommendations = if existing_codebase == nil do
["Consider registering this codebase with register_codebase for better coordination" | recommendations]
else
recommendations
end
# Recommend coordination if other agents are working on same codebase
recommendations = if length(related_agents) > 0 do
agent_names = Enum.map(related_agents, & &1.name) |> Enum.join(", ")
["Other agents working on this codebase: #{agent_names}. Consider coordination." | recommendations]
else
recommendations
end
# Recommend git setup if local folder without git
recommendations = if codebase_info.identification_method == :folder_name do
["Consider initializing git repository for better distributed coordination" | recommendations]
else
recommendations
end
Enum.reverse(recommendations)
end
# External MCP server management functions
defp start_external_server(name, %{type: :stdio} = config) do
@@ -1427,17 +1558,24 @@ defmodule AgentCoordinator.MCPServer do
end
defp route_tool_call(tool_name, args, state) do
# Extract agent_id for activity tracking
agent_id = Map.get(args, "agent_id")
# Update agent activity before processing the tool call
if agent_id do
ActivityTracker.update_agent_activity(agent_id, tool_name, args)
end
# Check if it's a coordinator tool first
coordinator_tool_names = Enum.map(@mcp_tools, & &1["name"])
cond do
result = cond do
tool_name in coordinator_tool_names ->
handle_coordinator_tool(tool_name, args)
# Check if it's a VS Code tool
String.starts_with?(tool_name, "vscode_") ->
# Route to VS Code Tool Provider with agent context
agent_id = Map.get(args, "agent_id")
context = if agent_id, do: %{agent_id: agent_id}, else: %{}
VSCodeToolProvider.handle_tool_call(tool_name, args, context)
@@ -1445,6 +1583,13 @@ defmodule AgentCoordinator.MCPServer do
# Try to route to external server
route_to_external_server(tool_name, args, state)
end
# Clear agent activity after tool call completes (optional - could keep until next call)
# if agent_id do
# ActivityTracker.clear_agent_activity(agent_id)
# end
result
end
defp handle_coordinator_tool(tool_name, args) do
@@ -1465,6 +1610,7 @@ defmodule AgentCoordinator.MCPServer do
"create_agent_task" -> create_agent_task(args)
"get_detailed_task_board" -> get_detailed_task_board(args)
"get_agent_task_history" -> get_agent_task_history(args)
"discover_codebase_info" -> discover_codebase_info(args)
_ -> {:error, "Unknown coordinator tool: #{tool_name}"}
end
end

View File

@@ -0,0 +1,192 @@
defmodule AgentCoordinator.SessionManager do
@moduledoc """
Session management for MCP agents with token-based authentication.
Implements MCP-compliant session management where:
1. Agents register and receive session tokens
2. Session tokens must be included in Mcp-Session-Id headers
3. Session tokens are cryptographically secure and time-limited
4. Sessions are tied to specific agent IDs
"""
use GenServer
require Logger
defstruct [
:sessions,
:config
]
@session_expiry_minutes 60
@cleanup_interval_minutes 5
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Generate a new session token for an agent.
Returns {:ok, session_token} or {:error, reason}
"""
def create_session(agent_id, metadata \\ %{}) do
GenServer.call(__MODULE__, {:create_session, agent_id, metadata})
end
@doc """
Validate a session token and return agent information.
Returns {:ok, agent_id, metadata} or {:error, reason}
"""
def validate_session(session_token) do
GenServer.call(__MODULE__, {:validate_session, session_token})
end
@doc """
Invalidate a session token.
"""
def invalidate_session(session_token) do
GenServer.call(__MODULE__, {:invalidate_session, session_token})
end
@doc """
Get all active sessions for an agent.
"""
def get_agent_sessions(agent_id) do
GenServer.call(__MODULE__, {:get_agent_sessions, agent_id})
end
@doc """
Clean up expired sessions.
"""
def cleanup_expired_sessions do
GenServer.cast(__MODULE__, :cleanup_expired)
end
# Server implementation
@impl GenServer
def init(opts) do
# Start periodic cleanup
schedule_cleanup()
state = %__MODULE__{
sessions: %{},
config: %{
expiry_minutes: Keyword.get(opts, :expiry_minutes, @session_expiry_minutes),
cleanup_interval: Keyword.get(opts, :cleanup_interval, @cleanup_interval_minutes)
}
}
Logger.info("SessionManager started with #{state.config.expiry_minutes}min expiry")
{:ok, state}
end
@impl GenServer
def handle_call({:create_session, agent_id, metadata}, _from, state) do
session_token = generate_session_token()
expires_at = DateTime.add(DateTime.utc_now(), state.config.expiry_minutes, :minute)
session_data = %{
agent_id: agent_id,
token: session_token,
created_at: DateTime.utc_now(),
expires_at: expires_at,
metadata: metadata,
last_activity: DateTime.utc_now()
}
new_sessions = Map.put(state.sessions, session_token, session_data)
new_state = %{state | sessions: new_sessions}
Logger.debug("Created session #{session_token} for agent #{agent_id}")
{:reply, {:ok, session_token}, new_state}
end
@impl GenServer
def handle_call({:validate_session, session_token}, _from, state) do
case Map.get(state.sessions, session_token) do
nil ->
{:reply, {:error, :session_not_found}, state}
session_data ->
if DateTime.compare(DateTime.utc_now(), session_data.expires_at) == :gt do
# Session expired, remove it
new_sessions = Map.delete(state.sessions, session_token)
new_state = %{state | sessions: new_sessions}
{:reply, {:error, :session_expired}, new_state}
else
# Session valid, update last activity
updated_session = %{session_data | last_activity: DateTime.utc_now()}
new_sessions = Map.put(state.sessions, session_token, updated_session)
new_state = %{state | sessions: new_sessions}
result = {:ok, session_data.agent_id, session_data.metadata}
{:reply, result, new_state}
end
end
end
@impl GenServer
def handle_call({:invalidate_session, session_token}, _from, state) do
case Map.get(state.sessions, session_token) do
nil ->
{:reply, {:error, :session_not_found}, state}
session_data ->
new_sessions = Map.delete(state.sessions, session_token)
new_state = %{state | sessions: new_sessions}
Logger.debug("Invalidated session #{session_token} for agent #{session_data.agent_id}")
{:reply, :ok, new_state}
end
end
@impl GenServer
def handle_call({:get_agent_sessions, agent_id}, _from, state) do
agent_sessions =
state.sessions
|> Enum.filter(fn {_token, session} -> session.agent_id == agent_id end)
|> Enum.map(fn {token, session} -> {token, session} end)
{:reply, agent_sessions, state}
end
@impl GenServer
def handle_cast(:cleanup_expired, state) do
now = DateTime.utc_now()
{expired_sessions, active_sessions} =
Enum.split_with(state.sessions, fn {_token, session} ->
DateTime.compare(now, session.expires_at) == :gt
end)
if length(expired_sessions) > 0 do
Logger.debug("Cleaned up #{length(expired_sessions)} expired sessions")
end
new_state = %{state | sessions: Map.new(active_sessions)}
schedule_cleanup()
{:noreply, new_state}
end
@impl GenServer
def handle_info(:cleanup_expired, state) do
handle_cast(:cleanup_expired, state)
end
# Private functions
defp generate_session_token do
# Generate cryptographically secure session token
# Format: "mcp_" + base64url(32 random bytes) + "_" + timestamp
random_bytes = :crypto.strong_rand_bytes(32)
timestamp = DateTime.utc_now() |> DateTime.to_unix()
token_body = Base.url_encode64(random_bytes, padding: false)
"mcp_#{token_body}_#{timestamp}"
end
defp schedule_cleanup do
Process.send_after(self(), :cleanup_expired, @cleanup_interval_minutes * 60 * 1000)
end
end

View File

@@ -79,6 +79,10 @@ defmodule AgentCoordinator.TaskRegistry do
GenServer.call(__MODULE__, {:complete_task, agent_id}, 30_000)
end
def update_agent(agent_id, updated_agent) do
GenServer.call(__MODULE__, {:update_agent, agent_id, updated_agent})
end
def get_task_board do
GenServer.call(__MODULE__, :get_task_board)
end
@@ -423,6 +427,18 @@ defmodule AgentCoordinator.TaskRegistry do
end
end
def handle_call({:update_agent, agent_id, updated_agent}, _from, state) do
case Map.get(state.agents, agent_id) do
nil ->
{:reply, {:error, :agent_not_found}, state}
_current_agent ->
new_agents = Map.put(state.agents, agent_id, updated_agent)
new_state = %{state | agents: new_agents}
{:reply, :ok, new_state}
end
end
def handle_call(:get_task_board, _from, state) do
agents_info =
Enum.map(state.agents, fn {_id, agent} ->
@@ -439,7 +455,9 @@ defmodule AgentCoordinator.TaskRegistry do
capabilities: agent.capabilities,
current_task: current_task,
last_heartbeat: agent.last_heartbeat,
online: Agent.is_online?(agent)
online: Agent.is_online?(agent),
current_activity: agent.current_activity,
current_files: agent.current_files || []
}
end)

View File

@@ -0,0 +1,282 @@
defmodule AgentCoordinator.ToolFilter do
@moduledoc """
Intelligent tool filtering system that adapts available tools based on client context.
This module determines which tools should be available to different types of clients:
- Local clients: Full tool access including filesystem and VSCode tools
- Remote clients: Limited to agent coordination and safe remote tools
- Web clients: Browser-safe tools only
Tool filtering is based on:
- Tool capabilities and requirements
- Client connection type (local/remote)
- Security considerations
- Tool metadata annotations
"""
require Logger
@doc """
Context information about the client connection.
"""
defstruct [
:connection_type, # :local, :remote, :web
:client_info, # Client identification
:capabilities, # Client declared capabilities
:security_level, # :trusted, :sandboxed, :restricted
:origin, # For web clients, the origin domain
:user_agent # Client user agent string
]
@type client_context :: %__MODULE__{
connection_type: :local | :remote | :web,
client_info: map(),
capabilities: [String.t()],
security_level: :trusted | :sandboxed | :restricted,
origin: String.t() | nil,
user_agent: String.t() | nil
}
# Tool name patterns that indicate local-only functionality (defined as function to avoid compilation issues)
defp local_only_patterns do
[
~r/^(read_file|write_file|create_file|delete_file)/,
~r/^(list_dir|search_files|move_file)/,
~r/^vscode_/,
~r/^(run_in_terminal|get_terminal)/,
~r/filesystem/,
~r/directory/
]
end
# Tools that are always safe for remote access
@always_safe_tools [
# Agent coordination tools
"register_agent",
"create_task",
"get_next_task",
"complete_task",
"get_task_board",
"get_detailed_task_board",
"get_agent_task_history",
"heartbeat",
"unregister_agent",
"register_task_set",
"create_agent_task",
"create_cross_codebase_task",
"list_codebases",
"register_codebase",
"get_codebase_status",
"add_codebase_dependency",
# Memory and knowledge graph (safe for remote)
"create_entities",
"create_relations",
"read_graph",
"search_nodes",
"open_nodes",
"add_observations",
"delete_entities",
"delete_relations",
"delete_observations",
# Sequential thinking (safe for remote)
"sequentialthinking",
# Library documentation (safe for remote)
"get-library-docs",
"resolve-library-id"
]
@doc """
Filter tools based on client context.
Returns a filtered list of tools appropriate for the client's context.
"""
@spec filter_tools([map()], client_context()) :: [map()]
def filter_tools(tools, %__MODULE__{} = context) do
tools
|> Enum.filter(&should_include_tool?(&1, context))
|> maybe_annotate_tools(context)
end
@doc """
Determine if a tool should be included for the given client context.
"""
@spec should_include_tool?(map(), client_context()) :: boolean()
def should_include_tool?(tool, context) do
tool_name = Map.get(tool, "name", "")
cond do
# Always include safe tools
tool_name in @always_safe_tools ->
true
# Local clients get everything
context.connection_type == :local ->
true
# Remote/web clients get filtered access
context.connection_type in [:remote, :web] ->
not is_local_only_tool?(tool, context)
# Default to restrictive
true ->
tool_name in @always_safe_tools
end
end
@doc """
Detect client context from connection information.
"""
@spec detect_client_context(map()) :: client_context()
def detect_client_context(connection_info) do
connection_type = determine_connection_type(connection_info)
security_level = determine_security_level(connection_type, connection_info)
%__MODULE__{
connection_type: connection_type,
client_info: Map.get(connection_info, :client_info, %{}),
capabilities: Map.get(connection_info, :capabilities, []),
security_level: security_level,
origin: Map.get(connection_info, :origin),
user_agent: Map.get(connection_info, :user_agent)
}
end
@doc """
Create a local client context (for stdio and direct connections).
"""
@spec local_context() :: client_context()
def local_context do
%__MODULE__{
connection_type: :local,
client_info: %{type: "local_stdio"},
capabilities: ["full_access"],
security_level: :trusted,
origin: nil,
user_agent: "agent-coordinator-local"
}
end
@doc """
Create a remote client context.
"""
@spec remote_context(map()) :: client_context()
def remote_context(opts \\ %{}) do
%__MODULE__{
connection_type: :remote,
client_info: Map.get(opts, :client_info, %{type: "remote_http"}),
capabilities: Map.get(opts, :capabilities, ["coordination"]),
security_level: :sandboxed,
origin: Map.get(opts, :origin),
user_agent: Map.get(opts, :user_agent, "unknown")
}
end
@doc """
Get tool filtering statistics for monitoring.
"""
@spec get_filter_stats([map()], client_context()) :: map()
def get_filter_stats(original_tools, context) do
filtered_tools = filter_tools(original_tools, context)
%{
original_count: length(original_tools),
filtered_count: length(filtered_tools),
removed_count: length(original_tools) - length(filtered_tools),
connection_type: context.connection_type,
security_level: context.security_level,
filtered_at: DateTime.utc_now()
}
end
# Private helpers
defp is_local_only_tool?(tool, _context) do
tool_name = Map.get(tool, "name", "")
description = Map.get(tool, "description", "")
# Check against known local-only tool names
name_is_local = tool_name in get_local_only_tool_names() or
Enum.any?(local_only_patterns(), &Regex.match?(&1, tool_name))
# Check description for local-only indicators
description_is_local = String.contains?(String.downcase(description),
["filesystem", "file system", "vscode", "terminal", "local file", "directory"])
# Check tool schema for local-only parameters
schema_is_local = has_local_only_parameters?(tool)
name_is_local or description_is_local or schema_is_local
end
defp get_local_only_tool_names do
[
# Filesystem tools
"read_file", "write_file", "create_file", "delete_file",
"list_directory", "search_files", "move_file", "get_file_info",
"list_allowed_directories", "directory_tree", "edit_file",
"read_text_file", "read_multiple_files", "read_media_file",
# VSCode tools
"vscode_create_file", "vscode_write_file", "vscode_read_file",
"vscode_delete_file", "vscode_list_directory", "vscode_get_active_editor",
"vscode_set_editor_content", "vscode_get_selection", "vscode_set_selection",
"vscode_show_message", "vscode_run_command", "vscode_get_workspace_folders",
# Terminal/process tools
"run_in_terminal", "get_terminal_output", "terminal_last_command",
"terminal_selection"
]
end
defp has_local_only_parameters?(tool) do
schema = Map.get(tool, "inputSchema", %{})
properties = Map.get(schema, "properties", %{})
# Look for file path parameters or other local indicators
Enum.any?(properties, fn {param_name, param_schema} ->
param_name in ["path", "filePath", "file_path", "directory", "workspace_path"] or
String.contains?(Map.get(param_schema, "description", ""),
["file path", "directory", "workspace", "local"])
end)
end
defp determine_connection_type(connection_info) do
cond do
Map.get(connection_info, :transport) == :stdio -> :local
Map.get(connection_info, :transport) == :websocket -> :web
Map.get(connection_info, :transport) == :http -> :remote
Map.get(connection_info, :remote_ip) == "127.0.0.1" -> :local
Map.get(connection_info, :remote_ip) == "::1" -> :local
Map.has_key?(connection_info, :remote_ip) -> :remote
true -> :local # Default to local for stdio
end
end
defp determine_security_level(connection_type, connection_info) do
case connection_type do
:local -> :trusted
:remote ->
if Map.get(connection_info, :secure, false) do
:sandboxed
else
:restricted
end
:web -> :sandboxed
end
end
defp maybe_annotate_tools(tools, context) do
# Add context information to tools if needed
if context.connection_type == :remote do
Enum.map(tools, fn tool ->
Map.put(tool, "_filtered_for", "remote_client")
end)
else
tools
end
end
end

View File

@@ -0,0 +1,383 @@
defmodule AgentCoordinator.WebSocketHandler do
@moduledoc """
WebSocket handler for real-time MCP communication.
Provides:
- Real-time MCP JSON-RPC over WebSocket
- Tool filtering based on client context
- Session management
- Heartbeat and connection monitoring
"""
@behaviour WebSock
require Logger
alias AgentCoordinator.{MCPServer, ToolFilter}
defstruct [
:client_context,
:session_id,
:last_heartbeat,
:agent_id,
:connection_info
]
@heartbeat_interval 30_000 # 30 seconds
@impl WebSock
def init(opts) do
session_id = "ws_" <> UUID.uuid4()
# Initialize connection state
state = %__MODULE__{
session_id: session_id,
last_heartbeat: DateTime.utc_now(),
connection_info: opts
}
# Start heartbeat timer
Process.send_after(self(), :heartbeat, @heartbeat_interval)
Logger.info("WebSocket connection established: #{session_id}")
{:ok, state}
end
@impl WebSock
def handle_in({text, [opcode: :text]}, state) do
case Jason.decode(text) do
{:ok, message} ->
handle_mcp_message(message, state)
{:error, %Jason.DecodeError{} = error} ->
error_response = %{
"jsonrpc" => "2.0",
"id" => nil,
"error" => %{
"code" => -32700,
"message" => "Parse error: #{Exception.message(error)}"
}
}
{:reply, {:text, Jason.encode!(error_response)}, state}
end
end
@impl WebSock
def handle_in({_binary, [opcode: :binary]}, state) do
Logger.warning("Received unexpected binary data on WebSocket")
{:ok, state}
end
@impl WebSock
def handle_info(:heartbeat, state) do
# Send heartbeat if we have an agent registered
if state.agent_id do
heartbeat_request = %{
"jsonrpc" => "2.0",
"id" => generate_request_id(),
"method" => "tools/call",
"params" => %{
"name" => "heartbeat",
"arguments" => %{"agent_id" => state.agent_id}
}
}
# Send heartbeat to MCP server
MCPServer.handle_mcp_request(heartbeat_request)
end
# Schedule next heartbeat
Process.send_after(self(), :heartbeat, @heartbeat_interval)
updated_state = %{state | last_heartbeat: DateTime.utc_now()}
{:ok, updated_state}
end
@impl WebSock
def handle_info(message, state) do
Logger.debug("Received unexpected message: #{inspect(message)}")
{:ok, state}
end
@impl WebSock
def terminate(:remote, state) do
Logger.info("WebSocket connection closed by client: #{state.session_id}")
cleanup_session(state)
:ok
end
@impl WebSock
def terminate(reason, state) do
Logger.info("WebSocket connection terminated: #{state.session_id}, reason: #{inspect(reason)}")
cleanup_session(state)
:ok
end
# Private helper functions
defp handle_mcp_message(message, state) do
method = Map.get(message, "method")
case method do
"initialize" ->
handle_initialize(message, state)
"tools/list" ->
handle_tools_list(message, state)
"tools/call" ->
handle_tool_call(message, state)
"notifications/initialized" ->
handle_initialized_notification(message, state)
_ ->
# Forward other methods to MCP server
forward_to_mcp_server(message, state)
end
end
defp handle_initialize(message, state) do
# Extract client info from initialize message
params = Map.get(message, "params", %{})
client_info = Map.get(params, "clientInfo", %{})
# Detect client context
connection_info = %{
transport: :websocket,
client_info: client_info,
session_id: state.session_id,
capabilities: Map.get(params, "capabilities", [])
}
client_context = ToolFilter.detect_client_context(connection_info)
# Send initialize response
response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"result" => %{
"protocolVersion" => "2024-11-05",
"capabilities" => %{
"tools" => %{},
"coordination" => %{
"automatic_task_tracking" => true,
"agent_management" => true,
"multi_server_proxy" => true,
"heartbeat_coverage" => true,
"session_tracking" => true,
"tool_filtering" => true,
"websocket_realtime" => true
}
},
"serverInfo" => %{
"name" => "agent-coordinator-websocket",
"version" => AgentCoordinator.version(),
"description" => "Agent Coordinator WebSocket interface with tool filtering"
},
"_meta" => %{
"session_id" => state.session_id,
"connection_type" => client_context.connection_type,
"security_level" => client_context.security_level
}
}
}
updated_state = %{state |
client_context: client_context,
connection_info: connection_info
}
{:reply, {:text, Jason.encode!(response)}, updated_state}
end
defp handle_tools_list(message, state) do
if state.client_context do
# Get filtered tools based on client context
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, state.client_context)
response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"result" => %{
"tools" => filtered_tools,
"_meta" => %{
"filtered_for" => state.client_context.connection_type,
"original_count" => length(all_tools),
"filtered_count" => length(filtered_tools),
"session_id" => state.session_id
}
}
}
{:reply, {:text, Jason.encode!(response)}, state}
else
# Client hasn't initialized yet
error_response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"error" => %{
"code" => -32002,
"message" => "Client must initialize first"
}
}
{:reply, {:text, Jason.encode!(error_response)}, state}
end
end
defp handle_tool_call(message, state) do
if state.client_context do
tool_name = get_in(message, ["params", "name"])
# Check if tool is allowed for this client context
if tool_allowed_for_context?(tool_name, state.client_context) do
# Enhance message with session info
enhanced_message = add_websocket_session_info(message, state)
# Track agent ID if this is a register_agent call
updated_state = maybe_track_agent_id(message, state)
# Forward to MCP server
case MCPServer.handle_mcp_request(enhanced_message) do
response when is_map(response) ->
{:reply, {:text, Jason.encode!(response)}, updated_state}
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
error_response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"error" => %{
"code" => -32603,
"message" => "Internal server error"
}
}
{:reply, {:text, Jason.encode!(error_response)}, updated_state}
end
else
# Tool not allowed for this client
error_response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"error" => %{
"code" => -32601,
"message" => "Tool not available for #{state.client_context.connection_type} clients: #{tool_name}"
}
}
{:reply, {:text, Jason.encode!(error_response)}, state}
end
else
# Client hasn't initialized yet
error_response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"error" => %{
"code" => -32002,
"message" => "Client must initialize first"
}
}
{:reply, {:text, Jason.encode!(error_response)}, state}
end
end
defp handle_initialized_notification(_message, state) do
# Client is ready to receive notifications
Logger.info("WebSocket client initialized: #{state.session_id}")
{:ok, state}
end
defp forward_to_mcp_server(message, state) do
if state.client_context do
enhanced_message = add_websocket_session_info(message, state)
case MCPServer.handle_mcp_request(enhanced_message) do
response when is_map(response) ->
{:reply, {:text, Jason.encode!(response)}, state}
nil ->
# Some notifications don't return responses
{:ok, state}
unexpected ->
Logger.error("Unexpected MCP response: #{inspect(unexpected)}")
{:ok, state}
end
else
error_response = %{
"jsonrpc" => "2.0",
"id" => Map.get(message, "id"),
"error" => %{
"code" => -32002,
"message" => "Client must initialize first"
}
}
{:reply, {:text, Jason.encode!(error_response)}, state}
end
end
defp add_websocket_session_info(message, state) do
# Add session tracking info to the message
params = Map.get(message, "params", %{})
enhanced_params = params
|> Map.put("_session_id", state.session_id)
|> Map.put("_transport", "websocket")
|> Map.put("_client_context", %{
connection_type: state.client_context.connection_type,
security_level: state.client_context.security_level,
session_id: state.session_id
})
Map.put(message, "params", enhanced_params)
end
defp tool_allowed_for_context?(tool_name, client_context) do
all_tools = MCPServer.get_tools()
filtered_tools = ToolFilter.filter_tools(all_tools, client_context)
Enum.any?(filtered_tools, fn tool ->
Map.get(tool, "name") == tool_name
end)
end
defp maybe_track_agent_id(message, state) do
case get_in(message, ["params", "name"]) do
"register_agent" ->
# We'll get the agent_id from the response, but for now mark that we expect one
%{state | agent_id: :pending}
_ ->
state
end
end
defp cleanup_session(state) do
# Unregister agent if one was registered through this session
if state.agent_id && state.agent_id != :pending do
unregister_request = %{
"jsonrpc" => "2.0",
"id" => generate_request_id(),
"method" => "tools/call",
"params" => %{
"name" => "unregister_agent",
"arguments" => %{
"agent_id" => state.agent_id,
"reason" => "WebSocket connection closed"
}
}
}
MCPServer.handle_mcp_request(unregister_request)
end
end
defp generate_request_id do
"ws_req_" <> (:crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower))
end
end