Skip to content

Commit

Permalink
fix: fix race condition in BuiltinMetricsTracer (#1320)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf committed Jul 26, 2022
1 parent 5282589 commit 644454a
Showing 1 changed file with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
// flushed to memory.
private final Stopwatch serverLatencyTimer = Stopwatch.createUnstarted();
private final AtomicBoolean serverLatencyTimerIsRunning = new AtomicBoolean();
private boolean serverLatencyTimerIsRunning = false;
private final Object timerLock = new Object();

private boolean flowControlIsDisabled = false;

Expand Down Expand Up @@ -117,8 +118,11 @@ public void attemptStarted(Object request, int attemptNumber) {
this.tableId = Util.extractTableId(request);
}
if (!flowControlIsDisabled) {
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
synchronized (timerLock) {
if (!serverLatencyTimerIsRunning) {
serverLatencyTimer.start();
serverLatencyTimerIsRunning = true;
}
}
}
}
Expand All @@ -144,8 +148,11 @@ public void onRequest(int requestCount) {
if (flowControlIsDisabled) {
// On request is only called when auto flow control is disabled. When auto flow control is
// disabled, server latency is measured between onRequest and onResponse.
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
synchronized (timerLock) {
if (!serverLatencyTimerIsRunning) {
serverLatencyTimer.start();
serverLatencyTimerIsRunning = true;
}
}
}
}
Expand All @@ -159,9 +166,12 @@ public void responseReceived() {
// When auto flow control is disabled and application requested multiple responses, server
// latency is measured between afterResponse and responseReceived.
// In all the cases, we want to stop the serverLatencyTimer here.
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
synchronized (timerLock) {
if (serverLatencyTimerIsRunning) {
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
serverLatencyTimerIsRunning = false;
}
}
}

Expand All @@ -172,8 +182,11 @@ public void afterResponse(long applicationLatency) {
// measured between after the last response is processed and before the next response is
// received. If flow control is disabled but requestLeft is greater than 0,
// also start the timer to count the time between afterResponse and responseReceived.
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
synchronized (timerLock) {
if (!serverLatencyTimerIsRunning) {
serverLatencyTimer.start();
serverLatencyTimerIsRunning = true;
}
}
}
}
Expand Down Expand Up @@ -235,10 +248,13 @@ private void recordOperationCompletion(@Nullable Throwable status) {
private void recordAttemptCompletion(@Nullable Throwable status) {
// If the attempt failed, the time spent in retry should be counted in application latency.
// Stop the stopwatch and decrement requestLeft.
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
requestLeft.decrementAndGet();
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
synchronized (timerLock) {
if (serverLatencyTimerIsRunning) {
requestLeft.decrementAndGet();
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
serverLatencyTimerIsRunning = false;
}
}
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
Expand Down

0 comments on commit 644454a

Please sign in to comment.