Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions DSharpPlus.Voice/Transport/DiscordTransportService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

using DSharpPlus.Voice.Transport.Models;

using Microsoft.Extensions.Logging;

namespace DSharpPlus.Voice.Transport;

public class DiscordTransportService : ITransportService
{
private readonly ILogger<DiscordTransportService> logger;
private readonly ConcurrentDictionary<int, List<Func<string, DiscordTransportService, Task>>> jsonHandlers;
private readonly ConcurrentDictionary<int, List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>> binaryHandlers;
private readonly TransportService transportService;

internal DiscordTransportService(Uri uri,
ConcurrentDictionary<int, List<Func<string, DiscordTransportService, Task>>> jsonHandlers,
ConcurrentDictionary<int, List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>> binaryHandlers,
ILogger<DiscordTransportService> logger,
Action<ClientWebSocketOptions>? configureOptions = null)
{
this.transportService = new TransportService(uri, OnBaseTextAsync, OnBaseBinaryAsync, configureOptions);
this.jsonHandlers = jsonHandlers;
this.binaryHandlers = binaryHandlers;
this.logger = logger;
}

/// <inheritdoc/>
public async Task ConnectAsync(CancellationToken? cancellationToken = null) => await this.transportService.ConnectAsync(cancellationToken);

/// <inheritdoc/>
private async Task OnBaseTextAsync(string messageText)
{
BaseDiscordGatewayMessage? message = JsonSerializer.Deserialize<BaseDiscordGatewayMessage>(messageText);
List<Task> handlerTasks = [];

if (message == null)
{
this.logger.LogWarning("Invalid string message was received!");
this.logger.LogDebug("Invalid string message was received: {messageText}", messageText);

return;
}

if (this.jsonHandlers.TryGetValue(message.OpCode, out List<Func<string, DiscordTransportService, Task>>? handler))
{
handlerTasks = [.. handler.Select(async x => await x.Invoke(messageText, this))];
}

await Task.WhenAll(handlerTasks);
}

/// <inheritdoc/>
private async Task OnBaseBinaryAsync(ReadOnlyMemory<byte> binaryResponse)
{
List<Task> handlerTasks = [];
if (binaryResponse.Length < 3)
{
// log invalid binary message was received
this.logger.LogWarning("Invalid binary message was received!");

return;
}

int opCode = binaryResponse.Span[2];
if (this.binaryHandlers.TryGetValue(opCode, out List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>? handler))
{
handlerTasks = [.. handler.Select(async (x, y) => await x.Invoke(binaryResponse, this))];
}

await Task.WhenAll(handlerTasks);
}

/// <inheritdoc/>
public async Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken? token = null) => await this.transportService.SendAsync(data, token);
/// <inheritdoc/>
public async Task SendAsync<T>(T data, CancellationToken? token = null) => await this.transportService.SendAsync(data, token);
}

public class DiscordTransportServiceBuilder : IDiscordTransportServiceBuilder
{
private readonly ILogger<DiscordTransportService> logger;
private readonly ConcurrentDictionary<int, List<Func<string, DiscordTransportService, Task>>> jsonHandlers = [];
private readonly ConcurrentDictionary<int, List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>> binaryHandlers = [];
private Action<ClientWebSocketOptions>? configureOptions = null;

public DiscordTransportServiceBuilder(ILogger<DiscordTransportService> logger) => this.logger = logger;

/// <inheritdoc/>
public void AddJsonHandler<T>(int opCode, Func<T, DiscordTransportService, Task> handler)
{
if (!this.jsonHandlers.TryGetValue(opCode, out List<Func<string, DiscordTransportService, Task>>? handlerTasks))
{
handlerTasks = [];
}

handlerTasks.Add(async (x, y) =>
{
T? deserializedObject = default;

try
{
deserializedObject = JsonSerializer.Deserialize<T>(x);

}
catch
{
}

if (deserializedObject is null)
{
this.logger.LogWarning("Json handler for OpCode: {opCode} failed to deserialize to {type}", opCode, x.GetType().Name);
return;
}

await handler.Invoke(deserializedObject, y);
});

this.jsonHandlers[opCode] = handlerTasks;
}

/// <inheritdoc/>
public void AddBinaryHandler(int opCode, Func<ReadOnlyMemory<byte>, DiscordTransportService, Task> handler)
{
if (!this.binaryHandlers.TryGetValue(opCode, out List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>? handlerTasks))
{
handlerTasks = [];
}

handlerTasks.Add(handler);

this.binaryHandlers[opCode] = handlerTasks;
}

/// <inheritdoc/>
public void ConfigureWebSocketOptions(Action<ClientWebSocketOptions> configureOptions) => this.configureOptions = configureOptions;

/// <inheritdoc/>
public ITransportService Build(Uri uri)
{
if (this.jsonHandlers.IsEmpty && this.binaryHandlers.IsEmpty)
{
this.logger.LogWarning("Discord Transport Service was built with no handlers! No data will be handled.");
}

return new DiscordTransportService(uri, this.jsonHandlers, this.binaryHandlers, this.logger, this.configureOptions);
}
}
37 changes: 37 additions & 0 deletions DSharpPlus.Voice/Transport/IDiscordTransportServiceBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace DSharpPlus.Voice.Transport;

