博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ranch源码阅读
阅读量:5914 次
发布时间:2019-06-19

本文共 13509 字,大约阅读时间需要 45 分钟。

ranch

整体理解

从整体上的话,ranch主要是三层的监控树

  • 第一层 ranch_sup,负责整个应用的启动,启动了ranch_server进程,它管理了整个应用的配置和连接数据
  • 第二层 ranch_listener_sup,负责连接的管理
  • 第三层 ranch_conns_sup和ranch_acceptors_sup,这两个分别用来处理新的连接和获得新的连接

    当然最底层的ranch_acceptor是应用中的重要角色,每当有新的连接,都会把控制权交给ranch_conns_sup,由它统一管理

    ranch.app

    启动模块为ranch_app,说明需要找到ranch_app.erl文件去启动应用

    ranch_app.erl

    根据参数启动测试的功能,主要启动了一个ranch_sup监控进程

    ranch_sup.erl

    新建一个名为ranch_server的ets表,同时启动并监控ranch_server进程,策略为one_for_one

    ranch_server.erl

    启动了一个进程,管理ranch_server这个ets表,提供多个接口

    set_new_listener_opts:设置监听进程的参数
    set_connections_sup:增加新的连接进程的监控进程Pid,并且对该进程进行monitor监视,把{MonitorRef, Pid}添加到#state.monitors中
    set_listener_sup:增加一个监听进程的监控进程Pid,并且对该进程进行monitor监视,并且把{MonitorRef, Pid}添加到#state.monitors中
    set_addr:在ets中记录地址
    set_max_conns:设置最大连接数量
    set_trans_opts:设置传输协议参数
    set_proto_opts:设置协议参数
    到此为止,ranch应用的准备工作已经结束,剩下的就差外部的调用了
    ***

    ranch.erl

    ranch应用的调用模块,通过start_listener/6来初始化ranch的功能模块,给它提供功能参数,其中有一个Transport参数,是ranch的协议模块名,要么是ranch_ssl,要么就是ranch_tcp,先在ranch_sup下面启动了一个ranch_listener_sup进程,该进程做了什么,接下来将详细介绍,至少在这里我们知道,ranch的正式使用由ranch_listener_sup进程启动开始。

-spec start_listener(ref(), module(), any(), module(), any())    -> supervisor:startchild_ret().start_listener(Ref, Transport, TransOpts, Protocol, ProtoOpts) ->    NumAcceptors = proplists:get_value(num_acceptors, TransOpts, 10),    start_listener(Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts).-spec start_listener(ref(), non_neg_integer(), module(), any(), module(), any())    -> supervisor:startchild_ret().start_listener(Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts)        when is_integer(NumAcceptors) andalso is_atom(Transport)        andalso is_atom(Protocol) ->    _ = code:ensure_loaded(Transport),    case erlang:function_exported(Transport, name, 0) of        false ->            {error, badarg};        true ->            Res = supervisor:start_child(ranch_sup, child_spec(Ref, NumAcceptors,                    Transport, TransOpts, Protocol, ProtoOpts)),            Socket = proplists:get_value(socket, TransOpts),            case Res of                {ok, Pid} when Socket =/= undefined ->                    %% Give ownership of the socket to ranch_acceptors_sup                    %% to make sure the socket stays open as long as the                    %% listener is alive. If the socket closes however there                    %% will be no way to recover because we don't know how                    %% to open it again.                    Children = supervisor:which_children(Pid),                    {_, AcceptorsSup, _, _}                        = lists:keyfind(ranch_acceptors_sup, 1, Children),                    %%% Note: the catch is here because SSL crashes when you change                    %%% the controlling process of a listen socket because of a bug.                    %%% The bug will be fixed in R16.                    catch Transport:controlling_process(Socket, AcceptorsSup);                _ ->                    ok            end,            maybe_started(Res)    end.
-spec child_spec(ref(), module(), any(), module(), any())    -> supervisor:child_spec().child_spec(Ref, Transport, TransOpts, Protocol, ProtoOpts) ->    NumAcceptors = proplists:get_value(num_acceptors, TransOpts, 10),    child_spec(Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts).-spec child_spec(ref(), non_neg_integer(), module(), any(), module(), any())    -> supervisor:child_spec().child_spec(Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts)        when is_integer(NumAcceptors) andalso is_atom(Transport)        andalso is_atom(Protocol) ->    {
{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [ Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts ]}, permanent, infinity, supervisor, [ranch_listener_sup]}.

