Skip to content

Commit

Permalink
Merge pull request dotnet#2835 from shyamnamboodiripad/cleanup
Browse files Browse the repository at this point in the history
Cleanup some code in StdIoKernelConnector
  • Loading branch information
shyamnamboodiripad authored Mar 15, 2023
2 parents f8f961b + d439ff4 commit 0f9c00d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 41 deletions.
2 changes: 0 additions & 2 deletions src/dotnet-interactive.Tests/StdIoKernelConnectorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public async Task it_can_create_a_proxy_kernel_to_more_than_one_remote_subkernel
using var csharpProxyKernel = await connector.CreateProxyKernelAsync(remoteInfo: csharpKernelInfo);
var expectedCSharpKernelInfo = new KernelInfo(csharpKernelInfo.LocalName)
{
DisplayName = csharpKernelInfo.DisplayName,
IsProxy = true,
IsComposite = false,
LanguageName = csharpKernelInfo.LanguageName,
Expand All @@ -112,7 +111,6 @@ public async Task it_can_create_a_proxy_kernel_to_more_than_one_remote_subkernel
using var fsharpProxyKernel = await connector.CreateProxyKernelAsync(remoteInfo: fsharpKernelInfo, localNameOverride: "fsharp2");
var expectedFSharpKernelInfo = new KernelInfo("fsharp2")
{
DisplayName = fsharpKernelInfo.DisplayName,
IsProxy = true,
IsComposite = false,
LanguageName = fsharpKernelInfo.LanguageName,
Expand Down
71 changes: 32 additions & 39 deletions src/dotnet-interactive/Connection/StdIoKernelConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#nullable enable

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Reactive.Disposables;
Expand Down Expand Up @@ -35,12 +37,11 @@ public class StdIoKernelConnector : IKernelConnector
private readonly Uri _kernelHostUri;
private readonly DirectoryInfo _workingDirectory;

private readonly Dictionary<string, KernelInfo> _remoteKernelInfos;
private readonly ConcurrentDictionary<string, KernelInfo> _remoteKernelInfoCache;
private KernelCommandAndEventReceiver? _receiver;
private KernelCommandAndEventSender? _sender;
private Process? _process;
private RefCountDisposable? _refCountDisposable;
private KernelReady? _kernelReady;

public int? ProcessId => _process?.Id;

Expand All @@ -55,7 +56,7 @@ public StdIoKernelConnector(
_kernelHostUri = kernelHostUri;
_workingDirectory = workingDirectory ?? new DirectoryInfo(Environment.CurrentDirectory);

_remoteKernelInfos = new Dictionary<string, KernelInfo>();
_remoteKernelInfoCache = new ConcurrentDictionary<string, KernelInfo>();
}

/// <remarks>
Expand Down Expand Up @@ -130,31 +131,25 @@ public async Task<ProxyKernel> CreateRootProxyKernelAsync()
await Task.Yield();

_process.Start();

activity.Info("Process id: {0}", _process.Id);

_receiver = KernelCommandAndEventReceiver.FromObservable(stdOutObservable);
_kernelReady = null;

KernelReady? kernelReady = null;
_receiver.Select(coe => coe.Event)
.OfType<KernelReady>()
.Take(1)
.Subscribe(e =>
{
_kernelReady = e;

kernelReady = e;
UpdateRemoteKernelInfoCache(kernelReady.KernelInfos);
});

_receiver.Select(coe => coe.Event)
.OfType<KernelInfoProduced>()
.Subscribe(e =>
{
var info = e.KernelInfo;
var name = info.LocalName;

lock (_remoteKernelInfos)
{
_remoteKernelInfos[name] = info;
}
UpdateRemoteKernelInfoCache(e.KernelInfo);
});

_sender = KernelCommandAndEventSender.FromTextWriter(
Expand All @@ -179,7 +174,7 @@ public async Task<ProxyKernel> CreateRootProxyKernelAsync()
_process.BeginOutputReadLine();
_process.BeginErrorReadLine();

while (_kernelReady is null)
while (kernelReady is null)
{
await Task.Delay(20);

Expand Down Expand Up @@ -209,22 +204,38 @@ public async Task<ProxyKernel> CreateRootProxyKernelAsync()
rootProxyKernel.RegisterForDisposal(_refCountDisposable);
}

if (_kernelReady is { })
var remoteRootKernelInfo = GetCachedKernelInfoForRemoteRoot();
rootProxyKernel.UpdateKernelInfo(remoteRootKernelInfo);
return rootProxyKernel;
}

private void UpdateRemoteKernelInfoCache(IEnumerable<KernelInfo> infos)
{
foreach (var info in infos)
{
var kernelInfo = _kernelReady.KernelInfos.Single(k => k.Uri == _kernelHostUri);
rootProxyKernel.UpdateKernelInfo(kernelInfo);
UpdateRemoteKernelInfoCache(info);
}
}

return rootProxyKernel;
private void UpdateRemoteKernelInfoCache(KernelInfo info)
{
var name = info.LocalName;
_remoteKernelInfoCache[name] = info;
}

private KernelInfo GetCachedKernelInfoForRemoteRoot()
=> _remoteKernelInfoCache.Values.Single(k => k.Uri == _kernelHostUri);

private bool TryGetCachedKernelInfoByRemoteName(string remoteName, [NotNullWhen(true)] out KernelInfo? remoteInfo)
=> _remoteKernelInfoCache.TryGetValue(remoteName, out remoteInfo);

public async Task<ProxyKernel> CreateProxyKernelAsync(string remoteName, string? localNameOverride = null)
{
using var rootProxyKernel = await CreateRootProxyKernelAsync();

ProxyKernel proxyKernel;

if (_remoteKernelInfos.TryGetValue(remoteName, out var remoteInfo))
if (TryGetCachedKernelInfoByRemoteName(remoteName, out var remoteInfo))
{
proxyKernel = await CreateProxyKernelAsync(remoteInfo, localNameOverride);
}
Expand Down Expand Up @@ -272,31 +283,13 @@ public async Task<ProxyKernel> CreateProxyKernelAsync(KernelInfo remoteInfo, str
_receiver,
remoteInfo.Uri);

UpdateKernelInfo(proxyKernel, remoteInfo);
proxyKernel.UpdateKernelInfo(remoteInfo);

proxyKernel.RegisterForDisposal(_refCountDisposable!.GetDisposable());

return proxyKernel;
}

private static void UpdateKernelInfo(ProxyKernel proxyKernel, KernelInfo remoteInfo)
{
proxyKernel.KernelInfo.DisplayName = remoteInfo.DisplayName;
proxyKernel.KernelInfo.IsComposite = remoteInfo.IsComposite;
proxyKernel.KernelInfo.LanguageName = remoteInfo.LanguageName;
proxyKernel.KernelInfo.LanguageVersion = remoteInfo.LanguageVersion;

foreach (var directive in remoteInfo.SupportedDirectives)
{
proxyKernel.KernelInfo.SupportedDirectives.Add(directive);
}

foreach (var command in remoteInfo.SupportedKernelCommands)
{
proxyKernel.KernelInfo.SupportedKernelCommands.Add(command);
}
}

private void SendQuitCommand()
{
if (_sender is not null)
Expand Down

0 comments on commit 0f9c00d

Please sign in to comment.