File tree Expand file tree Collapse file tree
src/main/java/io/github/aplotnikov/batch/processing/reactor Expand file tree Collapse file tree Original file line number Diff line number Diff line change 11package io .github .aplotnikov .batch .processing .reactor ;
22
3+ import lombok .RequiredArgsConstructor ;
4+ import lombok .experimental .FieldDefaults ;
5+
36import java .util .concurrent .atomic .AtomicBoolean ;
47
58import static io .github .aplotnikov .batch .processing .reactor .Response .Status .FAILED ;
69import static io .github .aplotnikov .batch .processing .reactor .Response .Status .SUCCESS ;
710import static java .util .concurrent .TimeUnit .SECONDS ;
811import static java .util .concurrent .locks .LockSupport .parkNanos ;
12+ import static lombok .AccessLevel .PRIVATE ;
913
14+ @ FieldDefaults (level = PRIVATE , makeFinal = true )
15+ @ RequiredArgsConstructor
1016class ClientProcessor {
1117
12- private final AtomicBoolean isLastResponseSuccessful = new AtomicBoolean ();
13-
14- private final int pause ;
18+ AtomicBoolean isLastResponseSuccessful = new AtomicBoolean ();
1519
16- ClientProcessor (int pause ) {
17- this .pause = pause ;
18- }
20+ int pause ;
1921
2022 Response process (Client client ) {
2123 parkNanos (SECONDS .toNanos (pause ));
Original file line number Diff line number Diff line change 11package io .github .aplotnikov .batch .processing .reactor ;
22
33import io .github .aplotnikov .batch .processing .reactor .source .XmlFileSource ;
4+ import lombok .RequiredArgsConstructor ;
5+ import lombok .experimental .FieldDefaults ;
46
5- class ReactorFileProcessor implements Runnable {
7+ import static lombok . AccessLevel . PRIVATE ;
68
7- private final XmlFileSource fileSource ;
9+ @ FieldDefaults (level = PRIVATE , makeFinal = true )
10+ @ RequiredArgsConstructor
11+ class ReactorFileProcessor implements Runnable {
812
9- private final XmlFileReader reader ;
13+ XmlFileSource fileSource ;
1014
11- private final ClientProcessor processor ;
15+ XmlFileReader reader ;
1216
13- ReactorFileProcessor (XmlFileSource fileSource , XmlFileReader reader , ClientProcessor processor ) {
14- this .fileSource = fileSource ;
15- this .reader = reader ;
16- this .processor = processor ;
17- }
17+ ClientProcessor processor ;
1818
1919 @ Override
2020 public void run () {
Original file line number Diff line number Diff line change 11package io .github .aplotnikov .batch .processing .reactor ;
22
3- import java . util . Objects ;
3+ import lombok . Value ;
44
5+ @ Value
56class Response {
67
7- private final long clientId ;
8+ long clientId ;
89
9- private final Status status ;
10-
11- Response (long clientId , Status status ) {
12- this .clientId = clientId ;
13- this .status = status ;
14- }
15-
16- long getClientId () {
17- return clientId ;
18- }
19-
20- Status getStatus () {
21- return status ;
22- }
23-
24- @ Override
25- public boolean equals (Object other ) {
26- if (this == other ) {
27- return true ;
28- }
29-
30- if (other == null || getClass () != other .getClass ()) {
31- return false ;
32- }
33-
34- Response that = (Response ) other ;
35- return clientId == that .clientId
36- && status == that .status ;
37- }
38-
39- @ Override
40- public int hashCode () {
41- return Objects .hash (clientId , status );
42- }
43-
44- @ Override
45- public String toString () {
46- return "ClientResponse{"
47- + "clientId=" + clientId
48- + ", status=" + status
49- + '}' ;
50- }
10+ Status status ;
5111
5212 enum Status {
5313 SUCCESS ,
Original file line number Diff line number Diff line change 11package io .github .aplotnikov .batch .processing .reactor .source ;
22
3+ import lombok .RequiredArgsConstructor ;
4+ import lombok .experimental .FieldDefaults ;
35import reactor .core .publisher .Flux ;
46
57import java .util .concurrent .atomic .AtomicInteger ;
68
79import static java .util .concurrent .TimeUnit .SECONDS ;
810import static java .util .concurrent .locks .LockSupport .parkNanos ;
11+ import static lombok .AccessLevel .PRIVATE ;
912
13+ @ FieldDefaults (level = PRIVATE , makeFinal = true )
14+ @ RequiredArgsConstructor
1015class Queue {
1116
12- private final int processedFileNumber ;
17+ int processedFileNumber ;
1318
14- private final int pause ;
15-
16- Queue (int processedFileNumber , int pause ) {
17- this .processedFileNumber = processedFileNumber ;
18- this .pause = pause ;
19- }
19+ int pause ;
2020
2121 Flux <String > poll () {
2222 AtomicInteger processedFiles = new AtomicInteger (0 );
Original file line number Diff line number Diff line change 11package io .github .aplotnikov .batch .processing .reactor .source ;
22
3+ import lombok .RequiredArgsConstructor ;
4+ import lombok .experimental .FieldDefaults ;
35import reactor .core .publisher .Flux ;
46
57import static java .util .concurrent .TimeUnit .SECONDS ;
68import static java .util .concurrent .locks .LockSupport .parkNanos ;
79import static java .util .stream .IntStream .range ;
10+ import static lombok .AccessLevel .PRIVATE ;
811
12+ @ FieldDefaults (level = PRIVATE , makeFinal = true )
13+ @ RequiredArgsConstructor
914class Repository {
1015
11- private final int processedFileNumber ;
16+ int processedFileNumber ;
1217
13- private final int pause ;
14-
15- Repository (int processedFileNumber , int pause ) {
16- this .processedFileNumber = processedFileNumber ;
17- this .pause = pause ;
18- }
18+ int pause ;
1919
2020 Flux <String > readAll () {
2121 return Flux .generate (
Original file line number Diff line number Diff line change 11package io .github .aplotnikov .batch .processing .reactor .source ;
22
3+ import lombok .RequiredArgsConstructor ;
4+ import lombok .experimental .FieldDefaults ;
35import reactor .core .publisher .Flux ;
46
5- public class XmlFileSource {
7+ import static lombok . AccessLevel . PRIVATE ;
68
7- private final Repository repository ;
9+ @ FieldDefaults (level = PRIVATE , makeFinal = true )
10+ @ RequiredArgsConstructor
11+ public class XmlFileSource {
812
9- private final Queue queue ;
13+ Repository repository ;
1014
11- public XmlFileSource (Repository repository , Queue queue ) {
12- this .repository = repository ;
13- this .queue = queue ;
14- }
15+ Queue queue ;
1516
1617 public Flux <String > readAll () {
1718 return Flux .merge (repository .readAll (), queue .poll ());
You can’t perform that action at this time.
0 commit comments