summaryrefslogtreecommitdiff
path: root/lib/erl/src/thrift_disk_log_transport.erl
blob: 761fa3097d702ed1edb675c35d8488bb2a76c81c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
%%
%% Licensed to the Apache Software Foundation (ASF) under one
%% or more contributor license agreements. See the NOTICE file
%% distributed with this work for additional information
%% regarding copyright ownership. The ASF licenses this file
%% to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance
%% with the License. You may obtain a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%

%%% Todo: this might be better off as a gen_server type of transport
%%%       that handles stuff like group commit, similar to TFileTransport
%%%       in cpp land
-module(thrift_disk_log_transport).

-behaviour(thrift_transport).

%% API
-export([new/2, new_transport_factory/2, new_transport_factory/3]).

%% thrift_transport callbacks
-export([read/2, write/2, force_flush/1, flush/1, close/1]).

%% state
-record(dl_transport, {log,
                       close_on_close = false,
                       sync_every = infinity,
                       sync_tref}).


%% Create a transport attached to an already open log.
%% If you'd like this transport to close the disk_log using disk_log:lclose()
%% when the transport is closed, pass a {close_on_close, true} tuple in the
%% Opts list.
new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->
    State = parse_opts(Opts, #dl_transport{log = LogName}),

    State2 =
        case State#dl_transport.sync_every of
            N when is_integer(N), N > 0 ->
                {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),
                State#dl_transport{sync_tref = TRef};
            _ -> State
        end,

    thrift_transport:new(?MODULE, State2).


parse_opts([], State) ->
    State;
parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->
    State#dl_transport{close_on_close = Bool};
parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->
    State#dl_transport{sync_every = Int}.


%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% disk_log_transport is write-only
read(_State, Len) ->
    {error, no_read_from_disk_log}.

write(#dl_transport{log = Log}, Data) ->
    disk_log:balog(Log, erlang:iolist_to_binary(Data)).

force_flush(#dl_transport{log = Log}) ->
    error_logger:info_msg("~p syncing~n", [?MODULE]),
    disk_log:sync(Log).

flush(#dl_transport{log = Log, sync_every = SE}) ->
    case SE of
        undefined -> % no time-based sync
            disk_log:sync(Log);
        _Else ->     % sync will happen automagically
            ok
    end.


%% On close, close the underlying log if we're configured to do so.
close(#dl_transport{close_on_close = false}) ->
    ok;
close(#dl_transport{log = Log}) ->
    disk_log:lclose(Log).


%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

new_transport_factory(Name, ExtraLogOpts) ->
    new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},
                                               {sync_every, 500}]).

new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->
    F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,
    {ok, F}.

factory_impl(Name, ExtraLogOpts, TransportOpts) ->
    LogOpts = [{name, Name},
               {format, external},
               {type, wrap} |
               ExtraLogOpts],
    Log =
        case disk_log:open(LogOpts) of
            {ok, Log} ->
                Log;
            {repaired, Log, Info1, Info2} ->
                error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),
                Log
        end,
    new(Log, TransportOpts).