summaryrefslogtreecommitdiff
path: root/pyserial/serial/serialwin32.py
diff options
context:
space:
mode:
authorcliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2002-02-12 23:24:41 +0000
committercliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2002-02-12 23:24:41 +0000
commit5b6169d84afc45c1f979e4fb27df86fc1eb1d994 (patch)
treede176c7572b8114191ba7d1f80307af5b9bcd7f3 /pyserial/serial/serialwin32.py
downloadpyserial-avendor.tar.gz
initial split from pybsl, jython supportavendor
git-svn-id: http://svn.code.sf.net/p/pyserial/code/branches/avendor@3 f19166aa-fa4f-0410-85c2-fa1106f25c8a
Diffstat (limited to 'pyserial/serial/serialwin32.py')
-rw-r--r--pyserial/serial/serialwin32.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/pyserial/serial/serialwin32.py b/pyserial/serial/serialwin32.py
new file mode 100644
index 0000000..c780069
--- /dev/null
+++ b/pyserial/serial/serialwin32.py
@@ -0,0 +1,304 @@
+#! python
+#serial driver for win32
+#see serial.py
+#
+#(C) 2001 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+import win32file # The base COM port and file IO functions.
+import win32event # We use events and the WaitFor[Single|Multiple]Objects functions.
+import win32con # constants.
+import sys, string
+
+VERSION = string.split("$Revision: 1.1.1.1 $")[1] #extract CVS version
+
+PARITY_NONE, PARITY_EVEN, PARITY_ODD = range(3)
+STOPBITS_ONE, STOPBITS_TWO = (1, 2)
+FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5,6,7,8)
+
+portNotOpenError = ValueError('port not open')
+
+class Serial:
+ def __init__(self,
+ port, #number of device, numbering starts at
+ #zero. if everything fails, the user
+ #can specify a device string, note
+ #that this isn't portable anymore
+ baudrate=9600, #baudrate
+ bytesize=EIGHTBITS, #number of databits
+ parity=PARITY_NONE, #enable parity checking
+ stopbits=STOPBITS_ONE, #number of stopbits
+ timeout=None, #set a timeout value, None for waiting forever
+ xonxoff=0, #enable software flow control
+ rtscts=0, #enable RTS/CTS flow control
+ ):
+ """initialize comm port"""
+
+ self.timeout = timeout
+
+ if type(port) == type(''): #strings are taken directly
+ self.portstr = port
+ else:
+ self.portstr = 'COM%d' % (port+1) #numbers are transformed to a string
+ #self.portstr = '\\\\.\\COM%d' % (port+1) #WIN NT format??
+
+ self.hComPort = win32file.CreateFile(self.portstr,
+ win32con.GENERIC_READ | win32con.GENERIC_WRITE,
+ 0, # exclusive access
+ None, # no security
+ win32con.OPEN_EXISTING,
+ win32con.FILE_ATTRIBUTE_NORMAL | win32con.FILE_FLAG_OVERLAPPED,
+ None)
+ # Setup a 4k buffer
+ win32file.SetupComm(self.hComPort, 4096, 4096)
+
+ #Save original timeout values:
+ self.orgTimeouts = win32file.GetCommTimeouts(self.hComPort)
+
+ #Set Windows timeout values
+ #timeouts is a tuple with the following items:
+ #(ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
+ # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
+ # WriteTotalTimeoutConstant)
+ if timeout:
+ timeouts = (timeout*1000, 0, timeout*1000, 0, 0)
+ else:
+ #timeouts = (win32con.MAXDWORD, 1, 0, 1, 0)
+ timeouts = (win32con.MAXDWORD, 0, 0, 0, 1000)
+ win32file.SetCommTimeouts(self.hComPort, timeouts)
+
+ #win32file.SetCommMask(self.hComPort, win32file.EV_RXCHAR | win32file.EV_TXEMPTY |
+ # win32file.EV_RXFLAG | win32file.EV_ERR)
+ win32file.SetCommMask(self.hComPort,
+ win32file.EV_RXCHAR | win32file.EV_RXFLAG | win32file.EV_ERR)
+ #win32file.SetCommMask(self.hComPort, win32file.EV_ERR)
+
+ # Setup the connection info.
+ # Get state and modify it:
+ comDCB = win32file.GetCommState(self.hComPort)
+ comDCB.BaudRate = baudrate
+
+ if bytesize == FIVEBITS:
+ comDCB.ByteSize = 5
+ elif bytesize == SIXBITS:
+ comDCB.ByteSize = 6
+ elif bytesize == SEVENBITS:
+ comDCB.ByteSize = 7
+ elif bytesize == EIGHTBITS:
+ comDCB.ByteSize = 8
+
+ if parity == PARITY_NONE:
+ comDCB.Parity = win32file.NOPARITY
+ comDCB.fParity = 0 # Dis/Enable Parity Check
+ elif parity == PARITY_EVEN:
+ comDCB.Parity = win32file.EVENPARITY
+ comDCB.fParity = 1 # Dis/Enable Parity Check
+ elif parity == PARITY_ODD:
+ comDCB.Parity = win32file.ODDPARITY
+ comDCB.fParity = 1 # Dis/Enable Parity Check
+
+ if stopbits == STOPBITS_ONE:
+ comDCB.StopBits = win32file.ONESTOPBIT
+ elif stopbits == STOPBITS_TWO:
+ comDCB.StopBits = win32file.TWOSTOPBITS
+ comDCB.fBinary = 1 # Enable Binary Transmission
+ # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
+ if rtscts:
+ comDCB.fRtsControl = win32file.RTS_CONTROL_HANDSHAKE
+ comDCB.fDtrControl = win32file.DTR_CONTROL_HANDSHAKE
+ else:
+ comDCB.fRtsControl = win32file.RTS_CONTROL_ENABLE
+ comDCB.fDtrControl = win32file.DTR_CONTROL_ENABLE
+ comDCB.fOutxCtsFlow = rtscts
+ comDCB.fOutxDsrFlow = rtscts
+ comDCB.fOutX = xonxoff
+ comDCB.fInX = xonxoff
+ comDCB.fNull = 0
+ comDCB.fErrorChar = 0
+ comDCB.fAbortOnError = 0
+
+ win32file.SetCommState(self.hComPort, comDCB)
+
+ # Clear buffers:
+ # Remove anything that was there
+ win32file.PurgeComm(self.hComPort,
+ win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT |
+ win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
+
+ #print win32file.ClearCommError(self.hComPort) #flags, comState =
+
+ self.overlapped = win32file.OVERLAPPED()
+ self.overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
+
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ """close port"""
+ if self.hComPort:
+ #Wait until data is transmitted, but not too long... (Timeout-Time)
+ #while 1:
+ # flags, comState = win32file.ClearCommError(hComPort)
+ # if comState.cbOutQue <= 0 or calcTimeout(startTime) > timeout:
+ # break
+
+ self.setRTS(0)
+ self.setDTR(0)
+ #Clear buffers:
+ win32file.PurgeComm(self.hComPort,
+ win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT |
+ win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
+ #Restore original timeout values:
+ win32file.SetCommTimeouts(self.hComPort, self.orgTimeouts)
+ #Close COM-Port:
+ win32file.CloseHandle(self.hComPort)
+ self.hComPort = None
+
+ def inWaiting(self):
+ """returns the number of bytes waiting to be read"""
+ flags, comstat = win32file.ClearCommError(self.hComPort)
+ return comstat.cbInQue
+
+ def _read(self, size=1):
+ flags, comstat = win32file.ClearCommError( self.hComPort )
+ #print "1:",comstat.cbInQue,
+ if comstat.cbInQue < size:
+ rc, mask = win32file.WaitCommEvent(self.hComPort, self.overlapped)
+ if rc == 0: # Character already ready!
+ win32event.SetEvent(self.overlapped.hEvent)
+
+ rc = win32event.WaitForSingleObject(self.overlapped.hEvent, win32event.INFINITE)
+ flags, comstat = win32file.ClearCommError( self.hComPort )
+ #print "2:",comstat.cbInQue,
+
+ rc, data = win32file.ReadFile(self.hComPort, size, self.overlapped)
+ win32event.WaitForSingleObject(self.overlapped.hEvent, win32event.INFINITE)
+ print "read %r" % str(data)
+ return str(data)
+
+ def work_read(self,num=1):
+ "read num bytes from serial port"
+ if not self.hComPort: raise portNotOpenError
+ flags, comstat = win32file.ClearCommError( self.hComPort )
+ #print "1:",comstat.cbInQue,
+ if comstat.cbInQue < num:
+ rc, mask = win32file.WaitCommEvent(self.hComPort, self.overlapped)
+ if rc == 0: # Character already ready!
+ win32event.SetEvent(self.overlapped.hEvent)
+
+ if self.timeout:
+ rc = win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout*1000)
+ else:
+ rc = win32event.WaitForSingleObject(self.overlapped.hEvent, win32event.INFINITE)
+ flags, comstat = win32file.ClearCommError( self.hComPort )
+ #print "2:",comstat.cbInQue,
+
+ #rc, data = win32file.ReadFile(self.hComPort, comstat.cbInQue, self.overlapped)
+ rc, data = win32file.ReadFile(self.hComPort, num, self.overlapped)
+ if self.timeout:
+ win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout*1000)
+ else:
+ win32event.WaitForSingleObject(self.overlapped.hEvent, win32event.INFINITE)
+
+ #old: hr, data = win32file.ReadFile(self.hComPort, num)
+ #print '.%s.' % str(data)
+ #print "read() soll %d ist %d" % (num, len(str(data))), repr(data)
+ return str(data)
+
+ def read(self, size=1):
+ "read num bytes from serial port"
+ if not self.hComPort: raise portNotOpenError
+ #print "read %d" %size ####debug
+ read = ''
+ if size > 0:
+ while len(read) < size:
+ flags, comstat = win32file.ClearCommError( self.hComPort )
+ #print "1:",comstat.cbInQue,
+ #self.overlapped = win32file.OVERLAPPED()
+ #self.overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
+ #win32event.ResetEvent(self.overlapped.hEvent)
+## if comstat.cbInQue < size:
+## #print "read <size" ####debug
+## rc, mask = win32file.WaitCommEvent(self.hComPort, self.overlapped)
+## if rc == 0: # Character already ready!
+## win32event.SetEvent(self.overlapped.hEvent)
+##
+## if self.timeout:
+## rc = win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout*1000)
+## if rc == win32event.WAIT_TIMEOUT: break
+## else:
+## rc = win32event.WaitForSingleObject(self.overlapped.hEvent, win32event.INFINITE)
+## flags, comstat = win32file.ClearCommError( self.hComPort )
+## #print "2:",comstat.cbInQue,
+
+ rc, buf = win32file.ReadFile(self.hComPort, size-len(read), self.overlapped)
+ if self.timeout:
+ rc = win32event.WaitForSingleObject(self.overlapped.hEvent, self.timeout*1000)
+ if rc == win32event.WAIT_TIMEOUT: break
+ #TODO: if reading more than 1 byte data can be lost when a timeout occours!!!!
+ else:
+ win32event.WaitForSingleObject(self.overlapped.hEvent, win32event.INFINITE)
+ read = read + str(buf)
+ #print "read() soll %d ist %d" % (num, len(str(data))), repr(data)
+ #print "read %r" % str(read)
+ return str(read)
+
+ def write(self, s):
+ "write string to serial port"
+ if not self.hComPort: raise portNotOpenError
+ #print repr(s),
+ overlapped = win32file.OVERLAPPED()
+ overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
+ win32file.WriteFile(self.hComPort, s, overlapped)
+ # Wait for the write to complete.
+ win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
+ #old: win32file.WriteFile(self.hComPort, s) #, 1, NULL)
+ #print "ok"
+
+ def flushInput(self):
+ if not self.hComPort: raise portNotOpenError
+ win32file.PurgeComm(self.hComPort, win32file.PURGE_RXCLEAR | win32file.PURGE_RXABORT)
+
+ def flushOutput(self):
+ if not self.hComPort: raise portNotOpenError
+ win32file.PurgeComm(self.hComPort, win32file.PURGE_TXCLEAR | win32file.PURGE_TXABORT)
+
+ def sendBreak(self):
+ if not self.hComPort: raise portNotOpenError
+ raise "not implemented"
+
+ def setRTS(self,level=1):
+ if not self.hComPort: raise portNotOpenError
+ comDCB = win32file.GetCommState(self.hComPort)
+ if level:
+ comDCB.fRtsControl = win32file.RTS_CONTROL_ENABLE;
+ else:
+ comDCB.fRtsControl = win32file.RTS_CONTROL_DISABLE;
+ win32file.SetCommState(self.hComPort, comDCB)
+
+ def setDTR(self,level=1):
+ if not self.hComPort: raise portNotOpenError
+ comDCB = win32file.GetCommState(self.hComPort)
+ if level:
+ comDCB.fDtrControl = win32file.DTR_CONTROL_ENABLE;
+ else:
+ comDCB.fDtrControl = win32file.DTR_CONTROL_DISABLE;
+ win32file.SetCommState(self.hComPort, comDCB)
+
+ def getCTS(self):
+ if not self.hComPort: raise portNotOpenError
+ comDCB = win32file.GetCommState(self.hComPort)
+ return comDCB.fOutxCtsFlow
+
+ def getDSR(self):
+ if not self.hComPort: raise portNotOpenError
+ comDCB = win32file.GetCommState(self.hComPort)
+ return comDCB.fOutxDsrFlow
+
+
+
+#Nur Testfunktion!!
+if __name__ == '__main__':
+ print __name__
+ s = Serial(0)
+