-
-
Notifications
You must be signed in to change notification settings - Fork 318
Initial commit for transport service. #2364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
akiraveliara
merged 3 commits into
DSharpPlus:voice-rewrite
from
RoniRaad:voice-rewrite-features/transport-service
Aug 18, 2025
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Net.WebSockets; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| using DSharpPlus.Voice.Transport.Models; | ||
|
|
||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace DSharpPlus.Voice.Transport; | ||
|
|
||
| public class DiscordTransportService : ITransportService | ||
| { | ||
| private readonly ILogger<DiscordTransportService> logger; | ||
| private readonly ConcurrentDictionary<int, List<Func<string, DiscordTransportService, Task>>> jsonHandlers; | ||
| private readonly ConcurrentDictionary<int, List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>> binaryHandlers; | ||
| private readonly TransportService transportService; | ||
|
|
||
| internal DiscordTransportService(Uri uri, | ||
| ConcurrentDictionary<int, List<Func<string, DiscordTransportService, Task>>> jsonHandlers, | ||
| ConcurrentDictionary<int, List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>> binaryHandlers, | ||
| ILogger<DiscordTransportService> logger, | ||
| Action<ClientWebSocketOptions>? configureOptions = null) | ||
| { | ||
| this.transportService = new TransportService(uri, OnBaseTextAsync, OnBaseBinaryAsync, configureOptions); | ||
| this.jsonHandlers = jsonHandlers; | ||
| this.binaryHandlers = binaryHandlers; | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public async Task ConnectAsync(CancellationToken? cancellationToken = null) => await this.transportService.ConnectAsync(cancellationToken); | ||
|
|
||
| /// <inheritdoc/> | ||
| private async Task OnBaseTextAsync(string messageText) | ||
| { | ||
| BaseDiscordGatewayMessage? message = JsonSerializer.Deserialize<BaseDiscordGatewayMessage>(messageText); | ||
| List<Task> handlerTasks = []; | ||
|
|
||
| if (message == null) | ||
| { | ||
| this.logger.LogWarning("Invalid string message was received!"); | ||
| this.logger.LogDebug("Invalid string message was received: {messageText}", messageText); | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| if (this.jsonHandlers.TryGetValue(message.OpCode, out List<Func<string, DiscordTransportService, Task>>? handler)) | ||
| { | ||
| handlerTasks = [.. handler.Select(async x => await x.Invoke(messageText, this))]; | ||
| } | ||
|
|
||
| await Task.WhenAll(handlerTasks); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| private async Task OnBaseBinaryAsync(ReadOnlyMemory<byte> binaryResponse) | ||
| { | ||
| List<Task> handlerTasks = []; | ||
| if (binaryResponse.Length < 3) | ||
| { | ||
| // log invalid binary message was received | ||
| this.logger.LogWarning("Invalid binary message was received!"); | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| int opCode = binaryResponse.Span[2]; | ||
| if (this.binaryHandlers.TryGetValue(opCode, out List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>? handler)) | ||
| { | ||
| handlerTasks = [.. handler.Select(async (x, y) => await x.Invoke(binaryResponse, this))]; | ||
| } | ||
|
|
||
| await Task.WhenAll(handlerTasks); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public async Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken? token = null) => await this.transportService.SendAsync(data, token); | ||
| /// <inheritdoc/> | ||
| public async Task SendAsync<T>(T data, CancellationToken? token = null) => await this.transportService.SendAsync(data, token); | ||
| } | ||
|
|
||
| public class DiscordTransportServiceBuilder : IDiscordTransportServiceBuilder | ||
| { | ||
| private readonly ILogger<DiscordTransportService> logger; | ||
| private readonly ConcurrentDictionary<int, List<Func<string, DiscordTransportService, Task>>> jsonHandlers = []; | ||
| private readonly ConcurrentDictionary<int, List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>> binaryHandlers = []; | ||
| private Action<ClientWebSocketOptions>? configureOptions = null; | ||
|
|
||
| public DiscordTransportServiceBuilder(ILogger<DiscordTransportService> logger) => this.logger = logger; | ||
|
|
||
| /// <inheritdoc/> | ||
| public void AddJsonHandler<T>(int opCode, Func<T, DiscordTransportService, Task> handler) | ||
| { | ||
| if (!this.jsonHandlers.TryGetValue(opCode, out List<Func<string, DiscordTransportService, Task>>? handlerTasks)) | ||
| { | ||
| handlerTasks = []; | ||
| } | ||
|
|
||
| handlerTasks.Add(async (x, y) => | ||
| { | ||
| T? deserializedObject = default; | ||
|
|
||
| try | ||
| { | ||
| deserializedObject = JsonSerializer.Deserialize<T>(x); | ||
|
|
||
| } | ||
| catch | ||
| { | ||
| } | ||
|
|
||
| if (deserializedObject is null) | ||
| { | ||
| this.logger.LogWarning("Json handler for OpCode: {opCode} failed to deserialize to {type}", opCode, x.GetType().Name); | ||
| return; | ||
| } | ||
|
|
||
| await handler.Invoke(deserializedObject, y); | ||
| }); | ||
|
|
||
| this.jsonHandlers[opCode] = handlerTasks; | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public void AddBinaryHandler(int opCode, Func<ReadOnlyMemory<byte>, DiscordTransportService, Task> handler) | ||
| { | ||
| if (!this.binaryHandlers.TryGetValue(opCode, out List<Func<ReadOnlyMemory<byte>, DiscordTransportService, Task>>? handlerTasks)) | ||
| { | ||
| handlerTasks = []; | ||
| } | ||
|
|
||
| handlerTasks.Add(handler); | ||
|
|
||
| this.binaryHandlers[opCode] = handlerTasks; | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public void ConfigureWebSocketOptions(Action<ClientWebSocketOptions> configureOptions) => this.configureOptions = configureOptions; | ||
|
|
||
| /// <inheritdoc/> | ||
| public ITransportService Build(Uri uri) | ||
| { | ||
| if (this.jsonHandlers.IsEmpty && this.binaryHandlers.IsEmpty) | ||
| { | ||
| this.logger.LogWarning("Discord Transport Service was built with no handlers! No data will be handled."); | ||
| } | ||
|
|
||
| return new DiscordTransportService(uri, this.jsonHandlers, this.binaryHandlers, this.logger, this.configureOptions); | ||
| } | ||
| } |
37 changes: 37 additions & 0 deletions
37
DSharpPlus.Voice/Transport/IDiscordTransportServiceBuilder.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| using System; | ||
| using System.Net.WebSockets; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace DSharpPlus.Voice.Transport; | ||
|
|
||
| public interface IDiscordTransportServiceBuilder | ||
| { | ||
| /// <summary> | ||
| /// Adds a callback method to run when binary data is received from discord with the specified opCode | ||
| /// </summary> | ||
| /// <param name="opCode"></param> | ||
| /// <param name="handler"></param> | ||
| public void AddBinaryHandler(int opCode, Func<ReadOnlyMemory<byte>, DiscordTransportService, Task> handler); | ||
| /// <summary> | ||
| /// Adds a callback method to run when json data is received from discord with the specified opCode. | ||
| /// It will attempt to deserialze the data to type T | ||
| /// </summary> | ||
| /// <typeparam name="T"></typeparam> | ||
| /// <param name="opCode"></param> | ||
| /// <param name="handler"></param> | ||
| public void AddJsonHandler<T>(int opCode, Func<T, DiscordTransportService, Task> handler); | ||
|
|
||
| /// <summary> | ||
| /// Adds a callback method that will be called with the ClientWebSocketOptions when the websocket client is created | ||
| /// </summary> | ||
| /// <param name="configureOptions"></param> | ||
| public void ConfigureWebSocketOptions(Action<ClientWebSocketOptions> configureOptions); | ||
|
|
||
|
|
||
| /// <summary> | ||
| /// Returns the created DiscordTransportService with the configured handlers, WebSocket Options, and Uri | ||
| /// </summary> | ||
| /// <param name="uri"></param> | ||
| /// <returns></returns> | ||
| public ITransportService Build(Uri uri); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| using System; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace DSharpPlus.Voice.Transport; | ||
| public interface ITransportService | ||
| { | ||
| /// <summary> | ||
| /// Connects the client to the configured endpoint | ||
| /// </summary> | ||
| /// <param name="cancellationToken"></param> | ||
| /// <returns></returns> | ||
| public Task ConnectAsync(CancellationToken? cancellationToken = null); | ||
|
|
||
| /// <summary> | ||
| /// Sends bytes data to the active connection | ||
| /// </summary> | ||
| /// <param name="data"></param> | ||
| /// <param name="token"></param> | ||
| /// <returns></returns> | ||
| public Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken? token = null); | ||
|
|
||
| /// <summary> | ||
| /// Converts T data to json and sends it as UTF8 bytes to the active connection | ||
| /// </summary> | ||
| /// <typeparam name="T"></typeparam> | ||
| /// <param name="data"></param> | ||
| /// <param name="token"></param> | ||
| /// <returns></returns> | ||
| public Task SendAsync<T>(T data, CancellationToken? token = null); | ||
| } |
6 changes: 6 additions & 0 deletions
6
DSharpPlus.Voice/Transport/Models/BaseDiscordGatewayMessage.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| namespace DSharpPlus.Voice.Transport.Models; | ||
|
|
||
| public class BaseDiscordGatewayMessage() | ||
| { | ||
| public int OpCode { get; set; } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| using System; | ||
| using System.Buffers; | ||
| using System.Net.WebSockets; | ||
| using System.Text; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using CommunityToolkit.HighPerformance.Buffers; | ||
|
|
||
| namespace DSharpPlus.Voice.Transport; | ||
|
|
||
| /// <summary> | ||
| /// This service creates a websocket connection and has a callback for binary and text received. | ||
| /// It also allows for sending messages to the remote connection. | ||
| /// </summary> | ||
| public class TransportService : IDisposable, ITransportService | ||
| { | ||
| private readonly Uri uri; | ||
| private readonly Func<string, Task> onTextAsync; | ||
| private readonly Func<ReadOnlyMemory<byte>, Task> onBinaryAsync; | ||
| private readonly ClientWebSocket webSocketClient; | ||
| private readonly SemaphoreSlim receiveSemaphore = new(1); | ||
| private readonly SemaphoreSlim sendSemaphore = new(1); | ||
| private const int ReceiveLoopTimeout = 5000; | ||
|
|
||
| public TransportService(Uri uri, | ||
| Func<string, Task> onTextAsync, | ||
| Func<ReadOnlyMemory<byte>, Task> onBinaryAsync, | ||
| Action<ClientWebSocketOptions>? configureOptions = null) | ||
| { | ||
| this.onTextAsync = onTextAsync; | ||
| this.onBinaryAsync = onBinaryAsync; | ||
| this.uri = uri; | ||
| this.webSocketClient = new(); | ||
|
|
||
| configureOptions?.Invoke(this.webSocketClient.Options); | ||
| } | ||
|
|
||
|
|
||
| /// <inheritdoc/> | ||
| public async Task ConnectAsync(CancellationToken? cancellationToken = null) | ||
| { | ||
| await this.webSocketClient.ConnectAsync(this.uri, cancellationToken ?? CancellationToken.None); | ||
|
|
||
| // Start the receive loop | ||
| _ = Task.Run(async () => await ReceiveLoopAsync()); | ||
| } | ||
|
|
||
|
|
||
| /// <inheritdoc/> | ||
| public async Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken? token = null) | ||
| { | ||
| await this.sendSemaphore.WaitAsync(); | ||
| token ??= CancellationToken.None; | ||
|
|
||
| try | ||
| { | ||
| await this.webSocketClient.SendAsync(data, WebSocketMessageType.Binary, true, token.Value); | ||
| } | ||
| finally | ||
| { | ||
| this.sendSemaphore.Release(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /// <inheritdoc/> | ||
| public async Task SendAsync<T>(T data, CancellationToken? token = null) | ||
| { | ||
| await this.sendSemaphore.WaitAsync(); | ||
| token ??= CancellationToken.None; | ||
|
|
||
| string jsonString = JsonSerializer.Serialize(data); | ||
| byte[] jsonBinary = Encoding.UTF8.GetBytes(jsonString); | ||
|
|
||
| try | ||
| { | ||
| await this.webSocketClient.SendAsync(jsonBinary, WebSocketMessageType.Text, true, token.Value); | ||
| } | ||
| finally | ||
| { | ||
| this.sendSemaphore.Release(); | ||
| } | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public void Dispose() | ||
| { | ||
| this.webSocketClient?.Dispose(); | ||
| this.sendSemaphore.Dispose(); | ||
| this.receiveSemaphore?.Dispose(); | ||
| } | ||
|
|
||
| private async Task ReceiveLoopAsync(CancellationToken? token = null) | ||
| { | ||
| byte[] buffer = ArrayPool<byte>.Shared.Rent(8192); | ||
| token ??= CancellationToken.None; | ||
|
|
||
| try | ||
| { | ||
| ArrayPoolBufferWriter<byte> writer = new(); | ||
| while (!token.Value.IsCancellationRequested) | ||
| { | ||
| await this.receiveSemaphore.WaitAsync(token.Value); | ||
|
|
||
| try | ||
| { | ||
| if (this.webSocketClient is null || this.webSocketClient.State != WebSocketState.Open) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| ValueWebSocketReceiveResult result; | ||
|
|
||
| try | ||
| { | ||
| result = await this.webSocketClient.ReceiveAsync(buffer.AsMemory(0, buffer.Length), token.Value); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| if (result.MessageType == WebSocketMessageType.Close) | ||
| { | ||
| await this.webSocketClient.CloseAsync(WebSocketCloseStatus.NormalClosure, "ack", token.Value); | ||
| break; | ||
| } | ||
|
|
||
| writer.Write(buffer); | ||
|
|
||
| if (result.EndOfMessage) | ||
| { | ||
| if (result.MessageType == WebSocketMessageType.Text) | ||
| { | ||
| string text = Encoding.UTF8.GetString(writer.WrittenSpan); | ||
|
|
||
| await this.onTextAsync(text); | ||
| } | ||
| else if (result.MessageType == WebSocketMessageType.Binary) | ||
| { | ||
| await this.onBinaryAsync(writer.WrittenMemory); | ||
| } | ||
|
|
||
| writer.Clear(); | ||
| } | ||
|
|
||
| if (result.Count == 0) | ||
| { | ||
| await Task.Delay(ReceiveLoopTimeout, token.Value); | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| this.receiveSemaphore.Release(); | ||
| } | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| ArrayPool<byte>.Shared.Return(buffer); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.