File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ # -*- coding: utf-8 -*-
2+ """
3+ Small runtime state helper for background runtime threads.
4+ """
5+ from contextlib import contextmanager
6+ from threading import RLock
7+
8+
9+ class RuntimeState :
10+ def __init__ (self ):
11+ self ._stop_requested = False
12+ self ._busy_count = 0
13+ self ._lock = RLock ()
14+
15+ def request_stop (self ):
16+ with self ._lock :
17+ self ._stop_requested = True
18+
19+ @property
20+ def is_stop_requested (self ):
21+ with self ._lock :
22+ return self ._stop_requested
23+
24+ def mark_busy (self ):
25+ with self ._lock :
26+ self ._busy_count += 1
27+
28+ def mark_idle (self ):
29+ with self ._lock :
30+ if self ._busy_count > 0 :
31+ self ._busy_count -= 1
32+
33+ @property
34+ def busy_count (self ):
35+ with self ._lock :
36+ return self ._busy_count
37+
38+ @property
39+ def is_idle (self ):
40+ return self .busy_count == 0
41+
42+ @contextmanager
43+ def busy (self ):
44+ self .mark_busy ()
45+ try :
46+ yield
47+ finally :
48+ self .mark_idle ()
Original file line number Diff line number Diff line change 1+ from feapder .core .runtime_state import RuntimeState
2+
3+
4+ def test_runtime_state_starts_idle_and_running ():
5+ state = RuntimeState ()
6+
7+ assert state .is_idle is True
8+ assert state .is_stop_requested is False
9+ assert state .busy_count == 0
10+
11+
12+ def test_runtime_state_tracks_busy_count ():
13+ state = RuntimeState ()
14+
15+ state .mark_busy ()
16+ state .mark_busy ()
17+
18+ assert state .is_idle is False
19+ assert state .busy_count == 2
20+
21+ state .mark_idle ()
22+ state .mark_idle ()
23+
24+ assert state .is_idle is True
25+ assert state .busy_count == 0
26+
27+
28+ def test_runtime_state_does_not_go_negative ():
29+ state = RuntimeState ()
30+
31+ state .mark_idle ()
32+
33+ assert state .busy_count == 0
34+ assert state .is_idle is True
35+
36+
37+ def test_runtime_state_busy_context_resets_on_exception ():
38+ state = RuntimeState ()
39+
40+ try :
41+ with state .busy ():
42+ assert state .is_idle is False
43+ raise RuntimeError ("boom" )
44+ except RuntimeError :
45+ pass
46+
47+ assert state .is_idle is True
48+
49+
50+ def test_runtime_state_stop_request_is_sticky ():
51+ state = RuntimeState ()
52+
53+ state .request_stop ()
54+ state .request_stop ()
55+
56+ assert state .is_stop_requested is True
You can’t perform that action at this time.
0 commit comments