ranch_listener_sup.erl

该监控进程启动时,主动调用ranch_server:set_listener_sup/2,将自己的信息记录在ets中并且被ranch_server监控,它下面还顺序启动了ranch_conns_sup和ranch_acceptors_sup,策略是rest_for_one,因为ranch_conns_sup是负责监控连接的进程,而ranch_acceptors_sup是监控消息的进程,ranch_conns_sup死掉之后,说明连接都断开了,ranch_acceptors_sup下面的进程也就无法运行,必须等ranch_conns_sup重启成功后才能正常工作。

ranch_conns_sup.erl

该模块并不是supervisor行为,不过作者手动写了一个类似supervisor的东西,启动时主动调用ranch_server:set_connections_sup/2记录自身的信息,同时通过ranch_server获取相应的一些连接参数,其中用到了proc_lib:init_ack/2用于响应proc_lib:start_link/3,实现同步启动进程,做到和gen_server一样的效果,接着开始一个循环函数loop/4,用来处理消息,下面列出主要的消息处理

{?MODULE, start_protocol, T, Socket}:参数中To为ranch_acceptor模块的进程pid,而Socket是ranch_acceptor接收到的客户端socket,启动一个调用Protocol:start_link/4启动一个进程,这个Protocol是用户实现的回调模块,通常是socket消息的接收处理进程,就像例子中的echo_protocol.erl或者reverse_protocol.erl这两个部分,如果启动成功,将会调用shoot/8来修改回调模块的Socket的控制进程,即socket的消息将发送到哪个进程在这里决定,修改之后,将回复回调部分进程一个{shoot, Ref, Transport, Socket, AckTimeout}消息,接着检查当前连接数量是否达到配置中的MaxConns,如果达到了最大连接数的话则把连接加入到等待连接列表中,同时增加子连接数量,继续循环loop/4
{?MODULE, active_connections, To, Tag}:To连接进程获取当前连接列表
{remove_connection, Ref, Pid}:移除某个连接进程

