Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5901c4f
Implement graceful shutdown for Garnet server
yuseok-kim-edushare Nov 26, 2025
9ea9339
Update hosting/Windows/Garnet.worker/Program.cs
yuseok-kim-edushare Nov 26, 2025
7731df6
Update libs/host/GarnetServer.cs
yuseok-kim-edushare Nov 26, 2025
597ec57
Update main/GarnetServer/Program.cs
yuseok-kim-edushare Nov 26, 2025
9e168f4
Update main/GarnetServer/Program.cs
yuseok-kim-edushare Nov 26, 2025
67a7f2e
Update main/GarnetServer/Program.cs
yuseok-kim-edushare Nov 26, 2025
5b25407
Update libs/host/GarnetServer.cs
yuseok-kim-edushare Nov 26, 2025
476d629
🐛 Resolve Race Condition risk in "StopListening" impl at GarnetServer…
yuseok-kim-edushare Nov 26, 2025
9bf52de
✅ add test for gracefulshutdown about main/garnetserver
yuseok-kim-edushare Nov 26, 2025
11d115b
🐛 Fix risk of shutdown handler remaining
yuseok-kim-edushare Nov 26, 2025
3b3df07
✏️ fix by dotnet format
yuseok-kim-edushare Nov 26, 2025
2489995
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Dec 3, 2025
9d5017a
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Dec 6, 2025
5eb025c
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Dec 13, 2025
ff37e9b
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Dec 22, 2025
c542789
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Jan 7, 2026
efd4c75
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Jan 14, 2026
0b113a7
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Jan 19, 2026
19b1a13
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Jan 25, 2026
d77bd6b
✅🔀 Fix Test with Allure related Requirements (reflect #1457)
yuseok-kim-edushare Jan 25, 2026
f14fb2c
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Jan 25, 2026
c8da690
Merge branch 'main' into feat/graceful_shutdown
yuseok-kim-edushare Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hosting/Windows/Garnet.worker/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using Garnet;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand All @@ -12,6 +13,13 @@ static void Main(string[] args)
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService(_ => new Worker(args));

// Configure Host shutdown timeout
builder.Services.Configure<HostOptions>(options =>
{
// Set graceful shutdown timeout to 5 seconds
options.ShutdownTimeout = TimeSpan.FromSeconds(5);
});

builder.Services.AddWindowsService(options =>
{
options.ServiceName = "Microsoft Garnet Server";
Expand Down
21 changes: 19 additions & 2 deletions hosting/Windows/Garnet.worker/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param>
public override async Task StopAsync(CancellationToken cancellationToken)
{
Dispose();
await base.StopAsync(cancellationToken).ConfigureAwait(false);
try
{
if (server != null)
{
// Perform graceful shutdown with AOF commit and checkpoint
await server.ShutdownAsync(timeout: TimeSpan.FromSeconds(5), token: cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Force shutdown requested - proceed to dispose
}
finally
{
await base.StopAsync(cancellationToken).ConfigureAwait(false);
Dispose();
}
}

public override void Dispose()
Expand All @@ -55,6 +70,8 @@ public override void Dispose()
}
server?.Dispose();
_isDisposed = true;
base.Dispose();
GC.SuppressFinalize(this);
}
}
}
172 changes: 172 additions & 0 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Garnet.cluster;
using Garnet.common;
using Garnet.networking;
Expand Down Expand Up @@ -422,6 +423,177 @@ public void Start()
Console.WriteLine("* Ready to accept connections");
}

