forked from CJackHwang/AIstudioProxyAPI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
2147 lines (1940 loc) · 121 KB
/
Copy pathserver.py
File metadata and controls
2147 lines (1940 loc) · 121 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# server.py
import asyncio
import random
import time
import json
from typing import List, Optional, Dict, Any, Union, AsyncGenerator, Tuple, Callable # Add Tuple, Callable
import os
import traceback
from contextlib import asynccontextmanager
import sys
import platform
# --- 新增: 日志相关导入 ---
import logging
import logging.handlers
# -----------------------
from asyncio import Queue, Lock, Future, Task, Event # Add Queue, Lock, Future, Task, Event
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
# --- 新增: WebSocket 导入 ---
from fastapi import WebSocket, WebSocketDisconnect
# -------------------------
from pydantic import BaseModel, Field
from playwright.async_api import Page as AsyncPage, Browser as AsyncBrowser, Playwright as AsyncPlaywright, Error as PlaywrightAsyncError, expect as expect_async, BrowserContext as AsyncBrowserContext, Locator
from playwright.async_api import async_playwright
from urllib.parse import urljoin, urlparse # << Add urlparse
import uuid
import datetime
# --- 全局日志控制配置 ---
DEBUG_LOGS_ENABLED = os.environ.get('DEBUG_LOGS_ENABLED', 'false').lower() in ('true', '1', 'yes')
TRACE_LOGS_ENABLED = os.environ.get('TRACE_LOGS_ENABLED', 'false').lower() in ('true', '1', 'yes')
LOG_INTERVAL = int(os.environ.get('LOG_INTERVAL', '20'))
LOG_TIME_INTERVAL = float(os.environ.get('LOG_TIME_INTERVAL', '3.0'))
# --- Configuration ---
AI_STUDIO_URL_PATTERN = 'aistudio.google.com/'
RESPONSE_COMPLETION_TIMEOUT = 300000 # 5 minutes total timeout (in ms)
POLLING_INTERVAL = 300 # ms
POLLING_INTERVAL_STREAM = 180 # ms
SILENCE_TIMEOUT_MS = 10000 # ms
POST_SPINNER_CHECK_DELAY_MS = 500
FINAL_STATE_CHECK_TIMEOUT_MS = 1500
SPINNER_CHECK_TIMEOUT_MS = 1000
POST_COMPLETION_BUFFER = 700
CLEAR_CHAT_VERIFY_TIMEOUT_MS = 5000
CLEAR_CHAT_VERIFY_INTERVAL_MS = 400
CLICK_TIMEOUT_MS = 5000
CLIPBOARD_READ_TIMEOUT_MS = 5000
PSEUDO_STREAM_DELAY = 0.001
EDIT_MESSAGE_BUTTON_SELECTOR = 'ms-chat-turn:last-child .actions-container button.toggle-edit-button'
MESSAGE_TEXTAREA_SELECTOR = 'ms-chat-turn:last-child ms-text-chunk ms-autosize-textarea'
FINISH_EDIT_BUTTON_SELECTOR = 'ms-chat-turn:last-child .actions-container button.toggle-edit-button[aria-label="Stop editing"]'
# --- Configuration ---
AUTH_PROFILES_DIR = os.path.join(os.path.dirname(__file__), 'auth_profiles')
ACTIVE_AUTH_DIR = os.path.join(AUTH_PROFILES_DIR, 'active')
SAVED_AUTH_DIR = os.path.join(AUTH_PROFILES_DIR, 'saved')
# --- 新增: 日志文件路径 ---
LOG_DIR = os.path.join(os.path.dirname(__file__), 'logs')
LOG_FILE_PATH = os.path.join(LOG_DIR, 'app.log')
# -----------------------
# --- Constants ---
MODEL_NAME = 'AI-Studio_Camoufox-Proxy'
CHAT_COMPLETION_ID_PREFIX = 'chatcmpl-'
# --- Selectors ---
INPUT_SELECTOR = 'ms-prompt-input-wrapper textarea'
SUBMIT_BUTTON_SELECTOR = 'button[aria-label="Run"]'
RESPONSE_CONTAINER_SELECTOR = 'ms-chat-turn .chat-turn-container.model'
RESPONSE_TEXT_SELECTOR = 'ms-cmark-node.cmark-node'
LOADING_SPINNER_SELECTOR = 'button[aria-label="Run"] svg .stoppable-spinner'
ERROR_TOAST_SELECTOR = 'div.toast.warning, div.toast.error'
CLEAR_CHAT_BUTTON_SELECTOR = 'button[aria-label="Clear chat"][data-test-clear="outside"]:has(span.material-symbols-outlined:has-text("refresh"))'
CLEAR_CHAT_CONFIRM_BUTTON_SELECTOR = 'button.mdc-button:has-text("Continue")'
MORE_OPTIONS_BUTTON_SELECTOR = 'div.actions-container div ms-chat-turn-options div > button'
COPY_MARKDOWN_BUTTON_SELECTOR = 'div[class*="mat-menu"] div > button:nth-child(4)'
COPY_MARKDOWN_BUTTON_SELECTOR_ALT = 'div[role="menu"] button:has-text("Copy Markdown")'
# --- Global State ---
playwright_manager: Optional[AsyncPlaywright] = None
browser_instance: Optional[AsyncBrowser] = None
page_instance: Optional[AsyncPage] = None
is_playwright_ready = False
is_browser_connected = False
is_page_ready = False
is_initializing = False
request_queue: Queue = Queue()
processing_lock: Lock = Lock()
worker_task: Optional[Task] = None
# --- 新增: WebSocket 连接管理器 ---
class WebSocketConnectionManager:
def __init__(self):
self.active_connections = {} # 使用字典,client_id 作为键,WebSocket 作为值
async def connect(self, client_id, websocket):
self.active_connections[client_id] = websocket
logger.info(f"WebSocket 客户端已连接: {client_id}")
def disconnect(self, client_id):
if client_id in self.active_connections:
del self.active_connections[client_id]
logger.info(f"WebSocket 客户端已断开: {client_id}")
async def broadcast(self, message):
# 使用字典的 items() 创建副本进行迭代,防止在迭代过程中修改字典
disconnected_clients = []
active_conns_copy = list(self.active_connections.items())
# logger.debug(f"[WS Broadcast] Preparing to broadcast to {len(active_conns_copy)} client(s). Message starts with: {message[:80]}...") # Debug log (Removed)
for client_id, connection in active_conns_copy:
# logger.debug(f"[WS Broadcast] Attempting to send to client {client_id}...") # Debug log (Removed)
try:
await connection.send_text(message)
# logger.debug(f"[WS Broadcast] Sent successfully to client {client_id}.") # Debug log (Removed)
except WebSocketDisconnect:
logger.info(f"[WS Broadcast] Client {client_id} disconnected during broadcast.") # Info log
disconnected_clients.append(client_id)
except RuntimeError as e: # 处理连接已关闭的错误
if "Connection is closed" in str(e):
logger.info(f"[WS Broadcast] Client {client_id} connection already closed.") # Info log
disconnected_clients.append(client_id)
else:
logger.error(f"广播到 WebSocket {client_id} 时出错 (RuntimeError): {e}")
disconnected_clients.append(client_id) # Also disconnect on other RuntimeErrors
except Exception as e:
logger.error(f"广播到 WebSocket {client_id} 时出错 (Exception): {e}")
disconnected_clients.append(client_id)
# 清理已断开的连接
if disconnected_clients:
logger.info(f"[WS Broadcast] Cleaning up disconnected clients: {disconnected_clients}") # Info log
for client_id in disconnected_clients:
self.disconnect(client_id)
log_ws_manager = WebSocketConnectionManager()
# ------------------------------------
# --- 新增: StreamToLogger 类,用于重定向 print ---
class StreamToLogger:
"""
伪文件流对象,将写入重定向到日志实例。
"""
def __init__(self, logger_instance, log_level=logging.INFO):
self.logger = logger_instance
self.log_level = log_level
self.linebuf = ''
def write(self, buf):
try:
temp_linebuf = self.linebuf + buf
self.linebuf = ''
for line in temp_linebuf.splitlines(True):
if line.endswith(('\\n', '\\r')):
self.logger.log(self.log_level, line.rstrip())
else:
self.linebuf += line # 保留不完整行
except Exception as e:
# 如果日志失败,回退到原始 stderr
print(f"StreamToLogger 错误: {e}", file=sys.__stderr__)
def flush(self):
try:
if self.linebuf != '':
self.logger.log(self.log_level, self.linebuf.rstrip())
self.linebuf = ''
except Exception as e:
print(f"StreamToLogger Flush 错误: {e}", file=sys.__stderr__)
def isatty(self):
# 一些库检查这个,返回 False 避免问题
return False
# --- 新增: WebSocketLogHandler 类 ---
class WebSocketLogHandler(logging.Handler):
"""
将日志记录广播到 WebSocket 客户端的处理程序。
"""
def __init__(self, manager: WebSocketConnectionManager):
super().__init__()
self.manager = manager
self.formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # WebSocket 使用简单格式
def emit(self, record: logging.LogRecord):
# 仅当有连接时才尝试广播
if self.manager.active_connections:
try:
log_entry = self.format(record)
# 使用 asyncio.create_task 在事件循环中异步发送
try:
loop = asyncio.get_running_loop()
loop.create_task(self.manager.broadcast(log_entry))
except RuntimeError:
# 如果没有运行的事件循环(例如在关闭期间),则忽略
pass
except Exception as e:
# 这里打印错误到原始 stderr,以防日志系统本身出问题
print(f"WebSocketLogHandler 错误: 广播日志失败 - {e}", file=sys.__stderr__)
# --- 新增: 日志设置函数 ---
def setup_logging(log_level=logging.INFO, redirect_print=False): # <-- 默认改为 False
"""配置全局日志记录"""
# ... (目录创建不变) ...
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(ACTIVE_AUTH_DIR, exist_ok=True)
os.makedirs(SAVED_AUTH_DIR, exist_ok=True)
# --- 文件日志格式 (详细) ---
file_log_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - [%(name)s:%(funcName)s:%(lineno)d] - %(message)s'
)
# --- 控制台日志格式 (简洁) ---
console_log_formatter = logging.Formatter('%(message)s')
root_logger = logging.getLogger()
if root_logger.hasHandlers():
root_logger.handlers.clear()
root_logger.setLevel(log_level) # <-- Revert back to INFO (or original log_level)
# 1. Rotating File Handler (使用详细格式)
file_handler = logging.handlers.RotatingFileHandler(
LOG_FILE_PATH, maxBytes=5*1024*1024, backupCount=5, encoding='utf-8'
)
file_handler.setFormatter(file_log_formatter)
root_logger.addHandler(file_handler)
# 2. Stream Handler (to stderr, 使用简洁格式)
stream_handler = logging.StreamHandler(sys.__stderr__) # 直接输出到原始 stderr
stream_handler.setFormatter(console_log_formatter) # <-- 使用简洁格式
root_logger.addHandler(stream_handler)
# 3. WebSocket Handler (格式保持不变或根据需要调整)
ws_handler = WebSocketLogHandler(log_ws_manager)
ws_handler.setLevel(logging.INFO) # 可以为 WS Handler 设置不同的级别
root_logger.addHandler(ws_handler)
# --- 按需重定向 print ---
if redirect_print:
print("--- 注意:正在重定向 print 输出到日志系统 ---", file=sys.__stderr__) # 加个提示
# 标准输出重定向 (可选,如果希望 print 也进日志文件)
stdout_logger = logging.getLogger('stdout')
stdout_logger.propagate = False # 通常不希望 print 的内容重复出现在根 logger 的控制台输出
stdout_logger.addHandler(file_handler) # print 内容进文件
# 如果需要 print 也进 WS,取消下一行注释
# stdout_logger.addHandler(ws_handler)
stdout_logger.setLevel(logging.INFO)
sys.stdout = StreamToLogger(stdout_logger, logging.INFO)
# 标准错误重定向 (同上)
stderr_logger = logging.getLogger('stderr')
stderr_logger.propagate = False
stderr_logger.addHandler(file_handler) # stderr 内容进文件
# 如果需要 stderr 也进 WS,取消下一行注释
# stderr_logger.addHandler(ws_handler)
stderr_logger.setLevel(logging.ERROR)
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
# else: 不重定向,print 直接输出到终端
# --- 设置库日志级别 (保持不变) ---
# ... (设置 uvicorn, websockets, playwright 日志级别) ...
logging.getLogger("uvicorn").setLevel(logging.INFO)
logging.getLogger("uvicorn.error").setLevel(logging.INFO) # uvicorn 错误仍然显示
logging.getLogger("uvicorn.access").setLevel(logging.INFO) # <-- 修改回 INFO
logging.getLogger("websockets").setLevel(logging.INFO)
logging.getLogger("playwright").setLevel(logging.INFO) # playwright 日志也减少一些
root_logger.info("=" * 30 + " 日志系统已初始化 " + "=" * 30)
root_logger.info(f"日志级别: {logging.getLevelName(log_level)}")
root_logger.info(f"日志文件: {LOG_FILE_PATH}")
root_logger.info(f"重定向 print: {'启用' if redirect_print else '禁用'}")
# --- 新增: 日志实例 ---
logger = logging.getLogger("AIStudioProxyServer") # 获取指定名称的 logger
# ----------------------
# --- Pydantic Models ---
class MessageContentItem(BaseModel):
type: str
text: Optional[str] = None
class Message(BaseModel):
role: str
content: Union[str, List[MessageContentItem]]
class ChatCompletionRequest(BaseModel):
messages: List[Message]
model: Optional[str] = MODEL_NAME
stream: Optional[bool] = False
# --- Custom Exception ---
class ClientDisconnectedError(Exception):
pass
# --- Helper Functions ---
# V4: Combined prompt preparation logic - REPLACED with logic from server未重构.py to include history
def prepare_combined_prompt(messages: List[Message], req_id: str) -> str:
"""
Takes the complete message list and formats it into a single string
suitable for pasting into AI Studio, including history.
Handles the first system message separately and formats user/assistant turns.
(Logic adapted from server未重构.py)
"""
print(f"[{req_id}] (Prepare Prompt) Preparing combined prompt from {len(messages)} messages (including history).") # Log updated
combined_parts = []
system_prompt_content = None
processed_indices = set() # Keep track of processed messages
# 1. Extract the first system message if it exists
first_system_msg_index = -1
for i, msg in enumerate(messages):
if msg.role == 'system':
if isinstance(msg.content, str) and msg.content.strip():
system_prompt_content = msg.content.strip()
processed_indices.add(i)
first_system_msg_index = i
# print(f"[{req_id}] (Prepare Prompt) Found system prompt at index {i}: '{system_prompt_content[:80]}...'")
logger.info(f"[{req_id}] (Prepare Prompt) Found system prompt at index {i}: '{system_prompt_content[:80]}...'") # logger
else:
# print(f"[{req_id}] (Prepare Prompt) Ignoring non-string or empty system message at index {i}.")
logger.warning(f"[{req_id}] (Prepare Prompt) Ignoring non-string or empty system message at index {i}.") # logger warning
processed_indices.add(i) # Mark as processed even if ignored
break # Only process the first system message found
# 2. Add system prompt preamble if found
if system_prompt_content:
# Add a separator only if there will be other messages following
separator = "\\n\\n" if any(idx not in processed_indices for idx in range(len(messages))) else ""
# 预构建带换行符的字符串,避免在f-string中使用反斜杠
system_instr_prefix = "System Instructions:\\n"
combined_parts.append(f"{system_instr_prefix}{system_prompt_content}{separator}")
else:
# print(f"[{req_id}] (Prepare Prompt) 未找到有效的系统提示,继续处理其他消息。")
logger.info(f"[{req_id}] (Prepare Prompt) 未找到有效的系统提示,继续处理其他消息。") # logger
# 3. Iterate through remaining messages (user and assistant roles primarily)
turn_separator = "\\n---\\n" # Separator between turns
is_first_turn_after_system = True # Track if it's the first message after potential system prompt
for i, msg in enumerate(messages):
if i in processed_indices:
continue # Skip already processed (e.g., the system prompt)
role = msg.role.capitalize()
# Skip 'System' role here as we handled the first one already
if role == 'System':
# print(f"[{req_id}] (Prepare Prompt) Skipping subsequent system message at index {i}.")
logger.info(f"[{req_id}] (Prepare Prompt) Skipping subsequent system message at index {i}.") # logger
continue
content = ""
# Extract content, handling string or list[dict] format
if isinstance(msg.content, str):
content = msg.content
elif isinstance(msg.content, list):
text_parts = []
# Convert MessageContentItem models to text
for item_model in msg.content:
# Ensure item_model is the Pydantic model, not already a dict
if isinstance(item_model, MessageContentItem):
if item_model.type == 'text' and isinstance(item_model.text, str):
text_parts.append(item_model.text)
else:
# Handle non-text parts if necessary, e.g., log a warning
# print(f"[{req_id}] (Prepare Prompt) Warning: Ignoring non-text part in message at index {i}: type={item_model.type}")
logger.warning(f"[{req_id}] (Prepare Prompt) Ignoring non-text part in message at index {i}: type={item_model.type}") # logger
else:
# If it's somehow already a dict (less likely with Pydantic)
item_dict = dict(item_model) # Try converting
if item_dict.get('type') == 'text' and isinstance(item_dict.get('text'), str):
text_parts.append(item_dict['text'])
else:
# print(f"[{req_id}] (Prepare Prompt) Warning: Unexpected item format in message list at index {i}. Item: {item_model}")
logger.warning(f"[{req_id}] (Prepare Prompt) Unexpected item format in message list at index {i}. Item: {item_model}") # logger
content = "\\n".join(text_parts)
else:
# print(f"[{req_id}] (Prepare Prompt) Warning: Unexpected content type ({type(msg.content)}) for role {role} at index {i}. Converting to string.")
logger.warning(f"[{req_id}] (Prepare Prompt) Unexpected content type ({type(msg.content)}) for role {role} at index {i}. Converting to string.") # logger
content = str(msg.content)
content = content.strip() # Trim whitespace
if content: # Only add non-empty messages
# Add separator *before* the next role, unless it's the very first turn being added
if not is_first_turn_after_system:
combined_parts.append(turn_separator)
# 预构建带换行符的字符串,避免在f-string中使用反斜杠
role_prefix = f"{role}:\\n"
combined_parts.append(f"{role_prefix}{content}")
is_first_turn_after_system = False # No longer the first turn
else:
# print(f"[{req_id}] (Prepare Prompt) Skipping empty message for role {role} at index {i}.")
logger.info(f"[{req_id}] (Prepare Prompt) Skipping empty message for role {role} at index {i}.") # logger
final_prompt = "".join(combined_parts)
# Pre-calculate the preview string with escaped newlines
preview_text = final_prompt[:200].replace('\\n', '\\\\n')
# print(f"[{req_id}] (Prepare Prompt) Combined prompt length: {len(final_prompt)}. Preview: '{preview_text}...'") # Log preview with escaped newlines
logger.info(f"[{req_id}] (Prepare Prompt) Combined prompt length: {len(final_prompt)}. Preview: '{preview_text}...'") # logger
# Add a final newline if not empty, helps UI sometimes
final_newline = "\\n"
return final_prompt + final_newline if final_prompt else ""
# --- END V4 Combined Prompt Logic ---
def validate_chat_request(messages: List[Message], req_id: str) -> Dict[str, Optional[str]]:
# This function now ONLY validates, prompt prep is done by prepare_combined_prompt
if not messages:
raise ValueError(f"[{req_id}] Invalid request: 'messages' array is missing or empty.")
# Check if there's at least one non-system message
if not any(msg.role != 'system' for msg in messages):
raise ValueError(f"[{req_id}] Invalid request: No user or assistant messages found.")
# Optional: Check for alternating user/assistant roles if needed for AI Studio
# ... (validation logic can be added here if necessary) ...
logger.info(f"[{req_id}] (Validation) Basic validation passed for {len(messages)} messages.")
return {} # Return empty dict as it no longer extracts prompts
async def get_raw_text_content(response_element: Locator, previous_text: str, req_id: str) -> str:
# ... (Existing implementation - may become less critical) ...
raw_text = previous_text
try:
await response_element.wait_for(state='attached', timeout=1000)
pre_element = response_element.locator('pre').last
pre_found_and_visible = False
try:
await pre_element.wait_for(state='visible', timeout=250)
pre_found_and_visible = True
except PlaywrightAsyncError: pass
if pre_found_and_visible:
try:
raw_text = await pre_element.inner_text(timeout=500)
except PlaywrightAsyncError as pre_err:
if DEBUG_LOGS_ENABLED:
error_message_first_line = pre_err.message.split('\n')[0]
# print(f"[{req_id}] (Warn) Failed to get innerText from visible <pre>: {error_message_first_line}", flush=True)
logger.warning(f"[{req_id}] Failed to get innerText from visible <pre>: {error_message_first_line}") # logger
try:
raw_text = await response_element.inner_text(timeout=1000)
except PlaywrightAsyncError as e_parent:
if DEBUG_LOGS_ENABLED:
# print(f"[{req_id}] (Warn) getRawTextContent (inner_text) failed on parent after <pre> fail: {e_parent}. Returning previous.", flush=True)
logger.warning(f"[{req_id}] getRawTextContent (inner_text) failed on parent after <pre> fail: {e_parent}. Returning previous.") # logger
raw_text = previous_text
else:
try:
raw_text = await response_element.inner_text(timeout=1500)
except PlaywrightAsyncError as e_parent:
if DEBUG_LOGS_ENABLED:
# print(f"[{req_id}] (Warn) getRawTextContent (inner_text) failed on parent (no pre): {e_parent}. Returning previous.", flush=True)
logger.warning(f"[{req_id}] getRawTextContent (inner_text) failed on parent (no pre): {e_parent}. Returning previous.") # logger
raw_text = previous_text
if raw_text and isinstance(raw_text, str):
replacements = {
"IGNORE_WHEN_COPYING_START": "", "content_copy": "", "download": "",
"Use code with caution.": "", "IGNORE_WHEN_COPYING_END": ""
}
cleaned_text = raw_text
found_junk = False
for junk, replacement in replacements.items():
if junk in cleaned_text:
cleaned_text = cleaned_text.replace(junk, replacement)
found_junk = True
if found_junk:
cleaned_text = "\n".join([line.strip() for line in cleaned_text.splitlines() if line.strip()])
if DEBUG_LOGS_ENABLED:
# print(f"[{req_id}] (清理) 已移除响应文本中的已知UI元素。", flush=True)
logger.debug(f"[{req_id}] (清理) 已移除响应文本中的已知UI元素。") # logger debug
raw_text = cleaned_text
return raw_text
except PlaywrightAsyncError: return previous_text
except Exception as e_general:
# print(f"[{req_id}] (Warn) getRawTextContent unexpected error: {e_general}. Returning previous.", flush=True)
logger.warning(f"[{req_id}] getRawTextContent unexpected error: {e_general}. Returning previous.") # logger
return previous_text
def generate_sse_chunk(delta: str, req_id: str, model: str) -> str:
chunk = {
"id": f"{CHAT_COMPLETION_ID_PREFIX}{req_id}-{int(time.time())}-{random.randint(100, 999)}",
"object": "chat.completion.chunk", "created": int(time.time()), "model": model,
"choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}]
}
return f"data: {json.dumps(chunk)}\n\n"
def generate_sse_stop_chunk(req_id: str, model: str, reason: str = "stop") -> str:
chunk = {
"id": f"{CHAT_COMPLETION_ID_PREFIX}{req_id}-{int(time.time())}-{random.randint(100, 999)}",
"object": "chat.completion.chunk", "created": int(time.time()), "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": reason}]
}
return f"data: {json.dumps(chunk)}\n\n"
def generate_sse_error_chunk(message: str, req_id: str, error_type: str = "server_error") -> str:
error_payload = {"error": {"message": f"[{req_id}] {message}", "type": error_type}}
return f"data: {json.dumps(error_payload)}\n\n"
# --- Dependency Check ---
def check_dependencies():
# ... (Existing implementation) ...
print("--- 步骤 1: 检查服务器依赖项 ---")
required = {"fastapi": "fastapi", "uvicorn": "uvicorn[standard]", "playwright": "playwright"}
missing = []
modules_ok = True
for mod_name, install_name in required.items():
print(f" - 检查 {mod_name}... ", end="")
try: __import__(mod_name); print("✓ 已找到")
except ImportError: print("❌ 未找到"); missing.append(install_name); modules_ok = False
if not modules_ok:
print("\n❌ 错误: 缺少必要的 Python 库!")
print(f" 请运行以下命令安装:\n pip install {' '.join(missing)}")
sys.exit(1)
else: print("✅ 服务器依赖检查通过.")
print("---\n")
# --- Page Initialization --- (Simplified)
async def _initialize_page_logic(browser: AsyncBrowser):
"""初始化页面逻辑,连接到已有浏览器
Args:
browser: 已连接的浏览器实例
Returns:
tuple: (page_instance, is_page_ready) - 页面实例和就绪状态
"""
print("--- 初始化页面逻辑 (连接到现有浏览器) ---")
temp_context = None
storage_state_path_to_use = None
launch_mode = os.environ.get('LAUNCH_MODE', 'debug')
active_auth_json_path = os.environ.get('ACTIVE_AUTH_JSON_PATH')
print(f" 检测到启动模式: {launch_mode}")
loop = asyncio.get_running_loop()
# Determine storage state path based on launch_mode (simplified logic shown)
if launch_mode == 'headless':
auth_filename = os.environ.get('ACTIVE_AUTH_JSON_PATH')
if auth_filename:
constructed_path = os.path.join(ACTIVE_AUTH_DIR, auth_filename)
if os.path.exists(constructed_path):
storage_state_path_to_use = constructed_path
print(f" 无头模式将使用的认证文件: {constructed_path}")
else:
raise RuntimeError(f"无头模式认证文件无效: '{constructed_path}'")
else:
raise RuntimeError("无头模式需要设置 ACTIVE_AUTH_JSON_PATH 环境变量。")
elif launch_mode == 'debug':
# ... (Logic for selecting profile in debug mode) ...
print(f" 调试模式: 检查可用的认证文件...")
available_profiles = []
for profile_dir in [ACTIVE_AUTH_DIR, SAVED_AUTH_DIR]:
if os.path.exists(profile_dir):
try:
for filename in os.listdir(profile_dir):
if filename.endswith(".json"):
full_path = os.path.join(profile_dir, filename)
relative_dir = os.path.basename(profile_dir)
available_profiles.append({"name": f"{relative_dir}/{filename}", "path": full_path})
except OSError as e: print(f" ⚠️ 警告: 无法读取目录 '{profile_dir}': {e}")
if available_profiles:
print('-'*60 + "\n 找到以下可用的认证文件:")
for i, profile in enumerate(available_profiles): print(f" {i+1}: {profile['name']}")
print(" N: 不加载任何文件 (使用浏览器当前状态)\n" + '-'*60)
choice = await loop.run_in_executor(None, input, " 请选择要加载的认证文件编号 (输入 N 或直接回车则不加载): ")
if choice.lower() != 'n' and choice:
try:
choice_index = int(choice) - 1
if 0 <= choice_index < len(available_profiles):
selected_profile = available_profiles[choice_index]
storage_state_path_to_use = selected_profile["path"]
print(f" 已选择加载: {selected_profile['name']}")
else: print(" 无效的选择编号。将不加载认证文件。")
except ValueError: print(" 无效的输入。将不加载认证文件。")
else: print(" 好的,不加载认证文件。")
print('-'*60)
else: print(" 未找到认证文件。将使用浏览器当前状态。")
else: print(f" ⚠️ 警告: 未知的启动模式 '{launch_mode}'。不加载 storage_state。")
try:
print("创建新的浏览器上下文...")
context_options = {'viewport': {'width': 460, 'height': 800}}
if storage_state_path_to_use:
context_options['storage_state'] = storage_state_path_to_use
print(f" (使用 storage_state='{os.path.basename(storage_state_path_to_use)}')")
else: print(" (不使用 storage_state)")
temp_context = await browser.new_context(**context_options)
found_page = None
pages = temp_context.pages
target_url_base = f"https://{AI_STUDIO_URL_PATTERN}"
target_full_url = f"{target_url_base}prompts/new_chat"
login_url_pattern = 'accounts.google.com'
current_url = ""
# Find or create AI Studio page (simplified logic shown)
for p in pages:
try:
page_url_check = p.url
if not p.is_closed() and target_url_base in page_url_check and "/prompts/" in page_url_check:
found_page = p; current_url = page_url_check; break
# Add logic to navigate existing non-chat pages if needed
except PlaywrightAsyncError as pw_err:
print(f" 警告: 检查页面 URL 时出现Playwright错误: {pw_err}")
except AttributeError as attr_err:
print(f" 警告: 检查页面 URL 时出现属性错误: {attr_err}")
except Exception as e:
print(f" 警告: 检查页面 URL 时出现其他未预期错误: {e}")
print(f" 错误类型: {type(e).__name__}")
if not found_page:
print(f"-> 未找到合适的现有页面,正在打开新页面并导航到 {target_full_url}...")
found_page = await temp_context.new_page()
try:
await found_page.goto(target_full_url, wait_until="domcontentloaded", timeout=90000)
current_url = found_page.url
print(f"-> 新页面导航尝试完成。当前 URL: {current_url}")
except Exception as new_page_nav_err:
await save_error_snapshot(f"init_new_page_nav_fail")
# --- 新增: 检查特定网络错误并提供用户提示 ---
error_str = str(new_page_nav_err)
if "NS_ERROR_NET_INTERRUPT" in error_str:
print("\n" + "="*30 + " 网络导航错误提示 " + "="*30)
print(f"❌ 导航到 '{target_full_url}' 失败,出现网络中断错误 (NS_ERROR_NET_INTERRUPT)。")
print(" 这通常表示浏览器在尝试加载页面时连接被意外断开。")
print(" 可能的原因及排查建议:")
print(" 1. 网络连接: 请检查你的本地网络连接是否稳定,并尝试在普通浏览器中访问目标网址。")
print(" 2. AI Studio 服务: 确认 aistudio.google.com 服务本身是否可用。")
print(" 3. 防火墙/代理/VPN: 检查本地防火墙、杀毒软件、代理或 VPN 设置,确保它们没有阻止 Python 或浏览器的网络访问。")
print(" 4. Camoufox 服务: 确认 launch_camoufox.py 脚本是否正常运行,并且没有相关错误。")
print(" 5. 资源问题: 确保系统有足够的内存和 CPU 资源。")
print(" 请根据上述建议排查后重试。")
print("="*74 + "\n")
# --- 结束新增部分 ---
raise RuntimeError(f"导航新页面失败: {new_page_nav_err}") from new_page_nav_err
# Handle login redirect (simplified logic shown)
if login_url_pattern in current_url:
if launch_mode == 'headless':
raise RuntimeError("无头模式认证失败,需要更新认证文件。")
else: # Debug mode
print(f"\n{'='*20} 需要操作 {'='*20}")
print(f" 请在浏览器窗口中完成 Google 登录,然后按 Enter 键继续...")
await loop.run_in_executor(None, input)
print(" 感谢操作!正在检查登录状态...")
try:
await found_page.wait_for_url(f"**/{AI_STUDIO_URL_PATTERN}**", timeout=180000)
current_url = found_page.url
if login_url_pattern in current_url:
raise RuntimeError("手动登录尝试后仍在登录页面。")
print(" ✅ 登录成功!请不要操作窗口,等待保存认证状态选择器启动。")
# Ask to save state (simplified)
save_prompt = " 是否要将当前的浏览器认证状态保存到文件? (y/N): "
should_save = await loop.run_in_executor(None, input, save_prompt)
if should_save.lower() == 'y':
# ... (Logic to get filename and save state) ...
os.makedirs(SAVED_AUTH_DIR, exist_ok=True)
default_filename = f"auth_state_{int(time.time())}.json"
filename_prompt = f" 请输入保存的文件名 (默认为: {default_filename}): "
save_filename = await loop.run_in_executor(None, input, filename_prompt) or default_filename
if not save_filename.endswith(".json"): save_filename += ".json"
save_path = os.path.join(SAVED_AUTH_DIR, save_filename)
try:
await temp_context.storage_state(path=save_path)
print(f" ✅ 认证状态已成功保存到: {save_path}")
except Exception as save_err: print(f" ❌ 保存认证状态失败: {save_err}")
else: print(" 好的,不保存认证状态。")
except Exception as wait_err:
await save_error_snapshot(f"init_login_wait_fail")
raise RuntimeError(f"登录提示后未能检测到 AI Studio URL: {wait_err}")
elif target_url_base not in current_url or "/prompts/" not in current_url:
await save_error_snapshot(f"init_unexpected_page")
raise RuntimeError(f"初始导航后出现意外页面: {current_url}。")
print(f"-> 确认当前位于 AI Studio 对话页面: {current_url}")
await found_page.bring_to_front()
try:
input_wrapper_locator = found_page.locator('ms-prompt-input-wrapper')
await expect_async(input_wrapper_locator).to_be_visible(timeout=35000)
await expect_async(found_page.locator(INPUT_SELECTOR)).to_be_visible(timeout=10000)
print("-> ✅ 核心输入区域可见。")
result_page = found_page
result_ready = True
print(f"✅ 页面逻辑初始化成功。")
return result_page, result_ready
except Exception as input_visible_err:
await save_error_snapshot(f"init_fail_input_timeout")
raise RuntimeError(f"页面初始化失败:核心输入区域未在预期时间内变为可见。最后的 URL 是 {found_page.url}") from input_visible_err
except Exception as e:
print(f"❌ 页面逻辑初始化期间发生意外错误: {e}")
if temp_context:
try: await temp_context.close()
except: pass
await save_error_snapshot(f"init_unexpected_error")
raise RuntimeError(f"页面初始化意外错误: {e}") from e
# Note: temp_context is intentionally not closed on success, result_page belongs to it.
# The context will be closed when the browser connection closes during shutdown.
# --- Page Shutdown --- (Simplified)
async def _close_page_logic():
"""关闭页面并重置状态
Returns:
tuple: (page, is_ready) - 更新后的页面实例(None)和就绪状态(False)
"""
global page_instance, is_page_ready
print("--- 运行页面逻辑关闭 --- ")
if page_instance and not page_instance.is_closed():
try:
await page_instance.close()
print(" ✅ 页面已关闭")
except PlaywrightAsyncError as pw_err:
print(f" ⚠️ 关闭页面时出现Playwright错误: {pw_err}")
except asyncio.TimeoutError as timeout_err:
print(f" ⚠️ 关闭页面时超时: {timeout_err}")
except Exception as other_err:
print(f" ⚠️ 关闭页面时出现意外错误: {other_err}")
print(f" 错误类型: {type(other_err).__name__}")
page_instance = None
is_page_ready = False
print("页面逻辑状态已重置。")
return None, False
# --- Camoufox Shutdown Signal --- (Simplified)
async def signal_camoufox_shutdown():
# ... (Existing implementation) ...
try:
print(" 尝试发送关闭信号到 Camoufox 服务器...")
ws_endpoint = os.environ.get('CAMOUFOX_WS_ENDPOINT')
if not ws_endpoint: print(" ⚠️ 无法发送关闭信号:未找到 CAMOUFOX_WS_ENDPOINT"); return
if not browser_instance or not browser_instance.is_connected(): print(" ⚠️ 浏览器实例已断开,跳过关闭信号发送"); return
# Simulate signaling if direct API not available
await asyncio.sleep(0.2)
print(" ✅ 关闭信号已处理")
except Exception as e: print(f" ⚠️ 发送关闭信号过程中捕获异常: {e}")
# --- Lifespan Context Manager --- (Simplified)
@asynccontextmanager
async def lifespan(app_param: FastAPI):
# ... (Existing implementation, ensure it calls _initialize_page_logic and starts queue_worker) ...
global playwright_manager, browser_instance, page_instance, worker_task
global is_playwright_ready, is_browser_connected, is_page_ready, is_initializing
is_initializing = True
print("\n" + "="*60 + "\n 🚀 AI Studio Proxy Server (Python/FastAPI - Refactored) 🚀\n" + "="*60)
print(f"FastAPI 生命周期: 启动中...")
try:
os.makedirs(ACTIVE_AUTH_DIR, exist_ok=True); os.makedirs(SAVED_AUTH_DIR, exist_ok=True)
print(f" 确保认证目录存在: Active: {ACTIVE_AUTH_DIR}, Saved: {SAVED_AUTH_DIR}")
print(f" 启动 Playwright...")
playwright_manager = await async_playwright().start()
is_playwright_ready = True
print(f" ✅ Playwright 已启动。")
ws_endpoint = os.environ.get('CAMOUFOX_WS_ENDPOINT')
if not ws_endpoint: raise ValueError("未找到 CAMOUFOX_WS_ENDPOINT 环境变量。")
print(f" 连接到 Camoufox 服务器于: {ws_endpoint}")
try:
browser_instance = await playwright_manager.firefox.connect(ws_endpoint, timeout=30000)
is_browser_connected = True
print(f" ✅ 已连接到浏览器实例: 版本 {browser_instance.version}")
except Exception as connect_err:
raise RuntimeError(f"未能连接到 Camoufox 服务器: {connect_err}") from connect_err
# 从初始化函数获取返回值,而不是依赖函数直接修改全局变量
global page_instance, is_page_ready
page_instance, is_page_ready = await _initialize_page_logic(browser_instance)
if is_page_ready and is_browser_connected:
print(f" 启动请求队列 Worker...")
worker_task = asyncio.create_task(queue_worker())
print(f" ✅ 请求队列 Worker 已启动。")
else:
raise RuntimeError("页面或浏览器初始化失败,无法启动 Worker。")
print(f"✅ FastAPI 生命周期: 启动完成。")
is_initializing = False
yield # Application runs here
except Exception as startup_err:
print(f"❌ FastAPI 生命周期: 启动期间出错: {startup_err}")
traceback.print_exc()
# Ensure cleanup happens
if worker_task and not worker_task.done(): worker_task.cancel()
if browser_instance and browser_instance.is_connected():
try: await browser_instance.close()
except: pass
if playwright_manager:
try: await playwright_manager.stop()
except: pass
raise RuntimeError(f"应用程序启动失败: {startup_err}") from startup_err
finally:
is_initializing = False
print(f"\nFastAPI 生命周期: 关闭中...")
# ... (Existing shutdown logic: cancel worker, close page, signal camoufox, close browser, stop playwright) ...
if worker_task and not worker_task.done():
print(f" 正在取消请求队列 Worker...")
worker_task.cancel()
try: await asyncio.wait_for(worker_task, timeout=5.0); print(f" ✅ 请求队列 Worker 已停止/取消。")
except asyncio.TimeoutError: print(f" ⚠️ Worker 等待超时。")
except asyncio.CancelledError: print(f" ✅ 请求队列 Worker 已确认取消。")
except Exception as wt_err: print(f" ❌ 等待 Worker 停止时出错: {wt_err}")
# 获取_close_page_logic返回的更新状态并设置全局变量
page_instance, is_page_ready = await _close_page_logic()
browser_ready_for_shutdown = bool(browser_instance and browser_instance.is_connected())
if browser_ready_for_shutdown: await signal_camoufox_shutdown()
if browser_instance:
print(f" 正在关闭与浏览器实例的连接...")
try:
if browser_instance.is_connected(): await browser_instance.close(); print(f" ✅ 浏览器连接已关闭。")
else: print(f" ℹ️ 浏览器已断开连接。")
except Exception as close_err: print(f" ❌ 关闭浏览器连接时出错: {close_err}")
finally: browser_instance = None; is_browser_connected = False
if playwright_manager:
print(f" 停止 Playwright...")
try: await playwright_manager.stop(); print(f" ✅ Playwright 已停止。")
except Exception as stop_err: print(f" ❌ 停止 Playwright 时出错: {stop_err}")
finally: playwright_manager = None; is_playwright_ready = False
print(f"✅ FastAPI 生命周期: 关闭完成。")
# --- FastAPI App ---
app = FastAPI(
title="AI Studio Proxy Server (Python/FastAPI/Camoufox - Refactored)",
description="Refactored proxy server with unified request processing.",
version="0.4.0-py-refactored",
lifespan=lifespan
)
# --- Static Files & API Info ---
@app.get("/", response_class=FileResponse)
async def read_index():
index_html_path = os.path.join(os.path.dirname(__file__), "index.html")
if not os.path.exists(index_html_path): raise HTTPException(status_code=404, detail="index.html not found")
return FileResponse(index_html_path)
@app.get("/api/info")
async def get_api_info(request: Request):
host = request.headers.get('host') or f"127.0.0.1:8000" # Provide a default if headers missing
scheme = request.headers.get('x-forwarded-proto', 'http')
base_url = f"{scheme}://{host}"
api_base = f"{base_url}/v1"
return JSONResponse(content={
"model_name": MODEL_NAME, "api_base_url": api_base, "server_base_url": base_url,
"api_key_required": False, "message": "API Key is not required."
})
# --- API Endpoints ---
@app.get("/health")
async def health_check():
is_worker_running = bool(worker_task and not worker_task.done())
is_core_ready = is_playwright_ready and is_browser_connected and is_page_ready
status_val = "OK" if is_core_ready and is_worker_running else "Error"
q_size = request_queue.qsize() if request_queue else -1
status = {
"status": status_val, "message": "", "playwrightReady": is_playwright_ready,
"browserConnected": is_browser_connected, "pageReady": is_page_ready,
"initializing": is_initializing, "workerRunning": is_worker_running, "queueLength": q_size
}
if status_val == "OK":
status["message"] = f"服务运行中。队列长度: {q_size}。"
return JSONResponse(content=status, status_code=200)
else:
reasons = []
if not is_playwright_ready: reasons.append("Playwright 未初始化")
if not is_browser_connected: reasons.append("浏览器断开")
if not is_page_ready: reasons.append("页面未就绪")
if not is_worker_running: reasons.append("Worker 未运行")
if is_initializing: reasons.append("初始化进行中")
status["message"] = f"服务不可用。问题: {(', '.join(reasons) if reasons else '未知')}. 队列长度: {q_size}."
return JSONResponse(content=status, status_code=503)
@app.get("/v1/models")
async def list_models():
print("[API] 收到 /v1/models 请求。")
return {"object": "list", "data": [{"id": MODEL_NAME, "object": "model", "created": int(time.time()), "owned_by": "camoufox-proxy"}]}
# --- Helper: Detect Error ---
async def detect_and_extract_page_error(page: AsyncPage, req_id: str) -> Optional[str]:
error_toast_locator = page.locator(ERROR_TOAST_SELECTOR).last
try:
await error_toast_locator.wait_for(state='visible', timeout=500)
message_locator = error_toast_locator.locator('span.content-text')
error_message = await message_locator.text_content(timeout=500)
if error_message:
# print(f"[{req_id}] 检测到并提取错误消息: {error_message}")
logger.error(f"[{req_id}] 检测到并提取错误消息: {error_message}") # logger
return error_message.strip()
else:
# print(f"[{req_id}] 警告: 检测到错误提示框,但无法提取消息。")
logger.warning(f"[{req_id}] 检测到错误提示框,但无法提取消息。") # logger
return "检测到错误提示框,但无法提取特定消息。"
except PlaywrightAsyncError: return None
except Exception as e:
# print(f"[{req_id}] 警告: 检查页面错误时出错: {e}")
logger.warning(f"[{req_id}] 检查页面错误时出错: {e}") # logger
return None
# --- Snapshot Helper --- (Simplified)
async def save_error_snapshot(error_name: str = 'error'):
# ... (Existing implementation) ...
name_parts = error_name.split('_')
req_id = name_parts[-1] if len(name_parts) > 1 and len(name_parts[-1]) == 7 else None
base_error_name = error_name if not req_id else '_'.join(name_parts[:-1])
log_prefix = f"[{req_id}]" if req_id else "[无请求ID]"
page_to_snapshot = page_instance
if not browser_instance or not browser_instance.is_connected() or not page_to_snapshot or page_to_snapshot.is_closed():
# print(f"{log_prefix} 无法保存快照 ({base_error_name}),浏览器/页面不可用。")
logger.warning(f"{log_prefix} 无法保存快照 ({base_error_name}),浏览器/页面不可用。") # logger
return
# print(f"{log_prefix} 尝试保存错误快照 ({base_error_name})...")
logger.info(f"{log_prefix} 尝试保存错误快照 ({base_error_name})...") # logger
timestamp = int(time.time() * 1000)
error_dir = os.path.join(os.path.dirname(__file__), 'errors_py')
try:
os.makedirs(error_dir, exist_ok=True)
filename_suffix = f"{req_id}_{timestamp}" if req_id else f"{timestamp}"
filename_base = f"{base_error_name}_{filename_suffix}"
screenshot_path = os.path.join(error_dir, f"{filename_base}.png")
html_path = os.path.join(error_dir, f"{filename_base}.html")
try:
await page_to_snapshot.screenshot(path=screenshot_path, full_page=True, timeout=15000)
# print(f"{log_prefix} 快照已保存到: {screenshot_path}")
logger.info(f"{log_prefix} 快照已保存到: {screenshot_path}") # logger
except Exception as ss_err:
# print(f"{log_prefix} 保存屏幕截图失败 ({base_error_name}): {ss_err}")
logger.error(f"{log_prefix} 保存屏幕截图失败 ({base_error_name}): {ss_err}") # logger
try:
content = await page_to_snapshot.content()
f = None
try:
f = open(html_path, 'w', encoding='utf-8')
f.write(content)
# print(f"{log_prefix} HTML 已保存到: {html_path}")
logger.info(f"{log_prefix} HTML 已保存到: {html_path}") # logger
except Exception as write_err:
# print(f"{log_prefix} 保存 HTML 失败 ({base_error_name}): {write_err}")
logger.error(f"{log_prefix} 保存 HTML 失败 ({base_error_name}): {write_err}") # logger
finally:
if f:
try:
f.close()
# print(f"{log_prefix} HTML 文件已正确关闭")
logger.debug(f"{log_prefix} HTML 文件已正确关闭") # logger debug
except Exception as close_err:
# print(f"{log_prefix} 关闭 HTML 文件时出错: {close_err}")
logger.error(f"{log_prefix} 关闭 HTML 文件时出错: {close_err}") # logger
except Exception as html_err:
# print(f"{log_prefix} 获取页面内容失败 ({base_error_name}): {html_err}")
logger.error(f"{log_prefix} 获取页面内容失败 ({base_error_name}): {html_err}") # logger
except Exception as dir_err:
# print(f"{log_prefix} 创建错误目录或保存快照时出错: {dir_err}")
print(f"{log_prefix} 获取页面内容失败 ({base_error_name}): {html_err}")
except Exception as dir_err: print(f"{log_prefix} 创建错误目录或保存快照时出错: {dir_err}")
# --- V4: New Helper - Get response via Edit Button ---
async def get_response_via_edit_button(
page: AsyncPage,
req_id: str,
check_client_disconnected: Callable
) -> Optional[str]:
"""Attempts to get the response content using the edit button.
Implementation mirrors original stream logic closely.
"""
print(f"[{req_id}] (Helper) 尝试通过编辑按钮获取响应...", flush=True)
edit_button = page.locator(EDIT_MESSAGE_BUTTON_SELECTOR)
textarea = page.locator(MESSAGE_TEXTAREA_SELECTOR)
finish_edit_button = page.locator(FINISH_EDIT_BUTTON_SELECTOR)
try:
# 1. Click the Edit button
print(f"[{req_id}] - 定位并点击编辑按钮...", flush=True)
try:
# Direct Playwright calls with timeout
await expect_async(edit_button).to_be_visible(timeout=CLICK_TIMEOUT_MS)
check_client_disconnected("编辑响应 - 编辑按钮可见后: ")
await edit_button.click(timeout=CLICK_TIMEOUT_MS)
print(f"[{req_id}] - 编辑按钮已点击。", flush=True)
except Exception as edit_btn_err:
print(f"[{req_id}] - ❌ 编辑按钮不可见或点击失败: {edit_btn_err}", flush=True)
await save_error_snapshot(f"edit_response_edit_button_failed_{req_id}")
return None
check_client_disconnected("编辑响应 - 点击编辑按钮后: ")
await asyncio.sleep(0.3) # Use asyncio.sleep
check_client_disconnected("编辑响应 - 点击编辑按钮后延时后: ")
# 2. Get content from textarea
print(f"[{req_id}] - 从文本区域获取内容...", flush=True)
response_content = None