Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
622d0ca
Merge pull request #3172 from simstudioai/fix/notifs
waleedlatif1 Feb 9, 2026
b3dbb44
improvement(jsm): destructured outputs for jsm, jira, and added 1pass…
waleedlatif1 Feb 10, 2026
e5d3049
fix(slack): resolve file metadata via files.info when event payload i…
waleedlatif1 Feb 10, 2026
190f12f
feat(copilot): copilot mcp + server side copilot execution (#3173)
Sg312 Feb 10, 2026
8b4b3af
fix(mcp): harden notification system against race conditions (#3168)
waleedlatif1 Feb 10, 2026
e321f88
improvement(preview): added trigger mode context for deploy preview (…
waleedlatif1 Feb 10, 2026
73540e3
feat(logs): add skill icon to trace spans (#3181)
emir-karabeg Feb 10, 2026
be3cdcf
Merge pull request #3179 from simstudioai/improvement/file-download-t…
icecrasher321 Feb 10, 2026
20b230d
improvement(schema): centralize derivation of block schemas (#3175)
icecrasher321 Feb 11, 2026
c5dd90e
feat(copilot): enterprise configuration (#3184)
Sg312 Feb 11, 2026
f8e9614
improvement(helm): support copilot-only deployments (#3185)
waleedlatif1 Feb 11, 2026
6d16f21
improvement(mcp): improved mcp sse events notifs, update jira to hand…
waleedlatif1 Feb 11, 2026
78fef22
fix(execution): scope execution state per workflow to prevent cross-w…
waleedlatif1 Feb 11, 2026
f5dc180
fix(memory): upgrade bun from 1.3.3 to 1.3.9 (#3186)
waleedlatif1 Feb 11, 2026
c471627
fix(posthog): replace proxy rewrite with route handler for reliable b…
waleedlatif1 Feb 11, 2026
8a24b56
improvement(terminal): increase workflow logs limit from 1k to 5k per…
waleedlatif1 Feb 11, 2026
af01dce
fix(terminal): subflow logs rendering (#3189)
icecrasher321 Feb 11, 2026
13a9111
fix(logs): surface handled errors as info in logs (#3190)
waleedlatif1 Feb 11, 2026
3d5bd00
fix(triggers): add copilot as a trigger type (#3191)
waleedlatif1 Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(mcp): harden notification system against race conditions (#3168)
* fix(mcp): harden notification system against race conditions

- Guard concurrent connect() calls in connection manager with connectingServers Set
- Suppress post-disconnect notification handler firing in MCP client
- Clean up Redis event listeners in pub/sub dispose()
- Add tests for all three hardening fixes (11 new tests)

* updated tests

* plugged in new mcp event based system and create sse route to publish notifs

* ack commetns

* fix reconnect timer

* cleanup when running onClose

* fixed spacing on mcp settings tab

* keep error listeners before quiet in redis
  • Loading branch information
waleedlatif1 authored Feb 10, 2026
commit 8b4b3af1208a67c20c58e2f61135d35636aaf85f
98 changes: 98 additions & 0 deletions apps/sim/app/api/mcp/events/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Tests for MCP SSE events endpoint
*
* @vitest-environment node
*/
import { createMockRequest, mockAuth, mockConsoleLogger } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

mockConsoleLogger()
const auth = mockAuth()

const mockGetUserEntityPermissions = vi.fn()
vi.doMock('@/lib/workspaces/permissions/utils', () => ({
getUserEntityPermissions: mockGetUserEntityPermissions,
}))

vi.doMock('@/lib/mcp/connection-manager', () => ({
mcpConnectionManager: null,
}))

vi.doMock('@/lib/mcp/pubsub', () => ({
mcpPubSub: null,
}))

const { GET } = await import('./route')

describe('MCP Events SSE Endpoint', () => {
beforeEach(() => {
vi.clearAllMocks()
})

it('returns 401 when session is missing', async () => {
auth.setUnauthenticated()

const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)

const response = await GET(request as any)

expect(response.status).toBe(401)
const text = await response.text()
expect(text).toBe('Unauthorized')
})

it('returns 400 when workspaceId is missing', async () => {
auth.setAuthenticated()

const request = createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/mcp/events')

const response = await GET(request as any)

expect(response.status).toBe(400)
const text = await response.text()
expect(text).toBe('Missing workspaceId query parameter')
})

it('returns 403 when user lacks workspace access', async () => {
auth.setAuthenticated()
mockGetUserEntityPermissions.mockResolvedValue(null)

const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)

const response = await GET(request as any)

expect(response.status).toBe(403)
const text = await response.text()
expect(text).toBe('Access denied to workspace')
expect(mockGetUserEntityPermissions).toHaveBeenCalledWith('user-123', 'workspace', 'ws-123')
})

it('returns SSE stream when authorized', async () => {
auth.setAuthenticated()
mockGetUserEntityPermissions.mockResolvedValue({ read: true })

const request = createMockRequest(
'GET',
undefined,
{},
'http://localhost:3000/api/mcp/events?workspaceId=ws-123'
)

const response = await GET(request as any)

expect(response.status).toBe(200)
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
expect(response.headers.get('Cache-Control')).toBe('no-cache')
expect(response.headers.get('Connection')).toBe('keep-alive')
})
})
111 changes: 111 additions & 0 deletions apps/sim/app/api/mcp/events/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* SSE endpoint for MCP tool-change events.
*
* Pushes `tools_changed` events to the browser when:
* - An external MCP server sends `notifications/tools/list_changed` (via connection manager)
* - A workflow CRUD route modifies workflow MCP server tools (via pub/sub)
*
* Auth is handled via session cookies (EventSource sends cookies automatically).
*/

import { createLogger } from '@sim/logger'
import type { NextRequest } from 'next/server'
import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'

const logger = createLogger('McpEventsSSE')

export const dynamic = 'force-dynamic'

const HEARTBEAT_INTERVAL_MS = 30_000

export async function GET(request: NextRequest) {
const session = await getSession()
if (!session?.user?.id) {
return new Response('Unauthorized', { status: 401 })
}

const { searchParams } = new URL(request.url)
const workspaceId = searchParams.get('workspaceId')
if (!workspaceId) {
return new Response('Missing workspaceId query parameter', { status: 400 })
}

const permissions = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (!permissions) {
return new Response('Access denied to workspace', { status: 403 })
}

const encoder = new TextEncoder()
const unsubscribers: Array<() => void> = []

const stream = new ReadableStream({
start(controller) {
const send = (eventName: string, data: Record<string, unknown>) => {
try {
controller.enqueue(
encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`)
)
} catch {
// Stream already closed
}
}

// Subscribe to external MCP server tool changes
if (mcpConnectionManager) {
const unsub = mcpConnectionManager.subscribe((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'external',
serverId: event.serverId,
timestamp: event.timestamp,
})
})
unsubscribers.push(unsub)
}

// Subscribe to workflow CRUD tool changes
if (mcpPubSub) {
const unsub = mcpPubSub.onWorkflowToolsChanged((event) => {
if (event.workspaceId !== workspaceId) return
send('tools_changed', {
source: 'workflow',
serverId: event.serverId,
timestamp: Date.now(),
})
})
unsubscribers.push(unsub)
}

// Heartbeat to keep the connection alive
const heartbeat = setInterval(() => {
try {
controller.enqueue(encoder.encode(': heartbeat\n\n'))
} catch {
clearInterval(heartbeat)
}
}, HEARTBEAT_INTERVAL_MS)
unsubscribers.push(() => clearInterval(heartbeat))

// Cleanup when client disconnects
request.signal.addEventListener('abort', () => {
for (const unsub of unsubscribers) {
unsub()
}
try {
controller.close()
} catch {
// Already closed
}
logger.info(`SSE connection closed for workspace ${workspaceId}`)
})

logger.info(`SSE connection opened for workspace ${workspaceId}`)
},
})

return new Response(stream, { headers: SSE_HEADERS })
}
3 changes: 3 additions & 0 deletions apps/sim/app/api/mcp/workflow-servers/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'

const logger = createLogger('WorkflowMcpServerAPI')
Expand Down Expand Up @@ -146,6 +147,8 @@ export const DELETE = withMcpAuth<RouteParams>('admin')(

logger.info(`[${requestId}] Successfully deleted workflow MCP server: ${serverId}`)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ message: `Server ${serverId} deleted successfully` })
} catch (error) {
logger.error(`[${requestId}] Error deleting workflow MCP server:`, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'

Expand Down Expand Up @@ -115,6 +116,8 @@ export const PATCH = withMcpAuth<RouteParams>('write')(

logger.info(`[${requestId}] Successfully updated tool ${toolId}`)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ tool: updatedTool })
} catch (error) {
logger.error(`[${requestId}] Error updating tool:`, error)
Expand Down Expand Up @@ -160,6 +163,8 @@ export const DELETE = withMcpAuth<RouteParams>('write')(

logger.info(`[${requestId}] Successfully deleted tool ${toolId}`)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ message: `Tool ${toolId} deleted successfully` })
} catch (error) {
logger.error(`[${requestId}] Error deleting tool:`, error)
Expand Down
3 changes: 3 additions & 0 deletions apps/sim/app/api/mcp/workflow-servers/[id]/tools/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
Expand Down Expand Up @@ -188,6 +189,8 @@ export const POST = withMcpAuth<RouteParams>('write')(
`[${requestId}] Successfully added tool ${toolName} (workflow: ${body.workflowId}) to server ${serverId}`
)

mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })

return createMcpSuccessResponse({ tool }, 201)
} catch (error) {
logger.error(`[${requestId}] Error adding tool:`, error)
Expand Down
5 changes: 5 additions & 0 deletions apps/sim/app/api/mcp/workflow-servers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { eq, inArray, sql } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
Expand Down Expand Up @@ -174,6 +175,10 @@ export const POST = withMcpAuth('write')(
`[${requestId}] Added ${addedTools.length} tools to server ${serverId}:`,
addedTools.map((t) => t.toolName)
)

if (addedTools.length > 0) {
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
}
}

logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ import {
type CustomTool as CustomToolDefinition,
useCustomTools,
} from '@/hooks/queries/custom-tools'
import { useForceRefreshMcpTools, useMcpServers, useStoredMcpTools } from '@/hooks/queries/mcp'
import {
useForceRefreshMcpTools,
useMcpServers,
useMcpToolsEvents,
useStoredMcpTools,
} from '@/hooks/queries/mcp'
import {
useChildDeploymentStatus,
useDeployChildWorkflow,
Expand Down Expand Up @@ -1035,6 +1040,7 @@ export const ToolInput = memo(function ToolInput({
const { data: mcpServers = [], isLoading: mcpServersLoading } = useMcpServers(workspaceId)
const { data: storedMcpTools = [] } = useStoredMcpTools(workspaceId)
const forceRefreshMcpTools = useForceRefreshMcpTools()
useMcpToolsEvents(workspaceId)
const openSettingsModal = useSettingsModalStore((state) => state.openModal)
const mcpDataLoading = mcpLoading || mcpServersLoading

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,14 +894,14 @@ export function MCP({ initialServerId }: MCPProps) {
disabled={!hasParams}
>
<div className='flex-1'>
<div className='flex items-center gap-[8px]'>
<p className='font-medium text-[13px] text-[var(--text-primary)]'>
<div className='flex h-[16px] items-center gap-[6px]'>
<p className='font-medium text-[13px] text-[var(--text-primary)] leading-none'>
{tool.name}
</p>
{issues.length > 0 && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<div>
<div className='flex items-center'>
<Badge
variant={getIssueBadgeVariant(issues[0].issue)}
size='sm'
Expand Down
Loading
Loading