/// <summary>
/// Performs graceful shutdown of the server.
/// Stops accepting new connections, waits for active connections to complete, commits AOF, and takes checkpoint if needed.
/// </summary>
/// <param name="timeout">Timeout for waiting on active connections (default: 30 seconds)</param>
/// <param name="token">Cancellation token</param>
/// <returns>Task representing the async shutdown operation</returns>
public async Task ShutdownAsync(TimeSpan? timeout = null, CancellationToken token = default)
{
var shutdownTimeout = timeout ?? TimeSpan.FromSeconds(30);

try
{
// Stop accepting new connections first
StopListening();

// Wait for existing connections to complete
await WaitForActiveConnectionsAsync(shutdownTimeout, token).ConfigureAwait(false);

// Commit AOF and take checkpoint if needed
await FinalizeDataAsync(token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Force shutdown requested
}
catch (Exception ex)
{
logger?.LogError(ex, "Error during graceful shutdown");
}
}

/// <summary>
/// Stop all servers from accepting new connections.
/// </summary>
private void StopListening()
{
if (servers == null) return;

logger?.LogInformation("Stopping listeners to prevent new connections...");
foreach (var server in servers)
{
try
{
server?.StopListening();
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Error stopping listener");
}
}
}

/// <summary>
/// Waits for active connections to complete within the specified timeout.
/// </summary>
private async Task WaitForActiveConnectionsAsync(TimeSpan timeout, CancellationToken token)
{
if (servers == null) return;

var stopwatch = Stopwatch.StartNew();
var delays = new[] { 50, 300, 1000 };
var delayIndex = 0;

while (stopwatch.Elapsed < timeout && !token.IsCancellationRequested)
{
try
{
var activeConnections = GetActiveConnectionCount();

if (activeConnections == 0)
{
logger?.LogInformation("All connections have been closed gracefully.");
return;
}

logger?.LogInformation("Waiting for {ActiveConnections} active connections to complete...", activeConnections);

var currentDelay = delays[delayIndex];
if (delayIndex < delays.Length - 1) delayIndex++;

await Task.Delay(currentDelay, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Error checking active connections");
delayIndex = 0;
await Task.Delay(500, token).ConfigureAwait(false);
}
}

if (stopwatch.Elapsed >= timeout)
{
logger?.LogWarning("Timeout reached after {TimeoutSeconds} seconds. Some connections may still be active.",
timeout.TotalSeconds);
}
}

/// <summary>
/// Gets the current number of active connections directly from server instances.
/// </summary>
private int GetActiveConnectionCount()
{
int count = 0;
if (servers != null)
{
foreach (var garnetServerBase in servers.OfType<GarnetServerBase>())
{
count += (int)garnetServerBase.get_conn_active();
}
}
return count;
}

/// <summary>
/// Commits AOF and takes checkpoint for data durability during shutdown.
/// </summary>
private async Task FinalizeDataAsync(CancellationToken token)
{
var enableAOF = opts.EnableAOF;
var enableStorageTier = opts.EnableStorageTier;

// Commit AOF before checkpoint/shutdown
if (enableAOF)
{
logger?.LogInformation("Committing AOF before shutdown...");
try
{
var commitSuccess = await Store.CommitAOFAsync(token).ConfigureAwait(false);
if (commitSuccess)
{
logger?.LogInformation("AOF committed successfully.");
}
else
{
logger?.LogInformation("AOF commit skipped (another commit in progress or replica mode).");
}
}
catch (Exception ex)
{
logger?.LogError(ex, "Error committing AOF during shutdown");
}
}

// Take checkpoint for tiered storage
if (enableStorageTier)
{
logger?.LogInformation("Taking checkpoint for tiered storage...");
try
{
var checkpointSuccess = Store.TakeCheckpoint(background: false, token);
if (checkpointSuccess)
{
logger?.LogInformation("Checkpoint completed successfully.");
}
else
{
logger?.LogInformation("Checkpoint skipped (another checkpoint in progress or replica mode).");
}
}
catch (Exception ex)
{
logger?.LogError(ex, "Error taking checkpoint during shutdown");
}
}
}

/// <summary>
/// Dispose store (including log and checkpoint directory)
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Servers/GarnetServerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetw
/// <inheritdoc />
public abstract void Start();

/// <inheritdoc />
public virtual void StopListening()
{
// Base implementation does nothing; derived classes should override
}

/// <inheritdoc />
public virtual void Dispose()
{
Expand Down
27 changes: 26 additions & 1 deletion libs/server/Servers/GarnetServerTcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class GarnetServerTcp : GarnetServerBase, IServerHook
readonly int networkConnectionLimit;
readonly string unixSocketPath;
readonly UnixFileMode unixSocketPermission;
volatile bool isListening;

/// <inheritdoc/>
public override IEnumerable<IMessageConsumer> ActiveConsumers()
Expand Down Expand Up @@ -117,19 +118,43 @@ public override void Start()
}

listenSocket.Listen(512);
isListening = true;
if (!listenSocket.AcceptAsync(acceptEventArg))
AcceptEventArg_Completed(null, acceptEventArg);
}

/// <inheritdoc />
public override void StopListening()
{
if (!isListening)
return;

isListening = false;
try
{
// Close the listen socket to stop accepting new connections
// This will cause any pending AcceptAsync to complete with an error
listenSocket.Close();
logger?.LogInformation("Stopped accepting new connections on {endpoint}", EndPoint);
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Error closing listen socket on {endpoint}", EndPoint);
}
}

private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
try
{
do
{
// Check isListening flag before processing and before calling AcceptAsync again
if (!isListening) break;

if (!HandleNewConnection(e)) break;
e.AcceptSocket = null;
} while (!listenSocket.AcceptAsync(e));
} while (isListening && !listenSocket.AcceptAsync(e));
}
// socket disposed
catch (ObjectDisposedException) { }
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Servers/IGarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ public interface IGarnetServer : IDisposable
/// Start server
/// </summary>
public void Start();

/// <summary>
/// Stop accepting new connections (for graceful shutdown).
/// Existing connections remain active until they complete or are disposed.
/// </summary>
public void StopListening();
}
}
29 changes: 29 additions & 0 deletions libs/server/Servers/StoreApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,35 @@ public bool FlushDB(int dbId = 0, bool unsafeTruncateLog = false)
}
}

/// <summary>
/// Take checkpoint for all active databases
/// </summary>
/// <param name="background">True if method can return before checkpoint is taken</param>
/// <param name="token">Cancellation token</param>
/// <returns>false if checkpoint was skipped due to node state or another checkpoint in progress</returns>
public bool TakeCheckpoint(bool background = false, CancellationToken token = default)
{
using (PreventRoleChange(out var acquired))
{
if (!acquired || IsReplica)
{
return false;
}

return storeWrapper.TakeCheckpoint(background, logger: null, token: token);
}
}

/// <summary>
/// Check if storage tier is enabled
/// </summary>
public bool IsStorageTierEnabled => storeWrapper.serverOptions.EnableStorageTier;

/// <summary>
/// Check if AOF is enabled
/// </summary>
public bool IsAOFEnabled => storeWrapper.serverOptions.EnableAOF;

/// <summary>
/// Helper to disable role changes during a using block.
///
Expand Down
Loading
Loading