<返回更多

asp.net core源码如何实现监听Http请求,分析Kestrel看一下过程

2022-10-04  今日头条  大好fish
加入收藏

#asp.net core#先让我们看一下最小API的代码,通过以下几行代码就可以搭建一个简单的asp.NET core web服务器,是不是十分简洁?

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.MapGet("/", () => "Hello World!");
app.Run();

访问localhost:5001浏览器即可响应helloworld,那源码到底是如何实现http请求监听的呢?可以通过IIS,Kestrel实现,我们来看下Kestrel的实现,下图是实现的总体流程

整体运行图

让我们通过源码整体看一下执行过程,源码以.NET 7为例

1.WebApplication执行Run()方法

/// <summary>
/// Runs an application and block the calling thread until host shutdown.
/// </summary>
/// <param name="url">The URL to listen to if the server hasn't been configured directly.</param>
public void Run(string? url = null)
{
    Listen(url);
    HostingAbstractionsHostExtensions.Run(this);
}

2.
HostingAbstractionsHostExtensions执行RunAsync()方法

/// <summary>
    /// Runs an application and returns a <see cref="Task"/> that only completes when the token is triggered or shutdown is triggered.
    /// The <paramref name="host"/> instance is disposed of after running.
    /// </summary>
    /// <param name="host">The <see cref="IHost"/> to run.</param>
    /// <param name="token">The token to trigger shutdown.</param>
    /// <returns>The <see cref="Task"/> that represents the asynchronous operation.</returns>
    public static async Task RunAsync(this IHost host, CancellationToken token = default)
    {
        try
        {
            await host.StartAsync(token).ConfigureAwait(false);
            await host.WaitForShutdownAsync(token).ConfigureAwait(false);
        }
        finally
        {
            if (host is IAsyncDisposable asyncDisposable)
            {
                await asyncDisposable.DisposeAsync().ConfigureAwait(false);
            }
            else
            {
                host.Dispose();
            }
        }
    }

3 KestrelServer执行StartAsync()方法

public Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull
{
    return _innerKestrelServer.StartAsync(application, cancellationToken);
}

4.我们来看下Start方法,重点看
_transportManager.BindAsync方法

