diff options
author | Stevan Andjelkovic <stevana@users.noreply.github.com> | 2016-10-05 16:19:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-05 16:19:42 +0200 |
commit | fee9a85b4385507f33bd4bae381c64fbd70b57af (patch) | |
tree | 9b55304b45fbb6e1c157c77ad651c2652c092b93 | |
parent | 4e71e3560f00ae9fe02a249c727d9e8677891e54 (diff) | |
parent | b27d04354805f54f5772802bd89f3e79f43e5641 (diff) | |
download | rvi_sota_client-fee9a85b4385507f33bd4bae381c64fbd70b57af.tar.gz |
Merge pull request #130 from advancedtelematic/bugfix/pro-1587/block-polling
Block polling until all interpreters have finished
-rw-r--r-- | Cargo.lock | 74 | ||||
-rw-r--r-- | src/interpreter.rs | 66 | ||||
-rw-r--r-- | src/main.rs | 29 |
3 files changed, 130 insertions, 39 deletions
@@ -593,3 +593,77 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[metadata] +"checksum aho-corasick 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2b3fb52b09c1710b961acb35390d514be82e4ac96a9969a8e38565a29b878dc9" +"checksum bit-set 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e6e1e6fb1c9e3d6fcdec57216a74eaa03e41f52a22f13a16438251d8e88b89da" +"checksum bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5b97c2c8e8bbb4251754f559df8af22fb264853c7d009084a576cdf12565089d" +"checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3" +"checksum bitflags 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4f67931368edf3a9a51d29886d245f1c3db2f1ef0dcc9e35ff70341b78c10d23" +"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" +"checksum bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c129aff112dcc562970abb69e2508b40850dd24c274761bb50fb8a0067ba6c27" +"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" +"checksum chan 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "82b22acfef7960fd8f829bc50749273be637cbd76b9d4cc20497666cc3a33329" +"checksum chan-signal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "afbba6202dc1d10ff08c3b04e00e4d2d6cf5effee56cd9fee92928be6692379a" +"checksum cookie 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0e3d6405328b6edb412158b3b7710e2634e23f3614b9bb1c412df7952489a626" +"checksum crossbeam 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "fb974f835e90390c5f9dfac00f05b06dc117299f5ea4e85fbc7bb443af4911cc" +"checksum dbus 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "aefec6d9031bc53358eb822549ca946f50c8618a85bfe8afa52816c3a978eecf" +"checksum env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "aba65b63ffcc17ffacd6cf5aa843da7c5a25e3bd4bbe0b7def8b214e411250e5" +"checksum gcc 0.3.28 (registry+https://github.com/rust-lang/crates.io-index)" = "3da3a2cbaeb01363c8e3704fd9fd0eb2ceb17c6f27abd4c1ef040fb57d20dc79" +"checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518" +"checksum getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9047cfbd08a437050b363d35ef160452c5fe8ea5187ae0a624708c91581d685" +"checksum httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "46534074dbb80b070d60a5cb8ecadd8963a00a438ae1a95268850a7ef73b67ae" +"checksum hyper 0.9.4 (git+https://github.com/hyperium/hyper)" = "<none>" +"checksum idna 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1053236e00ce4f668aeca4a769a09b3bf5a682d802abd6f3cb39374f6b162c11" +"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" +"checksum lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "cf186d1a8aa5f5bee5fd662bc9c1b949e0259e1bcc379d1f006847b0080c7417" +"checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" +"checksum libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "97def9dc7ce1d8e153e693e3a33020bc69972181adb2f871e87e888876feae49" +"checksum libressl-pnacl-sys 2.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "cbc058951ab6a3ef35ca16462d7642c4867e6403520811f28537a4e2f2db3e71" +"checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" +"checksum matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "15305656809ce5a4805b1ff2946892810992197ce1270ff79baded852187942e" +"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" +"checksum mime 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf93a79c700c9df8227ec6a4f0f27a8948373c079312bac24549d944cef85f64" +"checksum mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a637d1ca14eacae06296a008fa7ad955347e34efcb5891cfd8ba05491a37907e" +"checksum miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4e93d633d34b8ff65a24566d67d49703e7a5c7ac2844d6139a9fc441a799e89a" +"checksum net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)" = "6a816012ca11cb47009693c1e0c6130e26d39e4d97ee2a13c50e868ec83e3204" +"checksum nix 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79" +"checksum nom 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d1b06a35295796400a1db7382054f93713bf3924e7c268af94c5357b9fbf4cb6" +"checksum openssl 0.7.13 (registry+https://github.com/rust-lang/crates.io-index)" = "81ff0208f23e726e747375d34e40c93d037a5b504de7305117dfe5ad72516d2d" +"checksum openssl-sys 0.7.13 (registry+https://github.com/rust-lang/crates.io-index)" = "618753feb53784e3ccb131811ed0b02f80640da89fb33b165d69146564b02085" +"checksum openssl-sys-extras 0.7.13 (registry+https://github.com/rust-lang/crates.io-index)" = "01838027da8e31ab4d3530fc5d6752bfd92dcc8e0ae070633e69f2b020bd0f36" +"checksum openssl-verify 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3ed86cce894f6b0ed4572e21eb34026f1dc8869cb9ee3869029131bc8c3feb2d" +"checksum pkg-config 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8cee804ecc7eaf201a4a207241472cc870e825206f6c031e3ee2a72fa425f2fa" +"checksum pnacl-build-helper 1.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "61c9231d31aea845007443d62fcbb58bb6949ab9c18081ee1e09920e0cf1118b" +"checksum quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ac990ab4e038dd8481a5e3fd00641067fcfc674ad663f3222752ed5284e05d4" +"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" +"checksum regex 0.1.71 (registry+https://github.com/rust-lang/crates.io-index)" = "e58a1b7d2bfecc0746e8587c30a53d01ea7bc0e98fac54e5aaa375b94338a0cc" +"checksum regex-syntax 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "baa04823ba7be7ed0bed3d0704c7b923019d9c4e4931c5af2804c7c7a0e3d00b" +"checksum rotor 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "07a6d6ac669b5c7623d7270f657e7fe60bd1d07f37d99fd5b9ea38c273834c14" +"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" +"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b" +"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" +"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" +"checksum sha1 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a307a40d5834140e4213a6952483b84e9ad53bdcab918b7335a6e305e505a53c" +"checksum slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e" +"checksum spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93bdab61c1a413e591c4d17388ffa859eaff2df27f1e13a5ec8b716700605adf" +"checksum tempdir 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0b62933a3f96cd559700662c34f8bab881d9e3540289fb4f368419c7f13a5aa9" +"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" +"checksum thread_local 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "55dd963dbaeadc08aa7266bf7f91c3154a7805e32bb94b820b769d2ef3b4744d" +"checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af" +"checksum toml 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)" = "0590d72182e50e879c4da3b11c6488dae18fccb1ae0c7a3eda18e16795844796" +"checksum traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "07eaeb7689bb7fca7ce15628319635758eda769fed481ecfe6686ddef2600616" +"checksum typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1410f6f91f21d1612654e7cc69193b0334f909dcf2c790c4826254fbb86f8887" +"checksum unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "13a5906ca2b98c799f4b1ab4557b76367ebd6ae5ef14930ec841c74aed5f3764" +"checksum unicode-bidi 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c1f7ceb96afdfeedee42bade65a0d585a6a0106f681b6749c8ff4daa8df30b3f" +"checksum unicode-normalization 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "26643a2f83bac55f1976fb716c10234485f9202dcd65cfbdf9da49867b271172" +"checksum unix_socket 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6aa2700417c405c38f5e6902d699345241c28c0b7ade4abaad71e35a87eb1564" +"checksum url 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8ab4ca6f0107350f41a59a51cb0e71a04d905bc6a29181d2cb42fa4f040c65c9" +"checksum user32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4ef4711d107b21b410a3a974b1204d9accc8b10dad75d8324b5d755de1617d47" +"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f" +"checksum vecio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0795a11576d29ae80525a3fda315bf7b534f8feb9d34101e5fe63fb95bb2fd24" +"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +"checksum winapi 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "3969e500d618a5e974917ddefd0ba152e4bcaae5eb5d9b8c1fbc008e9e28c24e" +"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" +"checksum ws 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "50f888214c823b739f072b6d781df41824bd5e162a53be27d0079449d12ab0c9" +"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" diff --git a/src/interpreter.rs b/src/interpreter.rs index adac43a..4793b32 100644 --- a/src/interpreter.rs +++ b/src/interpreter.rs @@ -1,7 +1,9 @@ use chan; -use chan::{Sender, Receiver}; -use std; +use chan::{Sender, Receiver, WaitGroup}; +use std::{process, thread}; use std::borrow::Cow; +use std::time::Duration; +use time; use datatype::{AccessToken, Auth, ClientCredentials, Command, Config, Error, Event, Package, UpdateReport, UpdateRequestStatus as Status, UpdateResultCode}; @@ -18,9 +20,20 @@ use sota::Sota; pub trait Interpreter<I, O> { fn interpret(&mut self, input: I, otx: &Sender<O>); - fn run(&mut self, irx: Receiver<I>, otx: Sender<O>) { + fn run(&mut self, irx: Receiver<I>, otx: Sender<O>, wg: WaitGroup) { + let cooldown = Duration::from_millis(100); + loop { - self.interpret(irx.recv().expect("interpreter sender closed"), &otx); + let input = irx.recv().expect("interpreter sender closed"); + let started = time::precise_time_ns(); + + wg.add(1); + debug!("interpreter starting: {}", started); + self.interpret(input, &otx); + + thread::sleep(cooldown); // let any further work commence + debug!("interpreter stopping: {}", started); + wg.done(); } } } @@ -29,16 +42,17 @@ pub trait Interpreter<I, O> { /// The `EventInterpreter` listens for `Event`s and optionally responds with /// `Command`s that may be sent to the `CommandInterpreter`. pub struct EventInterpreter { - pub package_manager: PackageManager + pub pacman: PackageManager } impl Interpreter<Event, Command> for EventInterpreter { fn interpret(&mut self, event: Event, ctx: &Sender<Command>) { - info!("Event received: {}", event); + info!("EventInterpreter received: {}", event); + match event { Event::Authenticated => { - if self.package_manager != PackageManager::Off { - self.package_manager.installed_packages().map(|packages| { + if self.pacman != PackageManager::Off { + self.pacman.installed_packages().map(|packages| { ctx.send(Command::SendInstalledPackages(packages)); }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); } @@ -52,17 +66,16 @@ impl Interpreter<Event, Command> for EventInterpreter { Event::UpdatesReceived(requests) => { for request in requests { - match (request.status, &self.package_manager) { - (Status::Pending, _) => ctx.send(Command::StartDownload(request.requestId.clone())), - - (Status::InFlight, &PackageManager::Off) => (), - - (Status::InFlight, _) => { - if self.package_manager.is_installed(&request.packageId) { - ctx.send(Command::SendUpdateReport(UpdateReport::single( - request.requestId.clone(), UpdateResultCode::OK, "".to_string()))); + let id = request.requestId.clone(); + match request.status { + Status::Pending => ctx.send(Command::StartDownload(id)), + + Status::InFlight if self.pacman != PackageManager::Off => { + if self.pacman.is_installed(&request.packageId) { + let report = UpdateReport::single(id, UpdateResultCode::OK, "".to_string()); + ctx.send(Command::SendUpdateReport(report)); } else { - ctx.send(Command::StartDownload(request.requestId.clone())); + ctx.send(Command::StartDownload(id)); } } @@ -72,7 +85,7 @@ impl Interpreter<Event, Command> for EventInterpreter { } Event::DownloadComplete(dl) => { - if self.package_manager != PackageManager::Off { + if self.pacman != PackageManager::Off { ctx.send(Command::StartInstall(dl.update_id.clone())); } } @@ -87,8 +100,8 @@ impl Interpreter<Event, Command> for EventInterpreter { } Event::UpdateReportSent => { - if self.package_manager != PackageManager::Off { - self.package_manager.installed_packages().map(|packages| { + if self.pacman != PackageManager::Off { + self.pacman.installed_packages().map(|packages| { ctx.send(Command::SendInstalledPackages(packages)); }).unwrap_or_else(|err| error!("couldn't send a list of packages: {}", err)); } @@ -106,7 +119,7 @@ pub struct CommandInterpreter; impl Interpreter<Command, Interpret> for CommandInterpreter { fn interpret(&mut self, cmd: Command, itx: &Sender<Interpret>) { - info!("Command received: {}", cmd); + info!("CommandInterpreter received: {}", cmd); itx.send(Interpret { command: cmd, response_tx: None }); } } @@ -124,7 +137,7 @@ pub struct GlobalInterpreter<'t> { impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { fn interpret(&mut self, interpret: Interpret, etx: &Sender<Event>) { - info!("Interpreter started: {}", interpret.command); + info!("GlobalInterpreter received: {}", interpret.command); let (multi_tx, multi_rx) = chan::async::<Event>(); let outcome = match (self.token.as_ref(), self.config.auth.is_none()) { @@ -156,7 +169,6 @@ impl<'t> Interpreter<Interpret, Event> for GlobalInterpreter<'t> { } } - info!("Interpreter finished."); let ev = response_ev.expect("no response event to send back"); interpret.response_tx.map(|tx| tx.lock().unwrap().send(ev)); } @@ -238,7 +250,7 @@ impl<'t> GlobalInterpreter<'t> { .map_err(|report| etx.send(Event::InstallFailed(report))); } - Command::Shutdown => std::process::exit(0), + Command::Shutdown => process::exit(0), } Ok(()) @@ -259,7 +271,7 @@ impl<'t> GlobalInterpreter<'t> { etx.send(Event::Authenticated); } - Command::Shutdown => std::process::exit(0), + Command::Shutdown => process::exit(0), _ => etx.send(Event::NotAuthenticated) } @@ -278,7 +290,7 @@ impl<'t> GlobalInterpreter<'t> { #[cfg(test)] mod tests { use chan; - use chan::{Sender, Receiver}; + use chan::{Sender, Receiver, WaitGroup}; use std::thread; use super::*; diff --git a/src/main.rs b/src/main.rs index 031734c..2caa411 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,12 +9,12 @@ extern crate rustc_serialize; #[macro_use] extern crate sota; extern crate time; -use chan::{Sender, Receiver}; +use chan::{Sender, Receiver, WaitGroup}; use chan_signal::Signal; use env_logger::LogBuilder; use getopts::Options; use log::{LogLevelFilter, LogRecord}; -use std::env; +use std::{env, thread}; use std::collections::HashMap; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -45,16 +45,17 @@ fn start_signal_handler(signals: Receiver<Signal>) { } } -fn start_update_poller(interval: u64, itx: Sender<Interpret>) { +fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) { let (etx, erx) = chan::async::<Event>(); - let tick = chan::tick(Duration::from_secs(interval)); + let wait = Duration::from_secs(interval); loop { - let _ = tick.recv(); + wg.wait(); // wait until not busy + thread::sleep(wait); // then wait `interval` seconds itx.send(Interpret { command: Command::GetUpdateRequests, response_tx: Some(Arc::new(Mutex::new(etx.clone()))) }); - let _ = erx.recv(); + let _ = erx.recv(); // then wait for the response } } @@ -67,8 +68,9 @@ fn main() { let (etx, erx) = chan::async::<Event>(); let (ctx, crx) = chan::async::<Command>(); let (itx, irx) = chan::async::<Interpret>(); - let mut broadcast = Broadcast::new(erx); + let wg = WaitGroup::new(); + ctx.send(Command::Authenticate(None)); crossbeam::scope(|scope| { @@ -78,7 +80,8 @@ fn main() { let poll_tick = config.device.polling_interval; let poll_itx = itx.clone(); - scope.spawn(move || start_update_poller(poll_tick, poll_itx)); + let poll_wg = wg.clone(); + scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg)); if config.gateway.console { let cons_itx = itx.clone(); @@ -133,19 +136,21 @@ fn main() { let event_sub = broadcast.subscribe(); let event_ctx = ctx.clone(); let event_mgr = config.device.package_manager.clone(); + let event_wg = wg.clone(); scope.spawn(move || EventInterpreter { - package_manager: event_mgr - }.run(event_sub, event_ctx)); + pacman: event_mgr + }.run(event_sub, event_ctx, event_wg)); let cmd_itx = itx.clone(); - scope.spawn(move || CommandInterpreter.run(crx, cmd_itx)); + let cmd_wg = wg.clone(); + scope.spawn(move || CommandInterpreter.run(crx, cmd_itx, cmd_wg)); scope.spawn(move || GlobalInterpreter { config: config, token: None, http_client: Box::new(AuthClient::default()), rvi: rvi, - }.run(irx, etx)); + }.run(irx, etx, wg)); scope.spawn(move || broadcast.start()); }); |