ejabberd_receiver 分析

ejabberd_receiver是ejabberd 中 网关层的数据receive模块,客户端发送的数据通过ejabberd_receiver 接收并通过xml port解析后发送给 ejabberd_c2s的实例处理,至于它的加密、压缩、解压之类的就不说了。

主要说一下这个shaper(字母翻译:脉冲整形器,个人理解,流量控制)机制,什么意思呢?

原来 ejabberd_receiver会根据本次socket接收到的包的大小,判断是否需要缓冲一会再接收下一个socket包,这里用到了socket参数{active, once}

算法如下:
根据本次收到的包的大小 和 maxrate 算出应该缓冲多少s,再算出上一次 到 本次接收数据包的时间间隔来决定是否需要缓冲。

shaper:update/2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
update(none, _Size) -> {none, 0};
update(#maxrate{} = State, Size) ->
MinInterv = 1000 * Size /(2 * State#maxrate.maxrate - State#maxrate.lastrate),
Interv = (now_to_usec(now()) - State#maxrate.lasttime) /1000,
?DEBUG("State: ~p, Size=~p~nM=~p, I=~p~n",
[State, Size, MinInterv, Interv]),
Pause = if
MinInterv > Interv ->
1 + trunc(MinInterv - Interv);
true -> 0
end,
NextNow = now_to_usec(now()) + Pause * 1000,
{State#maxrate{lastrate =(State#maxrate.lastrate + 1000000 * Size / (NextNow - State#maxrate.lasttime)) / 2, lasttime = NextNow},
Pause}.

ejabberd_receive:process_date/2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
process_data(Data,#state{xml_stream_state = XMLStreamState,shaper_state = ShaperState, c2s_pid = C2SPid} =State) ->
?DEBUG("Received XML on stream = ~p", [(Data)]),
XMLStreamState1 = xml_stream:parse(XMLStreamState, Data),
lager:info("XMLStreamState1 ~p",[XMLStreamState1]),
{NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)),
lager:info("pause :~p \n pid :~p",[Pause,C2SPid]),
if
C2SPid == undefined ->
ok;
Pause > 0 ->
erlang:start_timer(Pause, self(), activate);
true ->
activate_socket(State)
end,
State#state{xml_stream_state = XMLStreamState1,shaper_state = NewShaperState}.