public async Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull
    {
        try
        {
            ValidateOptions();

            if (_hasStarted)
            {
                // The server has already started and/or has not been cleaned up yet
                throw new InvalidOperationException(CoreStrings.ServerAlreadyStarted);
            }
            _hasStarted = true;

            ServiceContext.Heartbeat?.Start();

            async Task OnBind(ListenOptions options, CancellationToken onBindCancellationToken)
            {
                var hasHttp1 = options.Protocols.HasFlag(HttpProtocols.Http1);
                var hasHttp2 = options.Protocols.HasFlag(HttpProtocols.Http2);
                var hasHttp3 = options.Protocols.HasFlag(HttpProtocols.Http3);
                var hasTls = options.IsTls;

                // Filter out invalid combinations.

                if (!hasTls)
                {
                    // Http/1 without TLS, no-op HTTP/2 and 3.
                    if (hasHttp1)
                    {
                        hasHttp2 = false;
                        hasHttp3 = false;
                    }
                    // Http/3 requires TLS. Note we only let it fall back to HTTP/1, not HTTP/2
                    else if (hasHttp3)
                    {
                        throw new InvalidOperationException("HTTP/3 requires HTTPS.");
                    }
                }

                // Quic isn't registered if it's not supported, throw if we can't fall back to 1 or 2
                if (hasHttp3 && _multiplexedTransportFactory is null && !(hasHttp1 || hasHttp2))
                {
                    throw new InvalidOperationException("This platform doesn't support QUIC or HTTP/3.");
                }

                // Disable adding alt-svc header if endpoint has configured not to or there is no
                // multiplexed transport factory, which happens if QUIC isn't supported.
                var addAltSvcHeader = !options.DisableAltSvcHeader && _multiplexedTransportFactory != null;

                var configuredEndpoint = options.EndPoint;

                // Add the HTTP middleware as the terminal connection middleware
                if (hasHttp1 || hasHttp2
                    || options.Protocols == HttpProtocols.None) // TODO a test fails because it doesn't throw an exception in the right place
                                                                // when there is no HttpProtocols in KestrelServer, can we remove/change the test?
                {
                    if (_transportFactory is null)
                    {
                        throw new InvalidOperationException($"Cannot start HTTP/1.x or HTTP/2 server if no {nameof(IConnectionListenerFactory)} is registered.");
                    }

                    options.UseHttpServer(ServiceContext, application, options.Protocols, addAltSvcHeader);
                    var connectionDelegate = options.Build();

                    // Add the connection limit middleware
                    connectionDelegate = EnforceConnectionLimit(connectionDelegate, Options.Limits.MaxConcurrentConnections, Trace);

                    options.EndPoint = await _transportManager.BindAsync(configuredEndpoint, connectionDelegate, options.EndpointConfig, onBindCancellationToken).ConfigureAwait(false);
                }

                if (hasHttp3 && _multiplexedTransportFactory is not null)
                {
                    // Check if a previous transport has changed the endpoint. If it has then the endpoint is dynamic and we can't guarantee it will work for other transports.
                    // For more details, see https://github.com/dotnet/aspnetcore/issues/42982
                    if (!configuredEndpoint.Equals(options.EndPoint))
                    {
                        Trace.LogError(CoreStrings.DynamicPortOnMultipleTransportsNotSupported);
                    }
                    else
                    {
                        // 增加 ConnectionContext 中间件,后面处理Http请求会用到
                        options.UseHttp3Server(ServiceContext, application, options.Protocols, addAltSvcHeader);
                        var multiplexedConnectionDelegate = ((IMultiplexedConnectionBuilder)options).Build();

                        // Add the connection limit middleware
                        multiplexedConnectionDelegate = EnforceConnectionLimit(multiplexedConnectionDelegate, Options.Limits.MaxConcurrentConnections, Trace);
                        // 绑定
                        options.EndPoint = await _transportManager.BindAsync(configuredEndpoint, multiplexedConnectionDelegate, options, onBindCancellationToken).ConfigureAwait(false);
                    }
                }
            }

            AddressBindContext = new AddressBindContext(_serverAddresses, Options, Trace, OnBind);

            await BindAsync(cancellationToken).ConfigureAwait(false);
        }
        catch
        {
            // Don't log the error https://github.com/dotnet/aspnetcore/issues/29801
            Dispose();
            throw;
        }

        // Register the options with the event source so it can be logged (if necessary)
        KestrelEventSource.Log.AddServerOptions(Options);
    }

5 开始绑定socket端口

public async Task<EndPoint> BindAsync(EndPoint endPoint, ConnectionDelegate connectionDelegate, EndpointConfig? endpointConfig, CancellationToken cancellationToken)
{
    if (_transportFactory is null)
    {
        throw new InvalidOperationException($"Cannot bind with {nameof(ConnectionDelegate)} no {nameof(IConnectionListenerFactory)} is registered.");
    }
    var transport = await _transportFactory.BindAsync(endPoint, cancellationToken).ConfigureAwait(false);
    StartAcceptLoop(new GenericConnectionListener(transport), c => connectionDelegate(c), endpointConfig);
    return transport.EndPoint;
}

6.注意下这种线程的写法

ThreadPool.UnsafeQueueUserWorkItem(StartAcceptingConnectionsCore, listener, preferLocal: false);

7 通过 while循环不断监听socket连接请求

private void StartAcceptingConnectionsCore(IConnectionListener<T> listener)
{
    // REVIEW: Multiple accept loops in parallel?
    _ = AcceptConnectionsAsync();

    async Task AcceptConnectionsAsync()
    {
        try
        {
            while (true)
            {
                var connection = await listener.AcceptAsync();

                if (connection == null)
                {
                    // We're done listening
                    break;
                }

                // Add the connection to the connection manager before we queue it for execution
                var id = _transportConnectionManager.GetNewConnectionId();
                var kestrelConnection = new KestrelConnection<T>(
                    id, _serviceContext, _transportConnectionManager, _connectionDelegate, connection, Log);

                _transportConnectionManager.AddConnection(id, kestrelConnection);

                Log.ConnectionAccepted(connection.ConnectionId);
                KestrelEventSource.Log.ConnectionQueuedStart(connection);

                ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false);
            }
        }
        catch (Exception ex)
        {
            // REVIEW: If the accept loop ends should this trigger a server shutdown? It will manifest as a hang
            Log.LogCritical(0, ex, "The connection listener failed to accept any new connections.");
        }
        finally
        {
            _acceptLoopTcs.TrySetResult();
        }
    }
}