-spec init(pid(), ranch:ref(), module(), module()) -> no_return().init(Parent, Ref, Transport, Protocol) ->    process_flag(trap_exit, true),    ok = ranch_server:set_connections_sup(Ref, self()),    MaxConns = ranch_server:get_max_connections(Ref),    TransOpts = ranch_server:get_transport_options(Ref),    ConnType = proplists:get_value(connection_type, TransOpts, worker),    Shutdown = proplists:get_value(shutdown, TransOpts, 5000),    AckTimeout = proplists:get_value(ack_timeout, TransOpts, 5000),    ProtoOpts = ranch_server:get_protocol_options(Ref),    ok = proc_lib:init_ack(Parent, {ok, self()}),    loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,        shutdown=Shutdown, transport=Transport, protocol=Protocol,        opts=ProtoOpts, ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []).
loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,        transport=Transport, protocol=Protocol, opts=Opts,        max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->    receive        {?MODULE, start_protocol, To, Socket} ->            try Protocol:start_link(Ref, Socket, Transport, Opts) of                {ok, Pid} ->                    shoot(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid);                {ok, SupPid, ProtocolPid} when ConnType =:= supervisor ->                    shoot(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid);                Ret ->                    To ! self(),                    error_logger:error_msg(                        "Ranch listener ~p connection process start failure; "                        "~p:start_link/4 returned: ~999999p~n",                        [Ref, Protocol, Ret]),                    Transport:close(Socket),                    loop(State, CurConns, NbChildren, Sleepers)            catch Class:Reason ->                To ! self(),                error_logger:error_msg(                    "Ranch listener ~p connection process start failure; "                    "~p:start_link/4 crashed with reason: ~p:~999999p~n",                    [Ref, Protocol, Class, Reason]),                loop(State, CurConns, NbChildren, Sleepers)            end;        {?MODULE, active_connections, To, Tag} ->            To ! {Tag, CurConns},            loop(State, CurConns, NbChildren, Sleepers);        %% Remove a connection from the count of connections.        {remove_connection, Ref, Pid} ->            case put(Pid, removed) of                active ->                    loop(State, CurConns - 1, NbChildren, Sleepers);                remove ->                    loop(State, CurConns, NbChildren, Sleepers);                undefined ->                    _ = erase(Pid),                    loop(State, CurConns, NbChildren, Sleepers)            end;        %% Upgrade the max number of connections allowed concurrently.        %% We resume all sleeping acceptors if this number increases.        {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->            _ = [To ! self() || To <- Sleepers],            loop(State#state{max_conns=MaxConns2},                CurConns, NbChildren, []);        {set_max_conns, MaxConns2} ->            loop(State#state{max_conns=MaxConns2},                CurConns, NbChildren, Sleepers);        %% Upgrade the protocol options.        {set_opts, Opts2} ->            loop(State#state{opts=Opts2},                CurConns, NbChildren, Sleepers);        {'EXIT', Parent, Reason} ->            terminate(State, Reason, NbChildren);        {'EXIT', Pid, Reason} when Sleepers =:= [] ->            case erase(Pid) of                active ->                    report_error(Ref, Protocol, Pid, Reason),                    loop(State, CurConns - 1, NbChildren - 1, Sleepers);                removed ->                    report_error(Ref, Protocol, Pid, Reason),                    loop(State, CurConns, NbChildren - 1, Sleepers);                undefined ->                    loop(State, CurConns, NbChildren, Sleepers)            end;        %% Resume a sleeping acceptor if needed.        {'EXIT', Pid, Reason} ->            case erase(Pid) of                active when CurConns > MaxConns ->                    report_error(Ref, Protocol, Pid, Reason),                    loop(State, CurConns - 1, NbChildren - 1, Sleepers);                active ->                    report_error(Ref, Protocol, Pid, Reason),                    [To|Sleepers2] = Sleepers,                    To ! self(),                    loop(State, CurConns - 1, NbChildren - 1, Sleepers2);                removed ->                    report_error(Ref, Protocol, Pid, Reason),                    loop(State, CurConns, NbChildren - 1, Sleepers);                undefined ->                    loop(State, CurConns, NbChildren, Sleepers)            end;        {system, From, Request} ->            sys:handle_system_msg(Request, From, Parent, ?MODULE, [],                {State, CurConns, NbChildren, Sleepers});        %% Calls from the supervisor module.        {'$gen_call', {To, Tag}, which_children} ->            Children = [{Protocol, Pid, ConnType, [Protocol]}                || {Pid, Type} <- get(),                Type =:= active orelse Type =:= removed],            To ! {Tag, Children},            loop(State, CurConns, NbChildren, Sleepers);        {'$gen_call', {To, Tag}, count_children} ->            Counts = case ConnType of                worker -> [{supervisors, 0}, {workers, NbChildren}];                supervisor -> [{supervisors, NbChildren}, {workers, 0}]            end,            Counts2 = [{specs, 1}, {active, NbChildren}|Counts],            To ! {Tag, Counts2},            loop(State, CurConns, NbChildren, Sleepers);        {'$gen_call', {To, Tag}, _} ->            To ! {Tag, {error, ?MODULE}},            loop(State, CurConns, NbChildren, Sleepers);        Msg ->            error_logger:error_msg(                "Ranch listener ~p received unexpected message ~p~n",                [Ref, Msg]),            loop(State, CurConns, NbChildren, Sleepers)    end.
shoot(State=#state{ref=Ref, transport=Transport, ack_timeout=AckTimeout, max_conns=MaxConns},        CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->    case Transport:controlling_process(Socket, ProtocolPid) of        ok ->            ProtocolPid ! {shoot, Ref, Transport, Socket, AckTimeout},            put(SupPid, active),            CurConns2 = CurConns + 1,            if CurConns2 < MaxConns ->                    To ! self(),                    loop(State, CurConns2, NbChildren + 1, Sleepers);                true ->                    loop(State, CurConns2, NbChildren + 1, [To|Sleepers])            end;        {error, _} ->            Transport:close(Socket),            %% Only kill the supervised pid, because the connection's pid,            %% when different, is supposed to be sitting under it and linked.            exit(SupPid, kill),            To ! self(),            loop(State, CurConns, NbChildren, Sleepers)    end.

ranch_acceptors_sup.erl

从ranch_server中获取ranch_conns_sup的进程,并且获取监听参数TransOpts,如果ranch_server中尚未有监听socket,则启动监听socket,接着把监听socket记录到ranch_server中,启动一个ranch_acceptor子进程。

ranch_acceptor.erl

启动一个loop/3循环,当接收到客户端的socket之后,把socket的控制进程改为连接监控进程ranch_conns_sup,连接监控进程中有对应的一些消息处理,接着调用ranch_conns_sup:start_protocol/2发送{?MODULE, start_protocol, self(), Socket},ranch_conns_sup进程自身对该消息进行处理,详情看ranch_conns_sup.erl的介绍,至此,ranch的监听端口的工作都已经准备完毕,(发现还有部分忽略了,需要实现ranch_protocol行为才能处理客户端消息的)现在就差客户端的连接进来了。

-spec loop(inet:socket(), module(), pid()) -> no_return().loop(LSocket, Transport, ConnsSup) ->    _ = case Transport:accept(LSocket, infinity) of        {ok, CSocket} ->            case Transport:controlling_process(CSocket, ConnsSup) of                ok ->                    %% This call will not return until process has been started                    %% AND we are below the maximum number of connections.                    ranch_conns_sup:start_protocol(ConnsSup, CSocket);                {error, _} ->                    Transport:close(CSocket)            end;        %% Reduce the accept rate if we run out of file descriptors.        %% We can't accept anymore anyway, so we might as well wait        %% a little for the situation to resolve itself.        {error, emfile} ->            error_logger:warning_msg("Ranch acceptor reducing accept rate: out of file descriptors~n"),            receive after 100 -> ok end;        %% We want to crash if the listening socket got closed.        {error, Reason} when Reason =/= closed ->            ok    end,    flush(),    ?MODULE:loop(LSocket, Transport, ConnsSup).

转载于:https://www.cnblogs.com/cellphoneyeah/p/9155062.html

你可能感兴趣的文章
Linux Debugging (九) 一次生产环境下的“内存泄露”
查看>>
codeforces 558D Guess Your Way Out! II 规律
查看>>
实现精灵沿着圆形轨迹运动
查看>>
python数据持久存储:pickle模块的基本使用
查看>>
玩转spring boot——国际化
查看>>
POJ 3624 Charm Bracelet(01背包裸题)
查看>>
hello Kotlin!
查看>>
Java笔试面试题001
查看>>
读书笔记-《人为什么活着》
查看>>
C# SortedDictionary&lt;TKey, TValue&gt; 类
查看>>
mysql中delimiter
查看>>
JS实现密码加密
查看>>
Nginx+Keepalived(双机热备)搭建高可用负载均衡环境(HA)
查看>>
supervisor //todo
查看>>
自定义控件详解(六):Paint 画笔MaskFilter过滤
查看>>
(98)Address already in use: make_sock: could not bind to address 0.0.0.0:80
查看>>
Vijos P1784 数字统计【模拟】
查看>>
Spring.Net学习系列一
查看>>
(转)C#开发微信门户及应用(6)--微信门户菜单的管理操作
查看>>
spring-oauth-server实践:授权方式1、2、3和授权方式4的token对象.authorities产生方式比较...
查看>>