summaryrefslogtreecommitdiff
path: root/luxio/mq.lua
blob: 4ba8c65ddeb41f17affbec0a7b735920ba9a3fbb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
-- Light Unix I/O for Lua
-- Copyright 2012 Rob Kendrick <rjek+luxio@rjek.com>
--
-- Distributed under the same terms as Lua itself (MIT).
--
-- High-level functions for using POSIX message queues.  Delicate.

local l = require "luxio"

assert(l._ABI == 0, "luxio ABI mismatch")

local sio = require "luxio.simple"
local serialise = require("luxio.serialise").serialise
local lua_version = tonumber(_VERSION:match "Lua (%d.%d)")

local function err(txt, errno)
   return nil, ("%s: %s"):format(txt, l.strerror(errno)), errno
end

local mq_check_mt

local function mq_meta_gc(o)
   if not o.closed then
      l.mq_close(o.desc)
   end
end

local mq_mt = {
   send = function(o, msg, prio)
      mq_check_mt(o)

      local r, errno = l.mq_send(o.desc, msg, prio)
      if r < 0 then
	 return err("mq_send", errno)
      end

      return true
   end,

   receive = function(o)
      mq_check_mt(o)

      local r, errno, msg, prio = l.mq_receive(o.desc)
      if r < 0 then
	 return err("mq_receive", errno)
      end

      return msg, prio
   end,

   close = function(o)
      mq_check_mt(o)
      local r, errno = l.mq_close(o.desc)
      if r < 0 then
	 return err("mq_close", errno)
      end
      o.closed = true

      return true
   end,

   push = function(o, data, prio)
      mq_check_mt(o)
      local payload = serialise(data)
      local r, errno, attr = l.mq_getattr(o.desc)
      
      if r < 0 then
	 return err("mq_getattr", errno)
      end

      if #payload > attr.mq_msgsize then
	 return nil, "payload serialisation too large"
      end

      r, errno = l.mq_send(o.desc, payload, prio or 0)
      if r < 0 then
	 return err("mq_send", errno)
      end
      
      return true
   end,

   pull = function(o)
      mq_check_mt(o)
      local r, errno, data, prio = l.mq_receive(o.desc)
      if r < 0 then
	 return err("mq_receive", errno)
      end

      local f, err = loadstring("return " .. data)
      if f == nil then
	 return nil, "payload error: " .. err
      end

      local ok, result = pcall(f)
      if not ok then
	 return nil, "payload error: ".. result
      end
      
      return result
   end,

   __tostring = function(x)
      return ("message queue: %s %s (%d)%s"):format(
	 x.rw_mode, x.path, x.desc, x.closed and " closed" or "")
   end,

   __gc = mq_meta_gc

}

mq_mt.__index = mq_mt

if lua_version == 5.1 then
   mq_wrap_mt = function(t)
      t.proxy = newproxy(true)
      getmetatable(t.proxy).__gc = function()
	 mq_meta_gc(t)
      end
      setmetatable(t, mq_mt)
   end
else
   mq_wrap_mt = function(t) 
      setmetatable(t, mq_mt)
   end
end


mq_check_mt = function(x)
   if getmetatable(x) ~= mq_mt then
      error "object passed is not an MQ object"
   end
   if x.closed then
      error "MQ passed has been closed"
   end
end

local rw_mode_map = {
   r = l.O_RDONLY,
   w = l.O_WRONLY,
   rw = l.O_RDWR
}

local function open(path, rw, mode)
   -- Opens a message queue.  rw is one of "r", "w", or "rw".
   -- If mode is specified, it will also try to create it.

   local rw_mode = rw_mode_map[rw] or error "unknown read/write mode"
   local mq, errno

   if mode then
      mode = sio.tomode(mode) 
      mq, errno = l.mq_open(path, l.bit.bor(rw_mode, l.O_CREAT), mode)
   else
      mq, errno = l.mq_open(path, rw_mode)
   end

   if mq < 0 then
      return err("mq_open", errno)
   end

   local t = { path = path, desc = mq, rw_mode = rw }
   mq_wrap_mt(t)
   return t
end

local function unlink(path)
   local r, errno = l.mq_unlink(path)
   if r < 0 then
      return err("mq_unlink", errno)
   end
   return true
end

return { open = open, unlink = unlink }