8 使用Socket AcceptAsync的方法

public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
{
    while (true)
    {
        try
        {
            Debug.Assert(_listenSocket != null, "Bind must be called first.");

            var acceptSocket = await _listenSocket.AcceptAsync(cancellationToken);

            // Only apply no delay to Tcp based endpoints
            if (acceptSocket.LocalEndPoint is IPEndPoint)
            {
                acceptSocket.NoDelay = _options.NoDelay;
            }

            return _factory.Create(acceptSocket);
        }
        catch (ObjectDisposedException)
        {
            // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
            return null;
        }
        catch (SocketException e) when (e.SocketErrorCode == SocketError.OperationAborted)
        {
            // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
            return null;
        }
        catch (SocketException)
        {
            // The connection got reset while it was in the backlog, so we try again.
            SocketsLog.ConnectionReset(_logger, connectionId: "(null)");
        }
    }
}

9 当有连接过来后执行 KestrelConnection的Execute方法

internal async Task ExecuteAsync()
{
    var connectionContext = _transportConnection;

    try
    {
        KestrelEventSource.Log.ConnectionQueuedStop(connectionContext);

        Logger.ConnectionStart(connectionContext.ConnectionId);
        KestrelEventSource.Log.ConnectionStart(connectionContext);

        using (BeginConnectionScope(connectionContext))
        {
            try
            {
                await _connectionDelegate(connectionContext);
            }
            catch (Exception ex)
            {
                Logger.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
            }
        }
    }
    finally
    {
        await FireOnCompletedAsync();

        Logger.ConnectionStop(connectionContext.ConnectionId);
        KestrelEventSource.Log.ConnectionStop(connectionContext);

        // Dispose the transport connection, this needs to happen before removing it from the
        // connection manager so that we only signal completion of this connection after the transport
        // is properly torn down.
        await connectionContext.DisposeAsync();

        _transportConnectionManager.RemoveConnection(_id);
    }
}

10 监听到socket连接后,开始执行连接的委托,就是我们第4步注释提到的提前注入的一个中间件

public Task OnConnectionAsync(ConnectionContext connectionContext)
{
    var memoryPoolFeature = connectionContext.Features.Get<IMemoryPoolFeature>();
    var protocols = connectionContext.Features.Get<HttpProtocolsFeature>()?.HttpProtocols ?? _endpointDefaultProtocols;
    var localEndPoint = connectionContext.LocalEndPoint as IPEndPoint;
    var altSvcHeader = _addAltSvcHeader && localEndPoint != null ? HttpUtilities.GetEndpointAltSvc(localEndPoint, protocols) : null;

    var httpConnectionContext = new HttpConnectionContext(
        connectionContext.ConnectionId,
        protocols,
        altSvcHeader,
        connectionContext,
        _serviceContext,
        connectionContext.Features,
        memoryPoolFeature?.MemoryPool ?? System.Buffers.MemoryPool<byte>.Shared,
        localEndPoint,
        connectionContext.RemoteEndPoint as IPEndPoint);
    httpConnectionContext.Transport = connectionContext.Transport;

    var connection = new HttpConnection(httpConnectionContext);

    return connection.ProcessRequestsAsync(_application);
}

11 通过while循环监听Http请求,如果连接是keepAlive则不需要重复连接socket,一直在监听即可(如果是keeplive http连接不会断开)

