Skip to content

Commit 16f4de4

Browse files
authored
core/stats: report message count metrics to Census. (grpc#3312)
1 parent 02cb718 commit 16f4de4

3 files changed

Lines changed: 54 additions & 0 deletions

File tree

core/src/main/java/io/grpc/internal/CensusStatsModule.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ ClientInterceptor getClientInterceptor() {
133133
}
134134

135135
private static final class ClientTracer extends ClientStreamTracer {
136+
final AtomicLong outboundMessageCount = new AtomicLong();
137+
final AtomicLong inboundMessageCount = new AtomicLong();
136138
final AtomicLong outboundWireSize = new AtomicLong();
137139
final AtomicLong inboundWireSize = new AtomicLong();
138140
final AtomicLong outboundUncompressedSize = new AtomicLong();
@@ -157,6 +159,16 @@ public void outboundUncompressedSize(long bytes) {
157159
public void inboundUncompressedSize(long bytes) {
158160
inboundUncompressedSize.addAndGet(bytes);
159161
}
162+
163+
@Override
164+
public void inboundMessage() {
165+
inboundMessageCount.incrementAndGet();
166+
}
167+
168+
@Override
169+
public void outboundMessage() {
170+
outboundMessageCount.incrementAndGet();
171+
}
160172
}
161173

162174
@VisibleForTesting
@@ -209,6 +221,8 @@ void callEnded(Status status) {
209221
MeasurementMap.Builder builder = MeasurementMap.builder()
210222
// The metrics are in double
211223
.put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI)
224+
.put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount.get())
225+
.put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount.get())
212226
.put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize.get())
213227
.put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize.get())
214228
.put(
@@ -234,6 +248,8 @@ private final class ServerTracer extends ServerStreamTracer {
234248
private final StatsContext parentCtx;
235249
private final AtomicBoolean streamClosed = new AtomicBoolean(false);
236250
private final Stopwatch stopwatch;
251+
private final AtomicLong outboundMessageCount = new AtomicLong();
252+
private final AtomicLong inboundMessageCount = new AtomicLong();
237253
private final AtomicLong outboundWireSize = new AtomicLong();
238254
private final AtomicLong inboundWireSize = new AtomicLong();
239255
private final AtomicLong outboundUncompressedSize = new AtomicLong();
@@ -265,6 +281,16 @@ public void inboundUncompressedSize(long bytes) {
265281
inboundUncompressedSize.addAndGet(bytes);
266282
}
267283

284+
@Override
285+
public void inboundMessage() {
286+
inboundMessageCount.incrementAndGet();
287+
}
288+
289+
@Override
290+
public void outboundMessage() {
291+
outboundMessageCount.incrementAndGet();
292+
}
293+
268294
/**
269295
* Record a finished stream and mark the current time as the end time.
270296
*
@@ -281,6 +307,8 @@ public void streamClosed(Status status) {
281307
MeasurementMap.Builder builder = MeasurementMap.builder()
282308
// The metrics are in double
283309
.put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI)
310+
.put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount.get())
311+
.put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount.get())
284312
.put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize.get())
285313
.put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize.get())
286314
.put(

core/src/test/java/io/grpc/internal/CensusModulesTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,16 +322,20 @@ public void clientBasicStatsDefaultContext() {
322322
tracer.outboundHeaders();
323323

324324
fakeClock.forwardTime(100, MILLISECONDS);
325+
tracer.outboundMessage();
325326
tracer.outboundWireSize(1028);
326327
tracer.outboundUncompressedSize(1128);
327328

328329
fakeClock.forwardTime(16, MILLISECONDS);
330+
tracer.inboundMessage();
329331
tracer.inboundWireSize(33);
330332
tracer.inboundUncompressedSize(67);
333+
tracer.outboundMessage();
331334
tracer.outboundWireSize(99);
332335
tracer.outboundUncompressedSize(865);
333336

334337
fakeClock.forwardTime(24, MILLISECONDS);
338+
tracer.inboundMessage();
335339
tracer.inboundWireSize(154);
336340
tracer.inboundUncompressedSize(552);
337341
tracer.streamClosed(Status.OK);
@@ -345,9 +349,11 @@ public void clientBasicStatsDefaultContext() {
345349
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
346350
assertEquals(Status.Code.OK.toString(), statusTag.toString());
347351
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
352+
assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT));
348353
assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
349354
assertEquals(1128 + 865,
350355
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
356+
assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT));
351357
assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
352358
assertEquals(67 + 552,
353359
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
@@ -391,9 +397,11 @@ public void clientStreamNeverCreatedStillRecordStats() {
391397
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
392398
assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString());
393399
assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT));
400+
assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT));
394401
assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
395402
assertEquals(0,
396403
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
404+
assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT));
397405
assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
398406
assertEquals(0,
399407
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
@@ -579,16 +587,20 @@ public void serverBasicStatsNoHeaders() {
579587
Context filteredContext = tracer.filterContext(Context.ROOT);
580588
assertNull(STATS_CONTEXT_KEY.get(filteredContext));
581589

590+
tracer.inboundMessage();
582591
tracer.inboundWireSize(34);
583592
tracer.inboundUncompressedSize(67);
584593

585594
fakeClock.forwardTime(100, MILLISECONDS);
595+
tracer.outboundMessage();
586596
tracer.outboundWireSize(1028);
587597
tracer.outboundUncompressedSize(1128);
588598

589599
fakeClock.forwardTime(16, MILLISECONDS);
600+
tracer.inboundMessage();
590601
tracer.inboundWireSize(154);
591602
tracer.inboundUncompressedSize(552);
603+
tracer.outboundMessage();
592604
tracer.outboundWireSize(99);
593605
tracer.outboundUncompressedSize(865);
594606

@@ -604,9 +616,11 @@ public void serverBasicStatsNoHeaders() {
604616
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
605617
assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
606618
assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT));
619+
assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT));
607620
assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
608621
assertEquals(1128 + 865,
609622
record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
623+
assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT));
610624
assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES));
611625
assertEquals(67 + 552,
612626
record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
@@ -659,6 +673,8 @@ public void convertToTracingStatus() {
659673

660674
private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
661675
assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
676+
assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_COUNT));
677+
assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_COUNT));
662678
assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
663679
assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
664680
assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME));
@@ -669,6 +685,8 @@ private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
669685

670686
private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) {
671687
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
688+
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_COUNT));
689+
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_COUNT));
672690
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
673691
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
674692
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,10 @@ private static void checkCensusMetrics(MetricsRecord record, boolean server,
18161816
uncompressedResponsesSize += response.getSerializedSize();
18171817
}
18181818
if (server) {
1819+
assertEquals(
1820+
requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT));
1821+
assertEquals(
1822+
responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT));
18191823
assertEquals(uncompressedRequestsSize,
18201824
record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
18211825
assertEquals(uncompressedResponsesSize,
@@ -1826,6 +1830,10 @@ private static void checkCensusMetrics(MetricsRecord record, boolean server,
18261830
assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
18271831
assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
18281832
} else {
1833+
assertEquals(
1834+
requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT));
1835+
assertEquals(
1836+
responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT));
18291837
assertEquals(uncompressedRequestsSize,
18301838
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
18311839
assertEquals(uncompressedResponsesSize,

0 commit comments

Comments
 (0)