基本能力
产品定位
Apache Beam MCP Server 是一个用于管理 Apache Beam 数据管道的标准化服务,支持多种运行器,并提供 AI 集成能力。
核心功能
- 多运行器支持:支持 Flink、Spark、Dataflow 和 Direct 运行器。
- MCP 兼容:遵循 Model Context Protocol,支持 AI 集成。
- 管道管理:提供创建、监控和控制数据管道的功能。
- 易于扩展:支持添加新的运行器或自定义功能。
- 生产就绪:包括 Docker/Kubernetes 部署、监控和扩展功能。
适用场景
- 数据工程师:通过一致的 API 管理不同运行器的管道。
- AI/LLM 开发者:通过 MCP 标准实现 AI 控制的数据管道。
- DevOps 团队:简化管道操作和监控。
工具列表
- /tools 端点:管理 AI 代理和模型,用于管道处理。
- /resources 端点:管理数据集和其他管道资源。
- /contexts 端点:定义管道的执行环境。
常见问题解答
- 详细问题解答请参考 Troubleshooting Guide。
使用教程
使用依赖
```bash
Clone the repository
git clone https://github.com/yourusername/beam-mcp-server.git
cd beam-mcp-server
Create a virtual environment
python -m venv beam-mcp-venv
source beam-mcp-venv/bin/activate # On Windows: beam-mcp-venv\Scripts\activate
Install dependencies
pip install -r requirements.txt
```
安装教程
```bash
With the Direct runner (no external dependencies)
python main.py --debug --port 8888
With Flink runner (if you have Flink installed)
CONFIG_PATH=config/flink_config.yaml python main.py --debug --port 8888
```
调试方式
```bash
Create test input
echo "This is a test file for Apache Beam WordCount example" > /tmp/input.txt
Submit a job using curl
curl -X POST http://localhost:8888/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{
"job_name": "test-wordcount",
"runner_type": "direct",
"job_type": "BATCH",
"code_path": "examples/pipelines/wordcount.py",
"pipeline_options": {
"input_file": "/tmp/input.txt",
"output_path": "/tmp/output"
}
}'
```