diff options
Diffstat (limited to 'fs/remotefs.py')
-rw-r--r-- | fs/remotefs.py | 104 |
1 files changed, 52 insertions, 52 deletions
diff --git a/fs/remotefs.py b/fs/remotefs.py index 77fdfc2..529a8f9 100644 --- a/fs/remotefs.py +++ b/fs/remotefs.py @@ -14,20 +14,20 @@ from six import b class PacketHandler(threading.Thread): - + def __init__(self, transport, prelude_callback=None): super(PacketHandler, self).__init__() self.transport = transport self.encoder = packetstream.JSONFileEncoder(transport) self.decoder = packetstream.JSONDecoder(prelude_callback=None) - - self.queues = defaultdict(queue.Queue) + + self.queues = defaultdict(queue.Queue) self._encoder_lock = threading.Lock() self._queues_lock = threading.Lock() self._call_id_lock = threading.Lock() - + self.call_id = 0 - + def run(self): decoder = self.decoder read = self.transport.read @@ -37,96 +37,96 @@ class PacketHandler(threading.Thread): if not data: print "No data" break - print "data", repr(data) + print "data", repr(data) for header, payload in decoder.feed(data): print repr(header) print repr(payload) on_packet(header, payload) - + def _new_call_id(self): with self._call_id_lock: self.call_id += 1 return self.call_id - + def get_thread_queue(self, queue_id=None): if queue_id is None: queue_id = threading.current_thread().ident with self._queues_lock: return self.queues[queue_id] - + def send_packet(self, header, payload=''): - call_id = self._new_call_id() - queue_id = threading.current_thread().ident - client_ref = "%i:%i" % (queue_id, call_id) + call_id = self._new_call_id() + queue_id = threading.current_thread().ident + client_ref = "%i:%i" % (queue_id, call_id) header['client_ref'] = client_ref - + with self._encoder_lock: self.encoder.write(header, payload) - + return call_id - + def get_packet(self, call_id): - + if call_id is not None: - queue_id = threading.current_thread().ident + queue_id = threading.current_thread().ident client_ref = "%i:%i" % (queue_id, call_id) else: client_ref = None - + queue = self.get_thread_queue() - - while True: + + while True: header, payload = queue.get() print repr(header) print repr(payload) if client_ref is not None and header.get('client_ref') != client_ref: continue - break - + break + return header, payload - + def on_packet(self, header, payload): client_ref = header.get('client_ref', '') queue_id, call_id = client_ref.split(':', 1) queue_id = int(queue_id) #queue_id = header.get('queue_id', '') - queue = self.get_thread_queue(queue_id) + queue = self.get_thread_queue(queue_id) queue.put((header, payload)) - - + + class _SocketFile(object): def __init__(self, socket): self.socket = socket - + def read(self, size): try: return self.socket.recv(size) - except: - return b('') - + except: + return b('') + def write(self, data): self.socket.sendall(data) - + def close(self): self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() - + class _RemoteFile(object): - + def __init__(self, path, connection): self.path = path - self.connection = connection + self.connection = connection class RemoteFS(FS): - + _meta = { 'thead_safe' : True, 'network' : True, 'virtual' : False, 'read_only' : False, 'unicode_paths' : True, } - + def __init__(self, addr='', port=3000, username=None, password=None, resource=None, transport=None): self.addr = addr self.port = port @@ -136,56 +136,56 @@ class RemoteFS(FS): self.transport = transport if self.transport is None: self.transport = self._open_connection() - self.packet_handler = PacketHandler(self.transport) - self.packet_handler.start() - + self.packet_handler = PacketHandler(self.transport) + self.packet_handler.start() + self._remote_call('auth', username=username, password=password, resource=resource) - + def _open_connection(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.addr, self.port)) socket_file = _SocketFile(sock) socket_file.write(b('pyfs/0.1\n')) return socket_file - + def _make_call(self, method_name, *args, **kwargs): call = dict(type='rpc', method=method_name, args=args, kwargs=kwargs) - return call - + return call + def _remote_call(self, method_name, *args, **kwargs): call = self._make_call(method_name, *args, **kwargs) call_id = self.packet_handler.send_packet(call) header, payload = self.packet_handler.get_packet(call_id) return header, payload - + def ping(self, msg): call_id = self.packet_handler.send_packet({'type':'rpc', 'method':'ping'}, msg) header, payload = self.packet_handler.get_packet(call_id) print "PING" print header print payload - + def close(self): self.transport.close() self.packet_handler.join() - + def open(self, path, mode="r", **kwargs): pass - + def exists(self, path): remote = self._remote_call('exists', path) return remote.get('response') - - + + if __name__ == "__main__": - - rfs = RemoteFS() + + rfs = RemoteFS() rfs.close() - + |