Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ func (c *DaemonCmd) run(_ *cobra.Command, _ []string) error {

select {
case <-daemonCtx.Done():
logger.Info("Shutting down daemon")
logger.Info("Shutting down daemon...")
err := <-runErr // Wait for cleanup and deferred logging.
return err // Graceful Ctrl+C / SIGTERM.
logger.Info("Shutdown complete")
return err // Graceful Ctrl+C / SIGTERM.
case err := <-runErr:
logger.Error("daemon exited with error", "error", err)
return err // Propagate daemon failure.
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/danielgtaylor/huma/v2 v2.34.1
github.com/go-chi/chi/v5 v5.2.2
github.com/hashicorp/go-hclog v1.6.3
github.com/mark3labs/mcp-go v0.33.0
github.com/mark3labs/mcp-go v0.37.0
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
Expand All @@ -16,17 +16,22 @@ require (
)

require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.13.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/cast v1.9.2 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
golang.org/x/sys v0.33.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
Expand Down
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/cpuguy83/go-md2man/v2 v2.0.7 h1:zbFlGlXEAKlwXpmvle3d8Oe3YnkKIK4xSRTd3sHPnBo=
github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
Expand All @@ -23,15 +27,20 @@ github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB1
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E=
github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mark3labs/mcp-go v0.33.0 h1:naxhjnTIs/tyPZmWUZFuG0lDmdA6sUyYGGf3gsHvTCc=
github.com/mark3labs/mcp-go v0.33.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mark3labs/mcp-go v0.37.0 h1:BywvZLPRT6Zx6mMG/MJfxLSZQkTGIcJSEGKsvr4DsoQ=
github.com/mark3labs/mcp-go v0.37.0/go.mod h1:T7tUa2jO6MavG+3P25Oy/jR7iCeJPHImCZHRymCn39g=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
Expand All @@ -56,6 +65,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
Expand Down
4 changes: 2 additions & 2 deletions internal/contracts/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type MCPHealthMonitor interface {
// MCPClientAccessor provides a way to interact with MCP servers through a client.
type MCPClientAccessor interface {
// Add registers a client and its tools by server name.
Add(name string, c *client.Client, tools []string)
Add(name string, c client.MCPClient, tools []string)

// Client returns the client for the given server name.
// It returns a boolean to indicate whether the client was found.
Client(name string) (*client.Client, bool)
Client(name string) (client.MCPClient, bool)

// Tools returns the tools for the given server name.
// It returns a boolean to indicate whether the tools were found.
Expand Down
8 changes: 4 additions & 4 deletions internal/daemon/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ import (
// It is safe for concurrent use by multiple goroutines.
type ClientManager struct {
mu sync.RWMutex
clients map[string]*client.Client
clients map[string]client.MCPClient
serverTools map[string][]string
}

// NewClientManager creates an empty, concurrency-safe ClientManager.
func NewClientManager() *ClientManager {
return &ClientManager{
clients: make(map[string]*client.Client),
clients: make(map[string]client.MCPClient),
serverTools: make(map[string][]string),
}
}

// Add registers a client and its tools by server name.
// This method is safe for concurrent use.
func (cm *ClientManager) Add(name string, c *client.Client, tools []string) {
func (cm *ClientManager) Add(name string, c client.MCPClient, tools []string) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.clients[name] = c
Expand All @@ -34,7 +34,7 @@ func (cm *ClientManager) Add(name string, c *client.Client, tools []string) {
// Client returns the client for the given server name.
// It returns a boolean to indicate whether the client was found.
// This method is safe for concurrent use.
func (cm *ClientManager) Client(name string) (*client.Client, bool) {
func (cm *ClientManager) Client(name string) (client.MCPClient, bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
c, ok := cm.clients[name]
Expand Down
114 changes: 108 additions & 6 deletions internal/daemon/client_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,121 @@
package daemon

import (
"context"
"fmt"
"sync"
"testing"

"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/require"
)

// mockMCPClient is a test implementation of client.MCPClient
type mockMCPClient struct{}

func (m *mockMCPClient) Initialize(ctx context.Context, request mcp.InitializeRequest) (*mcp.InitializeResult, error) {
return nil, nil
}

func (m *mockMCPClient) Ping(ctx context.Context) error {
return nil
}

func (m *mockMCPClient) ListResourcesByPage(
ctx context.Context,
request mcp.ListResourcesRequest,
) (*mcp.ListResourcesResult, error) {
return nil, nil
}

func (m *mockMCPClient) ListResources(
ctx context.Context,
request mcp.ListResourcesRequest,
) (*mcp.ListResourcesResult, error) {
return nil, nil
}

func (m *mockMCPClient) ListResourceTemplatesByPage(
ctx context.Context,
request mcp.ListResourceTemplatesRequest,
) (*mcp.ListResourceTemplatesResult, error) {
return nil, nil
}

func (m *mockMCPClient) ListResourceTemplates(
ctx context.Context,
request mcp.ListResourceTemplatesRequest,
) (*mcp.ListResourceTemplatesResult, error) {
return nil, nil
}

func (m *mockMCPClient) ReadResource(
ctx context.Context,
request mcp.ReadResourceRequest,
) (*mcp.ReadResourceResult, error) {
return nil, nil
}

func (m *mockMCPClient) Subscribe(ctx context.Context, request mcp.SubscribeRequest) error {
return nil
}

func (m *mockMCPClient) Unsubscribe(ctx context.Context, request mcp.UnsubscribeRequest) error {
return nil
}

func (m *mockMCPClient) ListPromptsByPage(
ctx context.Context,
request mcp.ListPromptsRequest,
) (*mcp.ListPromptsResult, error) {
return nil, nil
}

func (m *mockMCPClient) ListPrompts(
ctx context.Context,
request mcp.ListPromptsRequest,
) (*mcp.ListPromptsResult, error) {
return nil, nil
}

func (m *mockMCPClient) GetPrompt(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
return nil, nil
}

func (m *mockMCPClient) ListToolsByPage(
ctx context.Context,
request mcp.ListToolsRequest,
) (*mcp.ListToolsResult, error) {
return nil, nil
}

func (m *mockMCPClient) ListTools(ctx context.Context, request mcp.ListToolsRequest) (*mcp.ListToolsResult, error) {
return nil, nil
}

func (m *mockMCPClient) CallTool(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return nil, nil
}

func (m *mockMCPClient) SetLevel(ctx context.Context, request mcp.SetLevelRequest) error {
return nil
}

func (m *mockMCPClient) Complete(ctx context.Context, request mcp.CompleteRequest) (*mcp.CompleteResult, error) {
return nil, nil
}

func (m *mockMCPClient) Close() error {
return nil
}

func (m *mockMCPClient) OnNotification(handler func(notification mcp.JSONRPCNotification)) {}

func TestClientManager_Add_Client_Tools(t *testing.T) {
t.Parallel()
cm := NewClientManager()

c := &client.Client{}
c := &mockMCPClient{}
tools := []string{"tool1", "tool2"}
name := "server1"

Expand All @@ -34,8 +136,8 @@ func TestClientManager_List(t *testing.T) {
t.Parallel()
cm := NewClientManager()

cm.Add("server1", &client.Client{}, []string{"a"})
cm.Add("server2", &client.Client{}, []string{"b"})
cm.Add("server1", &mockMCPClient{}, []string{"a"})
cm.Add("server2", &mockMCPClient{}, []string{"b"})

names := cm.List()
require.Len(t, names, 2)
Expand All @@ -46,7 +148,7 @@ func TestClientManager_Remove(t *testing.T) {
t.Parallel()
cm := NewClientManager()

cm.Add("server1", &client.Client{}, []string{"tool"})
cm.Add("server1", &mockMCPClient{}, []string{"tool"})
cm.Remove("server1")

_, ok := cm.Client("server1")
Expand Down Expand Up @@ -83,7 +185,7 @@ func TestClientManager_ConcurrentAccess(t *testing.T) {
name := fmt.Sprintf("server-%d", i)
go func() {
defer wg.Done()
cm.Add(name, &client.Client{}, []string{"tool"})
cm.Add(name, &mockMCPClient{}, []string{"tool"})
}()
go func() {
defer wg.Done()
Expand Down
72 changes: 63 additions & 9 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/mozilla-ai/mcpd/v2/internal/runtime"
)

// Daemon manages MCP server lifecycles, client connections, and health monitoring.
// It should only be created using NewDaemon to ensure proper initialization.
type Daemon struct {
apiServer *ApiServer
logger hclog.Logger
Expand Down Expand Up @@ -60,6 +62,8 @@ func NewDaemonOpts(logger hclog.Logger, cfgLoader config.Loader, ctxLoader confi
}, nil
}

// NewDaemon creates a new Daemon instance with proper initialization.
// Use this function instead of directly creating a Daemon struct.
func NewDaemon(apiAddr string, opts *Opts) (*Daemon, error) {
if err := IsValidAddr(apiAddr); err != nil {
return nil, fmt.Errorf("invalid API address '%s': %w", apiAddr, err)
Expand Down Expand Up @@ -109,15 +113,7 @@ func NewDaemon(apiAddr string, opts *Opts) (*Daemon, error) {
// It launches regular health checks on the MCP servers, with statuses visible via API routes.
func (d *Daemon) StartAndManage(ctx context.Context) error {
// Handle clean-up.
defer func() {
d.logger.Info("Shutting down MCP servers and client connections")
for _, n := range d.clientManager.List() {
if c, ok := d.clientManager.Client(n); ok {
d.logger.Info(fmt.Sprintf("Closing client %s", n))
_ = c.Close()
}
}
}()
defer d.closeAllClients()

// Launch servers
if err := d.startMCPServers(ctx); err != nil {
Expand Down Expand Up @@ -430,3 +426,61 @@ func loadConfig(cfgLoader config.Loader, ctxLoader configcontext.Loader) ([]runt

return runtime.AggregateConfigs(cfg, execCtx)
}

// closeAllClients gracefully closes all managed clients with individual timeouts.
// It closes all clients concurrently and waits for all to complete or timeout.
func (d *Daemon) closeAllClients() {
d.logger.Info("Shutting down MCP servers and client connections")

clients := d.clientManager.List()
if len(clients) == 0 {
return
}

var wg sync.WaitGroup
timeout := 5 * time.Second

// Start closing all clients concurrently
for _, n := range clients {
name := n
c, ok := d.clientManager.Client(name)
if !ok {
continue
}

wg.Add(1)
go func() {
defer wg.Done()
d.closeClientWithTimeout(name, c, timeout)
}()
}

wg.Wait()
}

// closeClientWithTimeout closes a single client with a timeout.
func (d *Daemon) closeClientWithTimeout(name string, c client.MCPClient, timeout time.Duration) {
d.logger.Info(fmt.Sprintf("Closing client %s", name))

done := make(chan struct{})
go func() {
err := c.Close()
if err != nil {
// 'errors' can result in things like SIGINT which returns exit code 130,
// we still log the error but only for debugging purposes.
d.logger.Debug("Closing client", "client", name, "error", err)
}
d.logger.Info(fmt.Sprintf("Closed client %s", name))
close(done)
}()

// Wait for this specific client to close or timeout.
// NOTE: this could leak if we just time out clients,
// but since we're exiting mcpd it isn't an issue.
select {
case <-done:
// Closed successfully.
case <-time.After(timeout):
d.logger.Warn(fmt.Sprintf("Timeout (%s) closing client %s", timeout.String(), name))
}
}
Loading