22 #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP) 23 #define RXCPP_OPERATORS_RX_TIMEOUT_HPP 25 #include "../rx-includes.hpp" 33 std::runtime_error(msg)
42 struct timeout_invalid_arguments {};
46 using type =
observable<timeout_invalid_arguments<
AN...>, timeout_invalid<
AN...>>;
49 using timeout_invalid_t =
typename timeout_invalid<
AN...>::type;
51 template<
class T,
class Duration,
class Coordination>
56 typedef typename coordination_type::coordinator_type coordinator_type;
61 timeout_values(duration_type p, coordination_type c)
68 coordination_type coordination;
70 timeout_values initial;
72 timeout(duration_type period, coordination_type coordination)
73 : initial(period, coordination)
77 template<
class Subscriber>
78 struct timeout_observer
80 typedef timeout_observer<Subscriber> this_type;
85 struct timeout_subscriber_values :
public timeout_values
101 mutable std::size_t index;
103 typedef std::shared_ptr<timeout_subscriber_values> state_type;
107 : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
109 auto localState = state;
112 localState->cs.unsubscribe();
113 localState->dest.unsubscribe();
114 localState->worker.unsubscribe();
117 [&](){
return localState->coordinator.act(disposer); },
119 if (selectedDisposer.empty()) {
123 localState->dest.add([=](){
124 localState->worker.schedule(selectedDisposer.get());
126 localState->cs.add([=](){
127 localState->worker.schedule(selectedDisposer.get());
131 auto new_id = ++localState->index;
132 auto produce_time = localState->worker.now() + localState->period;
134 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
137 [&](){
return localState->coordinator.act(work);},
139 if (selectedWork.empty()) {
142 localState->worker.schedule(selectedWork.get());
145 static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t
id, state_type state) {
147 if(
id != state->index)
154 [&](){
return state->coordinator.act(produce); },
156 if (selectedProduce.empty()) {
157 return std::function<void(const rxsc::schedulable&)>();
160 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
163 void on_next(T v)
const {
164 auto localState = state;
166 auto new_id = ++localState->index;
167 auto produce_time = localState->worker.now() + localState->period;
169 localState->dest.on_next(v);
170 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
173 [&](){
return localState->coordinator.act(work);},
175 if (selectedWork.empty()) {
178 localState->worker.schedule(selectedWork.get());
181 void on_error(std::exception_ptr e)
const {
182 auto localState = state;
184 localState->dest.on_error(e);
187 [&](){
return localState->coordinator.act(work); },
189 if (selectedWork.empty()) {
192 localState->worker.schedule(selectedWork.get());
195 void on_completed()
const {
196 auto localState = state;
198 localState->dest.on_completed();
201 [&](){
return localState->coordinator.act(work); },
203 if (selectedWork.empty()) {
206 localState->worker.schedule(selectedWork.get());
211 auto coordinator = v.coordination.create_coordinator();
213 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(
coordinator))));
217 template<
class Subscriber>
218 auto operator()(Subscriber dest)
const 219 -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
220 return timeout_observer<Subscriber>::make(std::move(dest), initial);
228 template<
class...
AN>
231 return operator_factory<timeout_tag,
AN...>(std::make_tuple(std::forward<AN>(an)...));
239 template<
class Observable,
class Duration,
244 class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>,
identity_one_worker>>
245 static auto member(Observable&& o, Duration&& d)
250 template<
class Observable,
class Coordination,
class Duration,
252 is_observable<Observable>,
254 rxu::is_duration<Duration>>,
257 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
258 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
259 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
262 template<
class Observable,
class Coordination,
class Duration,
264 is_observable<Observable>,
265 is_coordination<Coordination>,
266 rxu::is_duration<Duration>>,
269 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
270 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
271 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
274 template<
class...
AN>
275 static operators::detail::timeout_invalid_t<
AN...>
member(
const AN&...) {
278 static_assert(
sizeof...(
AN) == 10000,
"timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
Definition: rx-util.hpp:791
rxsc::worker get_worker() const
Definition: rx-coordination.hpp:85
timeout_error(const std::string &msg)
Definition: rx-timeout.hpp:32
Definition: rx-all.hpp:26
typename std::decay< T >::type::value_type value_type_t
Definition: rx-util.hpp:35
controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
Definition: rx-subscription.hpp:364
Definition: rx-operators.hpp:69
auto AN
Definition: rx-finally.hpp:105
typename std::decay< T >::type decay_t
Definition: rx-util.hpp:36
Definition: rx-operators.hpp:47
static auto member(Observable &&o, Coordination &&cn, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:257
typename std::enable_if< all_true_type< BN... >::value >::type enable_if_all_true_type_t
Definition: rx-util.hpp:114
a source of values. subscribe or use one of the operator methods that return a new observable...
Definition: rx-observable.hpp:510
Definition: rx-timeout.hpp:29
auto timeout(AN &&... an) -> operator_factory< timeout_tag, AN... >
Return an observable that terminates with timeout_error if a particular timespan has passed without e...
Definition: rx-timeout.hpp:229
static auto member(Observable &&o, Duration &&d, Coordination &&cn) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), std::forward< Coordination >(cn))))
Definition: rx-timeout.hpp:269
Definition: rx-operators.hpp:465
consumes values from an observable using State that may implement on_next, on_error and on_completed ...
Definition: rx-observer.hpp:179
Definition: rx-operators.hpp:16
auto on_exception(const F &f, const OnError &c) -> typename std::enable_if< detail::is_on_error< OnError >::value, typename detail::maybe_from_result< F >::type >::type
Definition: rx-observer.hpp:640
static operators::detail::timeout_invalid_t< AN... > member(const AN &...)
Definition: rx-timeout.hpp:275
Definition: rx-coordination.hpp:114
binds an observer that consumes values with a composite_subscription that controls lifetime...
Definition: rx-subscriber.hpp:25
Definition: rx-scheduler.hpp:426
identity_one_worker identity_current_thread()
Definition: rx-coordination.hpp:175
Definition: rx-predef.hpp:177
Definition: rx-coordination.hpp:37
Definition: rx-scheduler.hpp:200
static auto member(Observable &&o, Duration &&d) -> decltype(o.template lift< SourceValue >(Timeout(std::forward< Duration >(d), identity_current_thread())))
Definition: rx-timeout.hpp:245
Definition: rx-coordination.hpp:45