您当前的位置: 首页 >  tomcat

恐龙弟旺仔

暂无认证

  • 1浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Tomcat源码解析:Web请求处理过程

恐龙弟旺仔 发布时间:2018-12-06 11:09:44 ,浏览量:1

前言:

    Catalina是Tomcat提供的Servlet容器实现,它负责处理来自客户端的请求并处理响应。

    但是仅有Servlet容器服务器是无法对外提供服务的,还需要由连接器接收来自客户端的请求,并按照既定协议进行解析,然后交由Servlet容器处理

 

1.Coyote

    Coyote是Tomcat连接器框架的名称。客户端通过Coyote与服务器建立连接、发送请求并接收响应

    Coyote封装了底层的网络通信,为Catalina容器提供了统一的接口,使Catalina与具体的请求协议及IO方式解耦。

 

    1)Coyote支持的传输协议

    * HTTP/1.1 主要用于Tomcat单独运行的情况

    * AJP 用于和web服务器(Apache HTTP Server)集成,以实现针对静态资源的优化以及集群部署

    * HTTP/2.0 下一代HTTP协议,自Tomcat8.5以及9.0版本开始支持

 

    2)Coyote按照IO方式不同提供不同的方法

    * NIO

    * NIO2

    * APR(Apache Portable Runtime)

 

2.Coyote框架主要实现类

先复用一张别人的图来说明请求的处理过程,所涉及的节点(来自https://blog.csdn.net/xlgen157387/article/details/79006434  )

    这里先直接写实现步骤,下面会用源码分析的方式来确定其过程

    1)Endpoint 具体的Socket接收处理类,是对传输层的抽象。

    2)Processor 负责构造Request和Response对象,是对应用层的抽象

    3)Adapter 将请求适配到Servlet容器进行具体的处理

 

    从以上的图可以直观的看一下一个请求真正到具体的Container要经过的步骤,下面就从源码角度来分析一下这张图

 

3.Connector的结构分析

     1)Connection的结构

     Connection主要源码如下:

public class Connector extends LifecycleMBeanBase  {
    protected Service service = null;
    // 默认的ProtocolHandler实现
    protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
    
    protected final ProtocolHandler protocolHandler;
    protected Adapter adapter = null;
    
    @Override
    protected void startInternal() throws LifecycleException {

        // Validate settings before starting
        if (getPort() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
        }

        setState(LifecycleState.STARTING);

        try {
            // 启动protocolHandler
            protocolHandler.start();
        } catch (Exception e) {
            String errPrefix = "";
            if(this.service != null) {
                errPrefix += "service.getName(): \"" + this.service.getName() + "\"; ";
            }

            throw new LifecycleException
                (errPrefix + " " + sm.getString
                 ("coyoteConnector.protocolHandlerStartFailed"), e);
        }
    }
}

    由上可知,ProtocolHandler的默认实现为Http11NioProtocol

    2)Connector被创建的时机

    在Catalina解析server.xml时,也就是在Catalina.createStartDigester()方法中,有以下代码

digester.addRule("Server/Service/Connector",
                 new ConnectorCreateRule());
digester.addRule("Server/Service/Connector",
                 new SetAllPropertiesRule(new String[]{"executor", "sslImplementationName"}));
digester.addSetNext("Server/Service/Connector",
                    "addConnector",
                    "org.apache.catalina.connector.Connector");

   

    可以看到,Connector就是再Catalina解析的server.xml的时候创建的

    3)Connection.start()启动Connection包含的一系列组件

    Connection在Catalina解析server.xml时候被创建,那么Connection以及Connection包含的组件是什么时候被启动的呢?

    我们知道Connection属于Service,Service属于Server,Server在解析后创建启动,同时也通过调用其start()方法启动了Service、Connection

 

    由1)可知,Connection.start()调用了ProtocolHandler.start()方法,下面我们来看下这个start方法

 

4.Http11NioProtocol源码分析

    类结构如上图所示

    1)主要成员变量分析

    存在AbstractProtocol类中,如下所示:

public abstract class AbstractProtocol implements ProtocolHandler,
        MBeanRegistration {
    /**
     * Endpoint that provides low-level network I/O - must be matched to the
     * ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
     * Endpoint etc.).
     */
    private final AbstractEndpoint endpoint;
    
    /**
     * The adapter provides the link between the ProtocolHandler and the
     * connector.
     */
    protected Adapter adapter;
    
    private final Set waitingProcessors =
            Collections.newSetFromMap(new ConcurrentHashMap());
    
    /**
     * Create and configure a new Processor instance for the current protocol
     * implementation.
     *
     * @return A fully configured Processor instance that is ready to use
     */
    protected abstract Processor createProcessor();

    通过源码,我们更验证了上图的正确性

    ProtocolHandler包含了Endpoint、Processor、Adapter,三个类各司其职,共同完成请求的解析、封装、传递

 

    2)ProtocolHandler.start()启动方法

    下面我们来看下Handler启动时完成哪些动作

