默认需要阿里云百炼的api,当然你也可以换成其他的。
import base64
import re
import time
import shlex
import ast
from openai import OpenAI
import pyautogui
from PIL import Image
import io
import json
import subprocess
import platform
import os
from typing import Dict, Any, Tuple, Optional, List, Callable
class VLMAgent:
"""
视觉-语言-动作 代理(精度强化版)
- 对外接口/工具名保持不变;内部做了更严格的参数校验与解析,减少“理解偏差”导致的误操作。
- 新增:请求重试、坐标与工具参数的硬校验、工具调用解析更稳健、完成态判断。
"""
# 固定滚动步数(可按需覆盖)
_SCROLL_CLICKS = 1400
# 鼠标/键盘操作默认停顿(pyautogui 自带的 PAUSE 也会生效)
_FOCUS_DELAY = 0.5
# 模型调用最大重试次数
_MAX_RETRIES = 3
# 每步最大 tokens
_MAX_TOKENS = 1024
def __init__(self, api_key: Optional[str] = None, model_name: str = "qwen3-vl-plus"):
# 优先从环境变量获取,避免硬编码
api_key = api_key or os.getenv("QWEN_API_KEY") or os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("未提供 API Key。请通过参数或环境变量 QWEN_API_KEY/OPENAI_API_KEY 提供。")
self.client = OpenAI(
api_key=api_key,
base_url=os.getenv("QWEN_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
)
self.model_name = model_name
self.messages: List[Dict[str, Any]] = []
self._step_counter = 0
# 优先使用 pyautogui.size(),失败再回退到 tkinter
self.screen_width, self.screen_height = self.get_screen_resolution()
print(f"屏幕分辨率: {self.screen_width} x {self.screen_height}")
# 安全&节流
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.2
# 工具注册表(对外签名保持一致)
self.tools: Dict[str, Callable[..., str]] = {
"mouse_click": self.mouse_click,
"type_text": self.type_text,
"scroll_window": self.scroll_window,
"close_window": self.close_window,
"press_windows_key": self.press_windows_key,
"press_enter": self.press_enter,
"delete_text": self.delete_text,
"mouse_drag": self.mouse_drag,
"wait": self.wait,
"open_terminal": self.open_terminal,
"press_hotkey": self.press_hotkey,
}
# 截图缩放记录(避免重复计算)
self._last_scaled: Optional[Tuple[int, int]] = None
self._last_original: Optional[Tuple[int, int]] = None
# ---------- 基础工具与公用方法 ----------
@staticmethod
def _clamp_ratio(v: float) -> float:
"""将比例值裁剪到 [0, 1] 区间,避免越界导致的点击失焦。"""
try:
f = float(v)
except Exception:
raise ValueError(f"坐标必须是数值比例(0~1),收到: {v!r}")
if f != f: # NaN
raise ValueError("坐标值是 NaN")
return 0.0 if f < 0.0 else 1.0 if f > 1.0 else f
def _ratio_to_abs(self, x: float, y: float) -> Tuple[int, int]:
"""比例坐标(0~1) => 绝对像素坐标(含裁剪)"""
rx = self._clamp_ratio(x)
ry = self._clamp_ratio(y)
ax = int(round(rx * (self.screen_width - 1)))
ay = int(round(ry * (self.screen_height - 1)))
return ax, ay
def _validate_abs(self, ax: int, ay: int) -> Optional[str]:
"""校验像素坐标在屏幕范围内,返回错误文本或 None"""
if not (0 <= ax < self.screen_width and 0 <= ay < self.screen_height):
return f"坐标 ({ax}, {ay}) 超出屏幕范围 (0-{self.screen_width-1}, 0-{self.screen_height-1})"
return None
def _click_focus(self, ax: int, ay: int) -> Optional[str]:
"""点击坐标以获取焦点,并做短暂等待"""
err = self._validate_abs(ax, ay)
if err:
return err
pyautogui.click(ax, ay)
time.sleep(self._FOCUS_DELAY)
return None
# ---------- 具体工具实现(签名保持不变) ----------
def mouse_drag(self, start_x, start_y, end_x, end_y, duration=0.5):
try:
sx, sy = self._ratio_to_abs(start_x, start_y)
ex, ey = self._ratio_to_abs(end_x, end_y)
if (err := self._validate_abs(sx, sy)) or (err := self._validate_abs(ex, ey)):
return err
print(f"拖拽: ({start_x:.3f},{start_y:.3f})->({end_x:.3f},{end_y:.3f}) => ({sx},{sy})->({ex},{ey})")
pyautogui.moveTo(sx, sy)
pyautogui.dragTo(ex, ey, duration=float(duration))
return f"成功从 ({sx}, {sy}) 拖拽到 ({ex}, {ey})"
except Exception as e:
return f"拖拽操作失败: {e}"
def wait(self, seconds):
try:
wt = float(seconds)
if wt <= 0:
return "等待时间必须是正数"
print(f"等待 {wt} 秒…")
time.sleep(wt)
return f"成功等待 {wt} 秒"
except Exception as e:
return f"等待操作失败: {e}"
def open_terminal(self, command=""):
try:
system = platform.system()
if system == "Windows":
# 使用 start 不阻塞;/k 保持窗口打开
cmd = f'start cmd {" /k "+command if command else ""}'
subprocess.run(cmd, shell=True)
elif system == "Darwin":
if command:
subprocess.run(['osascript', '-e', f'tell app "Terminal" to do script "{command}"'])
subprocess.run(['osascript', '-e', 'tell app "Terminal" to activate'])
else:
subprocess.run(['open', '-a', 'Terminal'])
else:
# Linux: 依次尝试常见终端
for term in ('gnome-terminal', 'konsole', 'xterm'):
if subprocess.run(['which', term], capture_output=True).returncode == 0:
if command:
if term == 'gnome-terminal':
subprocess.run([term, '--', 'bash', '-c', f'{command}; exec bash'])
else:
subprocess.run([term, '-e', 'bash', '-c', f'{command}; exec bash'])
else:
subprocess.run([term])
break
else:
return "未找到支持的终端程序(gnome-terminal/konsole/xterm)"
return f"成功在新终端中执行命令: {command}" if command else "成功打开新终端窗口"
except Exception as e:
return f"打开终端失败: {e}"
def press_hotkey(self, x, y, hotkey):
try:
ax, ay = self._ratio_to_abs(x, y)
if (err := self._click_focus(ax, ay)):
return err
# 解析组合键:支持 "+", "-", " ", 并过滤非法键
raw = str(hotkey).lower()
tokens = [k for k in re.split(r"[+\-\s]+", raw) if k]
valid = {"ctrl","shift","alt","win","command","cmd","enter","tab","esc","escape",
"up","down","left","right","home","end","pageup","pagedown","delete",
"backspace","space","f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12",
"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"}
keys = [k for k in tokens if k in valid]
if not keys:
return f"未解析到有效的快捷键: {hotkey!r}"
if len(keys) == 1:
pyautogui.press(keys[0])
else:
pyautogui.hotkey(*keys)
return f"成功在 ({ax}, {ay}) 执行快捷键: {'+'.join(keys)}"
except Exception as e:
return f"执行快捷键失败: {e}"
def close_window(self, x, y):
try:
ax, ay = self._ratio_to_abs(x, y)
if (err := self._click_focus(ax, ay)):
return err
# 平台区分
if platform.system() == "Darwin":
pyautogui.hotkey('command', 'w')
else:
pyautogui.hotkey('alt', 'f4')
return f"成功点击窗口 ({ax}, {ay}) 并发送关闭指令"
except Exception as e:
return f"关闭窗口失败: {e}"
def press_windows_key(self):
try:
if platform.system() == "Darwin":
return "当前为 macOS,无 Windows 键"
pyautogui.press('win')
return "成功按下Windows键"
except Exception as e:
return f"按下Windows键失败: {e}"
def press_enter(self):
try:
pyautogui.press('enter')
return "成功按下回车键"
except Exception as e:
return f"按下回车键失败: {e}"
def delete_text(self, x, y, count=1):
try:
ax, ay = self._ratio_to_abs(x, y)
if (err := self._click_focus(ax, ay)):
return err
n = int(count)
if n <= 0:
return "删除数量必须是正整数"
for _ in range(n):
pyautogui.press('backspace')
time.sleep(0.01)
return f"成功在 ({ax}, {ay}) 删除 {n} 个字符"
except Exception as e:
return f"删除文本失败: {e}"
def get_screen_resolution(self) -> Tuple[int, int]:
"""
获取屏幕分辨率:优先 pyautogui;失败再 tkinter(避免无头环境抛窗)
"""
try:
size = pyautogui.size()
if size and size.width and size.height:
return size.width, size.height
except Exception:
pass
try:
import tkinter as tk
root = tk.Tk()
width = root.winfo_screenwidth()
height = root.winfo_screenheight()
root.destroy()
return width, height
except Exception:
# 最后保底,给出一个常见分辨率,避免中断
return 2560, 1600
def capture_screenshot(self, max_size: int = 1024) -> io.BytesIO:
"""
截屏并按最长边等比缩放到 <= max_size,返回 PNG buffer
同时记录原始/缩放尺寸,便于后续坐标换算(若使用到)
"""
screenshot = pyautogui.screenshot()
self._last_original = screenshot.size # (w, h)
ow, oh = self._last_original
if ow > oh:
nw = min(max_size, ow)
nh = int(oh * nw / ow)
else:
nh = min(max_size, oh)
nw = int(ow * nh / oh)
self._last_scaled = (nw, nh)
print(f"原始截图: {ow}x{oh} -> 缩放后: {nw}x{nh}")
if (nw, nh) != (ow, oh):
screenshot = screenshot.resize((nw, nh))
buf = io.BytesIO()
screenshot.save(buf, format='PNG')
buf.seek(0)
return buf
def encode_image_to_base64(self, image_buffer: io.BytesIO) -> str:
# 确保读指针在开头
try:
image_buffer.seek(0)
except Exception:
pass
return base64.b64encode(image_buffer.read()).decode('utf-8')
def mouse_click(self, x, y, button="left", clicks=1):
try:
ax, ay = self._ratio_to_abs(x, y)
if (err := self._validate_abs(ax, ay)):
return err
btn = str(button).lower()
if btn not in {"left","right","middle"}:
return f"非法的按钮参数: {button!r},仅支持 left/right/middle"
n = int(clicks)
if n <= 0:
return "clicks 必须是正整数"
pyautogui.click(ax, ay, button=btn, clicks=n)
return f"成功在 ({ax}, {ay}) {('左键' if btn=='left' else '右键' if btn=='right' else '中键')}{'单击' if n==1 else f'{n}击'}"
except Exception as e:
return f"点击失败: {e}"
def scroll_window(self, x, y, direction="up"):
try:
ax, ay = self._ratio_to_abs(x, y)
if (err := self._validate_abs(ax, ay)):
return err
dir_norm = str(direction).strip().lower()
if dir_norm not in {"up","down"}:
return f"非法的滚动方向: {direction!r},仅支持 'up'/'down'"
clicks = self._SCROLL_CLICKS if dir_norm == "up" else -self._SCROLL_CLICKS
pyautogui.scroll(clicks, x=ax, y=ay)
return f"成功在 ({ax}, {ay}) {'向上' if clicks>0 else '向下'}滚动 {abs(self._SCROLL_CLICKS)} 步"
except Exception as e:
return f"滚动窗口失败: {e}"
def type_text(self, x, y, text):
"""
优先剪贴板粘贴;若 pyperclip 不可用,回退逐字输入。
"""
try:
import pyperclip
ax, ay = self._ratio_to_abs(x, y)
if (err := self._click_focus(ax, ay)):
return err
pyperclip.copy(str(text))
time.sleep(0.1)
pyautogui.hotkey('ctrl', 'v')
return f"成功在 ({ax}, {ay}) 输入文本"
except Exception:
# 回退:逐字输入
try:
ax, ay = self._ratio_to_abs(x, y)
if (err := self._click_focus(ax, ay)):
return err
pyautogui.write(str(text), interval=0.04)
return f"成功在 ({ax}, {ay}) 输入文本"
except Exception as e:
return f"输入文本失败: {e}"
# ---------- 解析/执行 模型工具指令 ----------
# 工具调用块
_TOOL_BLOCK = re.compile(
r"<\|tool_call\|>\s*([A-Za-z_]\w*)\s*\((.*?)\)\s*<\|tool_call\|>",
re.DOTALL
)
def _coerce_value(self, raw: str) -> Any:
"""
将参数值字符串转成合适类型:
- 优先尝试 ast.literal_eval(能安全解析数字/字符串/布尔/None/列表/字典等)
- 失败再做手工处理(true/false/数字/去引号)
"""
s = raw.strip()
# 优先 literal_eval(允许 '...', "...", 数值, True/False/None, 列表/字典)
try:
return ast.literal_eval(s)
except Exception:
pass
# 手动兜底
ls = s.lower()
if ls in ("true", "false"):
return ls == "true"
try:
if "." in s:
return float(s)
return int(s)
except Exception:
pass
if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")):
return s[1:-1]
return s
def _split_args(self, arg_str: str) -> List[str]:
"""
以逗号分割参数,忽略括号与引号内的逗号。
使用简单状态机来避免误切分。
"""
parts = []
buf = []
depth = 0
in_str: Optional[str] = None
escape = False
for ch in arg_str:
if in_str:
buf.append(ch)
if escape:
escape = False
elif ch == "\\":
escape = True
elif ch == in_str:
in_str = None
continue
if ch in ("'", '"'):
in_str = ch
buf.append(ch)
continue
if ch in "([{":
depth += 1
buf.append(ch)
continue
if ch in ")]}":
depth = max(0, depth - 1)
buf.append(ch)
continue
if ch == "," and depth == 0:
part = "".join(buf).strip()
if part:
parts.append(part)
buf = []
else:
buf.append(ch)
tail = "".join(buf).strip()
if tail:
parts.append(tail)
return parts
def parse_tool_calls(self, response_text: str) -> List[Dict[str, Any]]:
calls = []
for fname, arg_str in self._TOOL_BLOCK.findall(response_text or ""):
args: Dict[str, Any] = {}
arg_str = arg_str.strip()
if arg_str:
for p in self._split_args(arg_str):
if "=" not in p:
# 允许位置参数(极少用到):自动映射为 x,y 或通用 argN
key = None
if "x" not in args:
key = "x"
elif "y" not in args:
key = "y"
else:
key = f"arg{len(args)+1}"
args[key] = self._coerce_value(p)
continue
k, v = p.split("=", 1)
k = k.strip()
v = v.strip()
args[k] = self._coerce_value(v)
calls.append({"name": fname, "arguments": args})
return calls
def execute_tool_calls(self, tool_calls: List[Dict[str, Any]]) -> str:
results = []
for call in tool_calls:
name = call.get("name")
args = call.get("arguments", {}) or {}
func = self.tools.get(name)
if not func:
results.append(f"未知工具: {name}")
continue
# 关键参数硬校验
for k in ("x","y","start_x","start_y","end_x","end_y"):
if k in args:
try:
_ = self._clamp_ratio(args[k])
except Exception as e:
results.append(f"工具 {name} 参数错误: {k} -> {e}")
break
else:
try:
results.append(f"工具 {name} 执行结果: {func(**args)}")
except TypeError as te:
results.append(f"执行工具 {name} 时参数不匹配: {te}")
except Exception as e:
results.append(f"执行工具 {name} 时出错: {e}")
return "\n".join(results)
# ---------- 主任务循环 ----------
def _build_system_prompt(self) -> str:
return f"""
你是一个用户助理,同时拥有操控电脑的能力,你现在面对看到的图像是电脑的用户界面,请分析屏幕内容(屏幕大小是{self.screen_width}*{self.screen_height}),如果需要操作电脑,请按以下格式调用工具:
<|tool_call|>函数名(参数1=值1, 参数2=值2)<|tool_call|>
可用的工具包括:
1. mouse_click(x=比例x, y=比例y, button="left", clicks=1) …
2. type_text(x=比例x, y=比例y, text="…")
3. scroll_window(x=比例x, y=比例y, direction="up"|"down")
4. close_window(x=比例x, y=比例y)
5. press_windows_key()
6. press_enter()
7. delete_text(x=比例x, y=比例y, count=1)
8. mouse_drag(start_x=…, start_y=…, end_x=…, end_y=…, duration=0.5)
9. wait(seconds=…)
10. open_terminal(command="")
11. press_hotkey(x=比例x, y=比例y, hotkey="ctrl+c")
请在每一步操作后给出简要说明,然后使用工具调用格式指定下一步操作。
坐标系统使用比例值(0.5~1)。参数类型必须正确(尤其 clicks 为整数),坐标参数向左上方偏移1至2个像素点。
不要关闭你所在的终端。
可以优先使用快捷键,终端可完成的操作优先终端。
""".strip()
def _build_user_image_content(self, text: str, base64_image: str):
return [
{"type": "text", "text": text},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
]
def _request_with_retries(self, *, messages: List[Dict[str, Any]]):
last_err = None
for attempt in range(1, self._MAX_RETRIES + 1):
try:
return self.client.chat.completions.create(
model=self.model_name,
messages=messages,
temperature=0.0, # 精确度优先
max_tokens=self._MAX_TOKENS,
)
except Exception as e:
last_err = e
sleep_s = 0.8 * attempt
print(f"[重试 {attempt}/{self._MAX_RETRIES}] 模型请求失败: {e}; {sleep_s:.1f}s 后重试…")
time.sleep(sleep_s)
raise RuntimeError(f"模型请求失败(已重试 {self._MAX_RETRIES} 次): {last_err}")
def _is_done(self, text: str) -> bool:
"""简单完成态检测:无工具调用且包含明显完成词。"""
if not text:
return False
if self._TOOL_BLOCK.search(text):
return False
return any(kw in text for kw in ("任务已完成","完成","已完成","done","finished"))
def run_task(self, task_description: str, max_steps: int = 50):
print(f"开始执行任务: {task_description}")
print(f"屏幕分辨率: {self.screen_width} x {self.screen_height}")
self._step_counter = 0
self.messages = [{"role": "system", "content": self._build_system_prompt()}]
for step in range(1, max_steps + 1):
self._step_counter = step
print(f"\n--- 步骤 {step} ---")
# 截屏 + 编码
buf = self.capture_screenshot()
b64 = self.encode_image_to_base64(buf)
user_text = f"请完成以下任务: {task_description}" if step == 1 else "这是当前屏幕状态,请继续完成任务"
self.messages.append({"role": "user", "content": self._build_user_image_content(user_text, b64)})
# 调用模型(带重试)
try:
resp = self._request_with_retries(messages=self.messages)
except Exception as e:
print(f"调用模型时发生错误: {e}")
break
resp_text = (resp.choices[0].message.content or "").strip()
self.messages.append({"role": "assistant", "content": resp_text})
print("模型响应:\n" + resp_text)
# 优先完成态检测
if self._is_done(resp_text):
print("检测到完成态,无需继续。")
break
# 解析并执行工具
calls = self.parse_tool_calls(resp_text)
if not calls:
print("未检测到工具调用,可能已完成。")
break
print("\n检测到工具调用:")
for c in calls:
args_desc = ", ".join(f"{k}={v!r}" for k, v in c["arguments"].items())
print(f"- {c['name']}({args_desc})")
results = self.execute_tool_calls(calls)
print("\n工具执行结果:\n" + results)
# 反馈执行结果给模型
self.messages.append({"role": "user", "content": f"工具执行结果:\n{results}"})
# 给界面一点反应时间
time.sleep(0.6)
print("\n任务执行结束。")
def main():
print("=== 电脑操作工具 ===")
# API Key 填写↓
agent = VLMAgent("sk-×××")
print("\n系统已就绪,您可以输入各种任务请求")
print("输入'退出'、'exit'或'quit'结束程序")
print("-" * 50)
while True:
try:
task = input("\n请输入任务: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n收到退出信号。程序结束,再见!")
break
if task.lower() in {"退出", "exit", "quit", "q"}:
print("程序结束,再见!")
break
if not task:
print("请输入有效的任务")
continue
print(f"\n开始执行任务: {task}")
agent.run_task(task)
print(f"\n任务 '{task}' 执行完成")
if __name__ == "__main__":
# 依赖检查
try:
import pyautogui # noqa: F401
import PIL # noqa: F401
except ImportError as e:
print(f"缺少必要的依赖包: {e}")
print("请安装依赖: pip install pyautogui pillow openai")
raise SystemExit(1)
main()