From b69cce62377ce2d382920227c278c6ea628d3d70 Mon Sep 17 00:00:00 2001 From: randy1568 Date: Wed, 15 Oct 2025 17:38:11 +0800 Subject: [PATCH] feat: add A2A Client Presenter --- package.json | 4 +- .../presenter/A2APresenter/A2AClientAction.ts | 319 ++++++++++++++++++ src/main/presenter/A2APresenter/index.ts | 116 +++++++ .../presenter/A2APresenter/serverManager.ts | 43 +++ src/main/presenter/A2APresenter/types.ts | 113 +++++++ .../types/presenters/legacy.presenters.d.ts | 18 + 6 files changed, 612 insertions(+), 1 deletion(-) create mode 100644 src/main/presenter/A2APresenter/A2AClientAction.ts create mode 100644 src/main/presenter/A2APresenter/index.ts create mode 100644 src/main/presenter/A2APresenter/serverManager.ts create mode 100644 src/main/presenter/A2APresenter/types.ts diff --git a/package.json b/package.json index 12e9653..a2e30af 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "cleanRuntime": "rm -rf runtime/uv runtime/bun runtime/node" }, "dependencies": { + "@a2a-js/sdk": "^0.3.4", "@anthropic-ai/sdk": "^0.53.0", "@aws-sdk/client-bedrock": "^3.879.0", "@aws-sdk/client-bedrock-runtime": "^3.879.0", @@ -92,6 +93,7 @@ "tokenx": "^0.4.1", "turndown": "^7.2.1", "undici": "^7.15.0", + "uuid": "^13.0.0", "xlsx": "https://cdn.sheetjs.com/xlsx-0.20.3/xlsx-0.20.3.tgz", "xml2js": "^0.6.2", "zod": "^3.25.76" @@ -194,4 +196,4 @@ "vue-demi" ] } -} +} \ No newline at end of file diff --git a/src/main/presenter/A2APresenter/A2AClientAction.ts b/src/main/presenter/A2APresenter/A2AClientAction.ts new file mode 100644 index 0000000..7d5ffbf --- /dev/null +++ b/src/main/presenter/A2APresenter/A2AClientAction.ts @@ -0,0 +1,319 @@ +import { A2AClient } from '@a2a-js/sdk/client' +import type { + Task, + MessageSendParams, + AgentCard, + Message, + TaskStatusUpdateEvent, + TaskArtifactUpdateEvent, + JSONRPCErrorResponse, + SendMessageSuccessResponse, + Part, + DataPart, + TextPart, + FilePart, + Artifact +} from '@a2a-js/sdk' + +import { A2AResponseData, A2APart, A2AArtifact, A2A_INNER_ERROR_CODE } from './types' + +export type { Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, AgentCard } + +export class A2AClientAction { + sdkClient: A2AClient + agentCardUrl: string = '' + agentCard: AgentCard | undefined = undefined + constructor( + serverUrl: string, + private timeout: number = 120000 + ) { + this.sdkClient = new A2AClient(serverUrl) + + if (serverUrl.endsWith('/.well-known/agent-card.json')) { + this.agentCardUrl = serverUrl + } else { + this.agentCardUrl = serverUrl + '/.well-known/agent-card.json' + } + + this.initialize(serverUrl).catch((error) => { + console.error(`[A2A] Failed to initialize client:`, error.message) + throw error + }) + } + + async initialize(serverUrl: string): Promise { + await this.fetchAgentCard(serverUrl) + } + /** + * get agentCard + */ + async fetchAgentCard(serverUrl: string): Promise { + try { + console.log(`[A2A] Fetching agent card ${serverUrl}`) + + // Get agent card from client + this.agentCard = await this.sdkClient.getAgentCard() + } catch (error) { + console.error(`[A2A] Failed to connect to server ${serverUrl}:`, error) + throw error + } + } + + /** + * Check if client is connected + */ + async isConnected(): Promise { + const response = await fetch(this.agentCardUrl) + if (response.ok) { + console.log('[A2A] JS Server is connected') + return true + } else { + console.error('[A2A] JS Server response error:', response.status) + } + return false + } + + async getAgentCard(): Promise { + if (this.agentCard == undefined) { + await this.fetchAgentCard(this.agentCardUrl) + } + return this.agentCard as AgentCard + } + /** + * Send a message and create a task (non-streaming) + */ + async sendMessage(params: MessageSendParams): Promise { + if (!(await this.isConnected())) { + throw new Error('Client not connected') + } + try { + const sendParams: MessageSendParams = { + message: params.message, + configuration: { + blocking: true // Non-streaming mode + } + } + console.log(`[A2A] Sending message`) + let timeoutHandle: NodeJS.Timeout | null = null + const response = await Promise.race([ + this.sdkClient.sendMessage(sendParams), + new Promise( + (_, reject) => + (timeoutHandle = setTimeout( + () => reject(new Error(`[A2A] sendMessage timed out after ${this.timeout} ms`)), + this.timeout + )) + ) + ]).finally(() => { + if (timeoutHandle) { + clearTimeout(timeoutHandle) + timeoutHandle = null + } + }) + // Check if response is error + if (typeof response === 'object' && response !== null && 'error' in response) { + const errorResponse = response as JSONRPCErrorResponse + console.error(`[A2A] Error: ${errorResponse.error.message}`) + return { + kind: 'error', + serverName: this?.agentCard?.name || '', + error: { + code: errorResponse.error.code, + message: errorResponse.error.message + } + } + } + // Get task from result + let successResponse = response as SendMessageSuccessResponse + return this.formatEventToResponse(successResponse.result) + } catch (error) { + const errorResponse: A2AResponseData = { + kind: 'error', + serverName: this?.agentCard?.name || '', + error: { + code: A2A_INNER_ERROR_CODE.STREAMING_MESSAGE_ERROR, + message: `send message error: ${error instanceof Error ? error.message : String(error)}` + } + } + return errorResponse + } + } + + /** + * Send a streaming message + */ + async *sendStreamingMessage(params: MessageSendParams): AsyncGenerator { + if (!(await this.isConnected())) { + throw new Error('Client not connected') + } + try { + console.log(`[A2A] Starting streaming message`) + const streamIterator = this.sdkClient.sendMessageStream(params) + // Helper function to race event with timeout + async function nextWithTimeout( + iterator: AsyncIterator, + ms: number + ): Promise> { + let timeoutHandle: NodeJS.Timeout | null = null + + const timeoutPromise = new Promise>((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`[A2A] Streaming message timed out after ${ms} ms`)) + }, ms) + }) + return Promise.race([iterator.next(), timeoutPromise]).finally(() => { + if (timeoutHandle) { + clearTimeout(timeoutHandle) + timeoutHandle = null + } + }) + } + // 处理流式事件 + while (true) { + const result = await nextWithTimeout(streamIterator, this.timeout) + if (result.done) { + console.log(`[A2A] Stream completed`) + break + } + const event = result.value + // 将事件转换为统一的 A2AResponseData 格式 + const formatted = this.formatEventToResponse(event) + yield formatted + } + streamIterator.return() + } catch (error) { + const errorResponse: A2AResponseData = { + kind: 'error', + serverName: this?.agentCard?.name || '', + error: { + code: A2A_INNER_ERROR_CODE.STREAMING_MESSAGE_ERROR, + message: `Streaming message error: ${error instanceof Error ? error.message : String(error)}` + } + } + yield errorResponse + } + } + + /** + * Cancel a task + */ + async cancelTask(taskId: string): Promise { + if (!(await this.isConnected())) { + throw new Error('Client not connected') + } + try { + await this.sdkClient.cancelTask({ id: taskId }) + + console.log(`[A2A] Task cancelled: ${taskId}`) + } catch (error) { + console.error('[A2A] Failed to cancel task:', error) + throw error + } + } + + private formatEventToResponse( + event: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent + ): A2AResponseData { + if (event.kind === 'message') { + return this.resolveMessage(event as Message) + } else if (event.kind === 'task') { + return this.resolveTask(event as Task) + } else if (event.kind === 'status-update') { + return this.resolveTaskStatusUpdate(event as TaskStatusUpdateEvent) + } else if (event.kind === 'artifact-update') { + return this.resolveTaskArtifactUpdate(event as TaskArtifactUpdateEvent) + } + throw new Error('Unknown event type') + } + + private resolveMessage(message: Message): A2AResponseData { + return { + kind: 'message', + serverName: this?.agentCard?.name || '', + contextId: message.contextId, + message: { + parts: this.convertParts(message.parts) + } + } + } + + private resolveTask(task: Task): A2AResponseData { + return { + kind: 'task', + serverName: this?.agentCard?.name || '', + contextId: task.contextId, + taskId: task.id, + task: { + status: { + state: task.status.state, + parts: task.status.message?.parts ? this.convertParts(task.status.message.parts) : [] + }, + artifacts: task.artifacts ? this.convertArtifacts(task.artifacts) : [] + } + } + } + + private resolveTaskStatusUpdate(event: TaskStatusUpdateEvent): A2AResponseData { + return { + kind: 'status-update', + serverName: this?.agentCard?.name || '', + contextId: event.contextId, + taskId: event.taskId, + statusUpdate: { + status: { + state: event.status.state, + parts: event.status.message?.parts ? this.convertParts(event.status.message.parts) : [] + }, + final: event.final + } + } + } + + private resolveTaskArtifactUpdate(event: TaskArtifactUpdateEvent): A2AResponseData { + return { + kind: 'artifact-update', + serverName: this?.agentCard?.name || '', + contextId: event.contextId, + taskId: event.taskId, + artifactUpdate: { + artifact: { + name: event.artifact?.name || '', + parts: event.artifact.parts ? this.convertParts(event.artifact.parts) : [] + }, + ...(event?.append ? { append: event.append } : {}), + ...(event?.lastChunk ? { lastChunk: event.lastChunk } : {}) + } + } + } + + private convertParts(parts: Part[]): A2APart[] { + const result: A2APart[] = [] + for (const part of parts || []) { + if (part.kind === 'text') { + result.push({ type: 'text', text: (part as TextPart).text }) + } else if (part.kind === 'data') { + result.push({ type: 'data', data: (part as DataPart).data }) + } else if (part.kind === 'file') { + const fp = part as FilePart + const file: any = (fp as any).file || {} + result.push({ + type: 'file', + file: { + name: file.name, + mimeType: file.mimeType, + uri: file.uri, + bytes: file.bytes + } + }) + } + } + return result + } + + private convertArtifacts(artifacts: Artifact[]): A2AArtifact[] { + return artifacts.map((artifact) => ({ + name: artifact?.name || '', + parts: artifact.parts ? this.convertParts(artifact.parts) : [] + })) + } +} diff --git a/src/main/presenter/A2APresenter/index.ts b/src/main/presenter/A2APresenter/index.ts new file mode 100644 index 0000000..9875117 --- /dev/null +++ b/src/main/presenter/A2APresenter/index.ts @@ -0,0 +1,116 @@ +/** + * A2A Presenter Implementation + * Main interface exposing all A2A operations and integrating with the existing system + */ + +import { serverManager } from './serverManager' +import type { MessageSendParams, AgentCard } from '@a2a-js/sdk' + +// Local interfaces for presenter API +export interface TaskQueryParams { + taskId: string + contextId?: string +} + +export interface TaskIdParams { + taskId: string +} +import { IA2APresenter } from '@shared/presenter' +import { A2AClientAction } from './A2AClientAction' +import { A2AResponseData } from './types' + +export class A2APresenter implements IA2APresenter { + private manager: serverManager + + constructor() { + this.manager = new serverManager() + } + + /** + * Get all A2A server configurations + */ + async getA2AServers(): Promise> { + return this.manager.getA2AServers() + } + + /** + * Add a new A2A server + */ + async addA2AServer(serverID: string): Promise { + try { + // Check if server already exists + const existingServers = await this.getA2AServers() + if (existingServers[serverID]) { + console.error(`[A2A] Failed to add A2A server: Server "${serverID}" already exists.`) + return false + } + + this.manager.addA2AServer(serverID) + console.log(`[A2A] Added server: ${serverID}`) + return true + } catch (error) { + console.error(`[A2A] Failed to add server ${serverID}:`, error) + throw error + } + } + /** + * Remove an A2A server + */ + async removeA2AServer(serverID: string): Promise { + try { + this.manager.removeA2AServer(serverID) + console.log(`[A2A] Removed server: ${serverID}`) + } catch (error) { + console.error(`[A2A] Failed to remove server ${serverID}:`, error) + throw error + } + } + + /** + * Check if a server is running + */ + async isServerRunning(serverID: string): Promise { + return await this.manager.isA2AServerRunning(serverID) + } + + /** + * Send a message to an A2A server + */ + async sendMessage( + serverID: string, + params: MessageSendParams + ): Promise> { + const client = this.manager.getA2AClient(serverID) + if (!client) { + throw new Error(`A2A server '${serverID}' is not running`) + } + const agentCard = await client.getAgentCard() + const isStreaming = agentCard.capabilities?.streaming === true + if (isStreaming) { + return client.sendStreamingMessage(params) + } else { + return client.sendMessage(params) + } + } + /** + * Cancel a task + */ + async cancelTask(serverID: string, taskID: string): Promise { + const client = this.manager.getA2AClient(serverID) + if (!client) { + throw new Error(`A2A server '${serverID}' is not running`) + } + await client.cancelTask(taskID) + } + + /** + * Get agent card for a server + */ + async getAgentCard(serverName: string): Promise { + const client = this.manager.getA2AClient(serverName) + if (!client) { + throw new Error(`A2A server '${serverName}' is not running`) + } + return await client.getAgentCard() + } +} diff --git a/src/main/presenter/A2APresenter/serverManager.ts b/src/main/presenter/A2APresenter/serverManager.ts new file mode 100644 index 0000000..3a31697 --- /dev/null +++ b/src/main/presenter/A2APresenter/serverManager.ts @@ -0,0 +1,43 @@ +import { A2AClientAction } from './A2AClientAction' + +export class serverManager { + private clients: Map = new Map() + + /** + * Add a new A2A server + */ + addA2AServer(serverID: string) { + if (this.clients.has(serverID)) { + throw new Error(`A2A server ${serverID} already exists`) + } + this.clients.set(serverID, new A2AClientAction(serverID)) + } + + /** + * Remove an A2A server + */ + removeA2AServer(serverID: string) { + this.clients.delete(serverID) + } + + getA2AServers(): Record { + return Object.fromEntries(this.clients.entries()) + } + /** + * Check if a server is running + */ + async isA2AServerRunning(serverID: string): Promise { + if (!this.clients.has(serverID)) { + return false + } + const client = this.clients.get(serverID) + return client ? await client.isConnected() : false + } + + /** + * Get a running client by name + */ + getA2AClient(serverID: string): A2AClientAction | undefined { + return this.clients.get(serverID) + } +} diff --git a/src/main/presenter/A2APresenter/types.ts b/src/main/presenter/A2APresenter/types.ts new file mode 100644 index 0000000..3832006 --- /dev/null +++ b/src/main/presenter/A2APresenter/types.ts @@ -0,0 +1,113 @@ +/** + * A2A Presenter Type Definitions + * + * This file contains all custom types and interfaces used by the A2A Presenter. + * It also re-exports necessary types from the @a2a-js/sdk for convenience. + */ + +// Re-export SDK types +export type { + AgentCard, + Task, + TaskState, + TaskStatusUpdateEvent, + TaskArtifactUpdateEvent, + Message, + MessageSendParams, + Part, + TextPart, + FilePart, + DataPart, + JSONRPCErrorResponse +} from '@a2a-js/sdk' + +export enum A2A_INNER_ERROR_CODE { + MESSAGE_ERROR = -1, + STREAMING_MESSAGE_ERROR = -2 +} + +/** + * Unified response data format for A2A interactions + * Used for frontend display and event propagation + */ +export interface A2AResponseData { + /** Response type discriminator */ + kind: 'message' | 'task' | 'status-update' | 'artifact-update' | 'error' + /** Timestamp when this response was created */ + /** Name of the A2A server that generated this response */ + serverName: string + contextId?: string + taskId?: string + + /** Message data (when type is 'message') */ + message?: { + parts: A2APart[] + } + + /** Task data (when type is 'task') */ + task?: { + status: { + state: string // TaskState + parts?: A2APart[] + } + artifacts?: A2AArtifact[] + } + + /** Status update data (when type is 'task-status-update') */ + statusUpdate?: { + status: { + state: string // TaskState + parts?: A2APart[] + } + final: boolean + } + + /** Artifact update data (when type is 'task-artifact-update') */ + artifactUpdate?: { + artifact: A2AArtifact + /** If true, the content of this artifact should be appended to a previously sent artifact with the same ID. */ + append?: boolean + /** If true, this is the final chunk of the artifact. */ + lastChunk?: boolean + } + + /** Error data (when type is 'error') */ + error?: { + code: number + message: string + data?: unknown + } +} + +/** + * Simplified part representation for frontend consumption + */ +export interface A2APart { + /** Part type */ + type: 'text' | 'data' | 'file' + /** Text content (for text parts) */ + text?: string + /** Structured data (for data parts) */ + data?: unknown + /** File information (for file parts) */ + file?: { + name?: string + mimeType?: string + uri?: string + bytes?: string // base64 encoded + } +} + +/** + * Artifact representation for frontend consumption + */ +export interface A2AArtifact { + /** + * An optional, human-readable name for the artifact. + */ + name?: string + /** + * An array of content parts that make up the artifact. + */ + parts: A2APart[] +} diff --git a/src/shared/types/presenters/legacy.presenters.d.ts b/src/shared/types/presenters/legacy.presenters.d.ts index d48e612..ff72744 100644 --- a/src/shared/types/presenters/legacy.presenters.d.ts +++ b/src/shared/types/presenters/legacy.presenters.d.ts @@ -5,6 +5,7 @@ import { ShowResponse } from 'ollama' import { ShortcutKeySetting } from '@/presenter/configPresenter/shortcutKeySettings' import { ModelType } from '@shared/model' import { ProviderChange, ProviderBatchUpdate } from './provider-operations' +import { A2AServerConfig } from '@/presenter/A2APresenter/serverManager' export type SQLITE_MESSAGE = { id: string @@ -692,6 +693,23 @@ export interface ILlmProviderPresenter { ): Promise } +export interface IA2APresenter { + // Server management + getA2AServers(): Promise> + addA2AServer(name: string, config: A2AServerConfig): Promise + removeA2AServer(name: string): Promise + + // Server lifecycle + isServerRunning(name: string): Promise + + // Task operations + // sendMessage(serverName: string, params: MessageSendParams): Promise + cancelTask(serverName: string, params: TaskIdParams): Promise + + // Agent information + getAgentCard(serverName: string): Promise +} + export type CONVERSATION_SETTINGS = { systemPrompt: string temperature: number -- Gitee