// 具体实现在AbstractProtocol.start()方法中
@Override
public void start() throws Exception {
    if (getLog().isInfoEnabled())
        getLog().info(sm.getString("abstractProtocolHandler.start",
                                   getName()));
    try {
        // 主要就是启动endpoint
        endpoint.start();
    } catch (Exception ex) {
        getLog().error(sm.getString("abstractProtocolHandler.startError",
                                    getName()), ex);
        throw ex;
    }
    ...
}

    由上可知:ProtocolHandler启动时主要完成的就是endpoint的启动

 

5.Endpoint.start()

    Tomcat中没有Endpoint,默认使用的是AbstractEndpoint

// AbstractEndpoint.start()
public final void start() throws Exception {
    if (bindState == BindState.UNBOUND) {
        // bind方法主要就是创建ServerSocket,关联到用户指定的地址
        bind();
        bindState = BindState.BOUND_ON_START;
    }
    // startInternal是一个抽象方法,默认实现在其子类中
    startInternal();
}

    我们当前使用的AbstractEndpoint实现类是NioEndpoint,其startInternal方法如下:

/**
     * Start the NIO endpoint, creating acceptor, poller threads.
     */
@Override
public void startInternal() throws Exception {

    if (!running) {
        running = true;
        paused = false;

        processorCache = new SynchronizedStack(SynchronizedStack.DEFAULT_SIZE,
                                                 socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack(SynchronizedStack.DEFAULT_SIZE,
                                             socketProperties.getEventCache());
        nioChannels = new SynchronizedStack(SynchronizedStack.DEFAULT_SIZE,
                                              socketProperties.getBufferPool());

        // 1.创建Executor
        if ( getExecutor() == null ) {
            createExecutor();
        }

        // 2.创建最大连接数限制类
        initializeConnectionLatch();

        // 3.创建Poller,并启动线程执行Poller任务
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i 0) {
                        //if we are here, means we have other stuff to do
                        //do a non blocking select
                        keyCount = selector.selectNow();
                    } else {
                        keyCount = selector.select(selectorTimeout);
                    }
                    wakeupCounter.set(0);
                }
                ...
            } catch (Throwable x) {
            }
            //either we timed out or we woke up, process events first
            if ( keyCount == 0 ) hasEvents = (hasEvents | events());

            Iterator iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            
            // 2.监听到客户端事件,
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
                    // 3.最终交由processKey处理对应的客户端事件
                    processKey(sk, attachment);
                }
            }//while

            //process timeouts
            timeout(keyCount,hasEvents);
        }//while

        getStopLatch().countDown();
    }
    
// NioEndpoint.Poller.processKey(sk, attachment)
// 处理客户端事件
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // 1.处理读事件
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    // 2.处理写事件
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        ...
}
        
// AbstractEndpoint.processSocket()
public boolean processSocket(SocketWrapperBase socketWrapper,SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase sc = processorCache.pop();
        // 1.将socketWrapper绑定到SocketProcessor上
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        // 2.获取线程池,并执行SocketProcessor任务,如果没有线程池,则直接执行
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    ...
    return true;
}

    总结:到这里为止,我们已经完成了第一阶段,主要的处理类是Endpoint,主要工作:

    1)NioEndpoint启动,监听对应的host:port

    2)NioEndpoint.Acceptor接收客户端连接请求,并将对应的Socket封装为一个PollerEvent,放入Poller中

    3)PollerEvent重新监听客户端的READ事件

    4)Poller主要是将客户端Socket传递给Processor进行处理

 

 

9.SocketProcessor(第二阶段,SocketProcessor将Socket请求转交给Processor)

    1)SocketProcessor.run()

protected class SocketProcessor extends SocketProcessorBase {

    public SocketProcessor(SocketWrapperBase socketWrapper, SocketEvent event) {
        super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
        NioChannel socket = socketWrapper.getSocket();
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

        try {
            int handshake = -1;

            try {
                if (key != null) {
                    // 1.NioChannel.isHandshakeComplete()默认为true
                    if (socket.isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    }
                    ...
                }
            } 
            ...
            if (handshake == 0) {
                SocketState state = SocketState.OPEN;
                // event为空时,则默认为读事件
                if (event == null) {
                    state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                } else {
                    // 交由AbstractProtocol.ConnectionHandler处理(关键步骤)
                    state = getHandler().process(socketWrapper, event);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == SelectionKey.OP_READ){
                socketWrapper.registerReadInterest();
            } else if (handshake == SelectionKey.OP_WRITE){
                socketWrapper.registerWriteInterest();
            }
        } catch (CancelledKeyException cx) {
           ...
        } 
        ...
    }
}

   2)AbstractProtocol.ConnectionHandler.process(socketWrapper, event)这一步会将处理交由Processor处理