private async Task ProcessRequests<TContext>(IHttpApplication<TContext> application) where TContext : notnull
    {
        while (_keepAlive)
        {
            if (_context.InitialExecutionContext is null)
            {
                // If this is a first request on a non-Http2Connection, capture a clean ExecutionContext.
                _context.InitialExecutionContext = ExecutionContext.Capture();
            }
            else
            {
                // Clear any AsyncLocals set during the request; back to a clean state ready for next request
                // And/or reset to Http2Connection's ExecutionContext giving access to the connection logging scope
                // and any other AsyncLocals set by connection middleware.
                ExecutionContext.Restore(_context.InitialExecutionContext);
            }

            BeginRequestProcessing();

            var result = default(ReadResult);
            bool endConnection;
            do
            {
                if (BeginRead(out var awaitable))
                {
                    result = await awaitable;
                }
            } while (!TryParseRequest(result, out endConnection));

            if (endConnection)
            {
                // Connection finished, stop processing requests
                return;
            }

            var messageBody = CreateMessageBody();
            if (!messageBody.RequestKeepAlive)
            {
                _keepAlive = false;
            }

            IsUpgradableRequest = messageBody.RequestUpgrade;

            InitializeBodyControl(messageBody);

            var context = application.CreateContext(this);

            try
            {
                KestrelEventSource.Log.RequestStart(this);

                // Run the application code for this request
                await application.ProcessRequestAsync(context);

                // Trigger OnStarting if it hasn't been called yet and the app hasn't
                // already failed. If an OnStarting callback throws we can go through
                // our normal error handling in ProduceEnd.
                // https://github.com/aspnet/KestrelHttpServer/issues/43
                if (!HasResponseStarted && _applicationException == null && _onStarting?.Count > 0)
                {
                    await FireOnStarting();
                }

                if (!_connectionAborted && !VerifyResponseContentLength(out var lengthException))
                {
                    ReportApplicationError(lengthException);
                }
            }
            catch (BadHttpRequestException ex)
            {
                // Capture BadHttpRequestException for further processing
                // This has to be caught here so StatusCode is set properly before disposing the HttpContext
                // (DisposeContext logs StatusCode).
                SetBadRequestState(ex);
                ReportApplicationError(ex);
            }
            catch (Exception ex)
            {
                ReportApplicationError(ex);
            }

            KestrelEventSource.Log.RequestStop(this);

            // At this point all user code that needs use to the request or response streams has completed.
            // Using these streams in the OnCompleted callback is not allowed.
            try
            {
                Debug.Assert(_bodyControl != null);
                await _bodyControl.StopAsync();
            }
            catch (Exception ex)
            {
                // BodyControl.StopAsync() can throw if the PipeWriter was completed prior to the application writing
                // enough bytes to satisfy the specified Content-Length. This risks double-logging the exception,
                // but this scenario generally indicates an app bug, so I don't want to risk not logging it.
                ReportApplicationError(ex);
            }

            // 4XX responses are written by TryProduceInvalidRequestResponse during connection tear down.
            if (_requestRejectedException == null)
            {
                if (!_connectionAborted)
                {
                    // Call ProduceEnd() before consuming the rest of the request body to prevent
                    // delaying clients waiting for the chunk terminator:
                    //
                    // https://github.com/dotnet/corefx/issues/17330#issuecomment-288248663
                    //
                    // This also prevents the 100 Continue response from being sent if the app
                    // never tried to read the body.
                    // https://github.com/aspnet/KestrelHttpServer/issues/2102
                    //
                    // ProduceEnd() must be called before _application.DisposeContext(), to ensure
                    // HttpContext.Response.StatusCode is correctly set when
                    // IHttpContextFactory.Dispose(HttpContext) is called.
                    await ProduceEnd();
                }
                else if (!HasResponseStarted)
                {
                    // If the request was aborted and no response was sent, there's no
                    // meaningful status code to log.
                    StatusCode = 0;
                }
            }

            if (_onCompleted?.Count > 0)
            {
                await FireOnCompleted();
            }

            application.DisposeContext(context, _applicationException);

            // Even for non-keep-alive requests, try to consume the entire body to avoid RSTs.
            if (!_connectionAborted && _requestRejectedException == null && !messageBody.IsEmpty)
            {
                await messageBody.ConsumeAsync();
            }

            if (HasStartedConsumingRequestBody)
            {
                await messageBody.StopAsync();
            }
        }
    }

 

11 处理Http请求中间再执行我们在Http管道里的委托方法,用来进行HTTP的构造和HTTP Response,管道这里就不展开介绍了,可以看下官网的介绍

 

 

12 最后通过Http1OutputProducer最终响应结果

public ValueTask<FlushResult> WriteStreamSuffixAsync()
{
    ValueTask<FlushResult> result = default;

    lock (_contextLock)
    {
        if (!_writeStreamSuffixCalled)
        {
            if (_autoChunk)
            {
                var writer = new BufferWriter<PipeWriter>(_pipeWriter);
                result = WriteAsyncInternal(ref writer, EndChunkedResponseBytes);
            }
            else if (_unflushedBytes > 0)
            {
                result = FlushAsync();
            }

            _writeStreamSuffixCalled = true;
        }
    }

    return result;
}

 



声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>