public interface IDiscordTransportServiceBuilder
{
/// <summary>
/// Adds a callback method to run when binary data is received from discord with the specified opCode
/// </summary>
/// <param name="opCode"></param>
/// <param name="handler"></param>
public void AddBinaryHandler(int opCode, Func<ReadOnlyMemory<byte>, DiscordTransportService, Task> handler);
/// <summary>
/// Adds a callback method to run when json data is received from discord with the specified opCode.
/// It will attempt to deserialze the data to type T
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="opCode"></param>
/// <param name="handler"></param>
public void AddJsonHandler<T>(int opCode, Func<T, DiscordTransportService, Task> handler);

/// <summary>
/// Adds a callback method that will be called with the ClientWebSocketOptions when the websocket client is created
/// </summary>
/// <param name="configureOptions"></param>
public void ConfigureWebSocketOptions(Action<ClientWebSocketOptions> configureOptions);


/// <summary>
/// Returns the created DiscordTransportService with the configured handlers, WebSocket Options, and Uri
/// </summary>
/// <param name="uri"></param>
/// <returns></returns>
public ITransportService Build(Uri uri);
}
31 changes: 31 additions & 0 deletions DSharpPlus.Voice/Transport/ITransportService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace DSharpPlus.Voice.Transport;
public interface ITransportService
{
/// <summary>
/// Connects the client to the configured endpoint
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task ConnectAsync(CancellationToken? cancellationToken = null);

/// <summary>
/// Sends bytes data to the active connection
/// </summary>
/// <param name="data"></param>
/// <param name="token"></param>
/// <returns></returns>
public Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken? token = null);

/// <summary>
/// Converts T data to json and sends it as UTF8 bytes to the active connection
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data"></param>
/// <param name="token"></param>
/// <returns></returns>
public Task SendAsync<T>(T data, CancellationToken? token = null);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace DSharpPlus.Voice.Transport.Models;

public class BaseDiscordGatewayMessage()
{
public int OpCode { get; set; }
}
164 changes: 164 additions & 0 deletions DSharpPlus.Voice/Transport/TransportService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
using System;
using System.Buffers;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using CommunityToolkit.HighPerformance.Buffers;

namespace DSharpPlus.Voice.Transport;

/// <summary>
/// This service creates a websocket connection and has a callback for binary and text received.
/// It also allows for sending messages to the remote connection.
/// </summary>
public class TransportService : IDisposable, ITransportService
{
private readonly Uri uri;
private readonly Func<string, Task> onTextAsync;
private readonly Func<ReadOnlyMemory<byte>, Task> onBinaryAsync;
private readonly ClientWebSocket webSocketClient;
private readonly SemaphoreSlim receiveSemaphore = new(1);
private readonly SemaphoreSlim sendSemaphore = new(1);
private const int ReceiveLoopTimeout = 5000;

public TransportService(Uri uri,
Func<string, Task> onTextAsync,
Func<ReadOnlyMemory<byte>, Task> onBinaryAsync,
Action<ClientWebSocketOptions>? configureOptions = null)
{
this.onTextAsync = onTextAsync;
this.onBinaryAsync = onBinaryAsync;
this.uri = uri;
this.webSocketClient = new();

configureOptions?.Invoke(this.webSocketClient.Options);
}


/// <inheritdoc/>
public async Task ConnectAsync(CancellationToken? cancellationToken = null)
{
await this.webSocketClient.ConnectAsync(this.uri, cancellationToken ?? CancellationToken.None);

// Start the receive loop
_ = Task.Run(async () => await ReceiveLoopAsync());
}


/// <inheritdoc/>
public async Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken? token = null)
{
await this.sendSemaphore.WaitAsync();
token ??= CancellationToken.None;

try
{
await this.webSocketClient.SendAsync(data, WebSocketMessageType.Binary, true, token.Value);
}
finally
{
this.sendSemaphore.Release();
}
}


/// <inheritdoc/>
public async Task SendAsync<T>(T data, CancellationToken? token = null)
{
await this.sendSemaphore.WaitAsync();
token ??= CancellationToken.None;

string jsonString = JsonSerializer.Serialize(data);
byte[] jsonBinary = Encoding.UTF8.GetBytes(jsonString);

try
{
await this.webSocketClient.SendAsync(jsonBinary, WebSocketMessageType.Text, true, token.Value);
}
finally
{
this.sendSemaphore.Release();
}
}

/// <inheritdoc/>
public void Dispose()
{
this.webSocketClient?.Dispose();
this.sendSemaphore.Dispose();
this.receiveSemaphore?.Dispose();
}

private async Task ReceiveLoopAsync(CancellationToken? token = null)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(8192);
token ??= CancellationToken.None;

try
{
ArrayPoolBufferWriter<byte> writer = new();
while (!token.Value.IsCancellationRequested)
{
await this.receiveSemaphore.WaitAsync(token.Value);

try
{
if (this.webSocketClient is null || this.webSocketClient.State != WebSocketState.Open)
{
break;
}

ValueWebSocketReceiveResult result;

try
{
result = await this.webSocketClient.ReceiveAsync(buffer.AsMemory(0, buffer.Length), token.Value);
Comment thread
akiraveliara marked this conversation as resolved.
}
catch (OperationCanceledException)
{
break;
}

if (result.MessageType == WebSocketMessageType.Close)
{
await this.webSocketClient.CloseAsync(WebSocketCloseStatus.NormalClosure, "ack", token.Value);
break;
}

writer.Write(buffer);

if (result.EndOfMessage)
{
if (result.MessageType == WebSocketMessageType.Text)
{
string text = Encoding.UTF8.GetString(writer.WrittenSpan);

await this.onTextAsync(text);
}
else if (result.MessageType == WebSocketMessageType.Binary)
{
await this.onBinaryAsync(writer.WrittenMemory);
}

writer.Clear();
}

if (result.Count == 0)
{
await Task.Delay(ReceiveLoopTimeout, token.Value);
}
}
finally
{
this.receiveSemaphore.Release();
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
Loading