@Override
public SocketState process(SocketWrapperBase wrapper, SocketEvent status) {
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.process",
                                    wrapper.getSocket(), status));
    }
    if (wrapper == null) {
        // Nothing to do. Socket has been closed.
        return SocketState.CLOSED;
    }

    S socket = wrapper.getSocket();

    // 1.获取对应的Processor
    Processor processor = connections.get(socket);
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                                    processor, socket));
    }
    ...

    try {
        ...
        // 2.如果对应的Processor为空,则各种方式获取,不行的话,最后再新建
        if (processor == null) {
            processor = recycledProcessors.pop();
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
                                            processor));
            }
        }
        if (processor == null) {
            processor = getProtocol().createProcessor();
            register(processor);
        }
        ...
        connections.put(socket, processor);

        SocketState state = SocketState.CLOSED;
        do {
            // 3.处理客户端事件
            // 主要关注下读事件,如果是读事件,则此时status=SocketEvent.OPEN_READ
            state = processor.process(wrapper, status);

            if (state == SocketState.UPGRADING) {
                ...
                // 关于升级Protocol的一系列处理
                }
            }
        } while ( state == SocketState.UPGRADING);

        // 下面就是一些关于写事件、连接事件等操作,不再细看
        if (state == SocketState.LONG) {
            // In the middle of processing a request/response. Keep the
            // socket associated with the processor. Exact requirements
            // depend on type of long poll
            longPoll(wrapper, processor);
            if (processor.isAsync()) {
                getProtocol().addWaitingProcessor(processor);
            }
        } else if (state == SocketState.OPEN) {
            // In keep-alive but between requests. OK to recycle
            // processor. Continue to poll for the next request.
            connections.remove(socket);
            release(processor);
            wrapper.registerReadInterest();
        } 
        ...
        return state;
    } catch(java.net.SocketException e) {
        // SocketExceptions are normal
        getLog().debug(sm.getString(
            "abstractConnectionHandler.socketexception.debug"), e);
    } 
    ...
    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return SocketState.CLOSED;
}

    3)AbstractProcessorLight.process(wrapper, status)处理

    @Override
    public SocketState process(SocketWrapperBase socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator dispatches = null;
        do {
            ...
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
                // 重点关注读事件,就在这里
                // service方法是一个抽象方法,由子类负责实现
                // 本例中我们选择看Http11Processor子类的实现,继续在下面分析
                state = service(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }

            ...
        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }

    总结:

    SocketProcessor将事件分类后,交由Processor处理

    后面Processor会将请求交由Adapter处理

 

10.Http11Processor.service(socketWrapper)(第三阶段:Processor封装Endpoint接收到的Socket为Request)
@Override
public SocketState service(SocketWrapperBase socketWrapper)
    throws IOException {
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    ...
        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
               sendfileState == SendfileState.DONE && !endpoint.isPaused()) {

            // Parsing the request header
            ...

                // Has an upgrade been requested?
                Enumeration connectionValues = request.getMimeHeaders().values("Connection");
            boolean foundUpgrade = false;
            while (connectionValues.hasMoreElements() && !foundUpgrade) {
                foundUpgrade = connectionValues.nextElement().toLowerCase(
                    Locale.ENGLISH).contains("upgrade");
            }

            ...

                // 1.关键的业务处理就在这里
                if (!getErrorState().isError()) {
                    try {
                        rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                        // 将request交由Adapter处理
                        getAdapter().service(request, response);
                        ...
                    } catch (InterruptedIOException e) {
                        setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
                    } 
                    ...
                }

            ...
        }
11.Adapter.service(request,response)(第四阶段:Adapter将请求交由具体的Container处理)

    Adapter是接口,具体的实现类是CoyoteAdapter,下面看下其service方法实现

@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
    throws Exception {

    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    // 1.将request、response转换为符合Servlet规范的请求响应
    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringEncoding(connector.getURIEncoding());
    }

    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;
    boolean postParseSuccess = false;

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        // 2.转换请求参数并完成请求映射
        // 这里会将请求映射到一个具体的Wrapper
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                connector.getService().getContainer().getPipeline().isAsyncSupported());
            // 3.得到Container中第一个Valve,执行其invoke方法,这个valve是责任链模式,会接连执行以下的valve
            // 完成客户端请求
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                request, response);
        }
        ...

    } catch (IOException e) {
        // Ignore
    } finally {
        ...
    }
}

    1)postParseRequest(req, request, res, response)完成请求映射

    这是一个非常复杂的方法,笔者也很绕,细节特别多,在这就不细述。

    只需要知道根据客户端请求路径映射到一个具体的有效的Wrapper。映射结果会保存在MappingData中

 

    2)connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)获取当前Engine的第一个Valve并执行,完成客户端请求

        到这一步,客户端请求就转换为具体的Servlet并执行其service方法,返回响应即结束本次会话

        有关于Pipeline和Valve的内容笔者会新开一篇博客来介绍。

 

总结:

    有关于web请求的内容到这里就结束了,下面来总结下请求的整个过程,实际也就是下图

    读者可按照该图再回忆一下我们上面的分析过程。

 

参考:Tomcat架构解析(刘光瑞) 参考博客: https://blog.csdn.net/xlgen157387/article/details/79006434

 

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0422s