Skip to content

Commit 03f3c9b

Browse files
committed
refactor: expose runtime idle status
1 parent 8599539 commit 03f3c9b

7 files changed

Lines changed: 312 additions & 85 deletions

File tree

feapder/buffer/item_buffer.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import feapder.utils.tools as tools
1515
from feapder import setting
1616
from feapder.db.redisdb import RedisDB
17+
from feapder.core.runtime_state import RuntimeState
1718
from feapder.dedup import Dedup
1819
from feapder.network.item import Item, UpdateItem
1920
from feapder.pipelines import BasePipeline
@@ -31,6 +32,7 @@ class ItemBuffer(threading.Thread):
3132
def __init__(self, redis_key, task_table=None):
3233
if not hasattr(self, "_table_item"):
3334
super(ItemBuffer, self).__init__()
35+
self._state = RuntimeState()
3436

3537
self._thread_stop = False
3638
self._is_adding_to_db = False
@@ -106,14 +108,16 @@ def mysql_pipeline(self):
106108

107109
def run(self):
108110
self._thread_stop = False
109-
while not self._thread_stop:
111+
self._state = RuntimeState()
112+
while not self._state.is_stop_requested:
110113
self.flush()
111114
tools.delay_time(setting.ITEM_UPLOAD_INTERVAL)
112115

113116
self.close()
114117

115118
def stop(self):
116119
self._thread_stop = True
120+
self._state.request_stop()
117121
self._started.clear()
118122

119123
def put_item(self, item):
@@ -174,6 +178,15 @@ def flush(self):
174178
def get_items_count(self):
175179
return self._items_queue.qsize()
176180

181+
def pending_count(self):
182+
return self.get_items_count()
183+
184+
def is_idle(self):
185+
return self.pending_count() == 0 and not self.is_adding_to_db()
186+
187+
def is_stopped(self):
188+
return self._state.is_stop_requested
189+
177190
def is_adding_to_db(self):
178191
return self._is_adding_to_db
179192

feapder/buffer/request_buffer.py

Lines changed: 67 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import feapder.utils.tools as tools
1616
from feapder.db.memorydb import MemoryDB
1717
from feapder.db.redisdb import RedisDB
18+
from feapder.core.runtime_state import RuntimeState
1819
from feapder.dedup import Dedup
1920
from feapder.utils.log import log
2021

@@ -60,6 +61,7 @@ class RequestBuffer(AirSpiderRequestBuffer, threading.Thread):
6061
def __init__(self, redis_key):
6162
AirSpiderRequestBuffer.__init__(self, db=RedisDB(), dedup_name=redis_key)
6263
threading.Thread.__init__(self)
64+
self._state = RuntimeState()
6365

6466
self._thread_stop = False
6567
self._is_adding_to_db = False
@@ -74,7 +76,8 @@ def __init__(self, redis_key):
7476

7577
def run(self):
7678
self._thread_stop = False
77-
while not self._thread_stop:
79+
self._state = RuntimeState()
80+
while not self._state.is_stop_requested:
7881
try:
7982
self.__add_request_to_db()
8083
except Exception as e:
@@ -84,6 +87,7 @@ def run(self):
8487

8588
def stop(self):
8689
self._thread_stop = True
90+
self._state.request_stop()
8791
self._started.clear()
8892

8993
def put_request(self, request):
@@ -113,62 +117,74 @@ def flush(self):
113117
def get_requests_count(self):
114118
return len(self._requests_deque)
115119

120+
def get_delete_requests_count(self):
121+
return len(self._del_requests_deque)
122+
123+
def pending_count(self):
124+
return self.get_requests_count() + self.get_delete_requests_count()
125+
126+
def is_idle(self):
127+
return self.pending_count() == 0 and not self.is_adding_to_db()
128+
129+
def is_stopped(self):
130+
return self._state.is_stop_requested
131+
116132
def is_adding_to_db(self):
117133
return self._is_adding_to_db
118134

