1515import feapder .utils .tools as tools
1616from feapder .db .memorydb import MemoryDB
1717from feapder .db .redisdb import RedisDB
18+ from feapder .core .runtime_state import RuntimeState
1819from feapder .dedup import Dedup
1920from feapder .utils .log import log
2021
@@ -60,6 +61,7 @@ class RequestBuffer(AirSpiderRequestBuffer, threading.Thread):
6061 def __init__ (self , redis_key ):
6162 AirSpiderRequestBuffer .__init__ (self , db = RedisDB (), dedup_name = redis_key )
6263 threading .Thread .__init__ (self )
64+ self ._state = RuntimeState ()
6365
6466 self ._thread_stop = False
6567 self ._is_adding_to_db = False
@@ -74,7 +76,8 @@ def __init__(self, redis_key):
7476
7577 def run (self ):
7678 self ._thread_stop = False
77- while not self ._thread_stop :
79+ self ._state = RuntimeState ()
80+ while not self ._state .is_stop_requested :
7881 try :
7982 self .__add_request_to_db ()
8083 except Exception as e :
@@ -84,6 +87,7 @@ def run(self):
8487
8588 def stop (self ):
8689 self ._thread_stop = True
90+ self ._state .request_stop ()
8791 self ._started .clear ()
8892
8993 def put_request (self , request ):
@@ -113,62 +117,74 @@ def flush(self):
113117 def get_requests_count (self ):
114118 return len (self ._requests_deque )
115119
120+ def get_delete_requests_count (self ):
121+ return len (self ._del_requests_deque )
122+
123+ def pending_count (self ):
124+ return self .get_requests_count () + self .get_delete_requests_count ()
125+
126+ def is_idle (self ):
127+ return self .pending_count () == 0 and not self .is_adding_to_db ()
128+
129+ def is_stopped (self ):
130+ return self ._state .is_stop_requested
131+
116132 def is_adding_to_db (self ):
117133 return self ._is_adding_to_db
118134
119135 def __add_request_to_db (self ):
136+ self ._is_adding_to_db = True
120137 request_list = []
121138 prioritys = []
122139 callbacks = []
123-
124- while self ._requests_deque :
125- request = self ._requests_deque .popleft ()
126- self ._is_adding_to_db = True
127-
128- if callable (request ):
129- # 函数
130- # 注意:应该考虑闭包情况。闭包情况可写成
131- # def test(xxx = xxx):
132- # # TODO 业务逻辑 使用 xxx
133- # 这么写不会导致xxx为循环结束后的最后一个值
134- callbacks .append (request )
135- continue
136-
137- priority = request .priority
138-
139- # 如果需要去重并且库中已重复 则continue
140- if self .is_exist_request (request ):
141- continue
142- else :
143- request_list .append (str (request .to_dict ))
144- prioritys .append (priority )
145-
146- if len (request_list ) > MAX_URL_COUNT :
140+ try :
141+ while self ._requests_deque :
142+ request = self ._requests_deque .popleft ()
143+
144+ if callable (request ):
145+ # 函数
146+ # 注意:应该考虑闭包情况。闭包情况可写成
147+ # def test(xxx = xxx):
148+ # # TODO 业务逻辑 使用 xxx
149+ # 这么写不会导致xxx为循环结束后的最后一个值
150+ callbacks .append (request )
151+ continue
152+
153+ priority = request .priority
154+
155+ # 如果需要去重并且库中已重复 则continue
156+ if self .is_exist_request (request ):
157+ continue
158+ else :
159+ request_list .append (str (request .to_dict ))
160+ prioritys .append (priority )
161+
162+ if len (request_list ) > MAX_URL_COUNT :
163+ self ._db .zadd (self ._table_request , request_list , prioritys )
164+ request_list = []
165+ prioritys = []
166+
167+ # 入库
168+ if request_list :
147169 self ._db .zadd (self ._table_request , request_list , prioritys )
148- request_list = []
149- prioritys = []
150-
151- # 入库
152- if request_list :
153- self ._db .zadd (self ._table_request , request_list , prioritys )
154-
155- # 执行回调
156- for callback in callbacks :
157- try :
158- callback ()
159- except Exception as e :
160- log .exception (e )
161-
162- # 删除已做任务
163- if self ._del_requests_deque :
164- request_done_list = []
165- while self ._del_requests_deque :
166- request_done_list .append (self ._del_requests_deque .popleft ())
167-
168- # 去掉request_list中的requests, 否则可能会将刚添加的request删除
169- request_done_list = list (set (request_done_list ) - set (request_list ))
170170
171- if request_done_list :
172- self ._db .zrem (self ._table_request , request_done_list )
173-
174- self ._is_adding_to_db = False
171+ # 执行回调
172+ for callback in callbacks :
173+ try :
174+ callback ()
175+ except Exception as e :
176+ log .exception (e )
177+
178+ # 删除已做任务
179+ if self ._del_requests_deque :
180+ request_done_list = []
181+ while self ._del_requests_deque :
182+ request_done_list .append (self ._del_requests_deque .popleft ())
183+
184+ # 去掉request_list中的requests, 否则可能会将刚添加的request删除
185+ request_done_list = list (set (request_done_list ) - set (request_list ))
186+
187+ if request_done_list :
188+ self ._db .zrem (self ._table_request , request_done_list )
189+ finally :
190+ self ._is_adding_to_db = False
0 commit comments