summaryrefslogtreecommitdiff
path: root/fs/remotefs.py
diff options
context:
space:
mode:
Diffstat (limited to 'fs/remotefs.py')
-rw-r--r--fs/remotefs.py104
1 files changed, 52 insertions, 52 deletions
diff --git a/fs/remotefs.py b/fs/remotefs.py
index 77fdfc2..529a8f9 100644
--- a/fs/remotefs.py
+++ b/fs/remotefs.py
@@ -14,20 +14,20 @@ from six import b
class PacketHandler(threading.Thread):
-
+
def __init__(self, transport, prelude_callback=None):
super(PacketHandler, self).__init__()
self.transport = transport
self.encoder = packetstream.JSONFileEncoder(transport)
self.decoder = packetstream.JSONDecoder(prelude_callback=None)
-
- self.queues = defaultdict(queue.Queue)
+
+ self.queues = defaultdict(queue.Queue)
self._encoder_lock = threading.Lock()
self._queues_lock = threading.Lock()
self._call_id_lock = threading.Lock()
-
+
self.call_id = 0
-
+
def run(self):
decoder = self.decoder
read = self.transport.read
@@ -37,96 +37,96 @@ class PacketHandler(threading.Thread):
if not data:
print "No data"
break
- print "data", repr(data)
+ print "data", repr(data)
for header, payload in decoder.feed(data):
print repr(header)
print repr(payload)
on_packet(header, payload)
-
+
def _new_call_id(self):
with self._call_id_lock:
self.call_id += 1
return self.call_id
-
+
def get_thread_queue(self, queue_id=None):
if queue_id is None:
queue_id = threading.current_thread().ident
with self._queues_lock:
return self.queues[queue_id]
-
+
def send_packet(self, header, payload=''):
- call_id = self._new_call_id()
- queue_id = threading.current_thread().ident
- client_ref = "%i:%i" % (queue_id, call_id)
+ call_id = self._new_call_id()
+ queue_id = threading.current_thread().ident
+ client_ref = "%i:%i" % (queue_id, call_id)
header['client_ref'] = client_ref
-
+
with self._encoder_lock:
self.encoder.write(header, payload)
-
+
return call_id
-
+
def get_packet(self, call_id):
-
+
if call_id is not None:
- queue_id = threading.current_thread().ident
+ queue_id = threading.current_thread().ident
client_ref = "%i:%i" % (queue_id, call_id)
else:
client_ref = None
-
+
queue = self.get_thread_queue()
-
- while True:
+
+ while True:
header, payload = queue.get()
print repr(header)
print repr(payload)
if client_ref is not None and header.get('client_ref') != client_ref:
continue
- break
-
+ break
+
return header, payload
-
+
def on_packet(self, header, payload):
client_ref = header.get('client_ref', '')
queue_id, call_id = client_ref.split(':', 1)
queue_id = int(queue_id)
#queue_id = header.get('queue_id', '')
- queue = self.get_thread_queue(queue_id)
+ queue = self.get_thread_queue(queue_id)
queue.put((header, payload))
-
-
+
+
class _SocketFile(object):
def __init__(self, socket):
self.socket = socket
-
+
def read(self, size):
try:
return self.socket.recv(size)
- except:
- return b('')
-
+ except:
+ return b('')
+
def write(self, data):
self.socket.sendall(data)
-
+
def close(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
-
+
class _RemoteFile(object):
-
+
def __init__(self, path, connection):
self.path = path
- self.connection = connection
+ self.connection = connection
class RemoteFS(FS):
-
+
_meta = { 'thead_safe' : True,
'network' : True,
'virtual' : False,
'read_only' : False,
'unicode_paths' : True,
}
-
+
def __init__(self, addr='', port=3000, username=None, password=None, resource=None, transport=None):
self.addr = addr
self.port = port
@@ -136,56 +136,56 @@ class RemoteFS(FS):
self.transport = transport
if self.transport is None:
self.transport = self._open_connection()
- self.packet_handler = PacketHandler(self.transport)
- self.packet_handler.start()
-
+ self.packet_handler = PacketHandler(self.transport)
+ self.packet_handler.start()
+
self._remote_call('auth',
username=username,
password=password,
resource=resource)
-
+
def _open_connection(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.addr, self.port))
socket_file = _SocketFile(sock)
socket_file.write(b('pyfs/0.1\n'))
return socket_file
-
+
def _make_call(self, method_name, *args, **kwargs):
call = dict(type='rpc',
method=method_name,
args=args,
kwargs=kwargs)
- return call
-
+ return call
+
def _remote_call(self, method_name, *args, **kwargs):
call = self._make_call(method_name, *args, **kwargs)
call_id = self.packet_handler.send_packet(call)
header, payload = self.packet_handler.get_packet(call_id)
return header, payload
-
+
def ping(self, msg):
call_id = self.packet_handler.send_packet({'type':'rpc', 'method':'ping'}, msg)
header, payload = self.packet_handler.get_packet(call_id)
print "PING"
print header
print payload
-
+
def close(self):
self.transport.close()
self.packet_handler.join()
-
+
def open(self, path, mode="r", **kwargs):
pass
-
+
def exists(self, path):
remote = self._remote_call('exists', path)
return remote.get('response')
-
-
+
+
if __name__ == "__main__":
-
- rfs = RemoteFS()
+
+ rfs = RemoteFS()
rfs.close()
-
+