您现在的位置是:网站首页> AI人工智能

AI模型调用MCP服务及实用MCP服务

摘要

AI模型调用MCP服务及实用MCP服务

***Agent让调用模型变得简单***

***Python编写MCP一般使用FastMCP,支持发布为studio sse streamable_http***

方便调用AI模型的Agent编程框架

AI入门编程推荐隔壁的程序员老王,点击进入

***采用UVX NPX方式调用的MCP若没有安装则会自动从仓库安装***


1.jpg


客户端将MCP工具列表和用户问题-》大语言模型-》需要调用MCP的信息返回给调用模型的客户端,客户端调用指定MCP->客户端获得MCP结果-》大语言模型-》返回给最终客户



AI学习MCP

AI辅助生成MCP例子

MCP协议:AI界的USB-C统一接口,从知识点到实践,从使用别人的MCP到自己编写部署MCP,cherry-studio使用链接MCP

UV与Conda工具使用收集

UVX NPX介绍

Awesome MCP Servers

mcp.so

Cherry Studio 新版本 1.2.9 一键式添加MCP服务器

CherryStudio的MCP配置

详细介绍NPX或UVX配置MCP服务

MCP客户端神器全收录

C# .NET4.5用 IHttpHandler如何编写SSE服务

MCP协议官网

MCP相关视频

MCP服务相关技术收集MCP

MCP协议详细讲解

AI 模型调用 MCP 服务的过程

AI 模型调用 MCP 服务的代码示例

C# 编写 Agent 调用 ollama 及 MCP 的方法

Trae+Chrome MCP Server 让AI接管你的浏览器

MCP典型服务

Chrome MCP Server

Chrome MCP官方

mcp2mqtt动动嘴就可以控制智能家居

excel-mcp-server使用 AI 代理创建、读取和修改 Excel 工作簿

Windows MCP.Net基于 .NET 的 Windows 桌面自动化 MCP (Model Context Protocol) 服务器

Windows-MCP用于 Windows 计算机的轻量级 MCP 服务器

大模型实战 | Dify+Chrome MCP打造网页自动化AI助手

AI辅助生成MCP例子




AI学习MCP

如何源码方式安装npx的 AI MCP

stdio 方式部署的MCP 如何调用

sse 方式部署的MCP 如何调用

StreamableHTTP 方式部署的MCP 如何调用

####【豆包】

Q:如何源码方式安装npx的 AI MCP

A:...

若需要从源码方式安装并通过npx调用 AI MCP(模型组件包),通常需要手动克隆源码仓库、进行本地构建,再通过npx引用本地路径。以下是通用步骤:

1. 克隆源码仓库

首先获取 AI MCP 的源代码(假设托管在 Git 仓库):

bash

# 克隆仓库(替换为实际仓库地址)

git clone https://github.com/example/ai-mcp.git

cd ai-mcp  # 进入源码目录

2. 安装依赖并构建

源码通常需要安装依赖并执行构建(如编译 TypeScript、打包等):

bash

# 安装项目依赖

npm install


# 执行构建(根据项目脚本调整,常见命令如下)

npm run build  # 或 npm run compile

如果项目有特殊构建步骤(如需要 Python 环境、额外编译工具等),需参考源码中的README文档。

3. 链接本地包(可选)

若希望在全局环境中通过包名调用,可使用npm link创建本地链接:

bash

# 在源码目录执行,将本地包链接到全局

npm link


# 此时可直接通过包名调用(无需npx)

ai-mcp --version  # 验证是否链接成功

4. 通过 npx 调用本地源码

如果不希望全局链接,可直接通过npx引用本地路径:

bash

# 在源码目录外,通过相对路径调用

npx ./path/to/ai-mcp [命令参数]


# 例如,从源码目录的父级调用

npx ./ai-mcp --input=test.txt

5. 验证安装

执行简单命令验证功能是否正常:

bash

# 查看版本(根据组件实际命令调整)

npx ./ai-mcp --version


# 运行示例功能

npx ./ai-mcp --demo

注意事项

依赖缺失:若构建或运行时报错,检查是否缺少系统级依赖(如python、make等),根据错误提示安装。

权限问题:npm link可能需要管理员权限(Linux/macOS 加sudo,Windows 用管理员终端)。

更新源码:后续源码有更新时,需重新拉取代码并执行npm install和npm run build。


####【豆包】

Q:stdio 方式部署的MCP 如何调用

A:

import subprocess

import json

import sys


def call_mcp(request_data):

    # 启动MCP进程(假设MCP是可执行程序)

    mcp_process = subprocess.Popen(

        ["./mcp_service"],  # MCP程序路径

        stdin=subprocess.PIPE,

        stdout=subprocess.PIPE,

        stderr=subprocess.PIPE,

        text=True  # 以文本模式通信(避免二进制处理)

    )

    

    try:

        # 序列化请求数据并发送到MCP的stdin

        request_str = json.dumps(request_data) + "\n"  # 加换行符作为结束标志

        mcp_process.stdin.write(request_str)

        mcp_process.stdin.flush()  # 强制刷新缓冲区,确保MCP能立即收到

        

        # 从MCP的stdout读取响应(假设响应以换行结束)

        response_str = mcp_process.stdout.readline()

        if not response_str:

            # 读取错误信息(如果MCP崩溃)

            error = mcp_process.stderr.read()

            raise RuntimeError(f"MCP error: {error}")

        

        # 反序列化响应

        return json.loads(response_str)

    

    finally:

        # 关闭进程(如果MCP是单次请求模式,否则保持运行)

        mcp_process.stdin.close()

        mcp_process.wait()


# 调用示例

if __name__ == "__main__":

    request = {"action": "predict", "data": [1, 2, 3]}

    response = call_mcp(request)

    print("MCP响应:", response)



####【豆包】

Q:sse 方式部署的MCP 如何调用

A:

SSE(Server-Sent Events,服务器发送事件)是一种基于 HTTP 的单向通信协议,允许服务器主动向客户端持续推送数据。以 SSE 方式部署的 MCP(模型 / 服务控制程序)本质是一个 HTTP 服务器,通过text/event-stream格式向客户端推送实时响应,适用于需要持续输出的场景(如流式生成、实时日志等)

调用原理

SSE 调用的核心是 “客户端发起一次 HTTP 请求,服务器保持连接并持续推送数据”:

1.客户端向 MCP 的 SSE 接口发起 HTTP GET 请求(需指定Accept: text/event-stream头);

2.MCP 接受请求后,保持 TCP 连接不关闭,通过text/event-stream格式持续向客户端推送数据;

3.客户端持续监听响应流,解析 SSE 格式的数据并处理;

4.通信结束时,MCP 关闭连接,或客户端主动断开。

SSE 数据格式遵循特定规范,每条消息由字段组成(用\n分隔),常见字段:

data::消息内容(核心字段,可多行,以\n\n结束一条完整消息);

event::事件类型(可选,客户端可按类型处理);

id::消息 ID(可选,用于重连时恢复进度);

retry::重连间隔(可选,单位毫秒)。


1. 浏览器端调用(JavaScript)

// 初始化SSE连接(参数为MCP的SSE接口URL,可带查询参数)

const sse = new EventSource('http://localhost:8000/mcp/sse?action=stream&input=test');


// 监听默认事件(无event字段的消息)

sse.onmessage = (event) => {

  const data = JSON.parse(event.data); // 解析JSON数据

  console.log('收到消息:', data);

  // 处理数据(如更新UI)

};


// 监听特定事件(如"progress"类型)

sse.addEventListener('progress', (event) => {

  const progress = JSON.parse(event.data);

  console.log('处理进度:', progress.step);

});


// 监听完成事件

sse.addEventListener('complete', (event) => {

  const result = JSON.parse(event.data);

  console.log('最终结果:', result.final_result);

  sse.close(); // 完成后关闭连接

});


// 监听错误

sse.onerror = (error) => {

  console.error('SSE连接错误:', error);

  // 可在这里处理重连逻辑

};


2. 后端服务调用(Python)

import requests

import json


def call_sse_mcp(url, params):

    # 发起SSE请求,设置必要的头信息

    headers = {

        'Accept': 'text/event-stream',

        'Cache-Control': 'no-cache'

    }

    

    try:

        # 发送GET请求,stream=True表示流式读取

        with requests.get(url, params=params, headers=headers, stream=True) as response:

            response.raise_for_status()  # 检查HTTP错误状态码

            

            # 逐行解析响应流(SSE消息以"\n\n"分隔)

            buffer = []

            for line in response.iter_lines(decode_unicode=True):

                if not line:  # 空行表示一条消息结束

                    if buffer:

                        process_sse_message('\n'.join(buffer))

                        buffer = []

                    continue

                buffer.append(line)

                

    except requests.exceptions.RequestException as e:

        print(f"请求错误: {e}")


def process_sse_message(raw_message):

    """解析单条SSE消息"""

    event_type = 'message'  # 默认事件类型

    data = None

    

    # 拆分消息的字段(如event: progress、data: {...})

    for part in raw_message.split('\n'):

        if part.startswith('event:'):

            event_type = part.split(':', 1)[1].strip()

        elif part.startswith('data:'):

            data_str = part.split(':', 1)[1].strip()

            data = json.loads(data_str)  # 解析JSON数据

    

    # 按事件类型处理

    if event_type == 'progress':

        print(f"进度更新: {data['step']}/{data['total']}")

    elif event_type == 'complete':

        print(f"最终结果: {data['result']}")

    else:

        print(f"收到消息: {data}")


# 调用示例

if __name__ == "__main__":

    mcp_sse_url = "http://localhost:8000/mcp/sse"

    request_params = {

        "action": "stream_predict",

        "input": "需要处理的内容"

    }

    call_sse_mcp(mcp_sse_url, request_params)


3. Java 调用示例(使用 OkHttp)

import okhttp3.*;

import com.google.gson.Gson;

import java.io.IOException;


public class SseMcpClient {

    private static final Gson gson = new Gson();

    private static final String MCP_SSE_URL = "http://localhost:8000/mcp/sse";


    public static void main(String[] args) {

        OkHttpClient client = new OkHttpClient();


        // 构建请求(带参数)

        HttpUrl url = HttpUrl.parse(MCP_SSE_URL)

                .newBuilder()

                .addQueryParameter("action", "stream_predict")

                .addQueryParameter("input", "需要处理的内容")

                .build();


        Request request = new Request.Builder()

                .url(url)

                .header("Accept", "text/event-stream")

                .header("Cache-Control", "no-cache")

                .build();


        // 发送请求并处理流式响应

        client.newCall(request).enqueue(new Callback() {

            @Override

            public void onResponse(Call call, Response response) throws IOException {

                if (!response.isSuccessful()) {

                    throw new IOException("请求失败: " + response);

                }


                // 逐行读取响应体

                ResponseBody body = response.body();

                if (body == null) return;


                try (BufferedSource source = body.source()) {

                    StringBuilder buffer = new StringBuilder();

                    while (!source.exhausted()) {

                        String line = source.readUtf8Line();

                        if (line == null) break;


                        if (line.isEmpty()) {  // 空行表示一条消息结束

                            if (buffer.length() > 0) {

                                processSseMessage(buffer.toString());

                                buffer.setLength(0);

                            }

                        } else {

                            buffer.append(line).append("\n");

                        }

                    }

                } finally {

                    response.close();

                }

            }


            @Override

            public void onFailure(Call call, IOException e) {

                System.err.println("连接失败: " + e.getMessage());

            }

        });

    }


    private static void processSseMessage(String rawMessage) {

        String eventType = "message";

        String dataJson = null;


        // 解析SSE消息字段

        for (String part : rawMessage.split("\n")) {

            if (part.startsWith("event:")) {

                eventType = part.substring(6).trim();

            } else if (part.startsWith("data:")) {

                dataJson = part.substring(5).trim();

            }

        }


        // 处理数据(根据事件类型解析)

        if (dataJson != null) {

            if ("progress".equals(eventType)) {

                ProgressData progress = gson.fromJson(dataJson, ProgressData.class);

                System.out.println("进度: " + progress.step + "/" + progress.total);

            } else if ("complete".equals(eventType)) {

                CompleteData result = gson.fromJson(dataJson, CompleteData.class);

                System.out.println("最终结果: " + result.result);

            } else {

                System.out.println("收到消息: " + dataJson);

            }

        }

    }


    // 定义数据模型类(与MCP响应格式对应)

    static class ProgressData {

        int step;

        int total;

    }


    static class CompleteData {

        String result;

    }

}


####【豆包】

Q:StreamableHTTP 方式部署的MCP 如何调用

A:

StreamableHTTP 是一种基于 HTTP 协议的流式通信方式,常用于需要服务器持续返回部分结果的场景(如大模型流式生成、实时数据处理等)。与 SSE 不同,StreamableHTTP 更灵活,通常使用 POST 方法 发送请求(支持复杂输入),并通过 分块传输编码(Chunked Transfer Encoding) 流式返回响应,允许客户端边接收边处理数据。


与 SSE 的区别:

StreamableHTTP 通常用 POST 方法,支持复杂请求体;SSE 用 GET 方法,参数通过 URL 传递。

StreamableHTTP 无固定消息格式,更灵活;SSE 严格遵循 text/event-stream 规范。

两者均基于 HTTP 分块传输,但适用场景不同(StreamableHTTP 更适合需要发送大量输入的流式场景)


调用原理

StreamableHTTP 的核心是 “一次 HTTP 请求,持续分块响应”:

1.客户端通过 HTTP POST 向 MCP 接口发送请求(包含完整输入数据,如 JSON 格式);

2.MCP 接收到请求后,立即返回 HTTP 200 响应,并通过 Transfer-Encoding: chunked 头部告知客户端将采用分块传输;

3.MCP 处理数据时,将中间结果按 “数据块” 持续写入响应流,每个块包含长度标识(十六进制)和数据内容;

4.客户端持续读取响应流中的分块数据,实时解析并处理(如拼接部分结果);

5.所有数据传输完成后,MCP 发送一个长度为 0 的块(0\r\n\r\n)标识结束,客户端关闭连接。


代码示例(不同场景)

1. 浏览器端调用(JavaScript,使用 Fetch API)

浏览器可通过 fetch 配合 ReadableStream 处理流式响应:

javascript

运行

async function callStreamableMCP(inputData) {

  try {

    const response = await fetch('http://localhost:8000/mcp/stream', {

      method: 'POST',

      headers: {

        'Content-Type': 'application/json',

      },

      body: JSON.stringify({  // 请求体携带完整输入

        action: 'stream_generate',

        input: inputData,

        max_tokens: 1000

      })

    });


    if (!response.ok) {

      throw new Error(`HTTP 错误: ${response.status}`);

    }


    // 获取响应流的读取器

    const reader = response.body.getReader();

    const decoder = new TextDecoder();  // 解码二进制流为文本

    let fullResult = '';  // 累积完整结果


    while (true) {

      // 读取一个数据块(done 表示结束,value 是 Uint8Array 类型的块内容)

      const { done, value } = await reader.read();

      if (done) break;


      // 解码当前块并处理

      const chunk = decoder.decode(value, { stream: true });  // stream: true 表示后续还有数据

      console.log('收到部分结果:', chunk);

      

      // 假设每个块是 JSON 片段(如 {"token": "hello"})

      try {

        const data = JSON.parse(chunk);

        fullResult += data.token;  // 拼接结果

        // 更新 UI 等操作(如实时显示)

      } catch (e) {

        console.warn('当前块解析失败,可能是不完整数据:', e);

      }

    }


    console.log('最终完整结果:', fullResult);

    return fullResult;


  } catch (error) {

    console.error('调用失败:', error);

  }

}


// 调用示例

callStreamableMCP("请生成一篇关于人工智能的短文");

2. 后端服务调用(Python,使用 requests 库)

Python 可通过 requests 库的 stream=True 处理流式响应:

python

运行

import requests

import json


def call_streamable_mcp(input_data):

    url = "http://localhost:8000/mcp/stream"

    headers = {

        "Content-Type": "application/json"

    }

    payload = {  # 请求体参数

        "action": "stream_generate",

        "input": input_data,

        "max_tokens": 1000

    }


    try:

        # 发送 POST 请求,stream=True 启用流式读取

        with requests.post(

            url,

            headers=headers,

            json=payload,

            stream=True,

            timeout=300  # 长超时(适应流式传输)

        ) as response:

            response.raise_for_status()  # 检查 HTTP 错误状态


            full_result = ""

            # 逐行/逐块读取响应(根据 MCP 输出格式选择)

            for chunk in response.iter_lines(decode_unicode=True):

                if chunk:  # 忽略空行

                    print(f"收到块: {chunk}")

                    # 解析块数据(假设为 JSON 格式)

                    try:

                        data = json.loads(chunk)

                        full_result += data.get("token", "")

                    except json.JSONDecodeError:

                        print(f"块解析失败: {chunk}")


            print(f"最终结果: {full_result}")

            return full_result


    except requests.exceptions.RequestException as e:

        print(f"请求异常: {e}")

        return None


# 调用示例

if __name__ == "__main__":

    call_streamable_mcp("请生成一篇关于人工智能的短文")

3. Java 调用示例(使用 OkHttp)

Java 可通过 OkHttp 处理分块传输的流式响应:

java

运行

import okhttp3.*;

import com.google.gson.Gson;

import java.io.IOException;

import java.nio.charset.StandardCharsets;


public class StreamableMcpClient {

    private static final OkHttpClient client = new OkHttpClient.Builder()

            .readTimeout(300, java.util.concurrent.TimeUnit.SECONDS)  // 长超时

            .build();

    private static final Gson gson = new Gson();

    private static final String MCP_URL = "http://localhost:8000/mcp/stream";


    public static void main(String[] args) {

        // 构造请求体

        RequestBody requestBody = RequestBody.create(

                MediaType.parse("application/json; charset=utf-8"),

                gson.toJson(new McpRequest(

                        "stream_generate",

                        "请生成一篇关于人工智能的短文",

                        1000

                ))

        );


        // 构建请求

        Request request = new Request.Builder()

                .url(MCP_URL)

                .post(requestBody)

                .build();


        // 发送请求并处理流式响应

        client.newCall(request).enqueue(new Callback() {

            @Override

            public void onResponse(Call call, Response response) throws IOException {

                if (!response.isSuccessful()) {

                    throw new IOException("请求失败: " + response);

                }


                ResponseBody body = response.body();

                if (body == null) {

                    throw new IOException("响应体为空");

                }


                // 读取分块数据(OkHttp 会自动处理 Chunked 编码)

                try (BufferedSource source = body.source()) {

                    StringBuilder fullResult = new StringBuilder();

                    while (!source.exhausted()) {

                        // 读取一行(假设每个块是一行 JSON)

                        String chunk = source.readUtf8Line();

                        if (chunk == null) break;


                        System.out.println("收到块: " + chunk);

                        // 解析块数据

                        McpResponse chunkData = gson.fromJson(chunk, McpResponse.class);

                        fullResult.append(chunkData.token);

                    }


                    System.out.println("最终结果: " + fullResult.toString());

                } finally {

                    response.close();

                }

            }


            @Override

            public void onFailure(Call call, IOException e) {

                System.err.println("调用失败: " + e.getMessage());

            }

        });

    }


    // 定义请求和响应的数据模型

    static class McpRequest {

        String action;

        String input;

        int max_tokens;


        public McpRequest(String action, String input, int max_tokens) {

            this.action = action;

            this.input = input;

            this.max_tokens = max_tokens;

        }

    }


    static class McpResponse {

        String token;  // 流式返回的部分结果

        boolean done;  // 是否结束的标志

    }

}





UVX NPX介绍

点击查看原文

UVX主要针对python编写的MCP, npx主要针对nodejs编写的MCP

直接使用UVX执行MCP命令(如uvx mcp-server-time)时,UVX会检查本地缓存或临时下载所需工具包。若MCP组件已存在于Python环境中,则直接调用;否则会触发下载安装过程

npx也会自动安装mcp

安装源

