diff options
Diffstat (limited to 'lib/cosNotification/src/PusherConsumer_impl.erl')
-rw-r--r-- | lib/cosNotification/src/PusherConsumer_impl.erl | 729 |
1 files changed, 729 insertions, 0 deletions
diff --git a/lib/cosNotification/src/PusherConsumer_impl.erl b/lib/cosNotification/src/PusherConsumer_impl.erl new file mode 100644 index 0000000000..195e81ec58 --- /dev/null +++ b/lib/cosNotification/src/PusherConsumer_impl.erl @@ -0,0 +1,729 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1999-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : PusherConsumer_impl.erl +%% Purpose : +%%---------------------------------------------------------------------- + +-module('PusherConsumer_impl'). + +%%--------------- INCLUDES ----------------------------------- +-include_lib("orber/include/corba.hrl"). +-include_lib("orber/include/ifr_types.hrl"). +%% cosEvent files. +-include_lib("cosEvent/include/CosEventChannelAdmin.hrl"). +-include_lib("cosEvent/include/CosEventComm.hrl"). +%% Application files +-include("CosNotification.hrl"). +-include("CosNotifyChannelAdmin.hrl"). +-include("CosNotifyComm.hrl"). +-include("CosNotifyFilter.hrl"). + +-include("CosNotification_Definitions.hrl"). + +%%--------------- EXPORTS ------------------------------------ +%%--------------- External ----------------------------------- +%%----- CosNotifyChannelAdmin::ProxyPushConsumer ------------- +-export([connect_any_push_supplier/4]). + +%%----- CosNotifyChannelAdmin::SequenceProxyPushConsumer ----- +-export([connect_sequence_push_supplier/4]). + +%%----- CosNotifyChannelAdmin::StructuredProxyPushConsumer --- +-export([connect_structured_push_supplier/4]). + +%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer ---- +-export([obtain_subscription_types/4, + validate_event_qos/4]). + +%%----- Inherit from CosNotification::QoSAdmin --------------- +-export([get_qos/3, + set_qos/4, + validate_qos/4]). + +%%----- Inherit from CosNotifyComm::NotifyPublish ------------ +-export([offer_change/5]). + +%%----- Inherit from CosNotifyFilter::FilterAdmin ------------ +-export([add_filter/4, + remove_filter/4, + get_filter/4, + get_all_filters/3, + remove_all_filters/3]). + +%%----- Inherit from CosEventComm::PushConsumer ------------- +-export([push/4, + disconnect_push_consumer/3]). + +%%----- Inherit from CosNotifyComm::SequencePushConsumer ---- +-export([push_structured_events/4, + disconnect_sequence_push_consumer/3]). + +%%----- Inherit from CosNotifyComm::StructuredPushConsumer -- +-export([push_structured_event/4, + disconnect_structured_push_consumer/3]). + +%%----- Inherit from CosEventChannelAdmin::ProxyPushConsumer +-export([connect_push_supplier/4]). + +%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier +-export(['_get_MyType'/3, + '_get_MyAdmin'/3]). + +%%--------------- gen_server specific exports ---------------- +-export([handle_info/2, code_change/3]). +-export([init/1, terminate/2]). + +%%--------------- LOCAL DEFINITIONS -------------------------- +%% Data structures +-record(state, {myType, + myAdmin, + myAdminPid, + myChannel, + myFilters = [], + myOperator, + idCounter = 0, + client, + qosGlobal, + qosLocal, + publishType = false, + etsR, + eventDB}). + +%% Data structures constructors +-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _EDB, _MyOP), + #state{myType = _MyT, + myAdmin = _MyA, + myAdminPid= _MyAP, + myChannel = _Ch, + myOperator= _MyOP, + qosGlobal = _QS, + qosLocal = _LQS, + etsR = ets:new(oe_ets, [set, protected]), + eventDB = _EDB}). + +%%-------------- Data structures selectors ----------------- +%% Attributes +-define(get_MyType(S), S#state.myType). +-define(get_MyAdmin(S), S#state.myAdmin). +-define(get_MyAdminPid(S), S#state.myAdminPid). +-define(get_MyChannel(S), S#state.myChannel). +-define(get_MyOperator(S), S#state.myOperator). +%% Client Object +-define(get_Client(S), S#state.client). +%% QoS +-define(get_GlobalQoS(S), S#state.qosGlobal). +-define(get_LocalQoS(S), S#state.qosLocal). +-define(get_BothQoS(S), {S#state.qosGlobal, S#state.qosLocal}). +%% Filters +-define(get_Filter(S, I), find_obj(lists:keysearch(I, 1, S#state.myFilters))). +-define(get_AllFilter(S), S#state.myFilters). +-define(get_AllFilterID(S), find_ids(S#state.myFilters)). +%% Publish +-define(get_AllPublish(S), lists:flatten(ets:match(S#state.etsR, + {'$1',publish}))). +-define(get_PublishType(S), S#state.publishType). +%% ID +-define(get_IdCounter(S), S#state.idCounter). +%% Event +-define(get_Event(S), cosNotification_eventDB:get_event(S#state.eventDB)). +-define(get_Events(S,M), cosNotification_eventDB:get_events(S#state.eventDB, M)). + +-define(get_EventCounter(S), S#state.eventCounter). +%% Admin +-define(get_BatchLimit(S), ?not_GetMaximumBatchSize((S#state.qosLocal))). + +%%-------------- Data structures modifiers ----------------- +%% Client Object +-define(set_Client(S,D), S#state{client=D}). +-define(del_Client(S), S#state{client=undefined}). +-define(set_Unconnected(S), S#state{client=undefined}). +%% QoS +-define(set_LocalQoS(S,D), S#state{qosLocal=D}). +-define(set_GlobalQoS(S,D), S#state{qosGlobal=D}). +-define(set_BothQoS(S,GD,LD), S#state{qosGlobal=GD, qosLocal=LD}). +%% Filters +-define(add_Filter(S,I,O), S#state{myFilters=[{I,O}|S#state.myFilters]}). +-define(del_Filter(S,I), S#state{myFilters= + delete_obj(lists:keydelete(I, 1, S#state.myFilters), + S#state.myFilters)}). +-define(del_AllFilter(S), S#state{myFilters=[]}). +%% Publish +-define(add_Publish(S,E), ets:insert(S#state.etsR, {E, publish})). +-define(del_Publish(S,E), ets:delete(S#state.etsR, E)). +-define(set_PublishType(S,T), S#state{publishType=T}). +%% ID +-define(set_IdCounter(S,V), S#state{idCounter=V}). +-define(new_Id(S), 'CosNotification_Common':create_id(S#state.idCounter)). +%% Event +-define(add_Event(S,E), cosNotification_eventDB:add_event(S#state.eventDB, E)). +-define(update_EventDB(S,Q), S#state{eventDB= + cosNotification_eventDB:update(S#state.eventDB, Q)}). + +-define(set_EventCounter(S,V), S#state{eventCounter=V}). +-define(add_to_EventCounter(S,V),S#state{eventCounter=S#state.eventCounter+V}). +-define(reset_EventCounter(S), S#state{eventCounter=0}). +-define(increase_EventCounter(S),S#state{eventCounter=(S#state.eventCounter+1)}). +-define(decrease_EventCounter(S),S#state{eventCounter=S#state.eventCounter-1}). +-define(add_ToEventCounter(S,A), S#state{eventCounter=(S#state.eventCounter+A)}). +-define(sub_FromEventCounter(S,_A), S#state{eventCounter=(S#state.eventCounter-_A)}). +-define(set_EventCounterTo(S,V), S#state{eventCounter=V}). + +%% MISC +-define(is_ANY(S), S#state.myType == 'PUSH_ANY'). +-define(is_STRUCTURED(S), S#state.myType == 'PUSH_STRUCTURED'). +-define(is_SEQUENCE(S), S#state.myType == 'PUSH_SEQUENCE'). +-define(is_ANDOP(S), S#state.myOperator == 'AND_OP'). +-define(is_UnConnected(S), S#state.client == undefined). +-define(is_Connected(S), S#state.client =/= undefined). +-define(is_PersistentConnection(S), + ?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent). +-define(is_PersistentEvent(S), + ?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent). +-define(is_BatchLimitReached(S), S#state.eventCounter >= + ?not_GetMaximumBatchSize((S#state.qosLocal))). + + +%%----------------------------------------------------------% +%% function : handle_info, code_change +%% Arguments: +%% Returns : +%% Effect : Functions demanded by the gen_server module. +%%----------------------------------------------------------- + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(Info, State) -> + ?DBG("INFO: ~p~n", [Info]), + case Info of + {'EXIT', Pid, Reason} when ?get_MyAdminPid(State)==Pid-> + ?DBG("PARENT ADMIN: ~p TERMINATED.~n",[Reason]), + {stop, Reason, State}; + {'EXIT', _Pid, _Reason} -> + ?DBG("PROXYPUSHCONSUMER: ~p TERMINATED.~n",[_Reason]), + {noreply, State}; + _ -> + {noreply, State} + end. + +%%----------------------------------------------------------% +%% function : init, terminate +%% Arguments: +%%----------------------------------------------------------- + +init(['PUSH_SEQUENCE', MyAdmin, MyAdminPid, InitQoS, LQS, + MyChannel, Options, Operator]) -> + process_flag(trap_exit, true), + %% Only if MyType is 'PUSH_SEQUENCE' we need an ets to store events in. + %% Otherwise we'll forward them at once. Why? We don't know when the next event + %% is due. + GCTime = 'CosNotification_Common':get_option(gcTime, Options, + ?not_DEFAULT_SETTINGS), + GCLimit = 'CosNotification_Common':get_option(gcLimit, Options, + ?not_DEFAULT_SETTINGS), + TimeRef = 'CosNotification_Common':get_option(timeService, Options, + ?not_DEFAULT_SETTINGS), + {ok, ?get_InitState('PUSH_SEQUENCE', MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, + cosNotification_eventDB:create_db(LQS, GCTime, GCLimit, TimeRef), + Operator)}; +init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, _Options, Operator]) -> + process_flag(trap_exit, true), + {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, + undefined, Operator)}. + +terminate(_Reason, State) when ?is_UnConnected(State) -> + ok; +terminate(_Reason, State) -> + Client = ?get_Client(State), + case catch corba_object:is_nil(Client) of + false when ?is_ANY(State) -> + 'CosNotification_Common':disconnect('CosEventComm_PushSupplier', + disconnect_push_supplier, + Client); + false when ?is_SEQUENCE(State) -> + 'CosNotification_Common':disconnect('CosNotifyComm_SequencePushSupplier', + disconnect_sequence_push_supplier, + Client); + false when ?is_STRUCTURED(State) -> + 'CosNotification_Common':disconnect('CosNotifyComm_StructuredPushSupplier', + disconnect_structured_push_supplier, + Client); + _ -> + ok + end. + +%%----------------------------------------------------------- +%%----- CosNotifyChannelAdmin_ProxyConsumer attributes ------ +%%----------------------------------------------------------- +%%----------------------------------------------------------% +%% Attribute: '_get_MyType' +%% Type : readonly +%% Returns : +%%----------------------------------------------------------- +'_get_MyType'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_MyType(State), State}. + +%%----------------------------------------------------------% +%% Attribute: '_get_MyAdmin' +%% Type : readonly +%% Returns : +%%----------------------------------------------------------- +'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_MyAdmin(State), State}. + +%%----------------------------------------------------------- +%%------- Exported external functions ----------------------- +%%----------------------------------------------------------- +%%----- CosEventChannelAdmin::ProxyPushConsumer ------------- +%%----------------------------------------------------------% +%% function : connect_push_supplier +%% Arguments: Client - CosEventComm::PushSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%% Both exceptions from CosEventChannelAdmin!!! +%%----------------------------------------------------------- +connect_push_supplier(OE_THIS, OE_FROM, State, Client) -> + connect_any_push_supplier(OE_THIS, OE_FROM, State, Client). + +%%----- CosNotifyChannelAdmin::ProxyPushConsumer ------------ +%%----------------------------------------------------------% +%% function : connect_any_push_supplier +%% Arguments: Client - CosEventComm::PushSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%% Both exceptions from CosEventChannelAdmin!!! +%%----------------------------------------------------------- +connect_any_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) -> + ?not_TypeCheck(Client, 'CosEventComm_PushSupplier'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + {reply, ok, ?set_Client(State, Client)} + end; +connect_any_push_supplier(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::SequenceProxyPushConsumer ---- +%%----------------------------------------------------------% +%% function : connect_sequence_push_supplier +%% Arguments: Client - CosNotifyComm::SequencePushSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%%----------------------------------------------------------- +connect_sequence_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) -> + ?not_TypeCheck(Client, 'CosNotifyComm_SequencePushSupplier'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + {reply, ok, ?set_Client(State, Client)} + end; +connect_sequence_push_supplier(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::StructuredProxyPushConsumer -- +%%----------------------------------------------------------% +%% function : connect_structured_push_supplier +%% Arguments: Client - CosNotifyComm::StructuredPushSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%%----------------------------------------------------------- +connect_structured_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) -> + ?not_TypeCheck(Client, 'CosNotifyComm_StructuredPushSupplier'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + {reply, ok, ?set_Client(State, Client)} + end; +connect_structured_push_supplier(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer --- +%%----------------------------------------------------------% +%% function : obtain_subscription_types +%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin) +%% Returns : CosNotification::EventTypeSeq +%%----------------------------------------------------------- +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') -> + {reply, ?get_AllPublish(State), ?set_PublishType(State, false)}; +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') -> + {reply, ?get_AllPublish(State), ?set_PublishType(State, true)}; +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') -> + {reply, [], ?set_PublishType(State, false)}; +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') -> + {reply, [], ?set_PublishType(State, true)}; +obtain_subscription_types(_,_,_,What) -> + orber:dbg("[~p] PusherConsumer:obtain_subscription_types(~p);~n" + "Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : validate_event_qos +%% Arguments: RequiredQoS - CosNotification::QoSProperties +%% Returns : ok | {'EXCEPTION', #'UnsupportedQoS'{}} +%% AvilableQoS - CosNotification::NamedPropertyRangeSeq (out) +%%----------------------------------------------------------- +validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) -> + AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS, + ?get_LocalQoS(State)), + {reply, {ok, AvilableQoS}, State}. + +%%----- Inherit from CosNotification::QoSAdmin -------------- +%%----------------------------------------------------------% +%% function : get_qos +%% Arguments: +%% Returns : +%%----------------------------------------------------------- +get_qos(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_GlobalQoS(State), State}. + +%%----------------------------------------------------------% +%% function : set_qos +%% Arguments: QoS - CosNotification::QoSProperties, i.e., +%% [#'Property'{name, value}, ...] where name eq. string() +%% and value eq. any(). +%% Returns : ok | {'EXCEPTION', CosNotification::UnsupportedQoS} +%%----------------------------------------------------------- +set_qos(_OE_THIS, _OE_FROM, State, QoS) -> + {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), + proxy, ?get_MyAdmin(State), + false), + NewState = ?update_EventDB(State, LQS), + {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}. + +%%----------------------------------------------------------% +%% function : validate_qos +%% Arguments: Required_qos - CosNotification::QoSProperties +%% [#'Property'{name, value}, ...] where name eq. string() +%% and value eq. any(). +%% Returns : {'EXCEPTION', CosNotification::UnsupportedQoS} +%% {ok, CosNotification::NamedPropertyRangeSeq} +%%----------------------------------------------------------- +validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) -> + QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), + proxy, ?get_MyAdmin(State), + false), + {reply, {ok, QoS}, State}. + +%%----- Inherit from CosNotifyComm::NotifyPublish ----------- +%%----------------------------------------------------------% +%% function : offer_change +%% Arguments: Added - #'CosNotification_EventType'{} +%% Removed - #'CosNotification_EventType'{} +%% Returns : ok | +%% {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}} +%%----------------------------------------------------------- +offer_change(_OE_THIS, _OE_FROM, State, Added, Removed) -> + cosNotification_Filter:validate_types(Added), + cosNotification_Filter:validate_types(Removed), + %% On this "side" we don't really care about which + %% type of events the client will supply. + %% Perhaps, later on, if we want to check this against Filters + %% associated with this object we may change this approach, i.e., if + %% the filter will not allow passing certain event types. But the + %% user should see to that that situation never occurs. It would add + %% extra overhead. Also see PusherSupplier- and PullerSuppler- + %% 'subscription_change'. + update_publish(add, State, Added), + update_publish(remove, State, Removed), + case ?get_PublishType(State) of + true -> + %% Perhaps we should handle exception here?! + %% Probably not. Better to stay "on-line". + catch 'CosNotifyComm_NotifySubscribe': + subscription_change(?get_Client(State), Added, Removed), + ok; + _-> + ok + end, + {reply, ok, State}. + +update_publish(_, _, [])-> + ok; +update_publish(add, State, [H|T]) -> + ?add_Publish(State, H), + update_publish(add, State, T); +update_publish(remove, State, [H|T]) -> + ?del_Publish(State, H), + update_publish(remove, State, T). + +%%----- Inherit from CosNotifyFilter::FilterAdmin ----------- +%%----------------------------------------------------------% +%% function : add_filter +%% Arguments: Filter - CosNotifyFilter::Filter +%% Returns : FilterID - long +%%----------------------------------------------------------- +add_filter(_OE_THIS, _OE_FROM, State, Filter) -> + 'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), + FilterID = ?new_Id(State), + NewState = ?set_IdCounter(State, FilterID), + {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}. + +%%----------------------------------------------------------% +%% function : remove_filter +%% Arguments: FilterID - long +%% Returns : ok +%%----------------------------------------------------------- +remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> + {reply, ok, ?del_Filter(State, FilterID)}; +remove_filter(_,_,_,What) -> + orber:dbg("[~p] PusherConsumer:remove_filter(~p); Not an integer", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : get_filter +%% Arguments: FilterID - long +%% Returns : Filter - CosNotifyFilter::Filter | +%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}} +%%----------------------------------------------------------- +get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> + {reply, ?get_Filter(State, FilterID), State}; +get_filter(_,_,_,What) -> + orber:dbg("[~p] PusherConsumer:get_filter(~p); Not an integer", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : get_all_filters +%% Arguments: - +%% Returns : Filter - CosNotifyFilter::FilterIDSeq +%%----------------------------------------------------------- +get_all_filters(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_AllFilterID(State), State}. + +%%----------------------------------------------------------% +%% function : remove_all_filters +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +remove_all_filters(_OE_THIS, _OE_FROM, State) -> + {reply, ok, ?del_AllFilter(State)}. + +%%----- Inherit from CosEventComm::PushConsumer ------------- +%%----------------------------------------------------------% +%% function : disconnect_push_consumer +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_push_consumer(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----------------------------------------------------------% +%% function : push +%% Arguments: AnyEvent +%% Returns : ok | +%%----------------------------------------------------------- +push(OE_THIS, OE_FROM, State, Event) when ?is_ANY(State) -> + corba:reply(OE_FROM, ok), + case {?not_isConvertedStructured(Event), + cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of + {_, {[],_}} when ?is_ANDOP(State) -> + {noreply, State}; + {true, {[],[_]}} -> + %% Is OR and converted, change back and forward to Admin + forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], + 'MATCH', OE_THIS); + {_, {[],[_]}} -> + %% Is OR and not converted, forward to Admin + forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS); + {true, {[_],_}} when ?is_ANDOP(State) -> + %% Is AND and converted, change back and forward to Admin + forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], + 'MATCH', OE_THIS); + {true, {[_],_}} -> + %% Is OR and converted, change back and forward to Channel + forward(seq, ?get_MyChannel(State), State, [any:get_value(Event)], + 'MATCHED', OE_THIS); + {_, {[_],_}} when ?is_ANDOP(State) -> + %% Is AND and not converted, forward to Admin + forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS); + _ -> + %% Is OR and not converted, forward to Channel + forward(any, ?get_MyChannel(State), State, Event, 'MATCHED', OE_THIS) + end; +push(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- Inherit from CosNotifyComm::SequencePushConsumer ---- +%%----------------------------------------------------------% +%% function : disconnect_sequence_push_consumer +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_sequence_push_consumer(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----------------------------------------------------------% +%% function : push_structured_events +%% Arguments: CosNotification::EventBatch +%% Returns : ok | +%%----------------------------------------------------------- +push_structured_events(OE_THIS, OE_FROM, State, Events) when ?is_SEQUENCE(State) -> + corba:reply(OE_FROM, ok), + %% We cannot convert parts of the sequence to any, event though they + %% are converted from any to structured. Would be 'impossible' to send. + case cosNotification_eventDB:filter_events(Events, ?get_AllFilter(State)) of + {[],_} when ?is_ANDOP(State) -> + {noreply, State}; + {[],Failed} -> + forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS); + {Passed, _} when ?is_ANDOP(State) -> + forward(seq, ?get_MyAdmin(State), State, Passed, 'MATCH', OE_THIS); + {Passed, []} -> + forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS); + {Passed, Failed} -> + %% Is OR, send Passed to channel and Failed to Admin. + forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS), + forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS) + end; +push_structured_events(_,_,_,_) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- Inherit from CosNotifyComm::StructuredPushConsumer -- +%%----------------------------------------------------------% +%% function : disconnect_structured_push_consumer +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_structured_push_consumer(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----------------------------------------------------------% +%% function : push_structured_event +%% Arguments: CosNotification::StructuredEvent +%% Returns : ok | +%%----------------------------------------------------------- +push_structured_event(OE_THIS, OE_FROM, State, Event) when ?is_STRUCTURED(State) -> + corba:reply(OE_FROM, ok), + case {?not_isConvertedAny(Event), + cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of + {_, {[],_}} when ?is_ANDOP(State) -> + {noreply, State}; + {true, {[],[_]}} -> + %% Is OR and converted, change back and forward to Admin + forward(any, ?get_MyAdmin(State), State, + Event#'CosNotification_StructuredEvent'.remainder_of_body, + 'MATCH', OE_THIS); + {_, {[],[_]}} -> + %% Is OR and not converted, forward to Admin + forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS); + {true, {[_],_}} when ?is_ANDOP(State) -> + %% Is AND and converted, change back and forward to Admin + forward(any, ?get_MyAdmin(State), State, + Event#'CosNotification_StructuredEvent'.remainder_of_body, + 'MATCH', OE_THIS); + {true, {[_],_}} -> + %% Is OR and converted, change back and forward to Channel + forward(any, ?get_MyChannel(State), State, + Event#'CosNotification_StructuredEvent'.remainder_of_body, + 'MATCHED', OE_THIS); + {_, {[_],_}} when ?is_ANDOP(State) -> + %% Is AND and not converted, forward to Admin + forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS); + _ -> + %% Is OR and not converted, forward to Channel + forward(seq, ?get_MyChannel(State), State, [Event], 'MATCHED', OE_THIS) + end; +push_structured_event(_,_,_,_) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%--------------- LOCAL FUNCTIONS ---------------------------- +find_obj({value, {_, Obj}}) -> Obj; +find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}. + +find_ids(List) -> find_ids(List, []). +find_ids([], Acc) -> Acc; +find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]); +find_ids(What, _) -> + orber:dbg("[~p] PusherConsumer:find_ids();~n" + "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}). + +%% Delete a single object. +%% The list don not differ, i.e., no filter removed, raise exception. +delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{}); +delete_obj(List,_) -> List. + +%% Forward events +forward(any, SendTo, State, Event, Status, OE_THIS) -> + case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of + ok -> + ?DBG("PROXY FORWARD ANY: ~p~n",[Event]), + {noreply, State}; + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + orber:dbg("[~p] PusherConsumer:forward();~n" + "Admin/Channel no longer exists; terminating and dropping: ~p", + [?LINE, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, OE_THIS}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + R when ?is_PersistentConnection(State) -> + orber:dbg("[~p] PusherConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), + {noreply, State}; + R -> + orber:dbg("[~p] PusherConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Terminating and dropping: ~p", + [?LINE, R, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, OE_THIS}, + {client, ?get_Client(State)}, + {reason, R}]), + {stop, normal, State} + end; +forward(seq, SendTo, State, Event, Status, OE_THIS) -> + case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of + ok -> + {noreply, State}; + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + ?DBG("ADMIN NO LONGER EXIST; DROPPING: ~p~n", [Event]), + 'CosNotification_Common':notify([{proxy, OE_THIS}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + R when ?is_PersistentConnection(State) -> + orber:dbg("[~p] PusherConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), + {noreply, State}; + R -> + orber:dbg("[~p] PusherConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Terminating and dropping: ~p", + [?LINE, R, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, OE_THIS}, + {client, ?get_Client(State)}, + {reason, R}]), + {stop, normal, State} + end. + +%%--------------- MISC FUNCTIONS, E.G. DEBUGGING ------------- +%%--------------- END OF MODULE ------------------------------ |