diff --git a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs index 6d8dadbac..eda1400a1 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs @@ -9,6 +9,7 @@ using System.Runtime.InteropServices; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; +using Microsoft.Spark.Services; using Microsoft.Spark.UnitTest.TestUtils; using Xunit; @@ -21,6 +22,10 @@ namespace Microsoft.Spark.E2ETest /// public sealed class SparkFixture : IDisposable { + + private readonly ILoggerService _logger = + LoggerServiceFactory.GetLogger(typeof(SparkFixture)); + /// /// The names of environment variables used by the SparkFixture. /// @@ -112,11 +117,14 @@ public SparkFixture() Spark.SparkContext.SetLogLevel(DefaultLogLevel); Jvm = ((IJvmObjectReferenceProvider)Spark).Reference.Jvm; + + _logger.LogInfo("SparkSession created."); } public void Dispose() { Spark.Dispose(); + _logger.LogInfo("SparkSession disposed."); // CSparkRunner will exit upon receiving newline from // the standard input stream. diff --git a/src/csharp/Microsoft.Spark.UnitTest/Utils/JVMBridgeHelperTests.cs b/src/csharp/Microsoft.Spark.UnitTest/Utils/JVMBridgeHelperTests.cs new file mode 100644 index 000000000..a2b6ad2ca --- /dev/null +++ b/src/csharp/Microsoft.Spark.UnitTest/Utils/JVMBridgeHelperTests.cs @@ -0,0 +1,86 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.NetworkInformation; +using System.Net; +using Microsoft.Spark.Interop.Ipc; +using Microsoft.Spark.Network; +using Microsoft.Spark.Sql; +using Microsoft.Spark.Sql.Types; +using Microsoft.Spark.UnitTest.TestUtils; +using Microsoft.Spark.Utils; +using Moq; +using Xunit; + +namespace Microsoft.Spark.UnitTest +{ + public class JVMBridgeHelperTests + { + [Theory] + [InlineData("1,2,3", 0, false)] + [InlineData("5567,5568,5569", 0, true)] + [InlineData("1,2,3", 5567, false)] + [InlineData("5567,5568,5569", 5567, true)] + [InlineData("1234,5678", 1234, true)] + public void IsDotnetBackendPortUsingTest(string testListeningPorts, int backendport, bool expect) + { + var envkey = "DOTNETBACKEND_PORT"; + var oldenvValue = Environment.GetEnvironmentVariable(envkey); + try + { + if (backendport == 0) + { + Environment.SetEnvironmentVariable(envkey, null); + } + else + { + Environment.SetEnvironmentVariable(envkey, backendport.ToString()); + } + + var listeningEndpoints = testListeningPorts + .Split(",").Select(x => new IPEndPoint(0, Convert.ToInt32(x))).ToArray(); + + var ipinfo = new Mock(); + ipinfo.Setup(m => m.GetActiveTcpListeners()) + .Returns(listeningEndpoints); + + var ret = JVMBridgeHelper.IsDotnetBackendPortUsing(ipinfo.Object); + Assert.Equal(ret, expect); + } + finally + { + Environment.SetEnvironmentVariable(envkey, oldenvValue); + } + } + + /// + /// The main test case of JVM bridge helper, + /// is simply run it and close it. + /// + [Fact] + public void JVMBridgeHelperMainPathTest() + { + using (var helper = new JVMBridgeHelper()) + { + // now we should be able to connect to JVM bridge + // or if system environment is not good, we should not failed. + } + } + + [Fact] + public void JVMBridgeHelperWithoutSpark() + { + var oldhome = Environment.GetEnvironmentVariable("SPARK_HOME"); + Environment.SetEnvironmentVariable("SPARK_HOME", null); + using (var helper = new JVMBridgeHelper()) + { + } + Environment.SetEnvironmentVariable("SPARK_HOME", oldhome); + } + } +} \ No newline at end of file diff --git a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs index f0eab693f..6e9008ff5 100644 --- a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs +++ b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs @@ -11,6 +11,7 @@ using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql.Streaming; using Microsoft.Spark.Sql.Types; +using Microsoft.Spark.Utils; namespace Microsoft.Spark.Sql { @@ -27,6 +28,12 @@ public sealed class SparkSession : IDisposable, IJvmObjectReferenceProvider private static readonly string s_sparkSessionClassName = "org.apache.spark.sql.SparkSession"; + /// + /// The created jvm process + /// + /// The created jvm bridge process helper. + private static JVMBridgeHelper s_jvmbridge = null; + /// /// Constructor for SparkSession. /// @@ -59,7 +66,17 @@ internal SparkSession(JvmObjectReference jvmObject) /// Creates a Builder object for SparkSession. /// /// Builder object - public static Builder Builder() => new Builder(); + public static Builder Builder() + { + // We could try to detect if we don't have jvm bridge, + // call the helper to launch jvm bridge process. + if ((s_jvmbridge == null) && + (JVMBridgeHelper.IsDotnetBackendPortUsing() == false)) + { + s_jvmbridge = new JVMBridgeHelper(); + } + return new Builder(); + } /// Note that *ActiveSession() APIs are not exposed because these APIs work with a /// thread-local variable, which stores the session variable. Since the Netty server diff --git a/src/csharp/Microsoft.Spark/Utils/JVMBridgeHelper.cs b/src/csharp/Microsoft.Spark/Utils/JVMBridgeHelper.cs new file mode 100644 index 000000000..cb541115e --- /dev/null +++ b/src/csharp/Microsoft.Spark/Utils/JVMBridgeHelper.cs @@ -0,0 +1,163 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.Linq; +using System.Net.NetworkInformation; +using System.Diagnostics; +using System.Threading.Tasks; +using System.Runtime.InteropServices; +using Microsoft.Spark.Services; +using Microsoft.Spark.Interop; + +namespace Microsoft.Spark.Utils +{ + /// + /// An helper to launch dotnet jvm if needed + /// + internal class JVMBridgeHelper : IDisposable + { + /// + /// Customization for JVM Bridge jar file + /// If not exists, the helper will find out the jar in $DOTNET_WORKER_DIR folder. + /// + internal static string JVMBridgeJarEnvName = "DOTNET_BRIDGE_JAR"; + + /// + /// Generate spark settings for the running system + /// + internal static SparkSettings sparkSettings = new SparkSettings(); + + /// + /// DotnetRunner classname + /// + private static string RunnerClassname = + "org.apache.spark.deploy.dotnet.DotnetRunner"; + + private static string RunnerReadyMsg = + ".NET Backend running debug mode. Press enter to exit"; + + private static string RunnerAddressInUseMsg = + "java.net.BindException: Address already in use"; + + + private static int maxWaitTimeoutMS = 60000; + + private readonly ILoggerService _logger = + LoggerServiceFactory.GetLogger(typeof(JVMBridgeHelper)); + + /// + /// The running jvm bridge process , null means no such process + /// + private Process jvmBridge; + + /// + /// Detect if we already have the runner by checking backend port is using or not. + /// + /// custom IPGlobalProperties, null for System.Net.NetworkInformation + /// True means backend port is occupied by the runner. + internal static bool IsDotnetBackendPortUsing( + IPGlobalProperties customIPGlobalProperties = null) + { + var backendport = SparkEnvironment.ConfigurationService.GetBackendPortNumber(); + var listeningEndpoints = + (customIPGlobalProperties ?? IPGlobalProperties.GetIPGlobalProperties()) + .GetActiveTcpListeners(); + return listeningEndpoints.Any(p => p.Port == backendport); + } + + internal JVMBridgeHelper() + { + var jarpath = locateBridgeJar(); + if (string.IsNullOrWhiteSpace(jarpath) || + string.IsNullOrWhiteSpace(sparkSettings.SPARK_SUBMIT)) + { + // Cannot find correct launch informations, give up. + return; + } + var arguments = $"--class {RunnerClassname} {jarpath} debug"; + var startupinfo = new ProcessStartInfo + { + FileName = sparkSettings.SPARK_SUBMIT, + Arguments = arguments, + RedirectStandardOutput = true, + RedirectStandardInput = true, + UseShellExecute = false, + CreateNoWindow = true, + }; + + jvmBridge = new Process() { StartInfo = startupinfo }; + _logger.LogInfo($"Launch JVM Bridge : {sparkSettings.SPARK_SUBMIT} {arguments}"); + jvmBridge.Start(); + + // wait until we see .net backend started + Task message; + while ((message = jvmBridge.StandardOutput.ReadLineAsync()) != null) + { + if (message.Wait(maxWaitTimeoutMS) == false) + { + // wait timeout , giveup + break; + } + + if (message.Result.Contains(RunnerReadyMsg)) + { + // launched successfully! + jvmBridge.StandardOutput.ReadToEndAsync(); + _logger.LogInfo($"Launch JVM Bridge ready"); + return; + } + if (message.Result.Contains(RunnerAddressInUseMsg)) + { + // failed to start for port is using, give up. + jvmBridge.StandardOutput.ReadToEndAsync(); + break; + } + } + // wait timeout , or failed to startup + // give up. + jvmBridge.Close(); + jvmBridge = null; + } + + /// + /// Locate + /// + private string locateBridgeJar() + { + var jarpath = Environment.GetEnvironmentVariable(JVMBridgeJarEnvName); + if (string.IsNullOrWhiteSpace(jarpath) == false) + { + return jarpath; + } + + var workdir = Environment.GetEnvironmentVariable( + ConfigurationService.WorkerDirEnvVarName); + if ((workdir != null) && Directory.Exists(workdir)) + { + // let's find the approicate jar in the work dirctory. + var jarfile = new DirectoryInfo(workdir) + .GetFiles("microsoft-spark-*.jar") + .FirstOrDefault(); + if (jarfile != null) + { + return Path.Combine(jarfile.DirectoryName, jarfile.Name); + } + } + + return string.Empty; + } + + public void Dispose() + { + if (jvmBridge != null) + { + jvmBridge.StandardInput.WriteLine("\n"); + jvmBridge.WaitForExit(maxWaitTimeoutMS); + _logger.LogInfo($"JVM Bridge disposed."); + } + } + } +} \ No newline at end of file diff --git a/src/csharp/Microsoft.Spark/Utils/SparkSettings.cs b/src/csharp/Microsoft.Spark/Utils/SparkSettings.cs new file mode 100644 index 000000000..aa5a61e4d --- /dev/null +++ b/src/csharp/Microsoft.Spark/Utils/SparkSettings.cs @@ -0,0 +1,150 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.Linq; +using System.Collections.Generic; +using System.Runtime.InteropServices; + +namespace Microsoft.Spark.Utils +{ + /// + /// Discover the spark settings + /// + internal class SparkSettings + { + private static string SPARK_HOME_ENV_KEY = "SPARK_HOME"; + + /// + /// The spark home + /// This would be empty string when failed to findout. + /// + public string SPARK_HOME {get; private set;} + + /// + /// The spark submit file + /// + public string SPARK_SUBMIT {get;private set;} + + + public Version Version { get; private set; } + + private static string sparksubmitcmd = + RuntimeInformation.IsOSPlatform(OSPlatform.Windows) + ? "spark-submit.cmd" : "spark-submit"; + + /// + /// Init the spark settings + /// + public SparkSettings() + { + find_spark_home(); + this.SPARK_SUBMIT = locateSparkSubmit(SPARK_HOME); + InitVersion(); + } + + /// + /// Locate SPARK_HOME path + /// + /// Return empty string when failed to find out. + private void find_spark_home() + { + SPARK_HOME = Environment.GetEnvironmentVariable(SPARK_HOME_ENV_KEY); + if (string.IsNullOrWhiteSpace(SPARK_HOME) == false) + { + SPARK_HOME = string.Empty; + return; + } + foreach (var possiblehome in possible_spark_home()) + { + if (isSparkHome(possiblehome)) + { + SPARK_HOME = possiblehome; + Environment.SetEnvironmentVariable(SPARK_HOME_ENV_KEY, SPARK_HOME); + return; + } + } + } + + private IEnumerable possible_spark_home() + { + var pathSeperator = ':'; + var envPath = "PATH"; + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + pathSeperator = ';'; + envPath = "Path"; + } + // try Env Pathes if we could locate spark + foreach (var path in Environment.GetEnvironmentVariable(envPath).Split(pathSeperator)) + { + yield return path; + yield return Path.GetFullPath(Path.Combine(path, "..")); + } + } + + /// + /// find out the existing spark-submit file + /// + private string locateSparkSubmit(string sparkhome) + { + var fn = Path.Combine(sparkhome ?? string.Empty, "bin", sparksubmitcmd); + return File.Exists(fn) + ? fn : string.Empty; + } + + private string locateJarFolder(string sparkhome) + { + var possible = new string[] { "jars", "assembly" }; + foreach (var tryfolder in possible) + { + var folder = Path.Combine(sparkhome, tryfolder); + if (Directory.Exists(folder)) + { + return folder; + } + } + return string.Empty; + } + + /// + /// Check if it is a reasonable SPARK_HOME + /// + private bool isSparkHome(string path) + { + return (locateSparkSubmit(path) != string.Empty) && + (locateJarFolder(path) != string.Empty); + } + + /// + /// After init spark home, try to find out spark version + /// + private void InitVersion() + { + var releasefile = $"{SPARK_HOME}{Path.DirectorySeparatorChar}RELEASE"; + var sparkversionstr = string.Empty; + if (File.Exists(releasefile)) + { + // First line of the RELEASE file under SPARK_HOME will be something similar to: + // Spark 2.3.2 built for Hadoop 2.7.3 + var firstLine = File.ReadLines(releasefile).FirstOrDefault(); + var columns = firstLine?.Split(' '); + if (columns.Length >= 1) + { + sparkversionstr = columns[1]; + } + } + if (string.IsNullOrWhiteSpace(sparkversionstr)) + { + this.Version = new Version(); + } + else + { + this.Version = new Version(sparkversionstr); + } + } + + } +} \ No newline at end of file