Skip to content

Commit 78cc5ef

Browse files
committed
baseline
1 parent 5d0517d commit 78cc5ef

File tree

6 files changed

+97
-62
lines changed

6 files changed

+97
-62
lines changed

reactor-core-3.1/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ jar {
2727
}
2828

2929
verifyInstrumentation {
30-
passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,3.3.0.RELEASE)'
30+
passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,3.3.0.RELEASE]'
3131
}

reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
package com.nr.instrumentation.reactor;
22

33
import com.newrelic.agent.bridge.AgentBridge;
4-
import com.newrelic.api.agent.NewRelic;
54
import com.newrelic.api.agent.Trace;
6-
import com.newrelic.api.agent.TransportType;
75

86
public class NRRunnableWrapper implements Runnable {
97

108
private Runnable delegate = null;
119

12-
private NRReactorHeaders headers;
10+
private NRTokenHolder headers;
1311
private static boolean isTransformed = false;
1412

15-
public NRRunnableWrapper(Runnable r, NRReactorHeaders h) {
13+
public NRRunnableWrapper(Runnable r, NRTokenHolder h) {
1614
delegate = r;
1715
headers = h;
1816
if(!isTransformed) {
@@ -22,17 +20,10 @@ public NRRunnableWrapper(Runnable r, NRReactorHeaders h) {
2220
}
2321

2422
@Override
25-
@Trace(dispatcher=true)
23+
@Trace(async=true)
2624
public void run() {
27-
boolean ignore = true;
2825
if(headers != null) {
29-
if(!headers.isEmpty()) {
30-
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers);
31-
ignore = false;
32-
}
33-
}
34-
if(ignore) {
35-
NewRelic.getAgent().getTransaction().ignore();
26+
headers.linkAndExpire();
3627
}
3728
if(delegate != null) {
3829
delegate.run();

reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import com.newrelic.agent.bridge.AgentBridge;
66
import com.newrelic.api.agent.NewRelic;
7+
import com.newrelic.api.agent.Token;
78
import com.newrelic.api.agent.Trace;
8-
import com.newrelic.api.agent.Transaction;
99

1010
import reactor.core.CoreSubscriber;
1111
import reactor.core.Fuseable;
@@ -20,7 +20,7 @@ public class NRSubscriberWrapper<T> implements CoreSubscriber<T>, QueueSubscript
2020

2121
private Subscription subscription = null;
2222

23-
private NRReactorHeaders headers = null;
23+
private NRTokenHolder headers = null;
2424

2525
private String name = null;
2626

@@ -32,62 +32,60 @@ public NRSubscriberWrapper(CoreSubscriber<T> sub, Scannable s) {
3232
actual = sub;
3333
name = s.name();
3434
if(name == null || name.isEmpty()) name = "Scannable";
35-
headers = new NRReactorHeaders();
36-
Transaction transaction = NewRelic.getAgent().getTransaction();
37-
if (transaction != null && ReactorUtils.activeTransaction()) {
38-
try {
39-
transaction.insertDistributedTraceHeaders(headers);
40-
} catch(Exception e) {
41-
String exceptionName = e.getClass().getSimpleName();
42-
NewRelic.incrementCounter("NRLabs/Reactor/NRSubscriberWrapper/Exception/"+exceptionName);
43-
}
35+
Token token = NewRelic.getAgent().getTransaction().getToken();
36+
if(token != null && token.isActive()) {
37+
headers = new NRTokenHolder(token);
38+
} else if(token != null) {
39+
token.expire();
40+
token = null;
4441
}
45-
4642
}
4743

4844
@Override
45+
@Trace(async = true)
4946
public void onNext(T t) {
50-
if(!ReactorUtils.activeTransaction()) {
51-
ReactorDispatcher.startOnNextTransaction(name, actual, t, headers);
52-
} else {
47+
if(headers != null) {
48+
headers.link();
49+
}
50+
if(actual != null) {
5351
actual.onNext(t);
5452
}
5553
}
5654

5755
@Override
56+
@Trace(async = true)
5857
public void onError(Throwable t) {
59-
if(!ReactorUtils.activeTransaction()) {
60-
ReactorDispatcher.startOnErrorTransaction(name, actual, headers,t);
61-
} else {
62-
ReactorUtils.deActivate();
58+
NewRelic.noticeError(t);
59+
if(headers != null) {
60+
headers.linkAndExpire();
61+
62+
}
63+
if(actual != null) {
6364
actual.onError(t);
6465
}
6566
}
6667

6768
@Override
6869
public void onComplete() {
69-
if(!ReactorUtils.activeTransaction()) {
70-
ReactorDispatcher.startOnCompleteTransaction(name, actual, headers);
71-
} else {
72-
ReactorUtils.deActivate();
70+
if(headers != null) {
71+
headers.linkAndExpire();
72+
73+
}
74+
if(actual != null) {
7375
actual.onComplete();
7476
}
77+
7578
}
7679

7780
@Override
7881
public void onSubscribe(Subscription s) {
7982
if(headers == null) {
80-
headers = new NRReactorHeaders();
81-
}
82-
if(headers.isEmpty()) {
83-
Transaction transaction = NewRelic.getAgent().getTransaction();
84-
if (transaction != null && ReactorUtils.activeTransaction()) {
85-
try {
86-
transaction.insertDistributedTraceHeaders(headers);
87-
} catch(Exception e) {
88-
String exceptionName = e.getClass().getSimpleName();
89-
NewRelic.incrementCounter("NRLabs/Reactor/NRSubscriberWrapper/Exception/"+exceptionName);
90-
}
83+
Token token = NewRelic.getAgent().getTransaction().getToken();
84+
if(token != null && token.isActive()) {
85+
headers = new NRTokenHolder(token);
86+
} else if(token != null) {
87+
token.expire();
88+
token = null;
9189
}
9290
}
9391
subscription = s;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.nr.instrumentation.reactor;
2+
3+
import com.newrelic.api.agent.Token;
4+
5+
public class NRTokenHolder {
6+
7+
private Token token = null;
8+
9+
public NRTokenHolder(Token t) {
10+
token = t;
11+
}
12+
13+
14+
public void link() {
15+
if(token != null) {
16+
token.link();
17+
}
18+
}
19+
20+
public void linkAndExpire() {
21+
if(token != null) {
22+
token.linkAndExpire();
23+
token = null;
24+
}
25+
}
26+
27+
public void expire() {
28+
if(token != null) {
29+
token.expire();
30+
token = null;
31+
}
32+
}
33+
}

reactor-core-3.1/src/main/java/reactor/core/scheduler/Scheduler.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
import java.util.concurrent.TimeUnit;
44

55
import com.newrelic.api.agent.NewRelic;
6+
import com.newrelic.api.agent.Token;
67
import com.newrelic.api.agent.Trace;
78
import com.newrelic.api.agent.weaver.MatchType;
89
import com.newrelic.api.agent.weaver.Weave;
910
import com.newrelic.api.agent.weaver.Weaver;
10-
import com.nr.instrumentation.reactor.NRReactorHeaders;
1111
import com.nr.instrumentation.reactor.NRRunnableWrapper;
12+
import com.nr.instrumentation.reactor.NRTokenHolder;
1213

1314
import reactor.core.Disposable;
1415

@@ -18,12 +19,16 @@ public abstract class Scheduler {
1819
@Trace
1920
public Disposable schedule(Runnable task) {
2021
if(!(task instanceof NRRunnableWrapper)) {
21-
NRReactorHeaders nrHeaders = new NRReactorHeaders();
22-
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders);
2322

24-
if(!nrHeaders.isEmpty()) {
25-
NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders);
23+
Token token = NewRelic.getAgent().getTransaction().getToken();
24+
25+
if(token != null && token.isActive()) {
26+
NRTokenHolder holder = new NRTokenHolder(token);
27+
NRRunnableWrapper wrapper = new NRRunnableWrapper(task, holder);
2628
task = wrapper;
29+
} else if(token != null) {
30+
token.expire();
31+
token = null;
2732
}
2833
}
2934
return Weaver.callOriginal();
@@ -34,25 +39,33 @@ public static class Worker {
3439

3540
public Disposable schedule(Runnable task) {
3641
if(!(task instanceof NRRunnableWrapper)) {
37-
NRReactorHeaders nrHeaders = new NRReactorHeaders();
38-
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders);
3942

40-
if(!nrHeaders.isEmpty()) {
41-
NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders);
43+
Token token = NewRelic.getAgent().getTransaction().getToken();
44+
45+
if(token != null && token.isActive()) {
46+
NRTokenHolder holder = new NRTokenHolder(token);
47+
NRRunnableWrapper wrapper = new NRRunnableWrapper(task, holder);
4248
task = wrapper;
49+
} else if(token != null) {
50+
token.expire();
51+
token = null;
4352
}
4453
}
4554
return Weaver.callOriginal();
4655
}
4756

4857
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
4958
if(!(task instanceof NRRunnableWrapper)) {
50-
NRReactorHeaders nrHeaders = new NRReactorHeaders();
51-
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders);
5259

53-
if(!nrHeaders.isEmpty()) {
54-
NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders);
60+
Token token = NewRelic.getAgent().getTransaction().getToken();
61+
62+
if(token != null && token.isActive()) {
63+
NRTokenHolder holder = new NRTokenHolder(token);
64+
NRRunnableWrapper wrapper = new NRRunnableWrapper(task, holder);
5565
task = wrapper;
66+
} else if(token != null) {
67+
token.expire();
68+
token = null;
5669
}
5770
}
5871
return Weaver.callOriginal();

reactor-core-3.3/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ jar {
2323
}
2424

2525
verifyInstrumentation {
26-
passes 'io.projectreactor:reactor-core:[3.3.0.RELEASE,)'
26+
passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,)'
2727
}

0 commit comments

Comments
 (0)