summaryrefslogtreecommitdiff
path: root/src/gallium/frontends/rusticl/core/queue.rs
blob: b06c7f45ae28ae3ab7bb6cab37751315448afad0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::api::icd::*;
use crate::core::context::*;
use crate::core::device::*;
use crate::core::event::*;
use crate::impl_cl_type_trait;

use mesa_rust_util::properties::*;
use rusticl_opencl_gen::*;

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;

#[repr(C)]
pub struct Queue {
    pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
    pub context: Arc<Context>,
    pub device: Arc<Device>,
    pub props: cl_command_queue_properties,
    pub props_v2: Option<Properties<cl_queue_properties>>,
    pending: Mutex<Vec<Arc<Event>>>,
    _thrd: Option<JoinHandle<()>>,
    chan_in: mpsc::Sender<Vec<Arc<Event>>>,
}

impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);

impl Queue {
    pub fn new(
        context: Arc<Context>,
        device: Arc<Device>,
        props: cl_command_queue_properties,
        props_v2: Option<Properties<cl_queue_properties>>,
    ) -> CLResult<Arc<Queue>> {
        // we assume that memory allocation is the only possible failure. Any other failure reason
        // should be detected earlier (e.g.: checking for CAPs).
        let pipe = device.screen().create_context().unwrap();
        let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
        Ok(Arc::new(Self {
            base: CLObjectBase::new(),
            context: context,
            device: device,
            props: props,
            props_v2: props_v2,
            pending: Mutex::new(Vec::new()),
            _thrd: Some(
                thread::Builder::new()
                    .name("rusticl queue thread".into())
                    .spawn(move || loop {
                        let r = rx_t.recv();
                        if r.is_err() {
                            break;
                        }
                        let new_events = r.unwrap();
                        for e in &new_events {
                            // all events should be processed, but we might have to wait on user
                            // events to happen
                            let err = e.deps.iter().map(|e| e.wait()).find(|s| *s < 0);
                            if let Some(err) = err {
                                // if a dependency failed, fail this event as well
                                e.set_user_status(err);
                            } else {
                                e.call(&pipe);
                            }
                        }
                        for e in new_events {
                            e.wait();
                        }
                    })
                    .unwrap(),
            ),
            chan_in: tx_q,
        }))
    }

    pub fn queue(&self, e: Arc<Event>) {
        self.pending.lock().unwrap().push(e);
    }

    pub fn flush(&self, wait: bool) -> CLResult<()> {
        let mut p = self.pending.lock().unwrap();
        let last = p.last().cloned();
        // This should never ever error, but if it does return an error
        self.chan_in
            .send((*p).drain(0..).collect())
            .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
        if wait {
            if let Some(last) = last {
                last.wait();
            }
        }
        Ok(())
    }
}

impl Drop for Queue {
    fn drop(&mut self) {
        // when deleting the application side object, we have to flush
        // From the OpenCL spec:
        // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
        // commands in command_queue.
        // TODO: maybe we have to do it on every release?
        let _ = self.flush(true);
    }
}