certutils.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # Copyright 2014 Google Inc. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Routines to generate root and server certificates.
  15. Certificate Naming Conventions:
  16. ca_cert: crypto.X509 for the certificate authority (w/ both the pub &
  17. priv keys)
  18. cert: a crypto.X509 certificate (w/ just the pub key)
  19. cert_str: a certificate string (w/ just the pub cert)
  20. key: a private crypto.PKey (from ca or pem)
  21. ca_cert_str: a certificae authority string (w/ both the pub & priv certs)
  22. """
  23. import logging
  24. import os
  25. import platform
  26. import socket
  27. import subprocess
  28. import time
  29. openssl_import_error = None
  30. Error = None
  31. SSL_METHOD = None
  32. SysCallError = None
  33. VERIFY_PEER = None
  34. ZeroReturnError = None
  35. FILETYPE_PEM = None
  36. try:
  37. from OpenSSL import crypto, SSL
  38. Error = SSL.Error
  39. SSL_METHOD = SSL.SSLv23_METHOD
  40. SysCallError = SSL.SysCallError
  41. VERIFY_PEER = SSL.VERIFY_PEER
  42. ZeroReturnError = SSL.ZeroReturnError
  43. FILETYPE_PEM = crypto.FILETYPE_PEM
  44. except ImportError, e:
  45. openssl_import_error = e
  46. def get_ssl_context(method=SSL_METHOD):
  47. # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD
  48. if openssl_import_error:
  49. raise openssl_import_error # pylint: disable=raising-bad-type
  50. return SSL.Context(method)
  51. class WrappedConnection(object):
  52. def __init__(self, obj):
  53. self._wrapped_obj = obj
  54. def __getattr__(self, attr):
  55. if attr in self.__dict__:
  56. return getattr(self, attr)
  57. return getattr(self._wrapped_obj, attr)
  58. def recv(self, buflen=1024, flags=0):
  59. try:
  60. return self._wrapped_obj.recv(buflen, flags)
  61. except SSL.SysCallError, e:
  62. if e.args[1] == 'Unexpected EOF':
  63. return ''
  64. raise
  65. except SSL.ZeroReturnError:
  66. return ''
  67. def get_ssl_connection(context, connection):
  68. return WrappedConnection(SSL.Connection(context, connection))
  69. def load_privatekey(key, filetype=FILETYPE_PEM):
  70. """Loads obj private key object from string."""
  71. return crypto.load_privatekey(filetype, key)
  72. def load_cert(cert_str, filetype=FILETYPE_PEM):
  73. """Loads obj cert object from string."""
  74. return crypto.load_certificate(filetype, cert_str)
  75. def _dump_privatekey(key, filetype=FILETYPE_PEM):
  76. """Dumps obj private key object to string."""
  77. return crypto.dump_privatekey(filetype, key)
  78. def _dump_cert(cert, filetype=FILETYPE_PEM):
  79. """Dumps obj cert object to string."""
  80. return crypto.dump_certificate(filetype, cert)
  81. def generate_dummy_ca_cert(subject='_WebPageReplayCert'):
  82. """Generates dummy certificate authority.
  83. Args:
  84. subject: a string representing the desired root cert issuer
  85. Returns:
  86. A tuple of the public key and the private key strings for the root
  87. certificate
  88. """
  89. if openssl_import_error:
  90. raise openssl_import_error # pylint: disable=raising-bad-type
  91. key = crypto.PKey()
  92. key.generate_key(crypto.TYPE_RSA, 1024)
  93. ca_cert = crypto.X509()
  94. ca_cert.set_serial_number(int(time.time()*10000))
  95. ca_cert.set_version(2)
  96. ca_cert.get_subject().CN = subject
  97. ca_cert.get_subject().O = subject
  98. ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2)
  99. ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2)
  100. ca_cert.set_issuer(ca_cert.get_subject())
  101. ca_cert.set_pubkey(key)
  102. ca_cert.add_extensions([
  103. crypto.X509Extension('basicConstraints', True, 'CA:TRUE'),
  104. crypto.X509Extension('nsCertType', True, 'sslCA'),
  105. crypto.X509Extension('extendedKeyUsage', True,
  106. ('serverAuth,clientAuth,emailProtection,'
  107. 'timeStamping,msCodeInd,msCodeCom,msCTLSign,'
  108. 'msSGC,msEFS,nsSGC')),
  109. crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'),
  110. crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
  111. subject=ca_cert),
  112. ])
  113. ca_cert.sign(key, 'sha256')
  114. key_str = _dump_privatekey(key)
  115. ca_cert_str = _dump_cert(ca_cert)
  116. return ca_cert_str, key_str
  117. def get_host_cert(host, port=443):
  118. """Contacts the host and returns its certificate."""
  119. host_certs = []
  120. def verify_cb(conn, cert, errnum, depth, ok):
  121. host_certs.append(cert)
  122. # Return True to indicates that the certificate was ok.
  123. return True
  124. context = SSL.Context(SSL.SSLv23_METHOD)
  125. context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate
  126. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  127. connection = SSL.Connection(context, s)
  128. try:
  129. connection.connect((host, port))
  130. connection.send('')
  131. except SSL.SysCallError:
  132. pass
  133. except socket.gaierror:
  134. logging.debug('Host name is not valid')
  135. finally:
  136. connection.shutdown()
  137. connection.close()
  138. if not host_certs:
  139. logging.warning('Unable to get host certificate from %s:%s', host, port)
  140. return ''
  141. return _dump_cert(host_certs[-1])
  142. def write_dummy_ca_cert(ca_cert_str, key_str, cert_path):
  143. """Writes four certificate files.
  144. For example, if cert_path is "mycert.pem":
  145. mycert.pem - CA plus private key
  146. mycert-cert.pem - CA in PEM format
  147. mycert-cert.cer - CA for Android
  148. mycert-cert.p12 - CA in PKCS12 format for Windows devices
  149. Args:
  150. cert_path: path string such as "mycert.pem"
  151. ca_cert_str: certificate string
  152. key_str: private key string
  153. """
  154. dirname = os.path.dirname(cert_path)
  155. if dirname and not os.path.exists(dirname):
  156. os.makedirs(dirname)
  157. root_path = os.path.splitext(cert_path)[0]
  158. ca_cert_path = root_path + '-cert.pem'
  159. android_cer_path = root_path + '-cert.cer'
  160. windows_p12_path = root_path + '-cert.p12'
  161. # Dump the CA plus private key
  162. with open(cert_path, 'w') as f:
  163. f.write(key_str)
  164. f.write(ca_cert_str)
  165. # Dump the certificate in PEM format
  166. with open(ca_cert_path, 'w') as f:
  167. f.write(ca_cert_str)
  168. # Create a .cer file with the same contents for Android
  169. with open(android_cer_path, 'w') as f:
  170. f.write(ca_cert_str)
  171. ca_cert = load_cert(ca_cert_str)
  172. key = load_privatekey(key_str)
  173. # Dump the certificate in PKCS12 format for Windows devices
  174. with open(windows_p12_path, 'w') as f:
  175. p12 = crypto.PKCS12()
  176. p12.set_certificate(ca_cert)
  177. p12.set_privatekey(key)
  178. f.write(p12.export())
  179. def generate_cert(root_ca_cert_str, server_cert_str, server_host):
  180. """Generates a cert_str with the sni field in server_cert_str signed by the
  181. root_ca_cert_str.
  182. Args:
  183. root_ca_cert_str: PEM formatted string representing the root cert
  184. server_cert_str: PEM formatted string representing cert
  185. server_host: host name to use if there is no server_cert_str
  186. Returns:
  187. a PEM formatted certificate string
  188. """
  189. EXTENSION_WHITELIST = set(['subjectAltName'])
  190. if openssl_import_error:
  191. raise openssl_import_error # pylint: disable=raising-bad-type
  192. common_name = server_host
  193. reused_extensions = []
  194. if server_cert_str:
  195. original_cert = load_cert(server_cert_str)
  196. common_name = original_cert.get_subject().commonName
  197. for i in xrange(original_cert.get_extension_count()):
  198. original_cert_extension = original_cert.get_extension(i)
  199. if original_cert_extension.get_short_name() in EXTENSION_WHITELIST:
  200. reused_extensions.append(original_cert_extension)
  201. ca_cert = load_cert(root_ca_cert_str)
  202. ca_key = load_privatekey(root_ca_cert_str)
  203. cert = crypto.X509()
  204. cert.get_subject().CN = common_name
  205. cert.gmtime_adj_notBefore(-60 * 60)
  206. cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
  207. cert.set_issuer(ca_cert.get_subject())
  208. cert.set_serial_number(int(time.time()*10000))
  209. cert.set_pubkey(ca_key)
  210. cert.add_extensions(reused_extensions)
  211. cert.sign(ca_key, 'sha256')
  212. return _dump_cert(cert)
  213. def install_cert_in_nssdb(home_directory_path, certificate_path):
  214. """Installs a certificate into the ~/.pki/nssdb database.
  215. Args:
  216. home_directory_path: Path of the home directory where to install
  217. certificate_path: Path of a CA in PEM format
  218. """
  219. assert os.path.isdir(home_directory_path)
  220. assert platform.system() == 'Linux', \
  221. 'SSL certification authority has only been tested for linux.'
  222. if (os.path.abspath(home_directory_path) ==
  223. os.path.abspath(os.environ['HOME'])):
  224. raise Exception('Modifying $HOME/.pki/nssdb compromises your machine.')
  225. cert_database_path = os.path.join(home_directory_path, '.pki', 'nssdb')
  226. def certutil(args):
  227. cmd = ['certutil', '--empty-password', '-d', 'sql:' + cert_database_path]
  228. cmd.extend(args)
  229. logging.info(subprocess.list2cmdline(cmd))
  230. subprocess.check_call(cmd)
  231. if not os.path.isdir(cert_database_path):
  232. os.makedirs(cert_database_path)
  233. certutil(['-N'])
  234. certutil(['-A', '-t', 'PC,,', '-n', certificate_path, '-i', certificate_path])