119135
def __add_request_to_db(self):
136+
self._is_adding_to_db = True
120137
request_list = []
121138
prioritys = []
122139
callbacks = []
123-
124-
while self._requests_deque:
125-
request = self._requests_deque.popleft()
126-
self._is_adding_to_db = True
127-
128-
if callable(request):
129-
# 函数
130-
# 注意:应该考虑闭包情况。闭包情况可写成
131-
# def test(xxx = xxx):
132-
# # TODO 业务逻辑 使用 xxx
133-
# 这么写不会导致xxx为循环结束后的最后一个值
134-
callbacks.append(request)
135-
continue
136-
137-
priority = request.priority
138-
139-
# 如果需要去重并且库中已重复 则continue
140-
if self.is_exist_request(request):
141-
continue
142-
else:
143-
request_list.append(str(request.to_dict))
144-
prioritys.append(priority)
145-
146-
if len(request_list) > MAX_URL_COUNT:
140+
try:
141+
while self._requests_deque:
142+
request = self._requests_deque.popleft()
143+
144+
if callable(request):
145+
# 函数
146+
# 注意:应该考虑闭包情况。闭包情况可写成
147+
# def test(xxx = xxx):
148+
# # TODO 业务逻辑 使用 xxx
149+
# 这么写不会导致xxx为循环结束后的最后一个值
150+
callbacks.append(request)
151+
continue
152+
153+
priority = request.priority
154+
155+
# 如果需要去重并且库中已重复 则continue
156+
if self.is_exist_request(request):
157+
continue
158+
else:
159+
request_list.append(str(request.to_dict))
160+
prioritys.append(priority)
161+
162+
if len(request_list) > MAX_URL_COUNT:
163+
self._db.zadd(self._table_request, request_list, prioritys)
164+
request_list = []
165+
prioritys = []
166+
167+
# 入库
168+
if request_list:
147169
self._db.zadd(self._table_request, request_list, prioritys)
148-
request_list = []
149-
prioritys = []
150-
151-
# 入库
152-
if request_list:
153-
self._db.zadd(self._table_request, request_list, prioritys)
154-
155-
# 执行回调
156-
for callback in callbacks:
157-
try:
158-
callback()
159-
except Exception as e:
160-
log.exception(e)
161-
162-
# 删除已做任务
163-
if self._del_requests_deque:
164-
request_done_list = []
165-
while self._del_requests_deque:
166-
request_done_list.append(self._del_requests_deque.popleft())
167-
168-
# 去掉request_list中的requests, 否则可能会将刚添加的request删除
169-
request_done_list = list(set(request_done_list) - set(request_list))
170170

171-
if request_done_list:
172-
self._db.zrem(self._table_request, request_done_list)
173-
174-
self._is_adding_to_db = False
171+
# 执行回调
172+
for callback in callbacks:
173+
try:
174+
callback()
175+
except Exception as e:
176+
log.exception(e)
177+
178+
# 删除已做任务
179+
if self._del_requests_deque:
180+
request_done_list = []
181+
while self._del_requests_deque:
182+
request_done_list.append(self._del_requests_deque.popleft())
183+
184+
# 去掉request_list中的requests, 否则可能会将刚添加的request删除
185+
request_done_list = list(set(request_done_list) - set(request_list))
186+
187+
if request_done_list:
188+
self._db.zrem(self._table_request, request_done_list)
189+
finally:
190+
self._is_adding_to_db = False

feapder/core/collector.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import feapder.setting as setting
1616
import feapder.utils.tools as tools
1717
from feapder.db.redisdb import RedisDB
18+
from feapder.core.runtime_state import RuntimeState
1819
from feapder.network.request import Request
1920
from feapder.utils.log import log
2021

