Skip to content

Commit

Permalink
Vert.x Core 3.x Plugin (apache#2386)
Browse files Browse the repository at this point in the history
* vertx plugin impl
  • Loading branch information
BFergerson authored and wu-sheng committed Apr 16, 2019
1 parent 11212c4 commit ce1c7aa
Show file tree
Hide file tree
Showing 29 changed files with 1,513 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public class ComponentsDefine {

public static final OfficialComponent ZOOKEEPER = new OfficialComponent(58, "Zookeeper");

public static final OfficialComponent VERTX = new OfficialComponent(59, "Vert.x");

private static ComponentsDefine INSTANCE = new ComponentsDefine();

private String[] components;
Expand All @@ -123,7 +125,7 @@ public static ComponentsDefine getInstance() {
}

public ComponentsDefine() {
components = new String[59];
components = new String[60];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
Expand Down Expand Up @@ -167,6 +169,7 @@ public ComponentsDefine() {
addComponent(REDISSON);
addComponent(LETTUCE);
addComponent(ZOOKEEPER);
addComponent(VERTX);
}

private void addComponent(OfficialComponent component) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public interface AbstractSpan extends AsyncSpan {
AbstractSpan setComponent(Component component);

/**
* Only use this method in explicit instrumentation, like opentracing-skywalking-bridge. It it higher recommend
* don't use this for performance consideration.
* Only use this method in explicit instrumentation, like opentracing-skywalking-bridge. It is highly recommended
* not to use this method for performance reasons.
*
* @param componentName
* @return the span for chaining.
Expand Down Expand Up @@ -127,7 +127,7 @@ public interface AbstractSpan extends AsyncSpan {
*/
void ref(TraceSegmentRef ref);

AbstractSpan start(long starttime);
AbstractSpan start(long startTime);

AbstractSpan setPeer(String remotePeer);
}
1 change: 1 addition & 0 deletions apm-sniffer/apm-sdk-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<module>canal-1.x-plugin</module>
<module>dubbo-2.7.x-plugin</module>
<module>dubbo-2.7.x-conflict-patch</module>
<module>vertx-plugins</module>
</modules>
<packaging>pom</packaging>

Expand Down
43 changes: 43 additions & 0 deletions apm-sniffer/apm-sdk-plugin/vertx-plugins/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-sdk-plugin</artifactId>
<version>6.1.0-SNAPSHOT</version>
</parent>

<artifactId>vertx-plugins</artifactId>
<modules>
<module>vertx-core-3.x-plugin</module>
</modules>
<packaging>pom</packaging>

<name>vertx-plugins</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sdk.plugin.related.dir>/..</sdk.plugin.related.dir>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>vertx-plugins</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>apm-vertx-core-3.x-plugin</artifactId>
<packaging>jar</packaging>

<name>vertx-core-3.x-plugin</name>
<url>http://maven.apache.org</url>

<properties>
<vertx.version>3.6.3</vertx.version>
</properties>

<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.vertx3;

import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.net.impl.ServerID;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

import java.lang.reflect.Method;

public class ClusteredEventBusSendRemoteInterceptor implements InstanceMethodsAroundInterceptor {

@Override
@SuppressWarnings("unchecked")
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextManager.getRuntimeContext().remove(VertxContext.STOP_SPAN_NECESSARY + "." + getClass().getName());

ClusteredMessage message = (ClusteredMessage) allArguments[1];
if (VertxContext.hasContext(message.address())) {
VertxContext context = VertxContext.popContext(message.address());
context.getSpan().asyncFinish();
} else {
ServerID sender = (ServerID) allArguments[0];
ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan span = ContextManager.createExitSpan(message.address(), contextCarrier, sender.toString());
span.setComponent(ComponentsDefine.VERTX);
SpanLayer.asRPCFramework(span);

CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
message.headers().add(next.getHeadKey(), next.getHeadValue());
}

if (message.replyAddress() != null) {
VertxContext.pushContext(message.replyAddress(),
new VertxContext(ContextManager.capture(), span.prepareForAsync()));
}
ContextManager.getRuntimeContext().put(VertxContext.STOP_SPAN_NECESSARY + "." + getClass().getName(), true);
}
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Boolean closeSpan = (Boolean) ContextManager.getRuntimeContext().get(
VertxContext.STOP_SPAN_NECESSARY + "." + getClass().getName());
if (Boolean.TRUE.equals(closeSpan)) {
ContextManager.stopSpan();
}
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.vertx3;

import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

import java.lang.reflect.Method;

/**
* @author brandon.fergerson
*/
public class EventBusImplDeliverToHandlerInterceptor implements InstanceMethodsAroundInterceptor {

@Override
@SuppressWarnings("unchecked")
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextManager.getRuntimeContext().remove(VertxContext.STOP_SPAN_NECESSARY + "." + getClass().getName());

Message message = (Message) allArguments[0];
boolean isFromWire = message instanceof ClusteredMessage && ((ClusteredMessage) message).isFromWire();
if (!isFromWire && VertxContext.hasContext(message.address())) {
VertxContext context = VertxContext.popContext(message.address());
context.getSpan().asyncFinish();
} else if (!isFromWire) {
AbstractSpan span;
if (VertxContext.hasContext(message.replyAddress())) {
VertxContext context = VertxContext.peekContext(message.replyAddress());
span = ContextManager.createLocalSpan(context.getContextSnapshot().getParentOperationName());
ContextManager.continued(context.getContextSnapshot());
} else {
span = ContextManager.createLocalSpan(message.address());
}
span.setComponent(ComponentsDefine.VERTX);
SpanLayer.asRPCFramework(span);

if (message.replyAddress() != null) {
VertxContext.pushContext(message.replyAddress(),
new VertxContext(ContextManager.capture(), span.prepareForAsync()));
}
ContextManager.getRuntimeContext().put(VertxContext.STOP_SPAN_NECESSARY + "." + getClass().getName(), true);
}
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Boolean closeSpan = (Boolean) ContextManager.getRuntimeContext().get(
VertxContext.STOP_SPAN_NECESSARY + "." + getClass().getName());
if (Boolean.TRUE.equals(closeSpan)) {
ContextManager.stopSpan();
}
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
Loading

0 comments on commit ce1c7aa

Please sign in to comment.