summaryrefslogtreecommitdiff
path: root/t/40channel.t
diff options
context:
space:
mode:
Diffstat (limited to 't/40channel.t')
-rw-r--r--t/40channel.t263
1 files changed, 263 insertions, 0 deletions
diff --git a/t/40channel.t b/t/40channel.t
new file mode 100644
index 0000000..930129b
--- /dev/null
+++ b/t/40channel.t
@@ -0,0 +1,263 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use IO::Async::Test;
+
+use Test::More;
+use Test::Identity;
+
+use IO::Async::Channel;
+
+use IO::Async::OS;
+
+use IO::Async::Loop;
+use Storable qw( freeze );
+
+my $loop = IO::Async::Loop->new_builtin;
+
+testing_loop( $loop );
+
+# sync->sync - mostly doesn't involve IO::Async
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_sync_mode( $pipe_rd );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ structure => "here" ] );
+
+ is_deeply( $channel_rd->recv, [ structure => "here" ], 'Sync mode channels can send/recv structures' );
+
+ $channel_wr->send_frozen( freeze [ prefrozen => "data" ] );
+
+ is_deeply( $channel_rd->recv, [ prefrozen => "data" ], 'Sync mode channels can send_frozen' );
+
+ $channel_wr->close;
+
+ is( $channel_rd->recv, undef, 'Sync mode can be closed' );
+}
+
+# async->sync
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_sync_mode( $pipe_rd );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_async_mode( write_handle => $pipe_wr );
+
+ $loop->add( $channel_wr );
+
+ $channel_wr->send( [ data => "by async" ] );
+
+ # Cheat for semi-sync
+ my $flushed;
+ $channel_wr->{stream}->write( "", on_flush => sub { $flushed++ } );
+ wait_for { $flushed };
+
+ is_deeply( $channel_rd->recv, [ data => "by async" ], 'Async mode channel can send' );
+
+ $channel_wr->close;
+
+ is( $channel_rd->recv, undef, 'Sync mode can be closed' );
+}
+
+# sync->async configured on_recv
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my @recv_queue;
+ my $recv_eof;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ $channel_rd->configure(
+ on_recv => sub {
+ identical( $_[0], $channel_rd, 'Channel passed to on_recv' );
+ push @recv_queue, $_[1];
+ },
+ on_eof => sub {
+ $recv_eof++;
+ },
+ );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ data => "by sync" ] );
+
+ wait_for { @recv_queue };
+
+ is_deeply( shift @recv_queue, [ data => "by sync" ], 'Async mode channel can on_recv' );
+
+ $channel_wr->close;
+
+ wait_for { $recv_eof };
+ is( $recv_eof, 1, 'Async mode channel can on_eof' );
+}
+
+# sync->async oneshot ->recv with future
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ data => "by sync" ] );
+
+ my $recv_f = $channel_rd->recv;
+
+ wait_for { $recv_f->is_ready };
+
+ is_deeply( scalar $recv_f->get, [ data => "by sync" ], 'Async mode future can receive data' );
+
+ $channel_wr->close;
+
+ my $eof_f = $channel_rd->recv;
+
+ wait_for { $eof_f->is_ready };
+
+ is( ( $eof_f->failure )[1], "eof", 'Async mode future can receive EOF' );
+}
+
+# sync->async oneshot ->recv with callbacks
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ data => "by sync" ] );
+
+ my $recved;
+ $channel_rd->recv(
+ on_recv => sub {
+ identical( $_[0], $channel_rd, 'Channel passed to ->recv on_recv' );
+ $recved = $_[1];
+ },
+ on_eof => sub { die "Test failed early" },
+ );
+
+ wait_for { $recved };
+
+ is_deeply( $recved, [ data => "by sync" ], 'Async mode channel can ->recv on_recv' );
+
+ $channel_wr->close;
+
+ my $recv_eof;
+ $channel_rd->recv(
+ on_recv => sub { die "Channel recv'ed when not expecting" },
+ on_eof => sub { $recv_eof++ },
+ );
+
+ wait_for { $recv_eof };
+ is( $recv_eof, 1, 'Async mode channel can ->recv on_eof' );
+}
+
+# sync->async write once then close
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ "One value here" ] );
+ $channel_wr->close;
+ undef $channel_wr;
+
+ my $recved;
+ $channel_rd->recv(
+ on_recv => sub {
+ $recved = $_[1];
+ },
+ on_eof => sub { die "Test failed early" },
+ );
+
+ wait_for { $recved };
+
+ is( $recved->[0], "One value here", 'Async mode channel can ->recv buffer at EOF' );
+
+ $loop->remove( $channel_rd );
+}
+
+# Async ->recv cancellation
+{
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new;
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ my $channel_wr = IO::Async::Channel->new;
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ "first" ] );
+ $channel_wr->send( [ "second" ] );
+
+ my $r1_f = $channel_rd->recv;
+ my $r2_f = $channel_rd->recv;
+
+ $r1_f->cancel;
+
+ wait_for { $r2_f->is_ready };
+
+ is_deeply( scalar $r2_f->get, [ "second" ], 'Async recv result after cancellation' );
+
+ $loop->remove( $channel_rd );
+}
+
+# Sereal encoder
+SKIP: {
+ skip "Sereal is not available", 1 unless eval { require Sereal::Encoder; require Sereal::Decoder; };
+
+ my ( $pipe_rd, $pipe_wr ) = IO::Async::OS->pipepair;
+
+ my $channel_rd = IO::Async::Channel->new(
+ codec => "Sereal"
+ );
+ $channel_rd->setup_async_mode( read_handle => $pipe_rd );
+
+ $loop->add( $channel_rd );
+
+ my $channel_wr = IO::Async::Channel->new(
+ codec => "Sereal",
+ );
+ $channel_wr->setup_sync_mode( $pipe_wr );
+
+ $channel_wr->send( [ data => "by sync" ] );
+
+ my $recv_f = $channel_rd->recv;
+
+ wait_for { $recv_f->is_ready };
+
+ is_deeply( scalar $recv_f->get, [ data => "by sync" ], 'Channel can use Sereal as codec' );
+
+ $loop->remove( $channel_rd );
+}
+
+done_testing;