@@ -30,6 +31,7 @@ def __init__(self, redis_key):
3031
"""
3132

3233
super(Collector, self).__init__()
34+
self._state = RuntimeState()
3335
self._db = RedisDB()
3436

3537
self._thread_stop = False
@@ -40,9 +42,11 @@ def __init__(self, redis_key):
4042

4143
def run(self):
4244
self._thread_stop = False
43-
while not self._thread_stop:
45+
self._state = RuntimeState()
46+
while not self._state.is_stop_requested:
4447
try:
45-
self.__input_data()
48+
with self._state.busy():
49+
self.__input_data()
4650
except Exception as e:
4751
log.exception(e)
4852
time.sleep(0.1)
@@ -51,6 +55,7 @@ def run(self):
5155

5256
def stop(self):
5357
self._thread_stop = True
58+
self._state.request_stop()
5459
self._started.clear()
5560

5661
def __input_data(self):
@@ -112,5 +117,14 @@ def get_requests_count(self):
112117
self._todo_requests.qsize() or self._db.zget_count(self._tab_requests) or 0
113118
)
114119

120+
def pending_count(self):
121+
return self.get_requests_count()
122+
123+
def is_idle(self):
124+
return not self.is_collector_task() and self.pending_count() == 0
125+
126+
def is_stopped(self):
127+
return self._state.is_stop_requested
128+
115129
def is_collector_task(self):
116130
return self._is_collector_task

feapder/core/parser_control.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from feapder.buffer.item_buffer import ItemBuffer
1919
from feapder.buffer.request_buffer import AirSpiderRequestBuffer
2020
from feapder.core.base_parser import BaseParser
21+
from feapder.core.runtime_state import RuntimeState
2122
from feapder.db.memorydb import MemoryDB
2223
from feapder.network.item import Item
2324
from feapder.network.request import Request
@@ -42,6 +43,7 @@ class ParserControl(threading.Thread):
4243

4344
def __init__(self, collector, redis_key, request_buffer, item_buffer):
4445
super(ParserControl, self).__init__()
46+
self._state = RuntimeState()
4547
self._parsers = []
4648
self._collector = collector
4749
self._redis_key = redis_key
@@ -52,7 +54,8 @@ def __init__(self, collector, redis_key, request_buffer, item_buffer):
5254

5355
def run(self):
5456
self._thread_stop = False
55-
while not self._thread_stop:
57+
self._state = RuntimeState()
58+
while not self._state.is_stop_requested:
5659
try:
5760
request = self._collector.get_request()
5861
if not request:
@@ -62,14 +65,21 @@ def run(self):
6265
continue
6366

6467
self.is_show_tip = False
65-
self.deal_request(request)
68+
with self._state.busy():
69+
self.deal_request(request)
6670

6771
except Exception as e:
6872
log.exception(e)
6973

7074
def is_not_task(self):
7175
return self.is_show_tip
7276

77+
def is_idle(self):
78+
return self.is_not_task() and self._state.is_idle
79+
80+
def is_stopped(self):
81+
return self._state.is_stop_requested
82+
7383
@classmethod
7484
def get_task_status_count(cls):
7585
return cls._failed_task_count, cls._success_task_count, cls._total_task_count
@@ -431,6 +441,7 @@ def record_download_status(self, status, spider):
431441

432442
def stop(self):
433443
self._thread_stop = True
444+
self._state.request_stop()
434445
self._started.clear()
435446

436447
def add_parser(self, parser: BaseParser):
@@ -467,14 +478,17 @@ def __init__(
467478
item_buffer: ItemBuffer,
468479
):
469480
super(ParserControl, self).__init__()
481+
self._state = RuntimeState()
470482
self._parsers = []
471483
self._memory_db = memory_db
472484
self._thread_stop = False
473485
self._request_buffer = request_buffer
474486
self._item_buffer = item_buffer
475487

476488
def run(self):
477-
while not self._thread_stop:
489+
self._thread_stop = False
490+
self._state = RuntimeState()
491+
while not self._state.is_stop_requested:
478492
try:
479493
request = self._memory_db.get()
480494
if not request:
@@ -484,7 +498,8 @@ def run(self):
484498
continue
485499

486500
self.is_show_tip = False
487-
self.deal_request(request)
501+
with self._state.busy():
502+
self.deal_request(request)
488503

489504
except Exception as e:
490505
log.exception(e)

feapder/core/scheduler.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -286,32 +286,19 @@ def _start(self):
286286
self.__add_task()
287287

288288
def all_thread_is_done(self):
289-
# 降低偶然性, 因为各个环节不是并发的,很有可能当时状态为假,但检测下一条时该状态为真。一次检测很有可能遇到这种偶然性
289+
# Check three times to avoid transient idle states between runtime stages.
290290
for i in range(3):
291-
# 检测 collector 状态
292-
if (
293-
self._collector.is_collector_task()
294-
or self._collector.get_requests_count() > 0
295-
):
291+
if not self._collector.is_idle():
296292
return False
297293

298-
# 检测 parser_control 状态
299294
for parser_control in self._parser_controls:
300-
if not parser_control.is_not_task():
295+
if not parser_control.is_idle():
301296
return False
302297

303-
# 检测 item_buffer 状态
304-
if (
305-
self._item_buffer.get_items_count() > 0
306-
or self._item_buffer.is_adding_to_db()
307-
):
298+
if not self._item_buffer.is_idle():
308299
return False
309300

310-
# 检测 request_buffer 状态
311-
if (
312-
self._request_buffer.get_requests_count() > 0
313-
or self._request_buffer.is_adding_to_db()
314-
):
301+
if not self._request_buffer.is_idle():
315302
return False
316303

317304
tools.delay_time(1)

0 commit comments

Comments
 (0)