更换镜像源(国内用户常用

npm config set registry https://registry.npmmirror.com/



mcp server大多使用python和typescript编写,对应的启动命令分别为uvx和npx,下面将分别具体介绍一下这两种指令。熟悉指令的基础知识,有利于我们安装和调试mcp server

示例1:

{

  "mcpServers": {

    "fetch": {

      "command": "uvx",

      "args": [

        "mcp-server-fetch"

      ]

    }

  }

}


示例2:

{

  "mcpServers": {

    "amap-maps": {

      "command": "npx",

      "args": [

        "-y",

        "@amap/amap-maps-mcp-server"

      ],

      "env": {

        "AMAP_MAPS_API_KEY": "api_key"

      }

    }

  }

}

uvx:UV运行Python包提供的命令。它是uv tool run的简写

UV是由Astral开发的新一代Python包和项目管理工具。使用Rust编写,速度比pip快10-100倍!它是一体化工具:整合包管理(替代pip)、虚拟环境管理(替代venv)、依赖锁定(替代pip-tools)、Python版本管理(替代pyenv)、脚本运行(替代pipx)等功能。


快速安装

UV的安装非常简单,有多种方式可供选择:

方法1:独立安装脚本



macOS/Linux:


curl -LsSf https://astral.sh/uv/install.sh | sh



Windows:


powershell -c "irm https://astral.sh/uv/install.ps1 | iex"



方法2:使用pip安装



pip install uv


方法

3:Homebrew安装(macOS) 



brew install uv


 安装完成后,你可以通过在命令行输入uv --version来验证是否安装成功。


1.3、uv基本用法

1.3.1、创建项目

使用UV创建一个新项目


uv init uv_demo


1.3.2、创建虚拟环境

我是在conda创建的虚拟环境中安装了uv,你可以继续使用uv创建应用的虚拟环境。


cd uv_demo

uv venv


1.3.3、安装依赖包

uv安装依赖包非常简单且迅速:


uv add numpy pandas


1.3.4、运行Python脚本

 使用UV运行Python脚本:


uv run main.py


1.3.5、依赖锁定

生成锁文件,确保项目依赖版本的一致性:


uv lock


 这会创建一个uv.lock文件,详细记录所有依赖的精确版本和来源。


1.4、高级功能


1.4.1、Python版本管理

UV可以管理多个Python版本:



# 安装特定Python版本


uv python install 3.10 3.11 3.12



# 创建特定Python版本的虚拟环境


uv venv --python 3.11




1.4.2、工具管理


像pipx一样,UV可以运行和安装Python工具



# 在临时环境中运行工具


uvx pycowsay 'Hello UV!'



# 安装工具


uv tool install ruff




1.4.3、单文件脚本支持


UV支持带有内联依赖元数据的单文件脚本:



# /// [uv]


# dependencies = ["requests"]


# ///



import requests


print(requests.get("https://astral.sh").status_code)


 

然后直接运行:



uv run example.py


 

UV会自动处理依赖并在隔离环境中运行脚本




1.4.4、pip兼容接口


UV提供了pip兼容的接口,可直接替代pip命令:



# 编译依赖


uv pip compile requirements.in --output-file requirements.txt



# 安装锁定依赖


uv pip sync requirements.txt


UV命令速查

1.png


2、npx


npx是npm包管理器自5.2.0版本起内置的工具,全称为“Node Package Execute”,核心功能是临时执行Node.js模块命令。它简化了命令行工具的调用流程,无需预先全局或本地安装依赖包即可直接运行特定命令,尤其适合一次性或实验性操作。



npm的核心操作是包的安装与管理(install、uninstall等),而npx专注于“按需执行”。npm负责持久化依赖管理,npx负责临时或一次性操作的轻量化执行。



主要优势包括:



临时安装可执行依赖包:无需全局安装,避免环境污染


自动执行依赖包命令:安装完成后自动运行


自动加载 node_modules 中的依赖:无需手动指定 $PATH


可执行特定版本的命令:便于测试不同版本

支持执行 

GitHub 代码仓库:扩展了使用场景


2.1、npx基本用法


2.1.1、执行本地依赖包命令


若项目中已安装某个包(如knex),可直接运行其命令:



npx knex migrate:latest



无需全局安装knex,也无需手动配置package.json的scripts


2.1.2、临时安装并运行包


运行未安装的包(执行后自动删除):



npx create-react-app my-app  # 临时安装 create-react-app 并执行




2.1.3、指定版本运行


运行特定版本的包:



npx eslint@7.0.0 --version  # 使用 7.0.0 版本的 ESLint




2.1.4、执行远程代码

 

直接运行GitHub仓库中的代码:



npx https://gist.github.com/user/test  # 执行远程脚本




2.1.5、交互式命令模式


 进入包的交互式环境(如node REPL):



npx node@14  # 临时使用 Node.js 14 版本


 


2.2、npx使用示例


传统方式:先全局安装脚手架工具,再使用



# 安装


npm install fast-create-app -g


# 使用


fast-create-app create-app test



传统方式的缺点:



全局污染:全局安装的包会影响整个系统环境


版本冲突:不同项目可能需要同一工具的不同版本


权限问题:全局安装通常需要管理员权限


更新困难:全局包不会随项目依赖自动更新


存储占用:长期积累大量不再使用的全局包



npx 方式:一步到位,无需全局安装



npx fast-create-app create-app test



npx 的优势:

按需临时安装,用完即删

自动使用最新版本的包

可同时使用不同版本的工具

不需要额外的管理员权限


npx执行优先级机制


当执行命令时,npx遵循三级查找逻辑



1、本地项目依赖:优先在当前项目的node_modules/.bin目录中查找目标命令,适配项目级工具调用(如项目内配置的ESLint、Jest等)。



2、全局安装包:若本地未找到,检查用户全局安装的npm包(通过npm install -g安装的模块)。



3、临时下载执行:若前两步均未找到,npx会自动从npm仓库下载目标包到临时目录,执行完成后立即清理,避免磁盘空间浪费。



MCP协议详细讲解

Model Context Protocol(MCP)是由 Anthropic 推出的一种开源协议,旨在建立 AI 模型与开发环境之间的统一上下文交互标准。以下是对其的详细讲解:

1.png

背景与起源

在 AI 领域,大语言模型的应用常常面临数据孤岛和工具集成复杂的问题,不同的数据源和工具都有自己独特的 API 和集成方式,导致开发者需要为每个工具单独编写集成代码,开发和维护成本高,工具复用性低。为解决这些问题,Anthropic 于 2024 年 11 月 25 日开源发布了 MCP

核心概念

  • MCP Hosts :如 Claude Desktop、Cursor、 Cline、trae、Cherry Studio、IDE、AI 应用等,是运行 MCP 的主应用程序,负责发起连接并管理多个客户端实例,与用户进行交互并接收用户的问题或指令,再通过 MCP Clients 向 MCP Servers 发送请求以获取所需的数据或执行相应的操作
  • MCP Clients :由 Host 创建,与特定的 MCP Server 建立一对一的连接,负责协议通信和能力协商,充当 MCP Host 和 MCP Server 之间的“连接器”或“总机”,将 Host 的请求转发给 Server,并将 Server 的响应返回给 Host
  • MCP Servers :轻量级程序,通过标准化的模型上下文协议暴露特定功能,如文件读取、API 调用、数据库查询等,为 MCP Host 提供实际的数据和功能支持,每个 Server 提供一个特定的工具或能力

工作原理

  • 通信机制 :MCP 采用客户端 - 服务器架构,基于 JSON-RPC 2.0 消息格式,支持请求、响应和通知三种消息类型,确保通信的标准化和一致性。它主要支持两种通信方式,一种是标准输入输出(Stdio),适用于在同一台机器上运行的客户端和服务器之间的通信,客户端启动服务器程序作为子进程,消息通讯通过 stdin/stdout 进行;另一种是服务器发送事件(SSE),利用 HTTP 协议实现跨网络的实时数据传输,适用于需要访问远程资源的场景
  • 工作流程 :首先,Host 初始化 Client,Client 尝试与 Server 建立连接,并询问 Server 有哪些能力,Server 回应并返回所有支持的功能,从而建立 1:1 的连接。当用户与 LLM 交互提出问题或指令时,LLM 会告诉 Client 需要调用哪些 Tools,Client 收到 Host 的请求后,将请求信息发送给对应的 Server,Server 接收到信息后开始处理,处理完成后返回结果给 Client,Client 再将信息返回给 Host,Host 根据这些信息生成最终的回复返回给用户

主要功能与特性

  • 统一接口 :提供标准化的 API 接口,简化 AI 与各类上下文信息的交互,使得开发者无需为不同的数据源或工具编写复杂的适配代码,降低了开发难度和成本
  • 无缝集成 :能够轻松集成到现有开发工作流程中,无需对现有的系统和工具进行大规模的改造,即可实现 AI 模型与各种数据源和工具的快速连接和集成,提高了开发效率
  • 多源数据支持 :支持从多种来源获取上下文信息,包括代码仓库、文档、文件系统、数据库、远程 API 等,丰富了 AI 模型可以利用的数据资源,为模型的训练和推理提供了更全面、准确的上下文信息
  • 智能上下文管理 :优化上下文信息的传递,根据 AI 模型的需求和任务场景,自动筛选、组织和更新上下文信息,提高 AI 模型理解和生成的质量,使模型能够更准确地理解和处理复杂的任务
  • 扩展性设计 :具有灵活的架构,允许添加新的数据源和功能,开发者可以根据实际需求自由扩展 MCP 的功能,不断增加新的 MCP Servers 来提供新的工具和能力,以满足不断变化的业务需求

应用场景

  • 智能开发环境 :将 AI 模型与开发工具集成,如代码编辑器、IDE 等,使 AI 能够实时理解代码上下文,为开发者提供智能代码补全、代码生成、代码解释等功能,提高开发效率和代码质量
  • 智能办公助手 :连接办公软件和在线服务,如日历、邮件、文档管理系统等,帮助用户自动完成日常办公任务,如安排会议、发送邮件、生成报告等,提升办公自动化水平
  • 智能数据分析 :将 AI 模型与数据分析工具和数据源集成,如数据库、数据仓库、BI 工具等,使 AI 能够协助分析师进行数据查询、数据分析、数据可视化等工作,为决策提供更有力的支持
  • 智能客服系统 :集成企业内部的客户信息管理系统、订单管理系统等,使 AI 客服能够快速准确地获取客户相关的信息,为客户提供更精准、高效的咨询服务,提高客户满意度


JSON-RPC 2.0 的请求和响应示例

请求示例

{

  "jsonrpc": "2.0",

  "method": "add",

  "params": [2, 3],

  "id": 1

}

响应示例

{

  "jsonrpc": "2.0",

  "result": 5,

  "id": 1

}

错误响应示例

{

  "jsonrpc": "2.0",

  "error": {

    "code": -32601,

    "message": "Method not found"

  },

  "id": 1

}


批量请求示例

[

  {

    "jsonrpc": "2.0",

    "method": "add",

    "params": [2, 3],

    "id": 1

  },

  {

    "jsonrpc": "2.0",

    "method": "subtract",

    "params": [5, 2],

    "id": 2

  }

]

批量响应示例

[

  {

    "jsonrpc": "2.0",

    "result": 5,

    "id": 1

  },

  {

    "jsonrpc": "2.0",

    "result": 3,

    "id": 2

  }

]

这些示例展示了 JSON-RPC 2.0 请求和响应的基本格式,包括请求、响应以及错误处理等场景。



AI 模型调用 MCP 服务的过程

准备工作

  • 搭建 MCP 服务器 :开发者需先搭建 MCP 服务器,可使用 Anthropic 提供的开源框架如 FastMCP 等。定义好服务器的基本配置,如名称、版本等,并通过装饰器注册工具函数,每个工具函数对应一个具体的功能,如查询天气、发送邮件等
  • 集成 MCP 客户端 :在 AI 模型所在的系统或应用中集成 MCP 客户端,如在 Python 项目中安装相应的客户端库等

调用过程

  1. 用户发起请求 :用户通过与 AI 模型交互的方式,如在聊天界面输入问题或指令,表达其想要实现的需求
  2. AI 模型解析意图 :MCP 客户端将用户请求发送给 AI 模型,AI 模型对用户输入的自然语言进行理解和解析,确定用户的需求,并选择与之匹配的 MCP 函数及相应的参数
  3. 发送调用请求 :MCP 客户端根据 AI 模型返回的函数调用信息,向 MCP 服务器发送请求。请求中包含请求 ID、函数名称及参数等,格式通常为 JSON
  4. 执行服务并返回结果 :MCP 服务器接收到请求后,执行对应的工具函数,在本地或通过调用其他外部服务完成具体的任务处理,然后将执行结果按照规定的格式返回给 MCP 客户端。
  5. 结果整合与反馈 :MCP 客户端将 MCP 服务器返回的结果整合到对话上下文中,再次发送给 AI 模型。AI 模型根据整合后的上下文生成自然语言的回复,由 MCP 客户端将最终结果呈现给用户

通信机制

  • STDIO 方式 :适用于本地服务。MCP 客户端启动 MCP 服务器程序作为子进程,通过 stdin 和 stdout 进行消息传递,消息格式遵循 JSON-RPC 2.0
  • SSE 方式 :用于远程服务。MCP 客户端通过 Server-Sent Events 与 MCP 服务器进行通信,服务器定义特定接口用于推送和接收数据,消息格式同样基于 JSON-RPC 2.0

动态上下文管理

MCP 协议具有动态上下文管理机制,通过上下文窗口存储用户偏好、会话记录和环境数据等。在每次交互过程中,上下文会不断扩展和更新,同时对非关键信息进行压缩,以保证对话的连贯性和减少资源消耗

安全与认证机制

  • API 密钥验证 :MCP 服务通常会要求进行 API 密钥验证,只有拥有正确 API 密钥的客户端才能访问服务。
  • HTTPS 加密 :采用 HTTPS 协议对数据传输进行加密,确保数据在网络传输过程中的安全性。
  • 请求限制 :设置请求频率限制和配额限制等,防止服务被滥用,保护服务的稳定性和可靠性



AI 模型调用 MCP 服务的代码示例

基于 Python 的简单 MCP 服务端

Python

pip install mcp

/*

from mcp.server.fastmcp import FastMCP

import tools


mcp=FastMCP("host info mcp")

mcp.add_tool(tools.get_host_info)


def main()->none:

   print("hello from ai-mcp!")

   mcp.run("stdio")#还有sse

   mcp.run(transport="sse", port=9000)

   


if __name__=="__main__":

   main()


tools.py内容

def get_hosts_info()->str:

...



*/


from mcp import MCP, tool


mcp = MCP()


@mcp.register_tool

class WeatherTool:

    @tool(name="get_weather", description="获取指定城市的天气信息")

    def get_weather(self, city: str):

        # 这里可以添加获取天气信息的逻辑,比如调用天气 API 等

        return f"The weather in {city} is sunny."


    @tool(name="get_weather_forecast", description="获取指定城市未来几天的天气预报")

    def get_weather_forecast(self, city: str, days: int):

        # 添加获取天气预报的逻辑

        return f"The weather forecast for {city} in the next {days} days."


if __name__ == "__main__":

    mcp.start()

基于 Java 的 MCP 服务端

java

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.io.ClassPathResource;

import org.springframework.core.io.Resource;

import org.springframework.util.StringUtils;

import org.springframework.web.reactive.function.server.RouterFunctions;

import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;


import java.util.Collections;

import java.util.Map;


@Configuration

public class MCPConfig {


    @Bean

    public RouterFunctions.Route mcpRoute() {

        return RouterFunctions.route()

                .POST("/mcp/weather", this::getWeather)

                .build();

    }


    private Mono<ServerResponse> getWeather(ServerRequest request) {

        Map<String, String> queryParams = request.queryParams();

        String city = queryParams.get("city");

        if (StringUtils.hasText(city)) {

            String weatherInfo = "The weather in " + city + " is sunny.";

            return ServerResponse.ok().bodyValue(Collections.singletonMap("weather", weatherInfo));

        } else {

            return ServerResponse.badRequest().bodyValue(Collections.singletonMap("error", "City parameter is required"));

        }

    }

}

AI 模型调用 MCP 服务的代码示例

基于 Python 的 OpenAI 模型调用 MCP 服务

Python

import openai

import requests

from mcp import MCPClient


# 初始化 MCP 客户端

mcp_client = MCPClient()

mcp_client.connect("http://localhost:8080/mcp")  # 替换为 MCP 服务的实际地址


# 调用 MCP 服务中的工具

def call_mcp_tool(tool_name, **kwargs):

    return mcp_client.call_tool(tool_name, **kwargs)


# 使用 OpenAI 模型

openai.api_key = "your_openai_api_key"  # 替换为你的 OpenAI API 密钥


def get_ai_response(user_query):

    # 构建消息列表

    messages = [

        {"role": "system", "content": "You are a helpful assistant."},

        {"role": "user", "content": user_query}

    ]


    # 调用 OpenAI API

    response = openai.ChatCompletion.create(

        model="gpt-4-turbo-preview",

        messages=messages,

        tools=mcp_client.list_tools(),  # 提供可用的 MCP 工具

        tool_choice="auto"

    )


    # 处理模型返回的结果

    ai_response = ""

    for choice in response.choices:

        message = choice.message

        if message.content:

            ai_response += message.content + "\n"

        if message.tool_calls:

            for tool_call in message.tool_calls:

                tool_name = tool_call.function.name

                tool_args = tool_call.function.arguments

                # 调用 MCP 工具

                tool_result = call_mcp_tool(tool_name, **tool_args)

                ai_response += f"Calling tool {tool_name} with args {tool_args}\n"

                ai_response += tool_result + "\n"

                # 将工具结果添加到消息中,继续与模型对话

                messages.append({"role": "assistant", "content": "", "tool_calls": [tool_call]})

                messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": tool_result})

                # 获取下一个模型响应

                next_response = openai.ChatCompletion.create(

                    model="gpt-4-turbo-preview",

                    messages=messages,

                    tools=mcp_client.list_tools(),

                    tool_choice="auto"

                )

                if next_response.choices[0].message.content:

                    ai_response += next_response.choices[0].message.content + "\n"

    return ai_response


# 示例 usage

user_input = "What's the weather like in Beijing?"

print(get_ai_response(user_input))

基于 Java 的 Spring AI 模型调用 MCP 服务

java

import org.springframework.ai.chat.client.ChatClient;

import org.springframework.ai.chat.client.ChatClientBuilder;

import org.springframework.ai.mcp.client.McpClient;

import org.springframework.ai.mcp.client.McpClientBuilder;

import org.springframework.ai.mcp.client.McpFunctionCallback;

import org.springframework.ai.tool.ToolCallbackProvider;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;


import java.util.List;


@Configuration

public class AIConfig {


    @Bean

    public McpClient mcpClient() {

        // 配置 MCP 服务器参数

        var stdioParams = ServerParameters.builder("java")

                .args("-Dspring.ai.mcp.server.stdio=true",

                        "-Dspring.main.web-application-type=none",

                        "-Dlogging.pattern.console=",

                        "-jar",

                        "target/mcp-server-0.0.1-SNAPSHOT.jar")

                .build();

        // 创建 MCP 客户端

        return McpClient.sync(new StdioClientTransport(stdioParams));

    }


    @Bean

    public ToolCallbackProvider mcpTools(McpClient mcpClient) {

        // 获取 MCP 服务器中的工具列表并转换为 Function Callback

        return mcpClient.listTools(null)

                .tools()

                .stream()

                .map(tool -> new McpFunctionCallback(mcpClient, tool))

                .toList();

    }


    @Bean

    public ChatClient chatClient(ChatClientBuilder chatClientBuilder, ToolCallbackProvider mcpTools) {

        // 初始化聊天客户端并设置默认工具

        return chatClientBuilder.defaultFunctions(mcpTools).build();

    }

}


// 使用示例

public class AIService {

    private final ChatClient chatClient;


    public AIService(ChatClient chatClient) {

        this.chatClient = chatClient;

    }


    public String generateResponse(String userQuery) {

        return chatClient

                .prompt(userQuery)

                .call()

                .content();

    }

}


使用 ASP.NET Core 来创建一个 Web API 服务,并模拟天气查询功能

C# MCP 服务端代码示例

csharp

using Microsoft.AspNetCore.Builder;

using Microsoft.AspNetCore.Hosting;

using Microsoft.AspNetCore.Mvc;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;


namespace MCPService

{

    public class Startup

    {

        public void ConfigureServices(IServiceCollection services)

        {

            services.AddControllers();

        }


        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

        {

            if (env.IsDevelopment())

            {

                app.UseDeveloperExceptionPage();

            }


            app.UseRouting();


            app.UseEndpoints(endpoints =>

            {

                endpoints.MapControllers();

            });

        }

    }

    /*

这段代码定义了一个名为WeatherController的控制器类。这个类继承自ControllerBase

,并使用了[ApiController]和[Route("mcp")]特性,表明这是用于处理HTTP请求的API控制器,并指定了其基础路由为/mcp。

在控制器类中,定义了一个名为GetWeather的方法,该方法使用了[HttpGet("weather")]特性。这表明这是一个处理HTTP GET请求的方法,其对应的路由是mcp/weather。这个方法接受一个名为city的参数,通过[FromQuery]特性从查询字符串中获取。

当客户端向/mcp/weather发起一个带有查询参数(如city=Paris)的GET请求时,GetWeather方法会处理这个请求,并返回一个IActionResult类型的对象,即一个包含响应内容和状态码的对象。在实际应用中,这个方法可能会根据提供的城市名称来获取和返回相应的天气信息。

*/

    [ApiController]

    [Route("mcp")]

    public class WeatherController : ControllerBase

    {

        [HttpGet("weather")]

        public IActionResult GetWeather([FromQuery] string city)

        {

            if (string.IsNullOrEmpty(city))

            {

                return BadRequest("City parameter is required");

            }


            // 模拟天气查询逻辑

            var weatherInfo = $"The weather in {city} is sunny.";


            return Ok(new { weather = weatherInfo });

        }


        [HttpGet("weather-forecast")]

        public IActionResult GetWeatherForecast([FromQuery] string city, [FromQuery] int days)

        {

            if (string.IsNullOrEmpty(city))

            {

                return BadRequest("City parameter is required");

            }


            // 模拟天气预报逻辑

            var weatherForecast = $"The weather forecast for {city} in the next {days} days.";


            return Ok(new { forecast = weatherForecast });

        }

   

[HttpPost("weather")]

    public IActionResult PostWeather([FromBody] WeatherRequest weatherRequest)

    {

        // 处理POST请求的逻辑

        return Ok($"Received weather data for {weatherRequest.City}: {weatherRequest.Temperature}°C");

    }


// 新增的可以GET和POST调用的方法

    [HttpGet("data")]

    [HttpPost("data")]

    public IActionResult GetAndPostData([FromQuery] string name, [FromBody] string postData = null)

    {

        if (HttpContext.Request.Method == HttpMethods.Get)

        {

            return Ok($"Received GET request with name: {name}");

        }

        else if (HttpContext.Request.Method == HttpMethods.Post)

 {

                   return Ok($"Received POST request with name: {name} and post data: {postData}");

        }


        return BadRequest("Invalid request method");

    }



}


public class WeatherRequest

{

    public string City { get; set; }

    public double Temperature { get; set; }

}



    public class Program

    {

        public static void Main(string[] args)

        {

            var host = CreateHostBuilder(args).Build();

            host.Run();

        }


        public static IHostBuilder CreateHostBuilder(string[] args) =>

            Host.CreateDefaultBuilder(args)

                .ConfigureWebHostDefaults(webBuilder =>

                {

                    webBuilder.UseStartup<Startup>();

                });

    }

}

说明

项目结构:

Startup 类配置了服务和中间件。

WeatherController 是一个 ASP.NET Core 控制器,提供了两个端点 /mcp/weather 和 /mcp/weather-forecast。

功能:

/mcp/weather 接受一个 city 参数,返回该城市的天气信息。

/mcp/weather-forecast 接受 city 和 days 参数,返回该城市未来几天的天气预报。

运行服务:

创建一个 ASP.NET Core 项目并复制上述代码。

使用 dotnet run 命令启动服务,服务默认运行在 https://localhost:5001 和 http://localhost:5000。

测试服务:

使用工具如 Postman 或 curl 测试 API:

获取天气信息:GET http://localhost:5000/mcp/weather?city=Beijing

获取天气预报:GET http://localhost:5000/mcp/weather-forecast?city=Beijing&days=3


C# 用OpenAI模型调用

public class WeatherRequestFunction : IRequestFunction

{

    public string Name => "weather";


    public async Task<IDictionary<string, object>> InvokeAsync(IDictionary<string, object> parameters)

    {

        using var httpClient = new HttpClient();

        var city = parameters["city"].ToString();

        var response = await httpClient.GetAsync($"http://localhost:5000/mcp/weather?city={city}");

        response.EnsureSuccessStatusCode();

        var content = await response.Content.ReadAsStringAsync();

        return new Dictionary<string, object> { { "result", content } };

    }


    public string Serialize(IRequestFunction function)

    {

        return $"Calling tool {function.Name} with parameters {function}";

    }


    public Task SerializeAsync(IRequestFunction function)

    {

        // 实现序列化逻辑

        return Task.CompletedTask;

    }

}

代码说明

OpenAI 客户端:创建 OpenAI 客户端实例,并配置 API 密钥和日志记录器。

MCP 工具:定义一个 WeatherRequestFunction 类实现 IRequestFunction 接口,用于调用 MCP 服务中的天气查询工具。

ChatClient:创建 ChatClient 实例,传入 OpenAI 客户端和日志记录器。

聊天历史:初始化聊天历史记录。

交互式聊天:在循环中读取用户输入,将其添加到聊天历史中,并调用 OpenAI API 获取响应。打印助手的响应并将其添加到聊天历史中。

其他信息

确保你的 MCP 服务正在运行,并且可以通过 http://localhost:5000/mcp/weather 访问。

替换成您的实际 OpenAI API 密钥。

如果您没有 MCP 服务,可以参考前面的示例代码实现一个简单的 MCP 服务。



C# 编写 Agent 调用 ollama 及 MCP 的方法

调用 ollama

安装 Ollama SDK :可以使用如下命令安装 Ollama 的 C# SDK,“dotnet add package Ollama --version 1.9.0”。

简单对话 :设置模型名称和 ollama 的 API 地址,利用 OllamaApiClient 进行对话生成,示例代码如下:

**```csharp

string modelName = "qwen2:7b";

using var ollama = new OllamaApiClient(baseUri: new Uri("http://127.0.0.1:11434/api"));

Console.WriteLine("开始对话!!!");

string userInput = "";

do

{

Console.WriteLine("User:");

userInput = Console.ReadLine()!;

var enumerable = ollama.Completions.GenerateCompletionAsync(modelName, userInput);

Console.WriteLine("Agent:");

await foreach (var response in enumerable)

{

Console.Write($"{response.Response}");

}

Console.WriteLine();

} while (!string.Equals(userInput, "exit", StringComparison.OrdinalIgnoreCase));

Console.WriteLine("对话结束!!!");



**[^2^]


  * **多轮对话** :使用 Chat 对象进行分角色的多轮对话,通过 List 消息存储之前的对话记录,以便模型捕获上下文,代码示例:

**```csharp

string modelName = "glm4:9b";

using var ollama = new OllamaApiClient(baseUri: new Uri("http://127.0.0.1:11434/api"));

Console.WriteLine("开始对话!!!");

string userInput = "";

List messages = [];

do

{

    messages = messages.TakeLast(5).ToList();

    Console.WriteLine("User:");

    userInput = Console.ReadLine()!;

    messages.Add(new Message(MessageRole.User, userInput));

    var enumerable = ollama.Chat.GenerateChatCompletionAsync(modelName, messages, stream: true);

    Console.WriteLine("Agent:");

    StringBuilder builder = new();

    await foreach (var response in enumerable)

    {

        string content = response.Message.Content;

        builder.AppendLine(content);

        Console.Write(content);

    }

    messages.Add(new Message(MessageRole.Assistant, builder.ToString()));

    Console.WriteLine();

} while (!string.Equals(userInput, "exit", StringComparison.OrdinalIgnoreCase));

Console.WriteLine("对话结束!!!");



```**[^2^]


  * **Function call** :若要使用 function call 功能,需设置 autoCallTools 为 true,并给 Ollama 注册相应的工具类,代码如下:

**```csharp

string modelName = "llama3.1:8b";

using var ollama = new OllamaApiClient(baseUri: new Uri("http://127.0.0.1:11434/api"));

var chat = ollama.Chat(

    model: modelName,

    systemMessage: "You are a helpful assistant.",

    autoCallTools: true);

var mathService = new MathService();

chat.AddToolService(mathService.AsTools(), mathService.AsCalls());

while (true)

{

    try

    {

        Console.WriteLine("User>");

        var newMessage = Console.ReadLine();

        var msg = await chat.SendAsync(newMessage);

        Console.WriteLine("Agent> " + msg.Content);

    }

    finally

    {

        Console.WriteLine(chat.PrintMessages());

    }

}

```**[^2^]


### 调用 MCP

目前未查到有成熟的 C# 调用 MCP 的官方 SDK 或常用库,但可以尝试以下思路:

  * **通过 HTTP 请求调用** :如果 MCP 提供了 HTTP API 接口,可以使用 C# 中的 HttpClient 类来发送 HTTP 请求进行调用。基本步骤如下:

    1. 创建 HttpClient 对象,并设置相关请求头等信息。

    2. 构造请求内容,根据 MCP API 的要求,将请求参数进行序列化。

    3. 发送 POST 或 GET 等请求到 MCP 的 API 地址。

    4. 处理返回的响应,对响应内容进行反序列化等操作,获取所需数据。


例如,假设 MCP 的 API 地址为<http://localhost:8000/mcp/api>,需要发送一个获取数据的请求,代码可能如下:

**```csharp

using (var client = new HttpClient())

{

    client.BaseAddress = new Uri("http://localhost:8000/");

    var response = await client.GetAsync("mcp/api?param1=value1");

    if (response.IsSuccessStatusCode)

    {

        var result = await response.Content.ReadAsStringAsync();

        // 处理结果

    }

    else

    {

        // 处理错误

    }

}

```**

  * **参考 Python 实现进行移植** :上述提到的 Python 调用 MCP 的代码示例中,展示了如何创建 MCP Server、获取工具列表以及创建 Agent 等过程。可以参考其逻辑和交互方式,在 C# 中进行相应的实现。例如,对于 MCP Server 的连接,可以考虑在 C# 中使用类似的网络通信方式;对于工具的调用,可以解析 MCP 返回的工具信息,并根据用户输入的问题,选择合适的工具进行调用等[^4^]。


MathService代码:

using System;

using System.Collections.Generic;

using System.Linq;


// 定义工具接口

public interface IToolsService

{

    public string Name { get; }

    public List<ToolDefinition> DefineTools();

}


// 定义工具函数

public class MathService : IToolsService

{

    public string Name => "math";

    

    // 定义工具

    public List<ToolDefinition> DefineTools()

    {

        return new List<ToolDefinition>

        {

            new ToolDefinition("add", "Add two numbers", "a: number, b: number", Add),

            new ToolDefinition("subtract", "Subtract two numbers", "a: number, b: number", Subtract),

            new ToolDefinition("multiply", "Multiply two numbers", "a: number, b: number", Multiply),

            new ToolDefinition("divide", "Divide two numbers", "a: number, b: number", Divide)

        };

    }


    // 加法

    public double Add(double a, double b)

    {

        return a + b;

    }


    // 减法

    public double Subtract(double a, double b)

    {

        return a - b;

    }


    // 乘法

    public double Multiply(double a, double b)

    {

        return a * b;

    }


    // 除法

    public double Divide(double a, double b)

    {

        if (b == 0)

        {

            throw new DivideByZeroException("Division by zero is not allowed.");

        }

        return a / b;

    }

}


ToolDefinition类通常不是.NET标准库或框架的一部分,也不是Ollama或其他主流库的内置类。因此,您需要自己定义它,或者参考您所使用的特定库或框架的实现

using System;


public class ToolDefinition

{

    // 工具名称

    public string Name { get; set; }

    // 工具描述

    public string Description { get; set; }

    // 工具参数列表

    public string Parameters { get; set; }

    // 工具处理函数的委托类型

    public Func<object[], object> Function { get; set; }


    public ToolDefinition(string name, string description, string parameters, Func<object[], object> function)

    {

        Name = name;

        Description = description;

        Parameters = parameters;

        Function = function;

    }

}



Chrome MCP Server

点击进入源码页

Chrome MCP Server 是一个基于chrome插件的 模型上下文协议 (MCP) 服务器,它将您的 Chrome 浏览器功能暴露给 Claude 等 AI 助手,实现复杂的浏览器自动化、内容分析和语义搜索等。与传统的浏览器自动化工具(如playwright)不同,Chrome MCP server直接使用您日常使用的chrome浏览器,基于现有的用户习惯和配置、登录态,让各种大模型或者各种chatbot都可以接管你的浏览器,真正成为你的如常助手


环境要求

Node.js >= 18.19.0 和 (npm 或 pnpm)

Chrome/Chromium 浏览器

安装pnpm :npm install -g pnpm

安装步骤

1.从github上下载最新的chrome扩展

下载地址:https://github.com/hangwin/mcp-chrome/releases


2.全局安装mcp-chrome-bridge

npm


npm install -g mcp-chrome-bridge

pnpm


# 方法1:全局启用脚本(推荐)

pnpm config set enable-pre-post-scripts true

pnpm install -g mcp-chrome-bridge


# 方法2:如果 postinstall 没有运行,手动注册

pnpm install -g mcp-chrome-bridge

mcp-chrome-bridge register

注意:pnpm v7+ 默认禁用 postinstall 脚本以提高安全性。enable-pre-post-scripts 设置控制是否运行 pre/post 安装脚本。如果自动注册失败,请使用上述手动注册命令。


3.加载 Chrome 扩展


打开 Chrome 并访问 chrome://extensions/

启用"开发者模式"

点击"加载已解压的扩展程序",选择 your/dowloaded/extension/folder

点击插件图标打开插件,点击连接即可看到mcp的配置


在支持MCP协议的客户端中使用

使用streamable http的方式连接(👍🏻推荐)

将以下配置添加到客户端的 MCP 配置中以cherryStudio为例:


推荐用streamable http的连接方式


{

  "mcpServers": {

    "chrome-mcp-server": {

      "type": "streamableHttp",

      "url": "http://127.0.0.1:12306/mcp"

    }

  }

}

使用stdio的方式连接(备选)

假设你的客户端仅支持stdio的连接方式,那么请使用下面的方法:


1.先查看你刚刚安装的npm包的安装位置

# npm 查看方式

npm list -g mcp-chrome-bridge

# pnpm 查看方式

pnpm list -g mcp-chrome-bridge

假设上面的命令输出的路径是:/Users/xxx/Library/pnpm/global/5 那么你的最终路径就是:/Users/xxx/Library/pnpm/global/5/node_modules/mcp-chrome-bridge/dist/mcp/mcp-server-stdio.js


2.把下面的配置替换成你刚刚得到的最终路径

{

  "mcpServers": {

    "chrome-mcp-stdio": {

      "command": "npx",

      "args": [

        "node",

        "/Users/xxx/Library/pnpm/global/5/node_modules/mcp-chrome-bridge/dist/mcp/mcp-server-stdio.js"

      ]

    }

  }

}


mcp2mqtt动动嘴就可以控制智能家居

点击进入开源地址

本项目通过将 MCP 协议转换为 MQTT 协议,我们能够利用强大的大型语言模型(LLMs),就能轻松操控您的智能家居、机器人或其他硬件设备。



excel-mcp-server使用 AI 代理创建、读取和修改 Excel 工作簿

点击查看源码

模型上下文协议 (MCP) 服务器,无需安装 Microsoft Excel 即可作 Excel 文件。使用 AI 代理创建、读取和修改 Excel 工作簿。

Excel 作 :创建、读取、更新工作簿和工作表

数据作 :公式、格式、图表、数据透视表和 Excel 表格

数据验证 :内置范围、公式和数据完整性验证

格式: 字体样式、颜色、边框、对齐方式和条件格式

表格作 :使用自定义样式创建和管理 Excel 表格

图表创建 :生成各种图表类型(折线图、条形图、饼图、散点图等)

数据透视表 :创建用于数据分析的动态数据透视表

工作表管理 :轻松复制、重命名、删除工作表

三重传输支持 :stdio、SSE(已弃用)和可流式传输的 HTTP

远程和本地 :既可以在本地工作,也可以作为远程服务工作



Trae+Chrome MCP Server 让AI接管你的浏览器

点击查看原文

一、安装与配置

环境准备

Node.js ≥ 18.19.0 + npm

Chrome/Chromium 浏览器

快速部署和使用

1、安装Chrome扩展插件

        从 GitHub Releases 下载最新扩展包,在 chrome://extensions 中启用开发者模式并加载解压的文件夹。


2、安装桥接工具

npm install -g mcp-chrome-bridge

AI写代码

    若自动注册失败,需手动执行 mcp-chrome-bridge register。


3、打开Chrome浏览器,点击扩展程序进入配置页

1.png


4、点击连接

1.png


5、打开Trae客户端,进入MCP管理页

1.png


6、添加MCP服务

{

  "mcpServers": {

    "chrome-mcp-server": {

      "type": "streamableHttp",

      "url": "http://127.0.0.1:12306/mcp"

    }

  }

}

AI写代码

7、MCP服务需要添加到智能体才可以在Trae内使用,进入智能体管理页面

1.png


8、配置智能体,勾选刚刚配置的MCP服务

1.png


9、在Trae的对话界面选择智能体就可以对话,让浏览器帮你干活啦

1.png


二、应用场景

内容创作

        AI 自动总结网页内容 → 调用 Excalidraw 绘制思维导图。


竞品监控

        定时截图竞品网站首页,对比分析布局更新。


工作流优化

        自动填写表单、批量关闭冗余标签页、整理书签。


开发调试

        网络请求拦截与 API 结构分析。


Windows MCP.Net基于 .NET 的 Windows 桌面自动化 MCP (Model Context Protocol) 服务器

点击进入源码页

一个基于 .NET 的 Windows 桌面自动化 MCP (Model Context Protocol) 服务器,为 AI 助手提供与 Windows 桌面环境交互的能力。

重要提示: 本项目需要 .NET 10 才能运行,请先确保你的本地安装了 .NET 10。如果尚未安装,请访问 .NET 10 下载页面 进行下载和安装。


1. MCP 客户端配置

在您的 MCP 客户端配置中添加以下配置:


使用全局安装的工具(推荐)

{

    "mcpServers": {

     "Windows-MCP.Net": {

      "type": "stdio",

      "command": "dnx",

      "args": ["Windows-MCP.Net@", "--yes"],

      "env": {}

    }

    }

}

使用项目源码直接运行(开发模式)

方式一:工作区配置

在项目根目录创建 .vscode/mcp.json 文件:


{

  "mcpServers": {

    "Windows-MCP.Net-Dev": {

      "type": "stdio",

      "command": "dotnet",

      "args": ["run", "--project", "src/Windows-MCP.Net.csproj"],

      "cwd": "${workspaceFolder}",

      "env": {}

    }

  }

}

方式二:用户配置

通过VS Code命令面板运行 MCP: Open User Configuration,添加:


{

  "mcpServers": {

    "Windows-MCP.Net-Local": {

      "type": "stdio",

      "command": "dotnet",

      "args": ["run", "--project", "src/Windows-MCP.Net.csproj"],

      "env": {}

    }

  }

}

注意: 使用项目源码方式便于开发调试,修改代码后无需重新安装即可生效。VS Code 1.102+ 版本支持MCP服务器的自动发现和管理。


2. 安装和运行

方式一:全局安装(推荐)

dotnet tool install --global WindowsMCP.Net

方式二:从源码运行

# 克隆仓库

git clone https://github.com/AIDotNet/Windows-MCP.Net.git

cd Windows-MCP.Net

# 构建项目

dotnet build

# 运行项目

dotnet run --project src/Windows-MCP.Net.csproj

3. 开始使用

配置完成后,重启您的MCP客户端,即可开始使用Windows桌面自动化功能!



Windows-MCP用于 Windows 计算机的轻量级 MCP 服务器

点击查看源码

"mcpServers": {

    "windows-mcp": {

      "command": "uv",

      "args": [

        "--directory",

        "<path to the windows-mcp directory>",

        "run",

        "main.py"

      ]

    }

  }



详细介绍NPX或UVX配置MCP服务

要使用 npx 或 uvx 配置和运行 MCP(Model Context Protocol)服务,需结合具体的 MCP 服务实现工具(通常是 Node.js 或 Python 生态的包)。以下是详细步骤和说明:

一、背景知识

npx:Node.js 生态的包执行工具,可直接运行 npm 仓库中的包(无需全局安装),适用于 Node.js 编写的 MCP 服务。

uvx:Rust 编写的快速 Python 包执行工具(类似 npx,属于 uv 包管理器的一部分),适用于 Python 编写的 MCP 服务。

MCP 服务:需遵循 MCP 协议(如可流式 HTTP 传输),提供标准化的客户端 - 服务器通信能力(如模型上下文管理、消息流式传输等)。

二、使用 npx 配置 Node.js 版 MCP 服务

若 MCP 服务基于 Node.js 实现(如 @mcp/server 等包),步骤如下:

1. 安装 Node.js 和 npm

确保环境中已安装 Node.js(含 npm),推荐版本 16+。

验证:node -v 和 npm -v 可正常输出版本号。

2. 直接运行 MCP 服务包

使用 npx 临时执行 MCP 服务包,无需提前安装。基本命令格式:


bash

npx [mcp-package-name] [配置参数]

3. 常见配置参数(以假设的 @mcp/server 为例)

MCP 服务通常支持通过命令行参数或环境变量配置核心功能,例如:


bash

# 启动 MCP 服务,指定端口、端点路径和允许的 Origin

npx @mcp/server \

  --port 8080 \                 # 服务端口(默认可能为 65432)

  --endpoint /mcp \             # MCP 端点路径(符合协议要求的单一端点)

  --allowed-origins "http://localhost:3000" \  # 允许的客户端 Origin(防 DNS 重绑定)

  --log-level info \            # 日志级别

  --session-timeout 3600        # 会话超时时间(秒)

4. 环境变量配置

部分参数可通过环境变量设置(优先级可能高于命令行):


bash

# 示例:通过环境变量配置

export MCP_PORT=8080

export MCP_ALLOWED_ORIGINS="http://localhost:3000,https://example.com"

npx @mcp/server  # 自动读取环境变量

5. 验证服务是否启动

服务启动后,可通过 curl 测试端点是否可用:


bash

curl http://localhost:8080/mcp -I  # 检查是否返回 200 或 405(允许 POST/GET)

三、使用 uvx 配置 Python 版 MCP 服务

若 MCP 服务基于 Python 实现(如 mcp-server 等包),步骤如下:

1. 安装 uv 包管理器

uvx 是 uv 的子命令,需先安装 uv:

bash

# 安装 uv(适用于 Linux/macOS,Windows 需调整)

curl -LsSf https://astral.sh/uv/install.sh | sh


验证:uv --version 和 uvx --version 可正常输出。

2. 直接运行 MCP 服务包

使用 uvx 执行 Python 版 MCP 服务,命令格式:


bash

uvx [mcp-package-name] [配置参数]

3. 常见配置参数(以假设的 mcp-server 为例)

Python 版 MCP 服务的配置参数与 Node.js 版类似,例如:


bash

# 启动 MCP 服务,配置核心参数

uvx mcp-server \

  --host 127.0.0.1 \            # 绑定本地地址(符合安全要求,避免暴露公网)

  --port 8080 \

  --endpoint /mcp \

  --cors-allowed-origins "http://localhost:3000" \  # CORS 配置(对应 Origin 验证)

  --enable-sse true             # 启用 SSE 流式传输(符合可流式 HTTP 要求)

4. 配置文件方式

复杂配置可通过 JSON 或 YAML 文件指定,例如:


bash

# 使用配置文件启动

uvx mcp-server --config ./mcp-config.json


mcp-config.json 示例:


json

{

  "port": 8080,

  "endpoint": "/mcp",

  "allowed_origins": ["http://localhost:3000"],

  "session": {

    "timeout": 3600,

    "auto_create": true

  }

}

5. 验证服务

同 Node.js 版,通过 curl 测试:


bash

curl http://localhost:8080/mcp -X POST \

  -H "Content-Type: application/json" \

  -d '{"jsonrpc": "2.0", "method": "initialize", "id": 1}'


四、核心配置项说明(符合 MCP 协议要求)

无论使用 npx 还是 uvx,MCP 服务的配置需满足协议规范:


端点路径:必须提供单一 HTTP 端点(如 /mcp),支持 POST(客户端发消息)和 GET(SSE 流)。

安全验证:

必须验证客户端的 Origin 头(通过 allowed-origins 配置)。

本地运行时绑定 127.0.0.1(而非 0.0.0.0),避免未授权访问。

会话管理:支持通过 Mcp-Session-Id 头管理会话,需配置会话超时时间。

流式传输:启用 SSE(text/event-stream)支持服务器向客户端流式发送消息。

五、总结

npx 适用于 Node.js 生态的 MCP 服务,直接运行 npm 包并通过命令行 / 环境变量配置。

uvx 适用于 Python 生态的 MCP 服务,依赖 uv 工具,支持配置文件和参数化启动。

核心是确保配置符合 MCP 协议(如端点、安全验证、流式传输),具体参数需参考所用 MCP 服务包的文档。


npx 的核心作用是 无需全局安装包,直接执行对应 MCP 工具的可执行文件。

例子 1:通用 TCP 控制型 MCP 服务(基于 mcp-server)

场景

搭建简单的 MCP 服务,实现「客户端 → MCP 服务 → 目标服务器」的 TCP 连接转发,并支持连接数限制、日志输出等基础控制。

完整 npx 命令

npx mcp-server@latest \

  --protocol tcp \

  --listen-port 8080 \

  --listen-host 0.0.0.0 \

  --target-host 192.168.1.100 \

  --target-port 9090 \

  --log-level info \

  --max-connections 50 \

  --timeout 30000


mcp-server@latest 包名 + 版本 如 @1.2.0(固定版) 指定要执行的 MCP 服务包,@latest 表示使用最新版本,生产环境建议固定版本(如 @1.2.0)。


例子 2:本地自定义 MCP 服务(基于项目依赖 mcp-cli)

场景

本地 Node.js 项目中已将 mcp-cli 作为开发依赖(npm install mcp-cli --save-dev),通过 npx 直接执行本地依赖的 MCP 工具,加载自定义配置文件启动服务(适合企业内部定制化 MCP 服务,如设备管理、数据采集)。

完整 npx 命令

bash

npx mcp-cli start \

  --config ./config/mcp-service.yaml \

  --env production \

  --port 8888 \

  --watch \

  --debug \

  --log-rotate 7


mcp-cli start包名 + 命令无默认值mcp-cli 是本地项目依赖的 MCP 管理工具,start 是其内置命令(表示 “启动 MCP 服务”)。



CherryStudio的MCP配置

点击查看官方配置MCP文档

手动配置一个STADIO MCP

一个STATIO MCP配置例子,它的源码地址

1.jpg


它的uvx的json

{

  "mcpServers": {

    "fetch": {

      "command": "uvx",

      "args": ["mcp-server-fetch"]

    }

  }

}


多参数配置,一行一个参数

1.jpg


自动安装MCP

直接给指令:

帮我安装一个 filesystem mcp server



AI辅助生成MCP例子

####【豆包】

你知道AI模型调用MCP吗??

。。。

写一个MCP的完整例子并给出模型调用这个MCP例子

。。。


Python实现
mcp_weather_server.py

import json

import socket

import threading

from typing import Dict, Any


class MCPWeatherServer:

    """MCP天气服务服务器实现"""

    

    def __init__(self, host: str = 'localhost', port: int = 8000):

        self.host = host

        self.port = port

        self.server_socket = None

        self.running = False

        

        # 模拟天气数据库

        self.weather_database = {

            "北京": {"temperature": 25, "condition": "晴朗", "wind": "微风"},

            "上海": {"temperature": 28, "condition": "多云", "wind": "东风3级"},

            "广州": {"temperature": 30, "condition": "小雨", "wind": "南风2级"},

            "深圳": {"temperature": 29, "condition": "阴天", "wind": "东南风1级"},

            "成都": {"temperature": 23, "condition": "晴转多云", "wind": "西北风2级"}

        }

    

    def start(self):

        """启动MCP服务器"""

        self.running = True

        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.server_socket.bind((self.host, self.port))

        self.server_socket.listen(5)

        print(f"MCP天气服务已启动,监听 {self.host}:{self.port}")

        

        try:

            while self.running:

                client_socket, client_address = self.server_socket.accept()

                print(f"接收到来自 {client_address} 的连接")

                client_handler = threading.Thread(

                    target=self.handle_client,

                    args=(client_socket,)

                )

                client_handler.start()

        except KeyboardInterrupt:

            print("服务器正在关闭...")

        finally:

            self.stop()

    

    def stop(self):

        """停止MCP服务器"""

        self.running = False

        if self.server_socket:

            self.server_socket.close()

        print("MCP天气服务已关闭")

    

    def handle_client(self, client_socket: socket.socket):

        """处理客户端请求"""

        try:

            # 接收请求数据

            data = client_socket.recv(1024).decode('utf-8')

            if not data:

                return

                

            print(f"收到请求: {data}")

            

            # 解析MCP请求

            request = json.loads(data)

            

            # 处理请求

            response = self.process_request(request)

            

            # 发送响应

            client_socket.sendall(json.dumps(response).encode('utf-8'))

            print(f"发送响应: {response}")

            

        except Exception as e:

            error_response = {

                "jsonrpc": "2.0",

                "error": {

                    "code": -1,

                    "message": f"处理请求时出错: {str(e)}"

                },

                "id": request.get("id") if 'request' in locals() else None

            }

            client_socket.sendall(json.dumps(error_response).encode('utf-8'))

        finally:

            client_socket.close()

    

    def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:

        """处理MCP请求"""

        # 验证请求格式

        if "jsonrpc" not in request or request["jsonrpc"] != "2.0":

            return {

                "jsonrpc": "2.0",

                "error": {"code": -32600, "message": "无效的请求"},

                "id": request.get("id")

            }

            

        if "method" not in request:

            return {

                "jsonrpc": "2.0",

                "error": {"code": -32601, "message": "方法不存在"},

                "id": request.get("id")

            }

        

        # 处理天气查询方法

        if request["method"] == "get_weather":

            params = request.get("params", {})

            city = params.get("city")

            

            if not city:

                return {

                    "jsonrpc": "2.0",

                    "error": {"code": -32602, "message": "缺少城市参数"},

                    "id": request.get("id")

                }

            

            # 查询天气数据

            weather_data = self.weather_database.get(city)

            if weather_data:

                return {

                    "jsonrpc": "2.0",

                    "result": {

                        "city": city,

                        "weather": weather_data,

                        "source": "MCP Weather Service"

                    },

                    "id": request.get("id")

                }

            else:

                return {

                    "jsonrpc": "2.0",

                    "error": {"code": 404, "message": f"未找到城市 {city} 的天气数据"},

                    "id": request.get("id")

                }

        

        # 处理服务描述方法

        elif request["method"] == "describe":

            return {

                "jsonrpc": "2.0",

                "result": {

                    "name": "WeatherService",

                    "description": "提供城市天气查询服务",

                    "methods": {

                        "get_weather": {

                            "description": "获取指定城市的天气信息",

                            "parameters": {

                                "city": {

                                    "type": "string",

                                    "description": "城市名称"

                                }

                            },

                            "returns": {

                                "type": "object",

                                "description": "包含温度、天气状况和风力的天气信息"

                            }

                        }

                    }

                },

                "id": request.get("id")

            }

        

        # 未知方法

        else:

            return {

                "jsonrpc": "2.0",

                "error": {"code": -32601, "message": f"方法 {request['method']} 不存在"},

                "id": request.get("id")

            }


if __name__ == "__main__":

    # 创建并启动MCP服务器

    server = MCPWeatherServer(host='localhost', port=8000)

    server.start()


mcp_client.py

import json

import socket

from typing import Dict, Any, Optional


class MCPClient:

    """MCP客户端,用于与MCP服务器通信"""

    

    def __init__(self, host: str = 'localhost', port: int = 8000):

        self.host = host

        self.port = port

        self.request_id = 0

    

    def _get_next_id(self) -> int:

        """获取下一个请求ID"""

        self.request_id += 1

        return self.request_id

    

    def send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:

        """发送MCP请求到服务器"""

        # 创建请求

        request = {

            "jsonrpc": "2.0",

            "method": method,

            "id": self._get_next_id()

        }

        

        if params is not None:

            request["params"] = params

        

        # 建立连接并发送请求

        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

            s.connect((self.host, self.port))

            s.sendall(json.dumps(request).encode('utf-8'))

            

            # 接收响应

            response_data = s.recv(1024).decode('utf-8')

            if not response_data:

                return {"error": "未收到响应"}

                

            return json.loads(response_data)

    

    def describe_service(self) -> Dict[str, Any]:

        """获取服务描述信息"""

        return self.send_request("describe")

    

    def get_weather(self, city: str) -> Dict[str, Any]:

        """查询指定城市的天气"""

        return self.send_request("get_weather", {"city": city})


# 模型调用MCP服务的示例

def model_use_mcp_service():

    """模拟AI模型使用MCP服务的过程"""

    print("=== AI模型开始运行 ===")

    

    # 1. 创建MCP客户端

    print("创建MCP客户端,准备连接天气服务...")

    client = MCPClient(host='localhost', port=8000)

    

    # 2. 先查询服务描述,了解可用功能

    print("\n查询MCP服务描述...")

    description = client.describe_service()

    print(f"服务描述: {json.dumps(description, ensure_ascii=False, indent=2)}")

    

    # 3. 根据服务描述,调用天气查询功能

    target_city = "北京"

    print(f"\n调用MCP服务查询 {target_city} 的天气...")

    weather_result = client.get_weather(target_city)

    

    # 4. 处理返回结果

    print(f"天气查询结果: {json.dumps(weather_result, ensure_ascii=False, indent=2)}")

    

    # 5. 根据结果生成回答

    if "result" in weather_result:

        weather_data = weather_result["result"]["weather"]

        response = (f"{target_city}当前的天气状况为:{weather_data['condition']},"

                   f"气温{weather_data['temperature']}°C,{weather_data['wind']}。")

    else:

        response = f"抱歉,无法获取{target_city}的天气信息:{weather_result.get('error', {}).get('message', '未知错误')}"

    

    print(f"\n=== AI模型生成回答 ===")

    print(response)


if __name__ == "__main__":

    # 模拟模型调用MCP服务

    model_use_mcp_service()


C#实现

Models.cs  

using System;

using System.Collections.Generic;

using System.Text.Json.Serialization;


namespace McpCommon

{

    /// <summary>

    /// MCP请求模型

    /// </summary>

    public class McpRequest

    {

        [JsonPropertyName("jsonrpc")]

        public string JsonRpc { get; set; } = "2.0";

        

        [JsonPropertyName("method")]

        public string Method { get; set; }

        

        [JsonPropertyName("params")]

        public Dictionary<string, object> Parameters { get; set; }

        

        [JsonPropertyName("id")]

        public int Id { get; set; }

    }

    

    /// <summary>

    /// MCP响应模型

    /// </summary>

    public class McpResponse

    {

        [JsonPropertyName("jsonrpc")]

        public string JsonRpc { get; set; } = "2.0";

        

        [JsonPropertyName("result")]

        public object Result { get; set; }

        

        [JsonPropertyName("error")]

        public McpError Error { get; set; }

        

        [JsonPropertyName("id")]

        public int Id { get; set; }

    }

    

    /// <summary>

    /// MCP错误模型

    /// </summary>

    public class McpError

    {

        [JsonPropertyName("code")]

        public int Code { get; set; }

        

        [JsonPropertyName("message")]

        public string Message { get; set; }

    }

    

    /// <summary>

    /// 天气数据模型

    /// </summary>

    public class WeatherData

    {

        public string City { get; set; }

        public string Condition { get; set; }

        public int Temperature { get; set; }

        public string Wind { get; set; }

        public DateTime UpdateTime { get; set; }

    }

    

    /// <summary>

    /// 服务描述模型

    /// </summary>

    public class ServiceDescription

    {

        public string Name { get; set; }

        public string Description { get; set; }

        public string Version { get; set; }

        public List<ServiceMethod> Methods { get; set; } = new List<ServiceMethod>();

    }

    

    /// <summary>

    /// 服务方法描述

    /// </summary>

    public class ServiceMethod

    {

        public string Name { get; set; }

        public string Description { get; set; }

        public List<MethodParameter> Parameters { get; set; } = new List<MethodParameter>();

    }

    

    /// <summary>

    /// 方法参数描述

    /// </summary>

    public class MethodParameter

    {

        public string Name { get; set; }

        public string Type { get; set; }

        public string Description { get; set; }

        public bool Required { get; set; }

    }

}

    

MCPServer.cs

using System;

using System.Collections.Generic;

using System.Net;

using System.Net.Sockets;

using System.Text;

using System.Text.Json;

using System.Threading;

using McpCommon;


namespace McpServer

{

    class Program

    {

        private static TcpListener _server;

        private static bool _isRunning;

        private static readonly Dictionary<string, WeatherData> _mockWeatherData = new Dictionary<string, WeatherData>

        {

            { "北京", new WeatherData { City = "北京", Condition = "晴朗", Temperature = 25, Wind = "微风", UpdateTime = DateTime.Now } },

            { "上海", new WeatherData { City = "上海", Condition = "多云", Temperature = 28, Wind = "东风3级", UpdateTime = DateTime.Now } },

            { "广州", new WeatherData { City = "广州", Condition = "小雨", Temperature = 30, Wind = "南风2级", UpdateTime = DateTime.Now } },

            { "深圳", new WeatherData { City = "深圳", Condition = "阴天", Temperature = 29, Wind = "东北风1级", UpdateTime = DateTime.Now } }

        };


        static void Main(string[] args)

        {

            StartServer();

            

            Console.WriteLine("MCP天气服务已启动,按任意键停止...");

            Console.ReadKey();

            

            StopServer();

        }


        private static void StartServer()

        {

            int port = 8000;

            _server = new TcpListener(IPAddress.Any, port);

            _server.Start();

            _isRunning = true;

            

            Console.WriteLine($"MCP服务器已启动,监听端口 {port}...");

            

            // 启动一个线程处理客户端连接

            new Thread(ListenForClients) { IsBackground = true }.Start();

        }


        private static void StopServer()

        {

            _isRunning = false;

            _server.Stop();

            Console.WriteLine("MCP服务器已停止");

        }


        private static void ListenForClients()

        {

            while (_isRunning)

            {

                try

                {

                    // 等待客户端连接

                    TcpClient client = _server.AcceptTcpClient();

                    Console.WriteLine("新客户端连接");

                    

                    // 为每个客户端创建一个线程处理

                    new Thread(HandleClient) { IsBackground = true }.Start(client);

                }

                catch (SocketException ex)

                {

                    if (_isRunning)

                    {

                        Console.WriteLine($"Socket错误: {ex.Message}");

                    }

                }

                catch (Exception ex)

                {

                    Console.WriteLine($"错误: {ex.Message}");

                }

            }

        }


        private static void HandleClient(object obj)

        {

            using (TcpClient client = (TcpClient)obj)

            using (NetworkStream stream = client.GetStream())

            {

                byte[] buffer = new byte[1024];

                int bytesRead;

                

                try

                {

                    // 读取客户端发送的数据

                    while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)

                    {

                        string requestString = Encoding.UTF8.GetString(buffer, 0, bytesRead);

                        Console.WriteLine($"收到请求: {requestString}");

                        

                        // 解析请求

                        McpRequest request = JsonSerializer.Deserialize<McpRequest>(requestString);

                        McpResponse response = ProcessRequest(request);

                        

                        // 发送响应

                        string responseString = JsonSerializer.Serialize(response);

                        byte[] responseBytes = Encoding.UTF8.GetBytes(responseString);

                        stream.Write(responseBytes, 0, responseBytes.Length);

                        

                        Console.WriteLine($"发送响应: {responseString}");

                    }

                }

                catch (Exception ex)

                {

                    Console.WriteLine($"处理客户端时出错: {ex.Message}");

                }

                

                Console.WriteLine("客户端连接已关闭");

            }

        }


        private static McpResponse ProcessRequest(McpRequest request)

        {

            var response = new McpResponse { Id = request.Id };

            

            try

            {

                switch (request.Method)

                {

                    case "describe":

                        response.Result = GetServiceDescription();

                        break;

                        

                    case "get_weather":

                        if (request.Parameters != null && request.Parameters.ContainsKey("city"))

                        {

                            string city = request.Parameters["city"].ToString();

                            response.Result = new { weather = GetWeatherData(city) };

                        }

                        else

                        {

                            response.Error = new McpError { Code = -1, Message = "缺少参数: city" };

                        }

                        break;

                        

                    default:

                        response.Error = new McpError { Code = -2, Message = $"未知方法: {request.Method}" };

                        break;

                }

            }

            catch (Exception ex)

            {

                response.Error = new McpError { Code = -999, Message = ex.Message };

            }

            

            return response;

        }


        private static ServiceDescription GetServiceDescription()

        {

            return new ServiceDescription

            {

                Name = "WeatherService",

                Description = "提供城市天气查询服务",

                Version = "1.0.0",

                Methods = new List<ServiceMethod>

                {

                    new ServiceMethod

                    {

                        Name = "describe",

                        Description = "获取服务描述信息",

                        Parameters = new List<MethodParameter>()

                    },

                    new ServiceMethod

                    {

                        Name = "get_weather",

                        Description = "查询指定城市的天气信息",

                        Parameters = new List<MethodParameter>

                        {

                            new MethodParameter

                            {

                                Name = "city",

                                Type = "string",

                                Description = "城市名称",

                                Required = true

                            }

                        }

                    }

                }

            };

        }


        private static WeatherData GetWeatherData(string city)

        {

            if (_mockWeatherData.TryGetValue(city, out var weatherData))

            {

                // 更新时间为当前时间

                weatherData.UpdateTime = DateTime.Now;

                return weatherData;

            }

            

            // 如果没有该城市的数据,返回一个默认值

            return new WeatherData

            {

                City = city,

                Condition = "未知",

                Temperature = 0,

                Wind = "未知",

                UpdateTime = DateTime.Now

            };

        }

    }

}

    

MCPClient.cs

MCP服务相关技术收集MCP

MCP协议详细讲解

AI 模型调用 MCP 服务的过程

AI 模型调用 MCP 服务的代码示例

C# 编写 Agent 调用 ollama 及 MCP 的方法

Trae+Chrome MCP Server 让AI接管你的浏览器

MCP典型服务

Chrome MCP Server

mcp2mqtt动动嘴就可以控制智能家居

excel-mcp-server使用 AI 代理创建、读取和修改 Excel 工作簿

Windows MCP.Net基于 .NET 的 Windows 桌面自动化 MCP (Model Context Protocol) 服务器

Windows-MCP用于 Windows 计算机的轻量级 MCP 服务器

AI辅助生成MCP例子



MCP协议详细讲解

Model Context Protocol(MCP)是由 Anthropic 推出的一种开源协议,旨在建立 AI 模型与开发环境之间的统一上下文交互标准。以下是对其的详细讲解:

1.png

背景与起源

在 AI 领域,大语言模型的应用常常面临数据孤岛和工具集成复杂的问题,不同的数据源和工具都有自己独特的 API 和集成方式,导致开发者需要为每个工具单独编写集成代码,开发和维护成本高,工具复用性低。为解决这些问题,Anthropic 于 2024 年 11 月 25 日开源发布了 MCP

核心概念

  • MCP Hosts :如 Claude Desktop、Cursor、 Cline、trae、Cherry Studio、IDE、AI 应用等,是运行 MCP 的主应用程序,负责发起连接并管理多个客户端实例,与用户进行交互并接收用户的问题或指令,再通过 MCP Clients 向 MCP Servers 发送请求以获取所需的数据或执行相应的操作
  • MCP Clients :由 Host 创建,与特定的 MCP Server 建立一对一的连接,负责协议通信和能力协商,充当 MCP Host 和 MCP Server 之间的“连接器”或“总机”,将 Host 的请求转发给 Server,并将 Server 的响应返回给 Host
  • MCP Servers :轻量级程序,通过标准化的模型上下文协议暴露特定功能,如文件读取、API 调用、数据库查询等,为 MCP Host 提供实际的数据和功能支持,每个 Server 提供一个特定的工具或能力

工作原理

  • 通信机制 :MCP 采用客户端 - 服务器架构,基于 JSON-RPC 2.0 消息格式,支持请求、响应和通知三种消息类型,确保通信的标准化和一致性。它主要支持两种通信方式,一种是标准输入输出(Stdio),适用于在同一台机器上运行的客户端和服务器之间的通信,客户端启动服务器程序作为子进程,消息通讯通过 stdin/stdout 进行;另一种是服务器发送事件(SSE),利用 HTTP 协议实现跨网络的实时数据传输,适用于需要访问远程资源的场景
  • 工作流程 :首先,Host 初始化 Client,Client 尝试与 Server 建立连接,并询问 Server 有哪些能力,Server 回应并返回所有支持的功能,从而建立 1:1 的连接。当用户与 LLM 交互提出问题或指令时,LLM 会告诉 Client 需要调用哪些 Tools,Client 收到 Host 的请求后,将请求信息发送给对应的 Server,Server 接收到信息后开始处理,处理完成后返回结果给 Client,Client 再将信息返回给 Host,Host 根据这些信息生成最终的回复返回给用户

主要功能与特性

  • 统一接口 :提供标准化的 API 接口,简化 AI 与各类上下文信息的交互,使得开发者无需为不同的数据源或工具编写复杂的适配代码,降低了开发难度和成本
  • 无缝集成 :能够轻松集成到现有开发工作流程中,无需对现有的系统和工具进行大规模的改造,即可实现 AI 模型与各种数据源和工具的快速连接和集成,提高了开发效率
  • 多源数据支持 :支持从多种来源获取上下文信息,包括代码仓库、文档、文件系统、数据库、远程 API 等,丰富了 AI 模型可以利用的数据资源,为模型的训练和推理提供了更全面、准确的上下文信息
  • 智能上下文管理 :优化上下文信息的传递,根据 AI 模型的需求和任务场景,自动筛选、组织和更新上下文信息,提高 AI 模型理解和生成的质量,使模型能够更准确地理解和处理复杂的任务
  • 扩展性设计 :具有灵活的架构,允许添加新的数据源和功能,开发者可以根据实际需求自由扩展 MCP 的功能,不断增加新的 MCP Servers 来提供新的工具和能力,以满足不断变化的业务需求

应用场景

  • 智能开发环境 :将 AI 模型与开发工具集成,如代码编辑器、IDE 等,使 AI 能够实时理解代码上下文,为开发者提供智能代码补全、代码生成、代码解释等功能,提高开发效率和代码质量
  • 智能办公助手 :连接办公软件和在线服务,如日历、邮件、文档管理系统等,帮助用户自动完成日常办公任务,如安排会议、发送邮件、生成报告等,提升办公自动化水平
  • 智能数据分析 :将 AI 模型与数据分析工具和数据源集成,如数据库、数据仓库、BI 工具等,使 AI 能够协助分析师进行数据查询、数据分析、数据可视化等工作,为决策提供更有力的支持
  • 智能客服系统 :集成企业内部的客户信息管理系统、订单管理系统等,使 AI 客服能够快速准确地获取客户相关的信息,为客户提供更精准、高效的咨询服务,提高客户满意度


JSON-RPC 2.0 的请求和响应示例

请求示例

{

  "jsonrpc": "2.0",

  "method": "add",

  "params": [2, 3],

  "id": 1

}

响应示例

{

  "jsonrpc": "2.0",

  "result": 5,

  "id": 1

}

错误响应示例

{

  "jsonrpc": "2.0",

  "error": {

    "code": -32601,

    "message": "Method not found"

  },

  "id": 1

}


批量请求示例

[

  {

    "jsonrpc": "2.0",

    "method": "add",

    "params": [2, 3],

    "id": 1

  },

  {

    "jsonrpc": "2.0",

    "method": "subtract",

    "params": [5, 2],

    "id": 2

  }

]

批量响应示例

[

  {

    "jsonrpc": "2.0",

    "result": 5,

    "id": 1

  },

  {

    "jsonrpc": "2.0",

    "result": 3,

    "id": 2

  }

]

这些示例展示了 JSON-RPC 2.0 请求和响应的基本格式,包括请求、响应以及错误处理等场景。



AI 模型调用 MCP 服务的过程


准备工作

  • 搭建 MCP 服务器 :开发者需先搭建 MCP 服务器,可使用 Anthropic 提供的开源框架如 FastMCP 等。定义好服务器的基本配置,如名称、版本等,并通过装饰器注册工具函数,每个工具函数对应一个具体的功能,如查询天气、发送邮件等
  • 集成 MCP 客户端 :在 AI 模型所在的系统或应用中集成 MCP 客户端,如在 Python 项目中安装相应的客户端库等

调用过程

  1. 用户发起请求 :用户通过与 AI 模型交互的方式,如在聊天界面输入问题或指令,表达其想要实现的需求
  2. AI 模型解析意图 :MCP 客户端将用户请求发送给 AI 模型,AI 模型对用户输入的自然语言进行理解和解析,确定用户的需求,并选择与之匹配的 MCP 函数及相应的参数
  3. 发送调用请求 :MCP 客户端根据 AI 模型返回的函数调用信息,向 MCP 服务器发送请求。请求中包含请求 ID、函数名称及参数等,格式通常为 JSON
  4. 执行服务并返回结果 :MCP 服务器接收到请求后,执行对应的工具函数,在本地或通过调用其他外部服务完成具体的任务处理,然后将执行结果按照规定的格式返回给 MCP 客户端。
  5. 结果整合与反馈 :MCP 客户端将 MCP 服务器返回的结果整合到对话上下文中,再次发送给 AI 模型。AI 模型根据整合后的上下文生成自然语言的回复,由 MCP 客户端将最终结果呈现给用户

通信机制

  • STDIO 方式 :适用于本地服务。MCP 客户端启动 MCP 服务器程序作为子进程,通过 stdin 和 stdout 进行消息传递,消息格式遵循 JSON-RPC 2.0
  • SSE 方式 :用于远程服务。MCP 客户端通过 Server-Sent Events 与 MCP 服务器进行通信,服务器定义特定接口用于推送和接收数据,消息格式同样基于 JSON-RPC 2.0

动态上下文管理

MCP 协议具有动态上下文管理机制,通过上下文窗口存储用户偏好、会话记录和环境数据等。在每次交互过程中,上下文会不断扩展和更新,同时对非关键信息进行压缩,以保证对话的连贯性和减少资源消耗

安全与认证机制

  • API 密钥验证 :MCP 服务通常会要求进行 API 密钥验证,只有拥有正确 API 密钥的客户端才能访问服务。
  • HTTPS 加密 :采用 HTTPS 协议对数据传输进行加密,确保数据在网络传输过程中的安全性。
  • 请求限制 :设置请求频率限制和配额限制等,防止服务被滥用,保护服务的稳定性和可靠性



AI 模型调用 MCP 服务的代码示例


基于 Python 的简单 MCP 服务端

Python

pip install mcp

/*


from mcp.server.fastmcp import FastMCP

import tools


mcp=FastMCP("host info mcp")

mcp.add_tool(tools.get_host_info)


def main()->none:

   print("hello from ai-mcp!")

   mcp.run("stdio")#还有sse

   mcp.run(transport="sse", port=9000)

   


if __name__=="__main__":

   main()


tools.py内容

def get_hosts_info()->str:

...



*/


from mcp import MCP, tool


mcp = MCP()


@mcp.register_tool

class WeatherTool:

    @tool(name="get_weather", description="获取指定城市的天气信息")

    def get_weather(self, city: str):

        # 这里可以添加获取天气信息的逻辑,比如调用天气 API 等

        return f"The weather in {city} is sunny."


    @tool(name="get_weather_forecast", description="获取指定城市未来几天的天气预报")

    def get_weather_forecast(self, city: str, days: int):

        # 添加获取天气预报的逻辑

        return f"The weather forecast for {city} in the next {days} days."


if __name__ == "__main__":

    mcp.start()

基于 Java 的 MCP 服务端

java

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.io.ClassPathResource;

import org.springframework.core.io.Resource;

import org.springframework.util.StringUtils;

import org.springframework.web.reactive.function.server.RouterFunctions;

import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;


import java.util.Collections;

import java.util.Map;


@Configuration

public class MCPConfig {


    @Bean

    public RouterFunctions.Route mcpRoute() {

        return RouterFunctions.route()

                .POST("/mcp/weather", this::getWeather)

                .build();

    }


    private Mono<ServerResponse> getWeather(ServerRequest request) {

        Map<String, String> queryParams = request.queryParams();

        String city = queryParams.get("city");

        if (StringUtils.hasText(city)) {

            String weatherInfo = "The weather in " + city + " is sunny.";

            return ServerResponse.ok().bodyValue(Collections.singletonMap("weather", weatherInfo));

        } else {

            return ServerResponse.badRequest().bodyValue(Collections.singletonMap("error", "City parameter is required"));

        }

    }

}

AI 模型调用 MCP 服务的代码示例

基于 Python 的 OpenAI 模型调用 MCP 服务

Python

import openai

import requests

from mcp import MCPClient


# 初始化 MCP 客户端

mcp_client = MCPClient()

mcp_client.connect("http://localhost:8080/mcp")  # 替换为 MCP 服务的实际地址


# 调用 MCP 服务中的工具

def call_mcp_tool(tool_name, **kwargs):

    return mcp_client.call_tool(tool_name, **kwargs)


# 使用 OpenAI 模型

openai.api_key = "your_openai_api_key"  # 替换为你的 OpenAI API 密钥


def get_ai_response(user_query):

    # 构建消息列表

    messages = [

        {"role": "system", "content": "You are a helpful assistant."},

        {"role": "user", "content": user_query}

    ]


    # 调用 OpenAI API

    response = openai.ChatCompletion.create(

        model="gpt-4-turbo-preview",

        messages=messages,

        tools=mcp_client.list_tools(),  # 提供可用的 MCP 工具

        tool_choice="auto"

    )


    # 处理模型返回的结果

    ai_response = ""

    for choice in response.choices:

        message = choice.message

        if message.content:

            ai_response += message.content + "\n"

        if message.tool_calls:

            for tool_call in message.tool_calls:

                tool_name = tool_call.function.name

                tool_args = tool_call.function.arguments

                # 调用 MCP 工具

                tool_result = call_mcp_tool(tool_name, **tool_args)

                ai_response += f"Calling tool {tool_name} with args {tool_args}\n"

                ai_response += tool_result + "\n"

                # 将工具结果添加到消息中,继续与模型对话

                messages.append({"role": "assistant", "content": "", "tool_calls": [tool_call]})

                messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": tool_result})

                # 获取下一个模型响应

                next_response = openai.ChatCompletion.create(

                    model="gpt-4-turbo-preview",

                    messages=messages,

                    tools=mcp_client.list_tools(),

                    tool_choice="auto"

                )

                if next_response.choices[0].message.content:

                    ai_response += next_response.choices[0].message.content + "\n"

    return ai_response


# 示例 usage

user_input = "What's the weather like in Beijing?"

print(get_ai_response(user_input))

基于 Java 的 Spring AI 模型调用 MCP 服务

java

import org.springframework.ai.chat.client.ChatClient;

import org.springframework.ai.chat.client.ChatClientBuilder;

import org.springframework.ai.mcp.client.McpClient;

import org.springframework.ai.mcp.client.McpClientBuilder;

import org.springframework.ai.mcp.client.McpFunctionCallback;

import org.springframework.ai.tool.ToolCallbackProvider;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;


import java.util.List;


@Configuration

public class AIConfig {


    @Bean

    public McpClient mcpClient() {

        // 配置 MCP 服务器参数

        var stdioParams = ServerParameters.builder("java")

                .args("-Dspring.ai.mcp.server.stdio=true",

                        "-Dspring.main.web-application-type=none",

                        "-Dlogging.pattern.console=",

                        "-jar",

                        "target/mcp-server-0.0.1-SNAPSHOT.jar")

                .build();

        // 创建 MCP 客户端

        return McpClient.sync(new StdioClientTransport(stdioParams));

    }


    @Bean

    public ToolCallbackProvider mcpTools(McpClient mcpClient) {

        // 获取 MCP 服务器中的工具列表并转换为 Function Callback

        return mcpClient.listTools(null)

                .tools()

                .stream()

                .map(tool -> new McpFunctionCallback(mcpClient, tool))

                .toList();

    }


    @Bean

    public ChatClient chatClient(ChatClientBuilder chatClientBuilder, ToolCallbackProvider mcpTools) {

        // 初始化聊天客户端并设置默认工具

        return chatClientBuilder.defaultFunctions(mcpTools).build();

    }

}


// 使用示例

public class AIService {

    private final ChatClient chatClient;


    public AIService(ChatClient chatClient) {

        this.chatClient = chatClient;

    }


    public String generateResponse(String userQuery) {

        return chatClient

                .prompt(userQuery)

                .call()

                .content();

    }

}


使用 ASP.NET Core 来创建一个 Web API 服务,并模拟天气查询功能

C# MCP 服务端代码示例

csharp

using Microsoft.AspNetCore.Builder;

using Microsoft.AspNetCore.Hosting;

using Microsoft.AspNetCore.Mvc;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;


namespace MCPService

{

    public class Startup

    {

        public void ConfigureServices(IServiceCollection services)

        {

            services.AddControllers();

        }


        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

        {

            if (env.IsDevelopment())

            {

                app.UseDeveloperExceptionPage();

            }


            app.UseRouting();


            app.UseEndpoints(endpoints =>

            {

                endpoints.MapControllers();

            });

        }

    }

    /*

这段代码定义了一个名为WeatherController的控制器类。这个类继承自ControllerBase

,并使用了[ApiController]和[Route("mcp")]特性,表明这是用于处理HTTP请求的API控制器,并指定了其基础路由为/mcp。

在控制器类中,定义了一个名为GetWeather的方法,该方法使用了[HttpGet("weather")]特性。这表明这是一个处理HTTP GET请求的方法,其对应的路由是mcp/weather。这个方法接受一个名为city的参数,通过[FromQuery]特性从查询字符串中获取。

当客户端向/mcp/weather发起一个带有查询参数(如city=Paris)的GET请求时,GetWeather方法会处理这个请求,并返回一个IActionResult类型的对象,即一个包含响应内容和状态码的对象。在实际应用中,这个方法可能会根据提供的城市名称来获取和返回相应的天气信息。

*/

    [ApiController]

    [Route("mcp")]

    public class WeatherController : ControllerBase

    {

        [HttpGet("weather")]

        public IActionResult GetWeather([FromQuery] string city)

        {

            if (string.IsNullOrEmpty(city))

            {

                return BadRequest("City parameter is required");

            }


            // 模拟天气查询逻辑

            var weatherInfo = $"The weather in {city} is sunny.";


            return Ok(new { weather = weatherInfo });

        }


        [HttpGet("weather-forecast")]

        public IActionResult GetWeatherForecast([FromQuery] string city, [FromQuery] int days)

        {

            if (string.IsNullOrEmpty(city))

            {

                return BadRequest("City parameter is required");

            }


            // 模拟天气预报逻辑

            var weatherForecast = $"The weather forecast for {city} in the next {days} days.";


            return Ok(new { forecast = weatherForecast });

        }

   

[HttpPost("weather")]

    public IActionResult PostWeather([FromBody] WeatherRequest weatherRequest)

    {

        // 处理POST请求的逻辑

        return Ok($"Received weather data for {weatherRequest.City}: {weatherRequest.Temperature}°C");

    }



// 新增的可以GET和POST调用的方法

    [HttpGet("data")]

    [HttpPost("data")]

    public IActionResult GetAndPostData([FromQuery] string name, [FromBody] string postData = null)

    {

        if (HttpContext.Request.Method == HttpMethods.Get)

        {

            return Ok($"Received GET request with name: {name}");

        }

        else if (HttpContext.Request.Method == HttpMethods.Post)

 {

                   return Ok($"Received POST request with name: {name} and post data: {postData}");

        }


        return BadRequest("Invalid request method");

    }



}


public class WeatherRequest

{

    public string City { get; set; }

    public double Temperature { get; set; }

}



    public class Program

    {

        public static void Main(string[] args)

        {

            var host = CreateHostBuilder(args).Build();

            host.Run();

        }


        public static IHostBuilder CreateHostBuilder(string[] args) =>

            Host.CreateDefaultBuilder(args)

                .ConfigureWebHostDefaults(webBuilder =>

                {

                    webBuilder.UseStartup<Startup>();

                });

    }

}

说明

项目结构:

Startup 类配置了服务和中间件。

WeatherController 是一个 ASP.NET Core 控制器,提供了两个端点 /mcp/weather 和 /mcp/weather-forecast。

功能:

/mcp/weather 接受一个 city 参数,返回该城市的天气信息。

/mcp/weather-forecast 接受 city 和 days 参数,返回该城市未来几天的天气预报。

运行服务:

创建一个 ASP.NET Core 项目并复制上述代码。

使用 dotnet run 命令启动服务,服务默认运行在 https://localhost:5001 和 http://localhost:5000。

测试服务:

使用工具如 Postman 或 curl 测试 API:

获取天气信息:GET http://localhost:5000/mcp/weather?city=Beijing

获取天气预报:GET http://localhost:5000/mcp/weather-forecast?city=Beijing&days=3


C# 用OpenAI模型调用

public class WeatherRequestFunction : IRequestFunction

{

    public string Name => "weather";


    public async Task<IDictionary<string, object>> InvokeAsync(IDictionary<string, object> parameters)

    {

        using var httpClient = new HttpClient();

        var city = parameters["city"].ToString();

        var response = await httpClient.GetAsync($"http://localhost:5000/mcp/weather?city={city}");

        response.EnsureSuccessStatusCode();

        var content = await response.Content.ReadAsStringAsync();

        return new Dictionary<string, object> { { "result", content } };

    }


    public string Serialize(IRequestFunction function)

    {

        return $"Calling tool {function.Name} with parameters {function}";

    }


    public Task SerializeAsync(IRequestFunction function)

    {

        // 实现序列化逻辑

        return Task.CompletedTask;

    }

}

代码说明

OpenAI 客户端:创建 OpenAI 客户端实例,并配置 API 密钥和日志记录器。

MCP 工具:定义一个 WeatherRequestFunction 类实现 IRequestFunction 接口,用于调用 MCP 服务中的天气查询工具。

ChatClient:创建 ChatClient 实例,传入 OpenAI 客户端和日志记录器。

聊天历史:初始化聊天历史记录。

交互式聊天:在循环中读取用户输入,将其添加到聊天历史中,并调用 OpenAI API 获取响应。打印助手的响应并将其添加到聊天历史中。

其他信息

确保你的 MCP 服务正在运行,并且可以通过 http://localhost:5000/mcp/weather 访问。

替换成您的实际 OpenAI API 密钥。

如果您没有 MCP 服务,可以参考前面的示例代码实现一个简单的 MCP 服务。



C# 编写 Agent 调用 ollama 及 MCP 的方法


调用 ollama

安装 Ollama SDK :可以使用如下命令安装 Ollama 的 C# SDK,“dotnet add package Ollama --version 1.9.0”。

简单对话 :设置模型名称和 ollama 的 API 地址,利用 OllamaApiClient 进行对话生成,示例代码如下:

**```csharp

string modelName = "qwen2:7b";

using var ollama = new OllamaApiClient(baseUri: new Uri("http://127.0.0.1:11434/api"));

Console.WriteLine("开始对话!!!");

string userInput = "";

do

{

Console.WriteLine("User:");

userInput = Console.ReadLine()!;

var enumerable = ollama.Completions.GenerateCompletionAsync(modelName, userInput);

Console.WriteLine("Agent:");

await foreach (var response in enumerable)

{

Console.Write($"{response.Response}");

}

Console.WriteLine();

} while (!string.Equals(userInput, "exit", StringComparison.OrdinalIgnoreCase));

Console.WriteLine("对话结束!!!");



**[^2^]


  * **多轮对话** :使用 Chat 对象进行分角色的多轮对话,通过 List 消息存储之前的对话记录,以便模型捕获上下文,代码示例:

**```csharp

string modelName = "glm4:9b";

using var ollama = new OllamaApiClient(baseUri: new Uri("http://127.0.0.1:11434/api"));

Console.WriteLine("开始对话!!!");

string userInput = "";

List messages = [];

do

{

    messages = messages.TakeLast(5).ToList();

    Console.WriteLine("User:");

    userInput = Console.ReadLine()!;

    messages.Add(new Message(MessageRole.User, userInput));

    var enumerable = ollama.Chat.GenerateChatCompletionAsync(modelName, messages, stream: true);

    Console.WriteLine("Agent:");

    StringBuilder builder = new();

    await foreach (var response in enumerable)

    {

        string content = response.Message.Content;

        builder.AppendLine(content);

        Console.Write(content);

    }

    messages.Add(new Message(MessageRole.Assistant, builder.ToString()));

    Console.WriteLine();

} while (!string.Equals(userInput, "exit", StringComparison.OrdinalIgnoreCase));

Console.WriteLine("对话结束!!!");



```**[^2^]


  * **Function call** :若要使用 function call 功能,需设置 autoCallTools 为 true,并给 Ollama 注册相应的工具类,代码如下:

**```csharp

string modelName = "llama3.1:8b";

using var ollama = new OllamaApiClient(baseUri: new Uri("http://127.0.0.1:11434/api"));

var chat = ollama.Chat(

    model: modelName,

    systemMessage: "You are a helpful assistant.",

    autoCallTools: true);

var mathService = new MathService();

chat.AddToolService(mathService.AsTools(), mathService.AsCalls());

while (true)

{

    try

    {

        Console.WriteLine("User>");

        var newMessage = Console.ReadLine();

        var msg = await chat.SendAsync(newMessage);

        Console.WriteLine("Agent> " + msg.Content);

    }

    finally

    {

        Console.WriteLine(chat.PrintMessages());

    }

}

```**[^2^]


### 调用 MCP

目前未查到有成熟的 C# 调用 MCP 的官方 SDK 或常用库,但可以尝试以下思路:

  * **通过 HTTP 请求调用** :如果 MCP 提供了 HTTP API 接口,可以使用 C# 中的 HttpClient 类来发送 HTTP 请求进行调用。基本步骤如下:

    1. 创建 HttpClient 对象,并设置相关请求头等信息。

    2. 构造请求内容,根据 MCP API 的要求,将请求参数进行序列化。

    3. 发送 POST 或 GET 等请求到 MCP 的 API 地址。

    4. 处理返回的响应,对响应内容进行反序列化等操作,获取所需数据。


例如,假设 MCP 的 API 地址为<http://localhost:8000/mcp/api>,需要发送一个获取数据的请求,代码可能如下:

**```csharp

using (var client = new HttpClient())

{

    client.BaseAddress = new Uri("http://localhost:8000/");

    var response = await client.GetAsync("mcp/api?param1=value1");

    if (response.IsSuccessStatusCode)

    {

        var result = await response.Content.ReadAsStringAsync();

        // 处理结果

    }

    else

    {

        // 处理错误

    }

}

```**

  * **参考 Python 实现进行移植** :上述提到的 Python 调用 MCP 的代码示例中,展示了如何创建 MCP Server、获取工具列表以及创建 Agent 等过程。可以参考其逻辑和交互方式,在 C# 中进行相应的实现。例如,对于 MCP Server 的连接,可以考虑在 C# 中使用类似的网络通信方式;对于工具的调用,可以解析 MCP 返回的工具信息,并根据用户输入的问题,选择合适的工具进行调用等[^4^]。


MathService代码:

using System;

using System.Collections.Generic;

using System.Linq;


// 定义工具接口

public interface IToolsService

{

    public string Name { get; }

    public List<ToolDefinition> DefineTools();

}


// 定义工具函数

public class MathService : IToolsService

{

    public string Name => "math";

    

    // 定义工具

    public List<ToolDefinition> DefineTools()

    {

        return new List<ToolDefinition>

        {

            new ToolDefinition("add", "Add two numbers", "a: number, b: number", Add),

            new ToolDefinition("subtract", "Subtract two numbers", "a: number, b: number", Subtract),

            new ToolDefinition("multiply", "Multiply two numbers", "a: number, b: number", Multiply),

            new ToolDefinition("divide", "Divide two numbers", "a: number, b: number", Divide)

        };

    }


    // 加法

    public double Add(double a, double b)

    {

        return a + b;

    }


    // 减法

    public double Subtract(double a, double b)

    {

        return a - b;

    }


    // 乘法

    public double Multiply(double a, double b)

    {

        return a * b;

    }


    // 除法

    public double Divide(double a, double b)

    {

        if (b == 0)

        {

            throw new DivideByZeroException("Division by zero is not allowed.");

        }

        return a / b;

    }

}



ToolDefinition类通常不是.NET标准库或框架的一部分,也不是Ollama或其他主流库的内置类。因此,您需要自己定义它,或者参考您所使用的特定库或框架的实现

using System;


public class ToolDefinition

{

    // 工具名称

    public string Name { get; set; }

    // 工具描述

    public string Description { get; set; }

    // 工具参数列表

    public string Parameters { get; set; }

    // 工具处理函数的委托类型

    public Func<object[], object> Function { get; set; }


    public ToolDefinition(string name, string description, string parameters, Func<object[], object> function)

    {

        Name = name;

        Description = description;

        Parameters = parameters;

        Function = function;

    }

}



Chrome MCP Server

点击进入源码页

Chrome MCP Server 是一个基于chrome插件的 模型上下文协议 (MCP) 服务器,它将您的 Chrome 浏览器功能暴露给 Claude 等 AI 助手,实现复杂的浏览器自动化、内容分析和语义搜索等。与传统的浏览器自动化工具(如playwright)不同,Chrome MCP server直接使用您日常使用的chrome浏览器,基于现有的用户习惯和配置、登录态,让各种大模型或者各种chatbot都可以接管你的浏览器,真正成为你的如常助手


环境要求

Node.js >= 18.19.0 和 (npm 或 pnpm)

Chrome/Chromium 浏览器

安装pnpm :npm install -g pnpm

安装步骤

1.从github上下载最新的chrome扩展

下载地址:https://github.com/hangwin/mcp-chrome/releases


2.全局安装mcp-chrome-bridge

npm


npm install -g mcp-chrome-bridge

pnpm


# 方法1:全局启用脚本(推荐)

pnpm config set enable-pre-post-scripts true

pnpm install -g mcp-chrome-bridge


# 方法2:如果 postinstall 没有运行,手动注册

pnpm install -g mcp-chrome-bridge

mcp-chrome-bridge register

注意:pnpm v7+ 默认禁用 postinstall 脚本以提高安全性。enable-pre-post-scripts 设置控制是否运行 pre/post 安装脚本。如果自动注册失败,请使用上述手动注册命令。


3.加载 Chrome 扩展


打开 Chrome 并访问 chrome://extensions/

启用"开发者模式"

点击"加载已解压的扩展程序",选择 your/dowloaded/extension/folder

点击插件图标打开插件,点击连接即可看到mcp的配置


在支持MCP协议的客户端中使用

使用streamable http的方式连接(👍🏻推荐)

将以下配置添加到客户端的 MCP 配置中以cherryStudio为例:


推荐用streamable http的连接方式


{

  "mcpServers": {

    "chrome-mcp-server": {

      "type": "streamableHttp",

      "url": "http://127.0.0.1:12306/mcp"

    }

  }

}

使用stdio的方式连接(备选)

假设你的客户端仅支持stdio的连接方式,那么请使用下面的方法:


1.先查看你刚刚安装的npm包的安装位置

# npm 查看方式

npm list -g mcp-chrome-bridge

# pnpm 查看方式

pnpm list -g mcp-chrome-bridge

假设上面的命令输出的路径是:/Users/xxx/Library/pnpm/global/5 那么你的最终路径就是:/Users/xxx/Library/pnpm/global/5/node_modules/mcp-chrome-bridge/dist/mcp/mcp-server-stdio.js


2.把下面的配置替换成你刚刚得到的最终路径

{

  "mcpServers": {

    "chrome-mcp-stdio": {

      "command": "npx",

      "args": [

        "node",

        "/Users/xxx/Library/pnpm/global/5/node_modules/mcp-chrome-bridge/dist/mcp/mcp-server-stdio.js"

      ]

    }

  }

}


mcp2mqtt动动嘴就可以控制智能家居

点击进入开源地址

本项目通过将 MCP 协议转换为 MQTT 协议,我们能够利用强大的大型语言模型(LLMs),就能轻松操控您的智能家居、机器人或其他硬件设备。



excel-mcp-server使用 AI 代理创建、读取和修改 Excel 工作簿

点击查看源码

模型上下文协议 (MCP) 服务器,无需安装 Microsoft Excel 即可作 Excel 文件。使用 AI 代理创建、读取和修改 Excel 工作簿。

Excel 作 :创建、读取、更新工作簿和工作表

数据作 :公式、格式、图表、数据透视表和 Excel 表格

数据验证 :内置范围、公式和数据完整性验证

格式: 字体样式、颜色、边框、对齐方式和条件格式

表格作 :使用自定义样式创建和管理 Excel 表格

图表创建 :生成各种图表类型(折线图、条形图、饼图、散点图等)

数据透视表 :创建用于数据分析的动态数据透视表

工作表管理 :轻松复制、重命名、删除工作表

三重传输支持 :stdio、SSE(已弃用)和可流式传输的 HTTP

远程和本地 :既可以在本地工作,也可以作为远程服务工作



Trae+Chrome MCP Server 让AI接管你的浏览器

点击查看原文


一、安装与配置

环境准备

Node.js ≥ 18.19.0 + npm

Chrome/Chromium 浏览器

快速部署和使用

1、安装Chrome扩展插件

        从 GitHub Releases 下载最新扩展包,在 chrome://extensions 中启用开发者模式并加载解压的文件夹。


2、安装桥接工具

npm install -g mcp-chrome-bridge

AI写代码

    若自动注册失败,需手动执行 mcp-chrome-bridge register。


3、打开Chrome浏览器,点击扩展程序进入配置页

1.png


4、点击连接

1.png


5、打开Trae客户端,进入MCP管理页

1.png


6、添加MCP服务

{

  "mcpServers": {

    "chrome-mcp-server": {

      "type": "streamableHttp",

      "url": "http://127.0.0.1:12306/mcp"

    }

  }

}

AI写代码

7、MCP服务需要添加到智能体才可以在Trae内使用,进入智能体管理页面

1.png


8、配置智能体,勾选刚刚配置的MCP服务

1.png


9、在Trae的对话界面选择智能体就可以对话,让浏览器帮你干活啦

1.png


二、应用场景

内容创作

        AI 自动总结网页内容 → 调用 Excalidraw 绘制思维导图。


竞品监控

        定时截图竞品网站首页,对比分析布局更新。


工作流优化

        自动填写表单、批量关闭冗余标签页、整理书签。


开发调试

        网络请求拦截与 API 结构分析。


Windows MCP.Net基于 .NET 的 Windows 桌面自动化 MCP (Model Context Protocol) 服务器

点击进入源码页


一个基于 .NET 的 Windows 桌面自动化 MCP (Model Context Protocol) 服务器,为 AI 助手提供与 Windows 桌面环境交互的能力。

重要提示: 本项目需要 .NET 10 才能运行,请先确保你的本地安装了 .NET 10。如果尚未安装,请访问 .NET 10 下载页面 进行下载和安装。


1. MCP 客户端配置

在您的 MCP 客户端配置中添加以下配置:


使用全局安装的工具(推荐)

{

    "mcpServers": {

     "Windows-MCP.Net": {

      "type": "stdio",

      "command": "dnx",

      "args": ["Windows-MCP.Net@", "--yes"],

      "env": {}

    }

    }

}

使用项目源码直接运行(开发模式)

方式一:工作区配置

在项目根目录创建 .vscode/mcp.json 文件:


{

  "mcpServers": {

    "Windows-MCP.Net-Dev": {

      "type": "stdio",

      "command": "dotnet",

      "args": ["run", "--project", "src/Windows-MCP.Net.csproj"],

      "cwd": "${workspaceFolder}",

      "env": {}

    }

  }

}

方式二:用户配置

通过VS Code命令面板运行 MCP: Open User Configuration,添加:


{

  "mcpServers": {

    "Windows-MCP.Net-Local": {

      "type": "stdio",

      "command": "dotnet",

      "args": ["run", "--project", "src/Windows-MCP.Net.csproj"],

      "env": {}

    }

  }

}

注意: 使用项目源码方式便于开发调试,修改代码后无需重新安装即可生效。VS Code 1.102+ 版本支持MCP服务器的自动发现和管理。


2. 安装和运行

方式一:全局安装(推荐)

dotnet tool install --global WindowsMCP.Net

方式二:从源码运行

# 克隆仓库

git clone https://github.com/AIDotNet/Windows-MCP.Net.git

cd Windows-MCP.Net

# 构建项目

dotnet build

# 运行项目

dotnet run --project src/Windows-MCP.Net.csproj

3. 开始使用

配置完成后,重启您的MCP客户端,即可开始使用Windows桌面自动化功能!



Windows-MCP用于 Windows 计算机的轻量级 MCP 服务器

点击查看源码


"mcpServers": {

    "windows-mcp": {

      "command": "uv",

      "args": [

        "--directory",

        "<path to the windows-mcp directory>",

        "run",

        "main.py"

      ]

    }

  }



详细介绍NPX或UVX配置MCP服务


要使用 npx 或 uvx 配置和运行 MCP(Model Context Protocol)服务,需结合具体的 MCP 服务实现工具(通常是 Node.js 或 Python 生态的包)。以下是详细步骤和说明:

一、背景知识

npx:Node.js 生态的包执行工具,可直接运行 npm 仓库中的包(无需全局安装),适用于 Node.js 编写的 MCP 服务。

uvx:Rust 编写的快速 Python 包执行工具(类似 npx,属于 uv 包管理器的一部分),适用于 Python 编写的 MCP 服务。

MCP 服务:需遵循 MCP 协议(如可流式 HTTP 传输),提供标准化的客户端 - 服务器通信能力(如模型上下文管理、消息流式传输等)。

二、使用 npx 配置 Node.js 版 MCP 服务

若 MCP 服务基于 Node.js 实现(如 @mcp/server 等包),步骤如下:

1. 安装 Node.js 和 npm

确保环境中已安装 Node.js(含 npm),推荐版本 16+。

验证:node -v 和 npm -v 可正常输出版本号。

2. 直接运行 MCP 服务包

使用 npx 临时执行 MCP 服务包,无需提前安装。基本命令格式:


bash

npx [mcp-package-name] [配置参数]

3. 常见配置参数(以假设的 @mcp/server 为例)

MCP 服务通常支持通过命令行参数或环境变量配置核心功能,例如:


bash

# 启动 MCP 服务,指定端口、端点路径和允许的 Origin

npx @mcp/server \

  --port 8080 \                 # 服务端口(默认可能为 65432)

  --endpoint /mcp \             # MCP 端点路径(符合协议要求的单一端点)

  --allowed-origins "http://localhost:3000" \  # 允许的客户端 Origin(防 DNS 重绑定)

  --log-level info \            # 日志级别

  --session-timeout 3600        # 会话超时时间(秒)

4. 环境变量配置

部分参数可通过环境变量设置(优先级可能高于命令行):


bash

# 示例:通过环境变量配置

export MCP_PORT=8080

export MCP_ALLOWED_ORIGINS="http://localhost:3000,https://example.com"

npx @mcp/server  # 自动读取环境变量

5. 验证服务是否启动

服务启动后,可通过 curl 测试端点是否可用:


bash

curl http://localhost:8080/mcp -I  # 检查是否返回 200 或 405(允许 POST/GET)

三、使用 uvx 配置 Python 版 MCP 服务

若 MCP 服务基于 Python 实现(如 mcp-server 等包),步骤如下:

1. 安装 uv 包管理器

uvx 是 uv 的子命令,需先安装 uv:

bash

# 安装 uv(适用于 Linux/macOS,Windows 需调整)

curl -LsSf https://astral.sh/uv/install.sh | sh


验证:uv --version 和 uvx --version 可正常输出。

2. 直接运行 MCP 服务包

使用 uvx 执行 Python 版 MCP 服务,命令格式:


bash

uvx [mcp-package-name] [配置参数]

3. 常见配置参数(以假设的 mcp-server 为例)

Python 版 MCP 服务的配置参数与 Node.js 版类似,例如:


bash

# 启动 MCP 服务,配置核心参数

uvx mcp-server \

  --host 127.0.0.1 \            # 绑定本地地址(符合安全要求,避免暴露公网)

  --port 8080 \

  --endpoint /mcp \

  --cors-allowed-origins "http://localhost:3000" \  # CORS 配置(对应 Origin 验证)

  --enable-sse true             # 启用 SSE 流式传输(符合可流式 HTTP 要求)

4. 配置文件方式

复杂配置可通过 JSON 或 YAML 文件指定,例如:


bash

# 使用配置文件启动

uvx mcp-server --config ./mcp-config.json


mcp-config.json 示例:


json

{

  "port": 8080,

  "endpoint": "/mcp",

  "allowed_origins": ["http://localhost:3000"],

  "session": {

    "timeout": 3600,

    "auto_create": true

  }

}

5. 验证服务

同 Node.js 版,通过 curl 测试:


bash

curl http://localhost:8080/mcp -X POST \

  -H "Content-Type: application/json" \

  -d '{"jsonrpc": "2.0", "method": "initialize", "id": 1}'


四、核心配置项说明(符合 MCP 协议要求)

无论使用 npx 还是 uvx,MCP 服务的配置需满足协议规范:


端点路径:必须提供单一 HTTP 端点(如 /mcp),支持 POST(客户端发消息)和 GET(SSE 流)。

安全验证:

必须验证客户端的 Origin 头(通过 allowed-origins 配置)。

本地运行时绑定 127.0.0.1(而非 0.0.0.0),避免未授权访问。

会话管理:支持通过 Mcp-Session-Id 头管理会话,需配置会话超时时间。

流式传输:启用 SSE(text/event-stream)支持服务器向客户端流式发送消息。

五、总结

npx 适用于 Node.js 生态的 MCP 服务,直接运行 npm 包并通过命令行 / 环境变量配置。

uvx 适用于 Python 生态的 MCP 服务,依赖 uv 工具,支持配置文件和参数化启动。

核心是确保配置符合 MCP 协议(如端点、安全验证、流式传输),具体参数需参考所用 MCP 服务包的文档。


npx 的核心作用是 无需全局安装包,直接执行对应 MCP 工具的可执行文件。

例子 1:通用 TCP 控制型 MCP 服务(基于 mcp-server)

场景

搭建简单的 MCP 服务,实现「客户端 → MCP 服务 → 目标服务器」的 TCP 连接转发,并支持连接数限制、日志输出等基础控制。

完整 npx 命令

npx mcp-server@latest \

  --protocol tcp \

  --listen-port 8080 \

  --listen-host 0.0.0.0 \

  --target-host 192.168.1.100 \

  --target-port 9090 \

  --log-level info \

  --max-connections 50 \

  --timeout 30000


mcp-server@latest 包名 + 版本 如 @1.2.0(固定版) 指定要执行的 MCP 服务包,@latest 表示使用最新版本,生产环境建议固定版本(如 @1.2.0)。


例子 2:本地自定义 MCP 服务(基于项目依赖 mcp-cli)

场景

本地 Node.js 项目中已将 mcp-cli 作为开发依赖(npm install mcp-cli --save-dev),通过 npx 直接执行本地依赖的 MCP 工具,加载自定义配置文件启动服务(适合企业内部定制化 MCP 服务,如设备管理、数据采集)。

完整 npx 命令

bash

npx mcp-cli start \

  --config ./config/mcp-service.yaml \

  --env production \

  --port 8888 \

  --watch \

  --debug \

  --log-rotate 7


mcp-cli start包名 + 命令无默认值mcp-cli 是本地项目依赖的 MCP 管理工具,start 是其内置命令(表示 “启动 MCP 服务”)。



CherryStudio的MCP配置

点击查看官方配置MCP文档

手动配置一个STADIO MCP

一个STATIO MCP配置例子,它的源码地址

1.jpg


它的uvx的json

{

  "mcpServers": {

    "fetch": {

      "command": "uvx",

      "args": ["mcp-server-fetch"]

    }

  }

}


多参数配置,一行一个参数

1.jpg


自动安装MCP

直接给指令:

帮我安装一个 filesystem mcp server



AI辅助生成MCP例子

####【豆包】


你知道AI模型调用MCP吗??

。。。

写一个MCP的完整例子并给出模型调用这个MCP例子

。。。


Python实现
mcp_weather_server.py

import json

import socket

import threading

from typing import Dict, Any


class MCPWeatherServer:

    """MCP天气服务服务器实现"""

    

    def __init__(self, host: str = 'localhost', port: int = 8000):

        self.host = host

        self.port = port

        self.server_socket = None

        self.running = False

        

        # 模拟天气数据库

        self.weather_database = {

            "北京": {"temperature": 25, "condition": "晴朗", "wind": "微风"},

            "上海": {"temperature": 28, "condition": "多云", "wind": "东风3级"},

            "广州": {"temperature": 30, "condition": "小雨", "wind": "南风2级"},

            "深圳": {"temperature": 29, "condition": "阴天", "wind": "东南风1级"},

            "成都": {"temperature": 23, "condition": "晴转多云", "wind": "西北风2级"}

        }

    

    def start(self):

        """启动MCP服务器"""

        self.running = True

        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.server_socket.bind((self.host, self.port))

        self.server_socket.listen(5)

        print(f"MCP天气服务已启动,监听 {self.host}:{self.port}")

        

        try:

            while self.running:

                client_socket, client_address = self.server_socket.accept()

                print(f"接收到来自 {client_address} 的连接")

                client_handler = threading.Thread(

                    target=self.handle_client,

                    args=(client_socket,)

                )

                client_handler.start()

        except KeyboardInterrupt:

            print("服务器正在关闭...")

        finally:

            self.stop()

    

    def stop(self):

        """停止MCP服务器"""

        self.running = False

        if self.server_socket:

            self.server_socket.close()

        print("MCP天气服务已关闭")

    

    def handle_client(self, client_socket: socket.socket):

        """处理客户端请求"""

        try:

            # 接收请求数据

            data = client_socket.recv(1024).decode('utf-8')

            if not data:

                return

                

            print(f"收到请求: {data}")

            

            # 解析MCP请求

            request = json.loads(data)

            

            # 处理请求

            response = self.process_request(request)

            

            # 发送响应

            client_socket.sendall(json.dumps(response).encode('utf-8'))

            print(f"发送响应: {response}")

            

        except Exception as e:

            error_response = {

                "jsonrpc": "2.0",

                "error": {

                    "code": -1,

                    "message": f"处理请求时出错: {str(e)}"

                },

                "id": request.get("id") if 'request' in locals() else None

            }

            client_socket.sendall(json.dumps(error_response).encode('utf-8'))

        finally:

            client_socket.close()

    

    def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:

        """处理MCP请求"""

        # 验证请求格式

        if "jsonrpc" not in request or request["jsonrpc"] != "2.0":

            return {

                "jsonrpc": "2.0",

                "error": {"code": -32600, "message": "无效的请求"},

                "id": request.get("id")

            }

            

        if "method" not in request:

            return {

                "jsonrpc": "2.0",

                "error": {"code": -32601, "message": "方法不存在"},

                "id": request.get("id")

            }

        

        # 处理天气查询方法

        if request["method"] == "get_weather":

            params = request.get("params", {})

            city = params.get("city")

            

            if not city:

                return {

                    "jsonrpc": "2.0",

                    "error": {"code": -32602, "message": "缺少城市参数"},

                    "id": request.get("id")

                }

            

            # 查询天气数据

            weather_data = self.weather_database.get(city)

            if weather_data:

                return {

                    "jsonrpc": "2.0",

                    "result": {

                        "city": city,

                        "weather": weather_data,

                        "source": "MCP Weather Service"

                    },

                    "id": request.get("id")

                }

            else:

                return {

                    "jsonrpc": "2.0",

                    "error": {"code": 404, "message": f"未找到城市 {city} 的天气数据"},

                    "id": request.get("id")

                }

        

        # 处理服务描述方法

        elif request["method"] == "describe":

            return {

                "jsonrpc": "2.0",

                "result": {

                    "name": "WeatherService",

                    "description": "提供城市天气查询服务",

                    "methods": {

                        "get_weather": {

                            "description": "获取指定城市的天气信息",

                            "parameters": {

                                "city": {

                                    "type": "string",

                                    "description": "城市名称"

                                }

                            },

                            "returns": {

                                "type": "object",

                                "description": "包含温度、天气状况和风力的天气信息"

                            }

                        }

                    }

                },

                "id": request.get("id")

            }

        

        # 未知方法

        else:

            return {

                "jsonrpc": "2.0",

                "error": {"code": -32601, "message": f"方法 {request['method']} 不存在"},

                "id": request.get("id")

            }


if __name__ == "__main__":

    # 创建并启动MCP服务器

    server = MCPWeatherServer(host='localhost', port=8000)

    server.start()


mcp_client.py

import json

import socket

from typing import Dict, Any, Optional


class MCPClient:

    """MCP客户端,用于与MCP服务器通信"""

    

    def __init__(self, host: str = 'localhost', port: int = 8000):

        self.host = host

        self.port = port

        self.request_id = 0

    

    def _get_next_id(self) -> int:

        """获取下一个请求ID"""

        self.request_id += 1

        return self.request_id

    

    def send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:

        """发送MCP请求到服务器"""

        # 创建请求

        request = {

            "jsonrpc": "2.0",

            "method": method,

            "id": self._get_next_id()

        }

        

        if params is not None:

            request["params"] = params

        

        # 建立连接并发送请求

        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

            s.connect((self.host, self.port))

            s.sendall(json.dumps(request).encode('utf-8'))

            

            # 接收响应

            response_data = s.recv(1024).decode('utf-8')

            if not response_data:

                return {"error": "未收到响应"}

                

            return json.loads(response_data)

    

    def describe_service(self) -> Dict[str, Any]:

        """获取服务描述信息"""

        return self.send_request("describe")

    

    def get_weather(self, city: str) -> Dict[str, Any]:

        """查询指定城市的天气"""

        return self.send_request("get_weather", {"city": city})


# 模型调用MCP服务的示例

def model_use_mcp_service():

    """模拟AI模型使用MCP服务的过程"""

    print("=== AI模型开始运行 ===")

    

    # 1. 创建MCP客户端

    print("创建MCP客户端,准备连接天气服务...")

    client = MCPClient(host='localhost', port=8000)

    

    # 2. 先查询服务描述,了解可用功能

    print("\n查询MCP服务描述...")

    description = client.describe_service()

    print(f"服务描述: {json.dumps(description, ensure_ascii=False, indent=2)}")

    

    # 3. 根据服务描述,调用天气查询功能

    target_city = "北京"

    print(f"\n调用MCP服务查询 {target_city} 的天气...")

    weather_result = client.get_weather(target_city)

    

    # 4. 处理返回结果

    print(f"天气查询结果: {json.dumps(weather_result, ensure_ascii=False, indent=2)}")

    

    # 5. 根据结果生成回答

    if "result" in weather_result:

        weather_data = weather_result["result"]["weather"]

        response = (f"{target_city}当前的天气状况为:{weather_data['condition']},"

                   f"气温{weather_data['temperature']}°C,{weather_data['wind']}。")

    else:

        response = f"抱歉,无法获取{target_city}的天气信息:{weather_result.get('error', {}).get('message', '未知错误')}"

    

    print(f"\n=== AI模型生成回答 ===")

    print(response)


if __name__ == "__main__":

    # 模拟模型调用MCP服务

    model_use_mcp_service()


C#实现

Models.cs  

using System;

using System.Collections.Generic;

using System.Text.Json.Serialization;


namespace McpCommon

{

    /// <summary>

    /// MCP请求模型

    /// </summary>

    public class McpRequest

    {

        [JsonPropertyName("jsonrpc")]

        public string JsonRpc { get; set; } = "2.0";

        

        [JsonPropertyName("method")]

        public string Method { get; set; }

        

        [JsonPropertyName("params")]

        public Dictionary<string, object> Parameters { get; set; }

        

        [JsonPropertyName("id")]

        public int Id { get; set; }

    }

    

    /// <summary>

    /// MCP响应模型

    /// </summary>

    public class McpResponse

    {

        [JsonPropertyName("jsonrpc")]

        public string JsonRpc { get; set; } = "2.0";

        

        [JsonPropertyName("result")]

        public object Result { get; set; }

        

        [JsonPropertyName("error")]

        public McpError Error { get; set; }

        

        [JsonPropertyName("id")]

        public int Id { get; set; }

    }

    

    /// <summary>

    /// MCP错误模型

    /// </summary>

    public class McpError

    {

        [JsonPropertyName("code")]

        public int Code { get; set; }

        

        [JsonPropertyName("message")]

        public string Message { get; set; }

    }

    

    /// <summary>

    /// 天气数据模型

    /// </summary>

    public class WeatherData

    {

        public string City { get; set; }

        public string Condition { get; set; }

        public int Temperature { get; set; }

        public string Wind { get; set; }

        public DateTime UpdateTime { get; set; }

    }

    

    /// <summary>

    /// 服务描述模型

    /// </summary>

    public class ServiceDescription

    {

        public string Name { get; set; }

        public string Description { get; set; }

        public string Version { get; set; }

        public List<ServiceMethod> Methods { get; set; } = new List<ServiceMethod>();

    }

    

    /// <summary>

    /// 服务方法描述

    /// </summary>

    public class ServiceMethod

    {

        public string Name { get; set; }

        public string Description { get; set; }

        public List<MethodParameter> Parameters { get; set; } = new List<MethodParameter>();

    }

    

    /// <summary>

    /// 方法参数描述

    /// </summary>

    public class MethodParameter

    {

        public string Name { get; set; }

        public string Type { get; set; }

        public string Description { get; set; }

        public bool Required { get; set; }

    }

}

    

MCPServer.cs

using System;

using System.Collections.Generic;

using System.Net;

using System.Net.Sockets;

using System.Text;

using System.Text.Json;

using System.Threading;

using McpCommon;


namespace McpServer

{

    class Program

    {

        private static TcpListener _server;

        private static bool _isRunning;

        private static readonly Dictionary<string, WeatherData> _mockWeatherData = new Dictionary<string, WeatherData>

        {

            { "北京", new WeatherData { City = "北京", Condition = "晴朗", Temperature = 25, Wind = "微风", UpdateTime = DateTime.Now } },

            { "上海", new WeatherData { City = "上海", Condition = "多云", Temperature = 28, Wind = "东风3级", UpdateTime = DateTime.Now } },

            { "广州", new WeatherData { City = "广州", Condition = "小雨", Temperature = 30, Wind = "南风2级", UpdateTime = DateTime.Now } },

            { "深圳", new WeatherData { City = "深圳", Condition = "阴天", Temperature = 29, Wind = "东北风1级", UpdateTime = DateTime.Now } }

        };


        static void Main(string[] args)

        {

            StartServer();

            

            Console.WriteLine("MCP天气服务已启动,按任意键停止...");

            Console.ReadKey();

            

            StopServer();

        }


        private static void StartServer()

        {

            int port = 8000;

            _server = new TcpListener(IPAddress.Any, port);

            _server.Start();

            _isRunning = true;

            

            Console.WriteLine($"MCP服务器已启动,监听端口 {port}...");

            

            // 启动一个线程处理客户端连接

            new Thread(ListenForClients) { IsBackground = true }.Start();

        }


        private static void StopServer()

        {

            _isRunning = false;

            _server.Stop();

            Console.WriteLine("MCP服务器已停止");

        }


        private static void ListenForClients()

        {

            while (_isRunning)

            {

                try

                {

                    // 等待客户端连接

                    TcpClient client = _server.AcceptTcpClient();

                    Console.WriteLine("新客户端连接");

                    

                    // 为每个客户端创建一个线程处理

                    new Thread(HandleClient) { IsBackground = true }.Start(client);

                }

                catch (SocketException ex)

                {

                    if (_isRunning)

                    {

                        Console.WriteLine($"Socket错误: {ex.Message}");

                    }

                }

                catch (Exception ex)

                {

                    Console.WriteLine($"错误: {ex.Message}");

                }

            }

        }


        private static void HandleClient(object obj)

        {

            using (TcpClient client = (TcpClient)obj)

            using (NetworkStream stream = client.GetStream())

            {

                byte[] buffer = new byte[1024];

                int bytesRead;

                

                try

                {

                    // 读取客户端发送的数据

                    while ((bytesRead = stream.Read(buffer, 0, buffer.Length)) > 0)

                    {

                        string requestString = Encoding.UTF8.GetString(buffer, 0, bytesRead);

                        Console.WriteLine($"收到请求: {requestString}");

                        

                        // 解析请求

                        McpRequest request = JsonSerializer.Deserialize<McpRequest>(requestString);

                        McpResponse response = ProcessRequest(request);

                        

                        // 发送响应

                        string responseString = JsonSerializer.Serialize(response);

                        byte[] responseBytes = Encoding.UTF8.GetBytes(responseString);

                        stream.Write(responseBytes, 0, responseBytes.Length);

                        

                        Console.WriteLine($"发送响应: {responseString}");

                    }

                }

                catch (Exception ex)

                {

                    Console.WriteLine($"处理客户端时出错: {ex.Message}");

                }

                

                Console.WriteLine("客户端连接已关闭");

            }

        }


        private static McpResponse ProcessRequest(McpRequest request)

        {

            var response = new McpResponse { Id = request.Id };

            

            try

            {

                switch (request.Method)

                {

                    case "describe":

                        response.Result = GetServiceDescription();

                        break;

                        

                    case "get_weather":

                        if (request.Parameters != null && request.Parameters.ContainsKey("city"))

                        {

                            string city = request.Parameters["city"].ToString();

                            response.Result = new { weather = GetWeatherData(city) };

                        }

                        else

                        {

                            response.Error = new McpError { Code = -1, Message = "缺少参数: city" };

                        }

                        break;

                        

                    default:

                        response.Error = new McpError { Code = -2, Message = $"未知方法: {request.Method}" };

                        break;

                }

            }

            catch (Exception ex)

            {

                response.Error = new McpError { Code = -999, Message = ex.Message };

            }

            

            return response;

        }


        private static ServiceDescription GetServiceDescription()

        {

            return new ServiceDescription

            {

                Name = "WeatherService",

                Description = "提供城市天气查询服务",

                Version = "1.0.0",

                Methods = new List<ServiceMethod>

                {

                    new ServiceMethod

                    {

                        Name = "describe",

                        Description = "获取服务描述信息",

                        Parameters = new List<MethodParameter>()

                    },

                    new ServiceMethod

                    {

                        Name = "get_weather",

                        Description = "查询指定城市的天气信息",

                        Parameters = new List<MethodParameter>

                        {

                            new MethodParameter

                            {

                                Name = "city",

                                Type = "string",

                                Description = "城市名称",

                                Required = true

                            }

                        }

                    }

                }

            };

        }


        private static WeatherData GetWeatherData(string city)

        {

            if (_mockWeatherData.TryGetValue(city, out var weatherData))

            {

                // 更新时间为当前时间

                weatherData.UpdateTime = DateTime.Now;

                return weatherData;

            }

            

            // 如果没有该城市的数据,返回一个默认值

            return new WeatherData

            {

                City = city,

                Condition = "未知",

                Temperature = 0,

                Wind = "未知",

                UpdateTime = DateTime.Now

            };

        }

    }

}

    

MCPClient.cs

using System;

using System.Collections.Generic;

using System.Net.Sockets;

using System.Text;

using System.Text.Json;

using McpCommon;


namespace McpClient

{

    class McpClient

    {

        private readonly string _host;

        private readonly int _port;

        private int _requestId;


        public McpClient(string host = "localhost", int port = 8000)

        {

            _host = host;

            _port = port;

            _requestId = 0;

        }


        private int GetNextRequestId()

        {

            return ++_requestId;

        }


        public McpResponse SendRequest(string method, Dictionary<string, object> parameters = null)

        {

            var request = new McpRequest

            {

                Method = method,

                Parameters = parameters,

                Id = GetNextRequestId()

            };


            try

            {

                using (var client = new TcpClient(_host, _port))

                using (var stream = client.GetStream())

                {

                    // 发送请求

                    string requestString = JsonSerializer.Serialize(request);

                    byte[] requestBytes = Encoding.UTF8.GetBytes(requestString);

                    stream.Write(requestBytes, 0, requestBytes.Length);

                    

                    // 接收响应

                    byte[] buffer = new byte[1024];

                    int bytesRead = stream.Read(buffer, 0, buffer.Length);

                    string responseString = Encoding.UTF8.GetString(buffer, 0, bytesRead);

                    

                    return JsonSerializer.Deserialize<McpResponse>(responseString);

                }

            }

            catch (Exception ex)

            {

                return new McpResponse

                {

                    Id = request.Id,

                    Error = new McpError { Code = -999, Message = $"通信错误: {ex.Message}" }

                };

            }

        }


        public ServiceDescription DescribeService()

        {

            var response = SendRequest("describe");

            if (response.Error != null)

            {

                throw new Exception($"获取服务描述失败: {response.Error.Message}");

            }

            

            return JsonSerializer.Deserialize<ServiceDescription>(JsonSerializer.Serialize(response.Result));

        }


        public WeatherData GetWeather(string city)

        {

            var parameters = new Dictionary<string, object>

            {

                { "city", city }

            };

            

            var response = SendRequest("get_weather", parameters);

            if (response.Error != null)

            {

                throw new Exception($"获取天气失败: {response.Error.Message}");

            }

            

            // 解析嵌套的结果

            var resultDict = JsonSerializer.Deserialize<Dictionary<string, object>>(JsonSerializer.Serialize(response.Result));

            return JsonSerializer.Deserialize<WeatherData>(JsonSerializer.Serialize(resultDict["weather"]));

        }

    }


    class Program

    {

        static void Main(string[] args)

        {

            try

            {

                ModelUseMcpService();

            }

            catch (Exception ex)

            {

                Console.WriteLine($"发生错误: {ex.Message}");

            }

            

            Console.WriteLine("按任意键退出...");

            Console.ReadKey();

        }


        static void ModelUseMcpService()

        {

            Console.WriteLine("=== AI模型开始运行 ===");

            

            // 1. 创建MCP客户端

            Console.WriteLine("创建MCP客户端,准备连接天气服务...");

            var client = new McpClient("localhost", 8000);

            

            // 2. 查询服务描述,了解可用功能

            Console.WriteLine("\n查询MCP服务描述...");

            var description = client.DescribeService();

            Console.WriteLine("服务描述:");

            Console.WriteLine($"名称: {description.Name} v{description.Version}");

            Console.WriteLine($"描述: {description.Description}");

            Console.WriteLine("可用方法:");

            foreach (var method in description.Methods)

            {

                Console.WriteLine($"- {method.Name}: {method.Description}");

                foreach (var param in method.Parameters)

                {

                    Console.WriteLine($"  * {param.Name} ({param.Type}): {param.Description} {(param.Required ? "(必填)" : "")}");

                }

            }

            

            // 3. 调用天气查询功能

            string targetCity = "北京";

            Console.WriteLine($"\n调用MCP服务查询 {targetCity} 的天气...");

            var weatherData = client.GetWeather(targetCity);

            

            // 4. 处理返回结果

            Console.WriteLine("天气查询结果:");

            Console.WriteLine($"城市: {weatherData.City}");

            Console.WriteLine($"天气状况: {weatherData.Condition}");

            Console.WriteLine($"气温: {weatherData.Temperature}°C");

            Console.WriteLine($"风向风力: {weatherData.Wind}");

            Console.WriteLine($"更新时间: {weatherData.UpdateTime:yyyy-MM-dd HH:mm:ss}");

            

            // 5. 根据结果生成回答

            string response = $"{targetCity}当前的天气状况为:{weatherData.Condition},气温{weatherData.Temperature}°C,{weatherData.Wind}。";

            

            Console.WriteLine("\n=== AI模型生成回答 ===");

            Console.WriteLine(response);

        }

    }

}

    


Python编写MCP一般使用FastMCP,支持发布为studio sse streamable_http

FastMCP介绍

FastMCP官方文档

该excel-mcp-server例子的源码地址:https://github.com/haris-musa/excel-mcp-server

源码:

excel-mcp-server-main.zip


import typer

from .server import run_sse, run_stdio, run_streamable_http

app = typer.Typer(help="Excel MCP Server")


@app.command()

def sse():

    """Start Excel MCP Server in SSE mode"""

    try:

        run_sse()

    except KeyboardInterrupt:

        print("\nShutting down server...")

    except Exception as e:

        print(f"\nError: {e}")

        import traceback

        traceback.print_exc()

    finally:

        print("Service stopped.")


if __name__ == "__main__":

    app() 


这段 Python 代码是一个使用 Typer 库创建的命令行应用程序,用于启动 Excel MCP Server 的 SSE 模式服务。下面是详细讲解:

导入依赖

import typer

from .server import run_sse, run_stdio, run_streamable_http


导入了typer库,这是一个用于构建命令行界面的工具

从当前包的server模块导入了三个函数,分别对应不同的服务运行模式

创建 Typer 应用实例

app = typer.Typer(help="Excel MCP Server")


创建了一个 Typer 应用实例,help参数设置了应用的描述信息 "Excel MCP Server"

这个app对象将用于定义各种命令

定义 SSE 模式命令

@app.command()

def sse():

    """Start Excel MCP Server in SSE mode"""

    # 函数实现...



使用@app.command()装饰器定义了一个名为sse的命令

函数文档字符串描述了该命令的功能:"以 SSE 模式启动 Excel MCP 服务器"

SSE 指的是 Server-Sent Events,是一种服务器向客户端推送实时数据的技术

命令实现逻辑

try:

    run_sse()  # 尝试运行SSE模式服务

except KeyboardInterrupt:

    print("\nShutting down server...")  # 处理用户中断(如Ctrl+C)

except Exception as e:

    print(f"\nError: {e}")  # 捕获并显示其他错误

    import traceback

    traceback.print_exc()  # 打印详细的错误堆栈信息

finally:

    print("Service stopped.")  # 无论是否发生异常,最终都显示服务已停止



MCP实现部分

mcp = FastMCP(

    "excel-mcp",

    host=os.environ.get("FASTMCP_HOST", "0.0.0.0"),

    port=int(os.environ.get("FASTMCP_PORT", "8017")),

    instructions="Excel MCP Server for manipulating Excel files"

)

@mcp.tool()

def format_range(

    filepath: str,

    sheet_name: str,

    start_cell: str,

    end_cell: Optional[str] = None,

    bold: bool = False,

    italic: bool = False,

    underline: bool = False,

    font_size: Optional[int] = None,

    font_color: Optional[str] = None,

    bg_color: Optional[str] = None,

    border_style: Optional[str] = None,

    border_color: Optional[str] = None,

    number_format: Optional[str] = None,

    alignment: Optional[str] = None,

    wrap_text: bool = False,

    merge_cells: bool = False,

    protection: Optional[Dict[str, Any]] = None,

    conditional_format: Optional[Dict[str, Any]] = None

) -> str:

    """Apply formatting to a range of cells."""

    try:

        full_path = get_excel_path(filepath)

        from excel_mcp.formatting import format_range as format_range_func

        

        # Convert None values to appropriate defaults for the underlying function

        format_range_func(

            filepath=full_path,

            sheet_name=sheet_name,

            start_cell=start_cell,

            end_cell=end_cell,  # This can be None

            bold=bold,

            italic=italic,

            underline=underline,

            font_size=font_size,  # This can be None

            font_color=font_color,  # This can be None

            bg_color=bg_color,  # This can be None

            border_style=border_style,  # This can be None

            border_color=border_color,  # This can be None

            number_format=number_format,  # This can be None

            alignment=alignment,  # This can be None

            wrap_text=wrap_text,

            merge_cells=merge_cells,

            protection=protection,  # This can be None

            conditional_format=conditional_format  # This can be None

        )

        return "Range formatted successfully"

    except (ValidationError, FormattingError) as e:

        return f"Error: {str(e)}"

    except Exception as e:

        logger.error(f"Error formatting range: {e}")

        raise


1. 装饰器与函数定义

python

运行

@mcp.tool()

def format_range(...):

    """Apply formatting to a range of cells."""

    # 函数实现

@mcp.tool():这是一个自定义装饰器,推测其作用是将该函数注册为 MCP(可能是某种管理控制协议)工具集中的一个可用工具

函数名format_range清晰表明其功能:对单元格范围应用格式化

文档字符串说明了函数的核心功能:"对单元格范围应用格式化"

2. 参数解析

函数定义了多个参数,涵盖了 Excel 单元格格式化的各种常见需求:

基础定位参数(必填):

filepath: str:Excel 文件路径

sheet_name: str:工作表名称

start_cell: str:起始单元格(如 "A1")

范围扩展参数(可选):

end_cell: Optional[str] = None:结束单元格,None 表示仅格式化单个单元格

字体格式参数:

bold: bool = False:是否加粗

italic: bool = False:是否斜体

underline: bool = False:是否下划线

font_size: Optional[int] = None:字体大小

font_color: Optional[str] = None:字体颜色

单元格样式参数:

bg_color: Optional[str] = None:背景颜色

border_style: Optional[str] = None:边框样式

border_color: Optional[str] = None:边框颜色

number_format: Optional[str] = None:数字格式(如日期、货币等)

alignment: Optional[str] = None:对齐方式

wrap_text: bool = False:是否自动换行

merge_cells: bool = False:是否合并单元格

高级功能参数:

protection: Optional[Dict[str, Any]] = None:单元格保护设置

conditional_format: Optional[Dict[str, Any]] = None:条件格式设置

所有可选参数都使用了Optional类型注解并设置了默认值None,表示这些参数是可选的。

3. 函数逻辑解析

python

运行

try:

    # 获取完整的Excel文件路径

    full_path = get_excel_path(filepath)

    # 导入实际执行格式化的函数

    from excel_mcp.formatting import format_range as format_range_func

    

    # 调用实际的格式化函数,传递所有参数

    format_range_func(...)

    return "Range formatted successfully"

except (ValidationError, FormattingError) as e:

    return f"Error: {str(e)}"

except Exception as e:

    logger.error(f"Error formatting range: {e}")

    raise

函数执行流程:

首先通过get_excel_path(filepath)获取 Excel 文件的完整路径

导入实际执行格式化操作的核心函数format_range_func

调用核心函数并传递所有参数

成功时返回 "Range formatted successfully"

4. 异常处理机制

捕获ValidationError和FormattingError(特定领域异常):

这两种异常可能是格式化过程中常见的验证错误和格式错误

捕获后返回友好的错误信息

捕获通用Exception:

记录错误日志(logger.error)

重新抛出异常(raise),让上层调用者处理未预料到的错误

5. 设计特点

职责分离:该函数主要作为接口层,实际格式化逻辑在format_range_func中实现

参数透明传递:将接收的所有参数原封不动地传递给核心函数

清晰的错误处理:区分已知异常和未知异常,分别处理

灵活性:通过大量可选参数支持各种格式化需求,用户可根据需要指定

6. 使用场景推测

这个函数很可能用于 Excel 文件的自动化处理工具中,允许用户通过代码或命令行:

批量设置单元格格式

标准化 Excel 文件的样式

实现复杂的条件格式设置

集成到工作流中自动格式化报表

通过这种设计,用户可以灵活地指定需要的格式化选项,而不必关心底层 Excel 文件操作的细节。



Chrome MCP官方

{

"mcpServers":{

"chrome-devtools":{

"command":"npx",

"args":{

"chrome-devtools-mcp@latest"

}


}

}

}

源码地址: https://github.com/ChromeDevTools/chrome-devtools-mcp

AI编程学习:Chrome DevTools MCP 到底有多强?

解决登录问题(必须搞!)

因为chrome devtools 每次启动都要重新登录 。 再次感谢群里@二米 大佬的方法,不然每次登录得烦死!


我们可以执行下面的提示词,Agent 就会自动安装程序


# Chrome MCP 简单配置


## 🎯 目标


让Chrome MCP保持登录状态,不用每次重新登录。


## 🔧 三步搞定


### 1. 启动专用Chrome


```bash

# macOS

/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --user-data-dir="$HOME/.chrome-mcp" --remote-debugging-port=9222


# Windows

"C:\Program Files\Google\Chrome\Application\chrome.exe" --user-data-dir="%USERPROFILE%\.chrome-mcp" --remote-debugging-port=9222


```


### 2. 手动登录网站

在这个Chrome里登录小红书、微信等,设置会自动保存。


### 3. 正常使用MCP

在Claude Code里照常使用Chrome MCP功能,登录状态会保持。


## ✅ 完成

就这么简单!以后每次启动这个Chrome,登录状态都在。


**核心原理**:`--user-data-dir` 让Chrome把数据保存在专门目录,不会丢失。



大模型实战 | Dify+Chrome MCP打造网页自动化AI助手

点击查看原文

必备软件安装

1. 安装Node.js

# Windows(使用Chocolatey)

choco install nodejs


# macOS(使用Homebrew)

brew install node


# Ubuntu/Debian

curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -

sudo apt-get install -y nodejs


# CentOS/RHEL

curl -fsSL https://rpm.nodesource.com/setup_18.x | sudo bash -

sudo yum install -y nodejs


2. 验证安装

node --version  # 应显示v18.0.0或更高版本 npm --version   # 应显示npm版本信息


本地部署Chrome MCP

1. 创建项目目录


# 创建项目目录

mkdir chrome-mcp-local

cd chrome-mcp-local


# 初始化npm项目

npm init -y


2. 下载Chrome扩展程序


首先下载Chrome MCP扩展程序:


# 创建扩展程序目录

mkdir chrome-extension

cd chrome-extension


# 下载最新版本的Chrome扩展程序

# 访问 https://github.com/hangwin/mcp-chrome/releases

# 下载最新的扩展程序压缩包并解压到此目录


3. 安装Chrome MCP桥接工具


需全局安装桥接包,推荐使用npm或pnpm:


npm install -g mcp-chrome-bridge

# 或

pnpm install -g mcp-chrome-bridge

安装完成后注册Native Messaging host:


# 注册Native Messaging host

mcp-chrome-bridge register


4. 安装和配置Chrome浏览器


验证Chrome浏览器安装


which google-chrome || which chromium-browser

如果上述命令返回空或提示"not found",说明Chrome浏览器未安装,请按以下方式安装:


macOS系统:


# 使用Homebrew安装

brew install --cask google-chrome


Ubuntu/Debian系统:


# 下载并安装Chrome

wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | sudo apt-key add -

echo"deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" | sudotee /etc/apt/sources.list.d/google-chrome.list

sudo apt update

sudo apt install google-chrome-stable


CentOS/RHEL系统:


# 添加Chrome仓库并安装

sudo yum install -y wget

wget https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm

sudo yum localinstall google-chrome-stable_current_x86_64.rpm

加载Chrome扩展程序


打开Chrome浏览器,访问 chrome://extensions/

启用"开发者模式"

点击"加载已解压的扩展程序"

选择之前下载并解压的扩展程序文件夹

点击扩展程序图标,然后点击"连接"按钮启动MCP服务


验证Chrome MCP服务

1. 检查服务是否启动


当Chrome扩展程序连接成功后,MCP服务会在本地启动HTTP服务器(默认端口12306):


curl http://127.0.0.1:12306/mcp

# 如果服务正常,应该返回MCP协议相关信息

# 如果显示"Could not connect to server",说明Chrome扩展程序未连接

2. 通过API测试MCP工具列表


curl -X POST http://127.0.0.1:12306/mcp \

  -H "Content-Type: application/json" \

  -d '{"method": "tools/list"}'


常见问题排除

如果curl命令返回"Could not connect to server"错误:


1.检查Chrome扩展程序状态:

打开Chrome浏览器,访问 chrome://extensions/

确认Chrome MCP扩展程序已启用

点击扩展程序图标,查看连接状态


2. 重新连接扩展程序:

点击扩展程序图标

如果显示"未连接",点击"连接"按钮

等待几秒钟,直到显示"已连接"状态


3. 检查Native Messaging注册:

# 重新注册Native Messaging host

mcp-chrome-bridge register

# 注册后需要重启Chrome浏览器


4. 验证端口状态:

# 检查端口12306是否被占用

lsof -i :12306

# 如果没有输出,说明端口未被使用,需要启动扩展程序


Dify集成Chrome MCP

注意:如果你的Dify是通过Docker部署的,MCP地址不能填写127.0.0.1或localhost。应填写宿主机的实际IP地址,或使用Docker网络中的可访问地址(如host.docker.internal),否则Dify容器无法访问本地MCP服务。

1. 登录Dify控制台

访问你的Dify实例(本地)

进入「设置」→「工具」


2. 添加Chrome MCP工具

点击「添加工具」

选择「MCP工具」类型

填写配置信息:

{

"name":"Chrome MCP",

"description":"Chrome浏览器自动化工具,支持网页截图、表单填写、数据抓取等功能",

"connection":{

"type":"http",

"url":"http://host.docker.internal:12306/mcp"

},

"timeout":30000

}

1.jpg

3. 测试连接

点击「测试连接」按钮

确认显示"连接成功"状态

保存配置并启用工具

1.jpg

实战案例:自动化网页操作

案例1:网页截图助手

创建应用:

新建「Agent」应用

系统提示词:

你是一个网页截图专家,能够帮用户截取任何网页的截图。 当用户提供网址时,你需要: 1. 访问指定网页 2. 等待页面完全加载 3. 截取全屏截图 4. 返回截图结果

测试对话

1.jpg


案例2:表单自动填写

应用场景:自动填写注册表单

系统提示词:

你是一个网页截图专家,能够帮用户截取任何网页的截图。

当用户提供网址时,你需要:

1. 访问指定网页

2. 等待页面完全加载

3. 截取全屏截图

4. 返回截图结果

实际操作

1.jpg


案例3:数据抓取助手

功能描述:抓取电商网站商品信息

系统提示词:

你是数据抓取专家,能够从网页中提取结构化数据。

当用户需要抓取数据时:

1. 访问目标网页

2. 定位数据元素

3. 提取文本内容

4. 整理成结构化格式返回

抓取示例:

用户:帮我抓取京东某商品的价格和评价信息

AI:正在为您抓取商品信息...

商品名称:iPhone 15 Pro Max

当前价格:¥9999

原价:¥10999

折扣:9折

评价数量:2.5万+

好评率:98%


数据抓取完成!


高级配置与优化

Chrome扩展程序设置

1. 扩展程序选项页面配置

右键点击Chrome MCP扩展程序图标

选择"选项"或"扩展程序选项"

可配置以下设置:

HTTP服务器端口(默认12306)

请求超时时间

调试模式开关

允许的域名列表


2. Chrome浏览器优化

# 启动Chrome时的推荐参数

open -a "Google Chrome" --args \

  --remote-debugging-port=9222 \

  --disable-web-security \

  --user-data-dir=/tmp/chrome-debug


性能优化建议

1. 浏览器资源管理

定期清理浏览器缓存和Cookie

关闭不必要的标签页

禁用不需要的Chrome扩展程序


2. 系统资源监控

# 监控Chrome进程资源使用

top -p $(pgrep -f "Google Chrome")


# 查看端口占用情况

lsof -i :12306




























Top