| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- #!jython
- #
- # Python Serial Port Extension for Win32, Linux, BSD, Jython
- # module for serial IO for Jython and JavaComm
- # see __init__.py
- #
- # (C) 2002-2008 Chris Liechti <cliechti@gmx.net>
- # this is distributed under a free software license, see license.txt
- from serial.serialutil import *
- def my_import(name):
- mod = __import__(name)
- components = name.split('.')
- for comp in components[1:]:
- mod = getattr(mod, comp)
- return mod
- def detect_java_comm(names):
- """try given list of modules and return that imports"""
- for name in names:
- try:
- mod = my_import(name)
- mod.SerialPort
- return mod
- except (ImportError, AttributeError):
- pass
- raise ImportError("No Java Communications API implementation found")
- # Java Communications API implementations
- # http://mho.republika.pl/java/comm/
- comm = detect_java_comm([
- 'javax.comm', # Sun/IBM
- 'gnu.io', # RXTX
- ])
- def device(portnumber):
- """Turn a port number into a device name"""
- enum = comm.CommPortIdentifier.getPortIdentifiers()
- ports = []
- while enum.hasMoreElements():
- el = enum.nextElement()
- if el.getPortType() == comm.CommPortIdentifier.PORT_SERIAL:
- ports.append(el)
- return ports[portnumber].getName()
- class JavaSerial(SerialBase):
- """Serial port class, implemented with Java Communications API and
- thus usable with jython and the appropriate java extension."""
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- if self._isOpen:
- raise SerialException("Port is already open.")
- if type(self._port) == type(''): # strings are taken directly
- portId = comm.CommPortIdentifier.getPortIdentifier(self._port)
- else:
- portId = comm.CommPortIdentifier.getPortIdentifier(device(self._port)) # numbers are transformed to a comport id obj
- try:
- self.sPort = portId.open("python serial module", 10)
- except Exception, msg:
- self.sPort = None
- raise SerialException("Could not open port: %s" % msg)
- self._reconfigurePort()
- self._instream = self.sPort.getInputStream()
- self._outstream = self.sPort.getOutputStream()
- self._isOpen = True
- def _reconfigurePort(self):
- """Set communication parameters on opened port."""
- if not self.sPort:
- raise SerialException("Can only operate on a valid port handle")
- self.sPort.enableReceiveTimeout(30)
- if self._bytesize == FIVEBITS:
- jdatabits = comm.SerialPort.DATABITS_5
- elif self._bytesize == SIXBITS:
- jdatabits = comm.SerialPort.DATABITS_6
- elif self._bytesize == SEVENBITS:
- jdatabits = comm.SerialPort.DATABITS_7
- elif self._bytesize == EIGHTBITS:
- jdatabits = comm.SerialPort.DATABITS_8
- else:
- raise ValueError("unsupported bytesize: %r" % self._bytesize)
- if self._stopbits == STOPBITS_ONE:
- jstopbits = comm.SerialPort.STOPBITS_1
- elif stopbits == STOPBITS_ONE_POINT_FIVE:
- self._jstopbits = comm.SerialPort.STOPBITS_1_5
- elif self._stopbits == STOPBITS_TWO:
- jstopbits = comm.SerialPort.STOPBITS_2
- else:
- raise ValueError("unsupported number of stopbits: %r" % self._stopbits)
- if self._parity == PARITY_NONE:
- jparity = comm.SerialPort.PARITY_NONE
- elif self._parity == PARITY_EVEN:
- jparity = comm.SerialPort.PARITY_EVEN
- elif self._parity == PARITY_ODD:
- jparity = comm.SerialPort.PARITY_ODD
- elif self._parity == PARITY_MARK:
- jparity = comm.SerialPort.PARITY_MARK
- elif self._parity == PARITY_SPACE:
- jparity = comm.SerialPort.PARITY_SPACE
- else:
- raise ValueError("unsupported parity type: %r" % self._parity)
- jflowin = jflowout = 0
- if self._rtscts:
- jflowin |= comm.SerialPort.FLOWCONTROL_RTSCTS_IN
- jflowout |= comm.SerialPort.FLOWCONTROL_RTSCTS_OUT
- if self._xonxoff:
- jflowin |= comm.SerialPort.FLOWCONTROL_XONXOFF_IN
- jflowout |= comm.SerialPort.FLOWCONTROL_XONXOFF_OUT
- self.sPort.setSerialPortParams(self._baudrate, jdatabits, jstopbits, jparity)
- self.sPort.setFlowControlMode(jflowin | jflowout)
- if self._timeout >= 0:
- self.sPort.enableReceiveTimeout(self._timeout*1000)
- else:
- self.sPort.disableReceiveTimeout()
- def close(self):
- """Close port"""
- if self._isOpen:
- if self.sPort:
- self._instream.close()
- self._outstream.close()
- self.sPort.close()
- self.sPort = None
- self._isOpen = False
- def makeDeviceName(self, port):
- return device(port)
- # - - - - - - - - - - - - - - - - - - - - - - - -
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- if not self.sPort: raise portNotOpenError
- return self._instream.available()
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if not self.sPort: raise portNotOpenError
- read = bytearray()
- if size > 0:
- while len(read) < size:
- x = self._instream.read()
- if x == -1:
- if self.timeout >= 0:
- break
- else:
- read.append(x)
- return bytes(read)
- def write(self, data):
- """Output the given string over the serial port."""
- if not self.sPort: raise portNotOpenError
- if not isinstance(data, (bytes, bytearray)):
- raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
- self._outstream.write(data)
- return len(data)
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.sPort: raise portNotOpenError
- self._instream.skip(self._instream.available())
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if not self.sPort: raise portNotOpenError
- self._outstream.flush()
- def sendBreak(self, duration=0.25):
- """Send break condition. Timed, returns to idle state after given duration."""
- if not self.sPort: raise portNotOpenError
- self.sPort.sendBreak(duration*1000.0)
- def setBreak(self, level=1):
- """Set break: Controls TXD. When active, to transmitting is possible."""
- if self.fd is None: raise portNotOpenError
- raise SerialException("The setBreak function is not implemented in java.")
- def setRTS(self, level=1):
- """Set terminal status line: Request To Send"""
- if not self.sPort: raise portNotOpenError
- self.sPort.setRTS(level)
- def setDTR(self, level=1):
- """Set terminal status line: Data Terminal Ready"""
- if not self.sPort: raise portNotOpenError
- self.sPort.setDTR(level)
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isCTS()
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isDSR()
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isRI()
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isCD()
- # assemble Serial class with the platform specific implementation and the base
- # for file-like behavior. for Python 2.6 and newer, that provide the new I/O
- # library, derive from io.RawIOBase
- try:
- import io
- except ImportError:
- # classic version with our own file-like emulation
- class Serial(JavaSerial, FileLike):
- pass
- else:
- # io library present
- class Serial(JavaSerial, io.RawIOBase):
- pass
- if __name__ == '__main__':
- s = Serial(0,
- baudrate=19200, # baudrate
- bytesize=EIGHTBITS, # number of databits
- parity=PARITY_EVEN, # enable parity checking
- stopbits=STOPBITS_ONE, # number of stopbits
- timeout=3, # set a timeout value, None for waiting forever
- xonxoff=0, # enable software flow control
- rtscts=0, # enable RTS/CTS flow control
- )
- s.setRTS(1)
- s.setDTR(1)
- s.flushInput()
- s.flushOutput()
- s.write('hello')
- sys.stdio.write('%r\n' % s.read(5))
- sys.stdio.write('%s\n' % s.inWaiting())
- del s
|