tsproxy.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. #!/usr/bin/python
  2. """
  3. Copyright 2016 Google Inc. All Rights Reserved.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. """
  14. import asyncore
  15. import gc
  16. import logging
  17. import platform
  18. import Queue
  19. import re
  20. import signal
  21. import socket
  22. import sys
  23. import threading
  24. import time
  25. server = None
  26. in_pipe = None
  27. out_pipe = None
  28. must_exit = False
  29. options = None
  30. dest_addresses = None
  31. connections = {}
  32. dns_cache = {}
  33. port_mappings = None
  34. map_localhost = False
  35. needs_flush = False
  36. flush_pipes = False
  37. last_activity = None
  38. REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
  39. lock = threading.Lock()
  40. background_activity_count = 0
  41. def PrintMessage(msg):
  42. # Print the message to stdout & flush to make sure that the message is not
  43. # buffered when tsproxy is run as a subprocess.
  44. print >> sys.stdout, msg
  45. sys.stdout.flush()
  46. ########################################################################################################################
  47. # Traffic-shaping pipe (just passthrough for now)
  48. ########################################################################################################################
  49. class TSPipe():
  50. PIPE_IN = 0
  51. PIPE_OUT = 1
  52. def __init__(self, direction, latency, kbps):
  53. self.direction = direction
  54. self.latency = latency
  55. self.kbps = kbps
  56. self.queue = Queue.Queue()
  57. self.last_tick = time.clock()
  58. self.next_message = None
  59. self.available_bytes = .0
  60. self.peer = 'server'
  61. if self.direction == self.PIPE_IN:
  62. self.peer = 'client'
  63. def SendMessage(self, message, main_thread = True):
  64. global connections, in_pipe, out_pipe
  65. message_sent = False
  66. now = time.clock()
  67. if message['message'] == 'closed':
  68. message['time'] = now
  69. else:
  70. message['time'] = time.clock() + self.latency
  71. message['size'] = .0
  72. if 'data' in message:
  73. message['size'] = float(len(message['data']))
  74. try:
  75. connection_id = message['connection']
  76. # Send messages directly, bypassing the queues is throttling is disabled and we are on the main thread
  77. if main_thread and connection_id in connections and self.peer in connections[connection_id]and self.latency == 0 and self.kbps == .0:
  78. message_sent = self.SendPeerMessage(message)
  79. except:
  80. pass
  81. if not message_sent:
  82. try:
  83. self.queue.put(message)
  84. except:
  85. pass
  86. def SendPeerMessage(self, message):
  87. global last_activity
  88. last_activity = time.clock()
  89. message_sent = False
  90. connection_id = message['connection']
  91. if connection_id in connections:
  92. if self.peer in connections[connection_id]:
  93. try:
  94. connections[connection_id][self.peer].handle_message(message)
  95. message_sent = True
  96. except:
  97. # Clean up any disconnected connections
  98. try:
  99. connections[connection_id]['server'].close()
  100. except:
  101. pass
  102. try:
  103. connections[connection_id]['client'].close()
  104. except:
  105. pass
  106. del connections[connection_id]
  107. return message_sent
  108. def tick(self):
  109. global connections
  110. global flush_pipes
  111. processed_messages = False
  112. now = time.clock()
  113. try:
  114. if self.next_message is None:
  115. self.next_message = self.queue.get_nowait()
  116. # Accumulate bandwidth if an available packet/message was waiting since our last tick
  117. if self.next_message is not None and self.kbps > .0 and self.next_message['time'] <= now:
  118. elapsed = now - self.last_tick
  119. accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0
  120. self.available_bytes += accumulated_bytes
  121. # process messages as long as the next message is sendable (latency or available bytes)
  122. while (self.next_message is not None) and\
  123. (flush_pipes or ((self.next_message['time'] <= now) and
  124. (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))):
  125. self.queue.task_done()
  126. processed_messages = True
  127. if self.kbps > .0:
  128. self.available_bytes -= self.next_message['size']
  129. self.SendPeerMessage(self.next_message)
  130. self.next_message = None
  131. self.next_message = self.queue.get_nowait()
  132. except:
  133. pass
  134. # Only accumulate bytes while we have messages that are ready to send
  135. if self.next_message is None or self.next_message['time'] > now:
  136. self.available_bytes = .0
  137. self.last_tick = now
  138. return processed_messages
  139. ########################################################################################################################
  140. # Threaded DNS resolver
  141. ########################################################################################################################
  142. class AsyncDNS(threading.Thread):
  143. def __init__(self, client_id, hostname, port, result_pipe):
  144. threading.Thread.__init__(self)
  145. self.hostname = hostname
  146. self.port = port
  147. self.client_id = client_id
  148. self.result_pipe = result_pipe
  149. def run(self):
  150. global lock, background_activity_count
  151. try:
  152. logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
  153. addresses = socket.getaddrinfo(self.hostname, self.port)
  154. logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
  155. except:
  156. addresses = ()
  157. logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
  158. message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
  159. self.result_pipe.SendMessage(message, False)
  160. lock.acquire()
  161. if background_activity_count > 0:
  162. background_activity_count -= 1
  163. lock.release()
  164. # open and close a local socket which will interrupt the long polling loop to process the message
  165. s = socket.socket()
  166. s.connect((server.ipaddr, server.port))
  167. s.close()
  168. ########################################################################################################################
  169. # TCP Client
  170. ########################################################################################################################
  171. class TCPConnection(asyncore.dispatcher):
  172. STATE_ERROR = -1
  173. STATE_IDLE = 0
  174. STATE_RESOLVING = 1
  175. STATE_CONNECTING = 2
  176. STATE_CONNECTED = 3
  177. def __init__(self, client_id):
  178. global options
  179. asyncore.dispatcher.__init__(self)
  180. self.client_id = client_id
  181. self.state = self.STATE_IDLE
  182. self.buffer = ''
  183. self.addr = None
  184. self.dns_thread = None
  185. self.hostname = None
  186. self.port = None
  187. self.needs_config = True
  188. self.needs_close = False
  189. self.is_localhost = False
  190. self.did_resolve = False
  191. def SendMessage(self, type, message):
  192. message['message'] = type
  193. message['connection'] = self.client_id
  194. in_pipe.SendMessage(message)
  195. def handle_message(self, message):
  196. if message['message'] == 'data' and 'data' in message and len(message['data']):
  197. self.buffer += message['data']
  198. if self.state == self.STATE_CONNECTED:
  199. self.handle_write()
  200. elif message['message'] == 'resolve':
  201. self.HandleResolve(message)
  202. elif message['message'] == 'connect':
  203. self.HandleConnect(message)
  204. elif message['message'] == 'closed':
  205. if len(self.buffer) == 0:
  206. self.handle_close()
  207. else:
  208. self.needs_close = True
  209. def handle_error(self):
  210. logging.warning('[{0:d}] Error'.format(self.client_id))
  211. if self.state == self.STATE_CONNECTING:
  212. self.SendMessage('connected', {'success': False, 'address': self.addr})
  213. def handle_close(self):
  214. logging.info('[{0:d}] Server Connection Closed'.format(self.client_id))
  215. self.state = self.STATE_ERROR
  216. self.close()
  217. try:
  218. if self.client_id in connections:
  219. if 'server' in connections[self.client_id]:
  220. del connections[self.client_id]['server']
  221. if 'client' in connections[self.client_id]:
  222. self.SendMessage('closed', {})
  223. else:
  224. del connections[self.client_id]
  225. except:
  226. pass
  227. def handle_connect(self):
  228. if self.state == self.STATE_CONNECTING:
  229. self.state = self.STATE_CONNECTED
  230. self.SendMessage('connected', {'success': True, 'address': self.addr})
  231. logging.info('[{0:d}] Connected'.format(self.client_id))
  232. self.handle_write()
  233. def writable(self):
  234. if self.state == self.STATE_CONNECTING:
  235. return True
  236. return len(self.buffer) > 0
  237. def handle_write(self):
  238. if self.needs_config:
  239. self.needs_config = False
  240. self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  241. if len(self.buffer) > 0:
  242. sent = self.send(self.buffer)
  243. logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
  244. self.buffer = self.buffer[sent:]
  245. if self.needs_close and len(self.buffer) == 0:
  246. self.needs_close = False
  247. self.handle_close()
  248. def handle_read(self):
  249. try:
  250. while True:
  251. data = self.recv(1460)
  252. if data:
  253. if self.state == self.STATE_CONNECTED:
  254. logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
  255. self.SendMessage('data', {'data': data})
  256. else:
  257. return
  258. except:
  259. pass
  260. def HandleResolve(self, message):
  261. global in_pipe, map_localhost, lock, background_activity_count
  262. self.did_resolve = True
  263. if 'hostname' in message:
  264. self.hostname = message['hostname']
  265. self.port = 0
  266. if 'port' in message:
  267. self.port = message['port']
  268. logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
  269. if self.hostname == 'localhost':
  270. self.hostname = '127.0.0.1'
  271. if self.hostname == '127.0.0.1':
  272. logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
  273. self.is_localhost = True
  274. if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
  275. logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(self.client_id, self.hostname, self.port, dest_addresses))
  276. self.SendMessage('resolved', {'addresses': dest_addresses})
  277. else:
  278. lock.acquire()
  279. background_activity_count += 1
  280. lock.release()
  281. self.state = self.STATE_RESOLVING
  282. self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe)
  283. self.dns_thread.start()
  284. def HandleConnect(self, message):
  285. global map_localhost
  286. if 'addresses' in message and len(message['addresses']):
  287. self.state = self.STATE_CONNECTING
  288. if not self.did_resolve and message['addresses'][0] == '127.0.0.1':
  289. logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
  290. self.is_localhost = True
  291. if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
  292. self.addr = dest_addresses[0]
  293. else:
  294. self.addr = message['addresses'][0]
  295. self.create_socket(self.addr[0], socket.SOCK_STREAM)
  296. addr = self.addr[4][0]
  297. if not self.is_localhost or map_localhost:
  298. port = GetDestPort(message['port'])
  299. else:
  300. port = message['port']
  301. logging.info('[{0:d}] Connecting to {1}:{2:d}'.format(self.client_id, addr, port))
  302. self.connect((addr, port))
  303. ########################################################################################################################
  304. # Socks5 Server
  305. ########################################################################################################################
  306. class Socks5Server(asyncore.dispatcher):
  307. def __init__(self, host, port):
  308. asyncore.dispatcher.__init__(self)
  309. self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  310. try:
  311. #self.set_reuse_addr()
  312. self.bind((host, port))
  313. self.listen(socket.SOMAXCONN)
  314. self.ipaddr, self.port = self.getsockname()
  315. self.current_client_id = 0
  316. except:
  317. PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port))
  318. exit(1)
  319. def handle_accept(self):
  320. global connections
  321. pair = self.accept()
  322. if pair is not None:
  323. sock, addr = pair
  324. self.current_client_id += 1
  325. logging.info('[{0:d}] Incoming connection from {1}'.format(self.current_client_id, repr(addr)))
  326. connections[self.current_client_id] = {
  327. 'client' : Socks5Connection(sock, self.current_client_id),
  328. 'server' : None
  329. }
  330. # Socks5 reference: https://en.wikipedia.org/wiki/SOCKS#SOCKS5
  331. class Socks5Connection(asyncore.dispatcher):
  332. STATE_ERROR = -1
  333. STATE_WAITING_FOR_HANDSHAKE = 0
  334. STATE_WAITING_FOR_CONNECT_REQUEST = 1
  335. STATE_RESOLVING = 2
  336. STATE_CONNECTING = 3
  337. STATE_CONNECTED = 4
  338. def __init__(self, connected_socket, client_id):
  339. global options
  340. asyncore.dispatcher.__init__(self, connected_socket)
  341. self.client_id = client_id
  342. self.state = self.STATE_WAITING_FOR_HANDSHAKE
  343. self.ip = None
  344. self.addresses = None
  345. self.hostname = None
  346. self.port = None
  347. self.requested_address = None
  348. self.buffer = ''
  349. self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  350. self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460)
  351. self.needs_close = False
  352. def SendMessage(self, type, message):
  353. message['message'] = type
  354. message['connection'] = self.client_id
  355. out_pipe.SendMessage(message)
  356. def handle_message(self, message):
  357. if message['message'] == 'data' and 'data' in message and len(message['data']) > 0:
  358. self.buffer += message['data']
  359. if self.state == self.STATE_CONNECTED:
  360. self.handle_write()
  361. elif message['message'] == 'resolved':
  362. self.HandleResolved(message)
  363. elif message['message'] == 'connected':
  364. self.HandleConnected(message)
  365. self.handle_write()
  366. elif message['message'] == 'closed':
  367. if len(self.buffer) == 0:
  368. logging.info('[{0:d}] Server connection close being processed, closing Browser connection'.format(self.client_id))
  369. self.handle_close()
  370. else:
  371. logging.info('[{0:d}] Server connection close being processed, queuing browser connection close'.format(self.client_id))
  372. self.needs_close = True
  373. def writable(self):
  374. return len(self.buffer) > 0
  375. def handle_write(self):
  376. if len(self.buffer) > 0:
  377. sent = self.send(self.buffer)
  378. logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
  379. self.buffer = self.buffer[sent:]
  380. if self.needs_close and len(self.buffer) == 0:
  381. logging.info('[{0:d}] queued browser connection close being processed, closing Browser connection'.format(self.client_id))
  382. self.needs_close = False
  383. self.handle_close()
  384. def handle_read(self):
  385. global connections
  386. global dns_cache
  387. try:
  388. while True:
  389. # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
  390. data = self.recv(1460)
  391. if data:
  392. data_len = len(data)
  393. if self.state == self.STATE_CONNECTED:
  394. logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len))
  395. self.SendMessage('data', {'data': data})
  396. elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
  397. self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
  398. if data_len >= 2 and ord(data[0]) == 0x05:
  399. supports_no_auth = False
  400. auth_count = ord(data[1])
  401. if data_len == auth_count + 2:
  402. for i in range(auth_count):
  403. offset = i + 2
  404. if ord(data[offset]) == 0:
  405. supports_no_auth = True
  406. if supports_no_auth:
  407. # Respond with a message that "No Authentication" was agreed to
  408. logging.info('[{0:d}] New Socks5 client'.format(self.client_id))
  409. response = chr(0x05) + chr(0x00)
  410. self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
  411. self.buffer += response
  412. self.handle_write()
  413. elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
  414. self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
  415. if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
  416. if ord(data[1]) == 0x01: #TCP connection (only supported method for now)
  417. connections[self.client_id]['server'] = TCPConnection(self.client_id)
  418. self.requested_address = data[3:]
  419. port_offset = 0
  420. if ord(data[3]) == 0x01:
  421. port_offset = 8
  422. self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7]))
  423. elif ord(data[3]) == 0x03:
  424. name_len = ord(data[4])
  425. if data_len >= 6 + name_len:
  426. port_offset = 5 + name_len
  427. self.hostname = data[5:5 + name_len]
  428. elif ord(data[3]) == 0x04 and data_len >= 22:
  429. port_offset = 20
  430. self.ip = ''
  431. for i in range(16):
  432. self.ip += '{0:02x}'.format(ord(data[4 + i]))
  433. if i % 2 and i < 15:
  434. self.ip += ':'
  435. if port_offset and connections[self.client_id]['server'] is not None:
  436. self.port = 256 * ord(data[port_offset]) + ord(data[port_offset + 1])
  437. if self.port:
  438. if self.ip is None and self.hostname is not None:
  439. if self.hostname in dns_cache:
  440. self.state = self.STATE_CONNECTING
  441. self.addresses = dns_cache[self.hostname]
  442. self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
  443. else:
  444. self.state = self.STATE_RESOLVING
  445. self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port})
  446. elif self.ip is not None:
  447. self.state = self.STATE_CONNECTING
  448. logging.debug('[{0:d}] Socks Connect - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.ip, self.port))
  449. self.addresses = socket.getaddrinfo(self.ip, self.port)
  450. self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
  451. else:
  452. return
  453. except:
  454. pass
  455. def handle_close(self):
  456. logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.client_id))
  457. self.state = self.STATE_ERROR
  458. self.close()
  459. try:
  460. if self.client_id in connections:
  461. if 'client' in connections[self.client_id]:
  462. del connections[self.client_id]['client']
  463. if 'server' in connections[self.client_id]:
  464. self.SendMessage('closed', {})
  465. else:
  466. del connections[self.client_id]
  467. except:
  468. pass
  469. def HandleResolved(self, message):
  470. global dns_cache
  471. if self.state == self.STATE_RESOLVING:
  472. if 'addresses' in message and len(message['addresses']):
  473. self.state = self.STATE_CONNECTING
  474. self.addresses = message['addresses']
  475. dns_cache[self.hostname] = self.addresses
  476. logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname))
  477. self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
  478. else:
  479. # Send host unreachable error
  480. self.state = self.STATE_ERROR
  481. self.buffer += chr(0x05) + chr(0x04) + self.requested_address
  482. self.handle_write()
  483. def HandleConnected(self, message):
  484. if 'success' in message and self.state == self.STATE_CONNECTING:
  485. response = chr(0x05)
  486. if message['success']:
  487. response += chr(0x00)
  488. logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hostname))
  489. self.state = self.STATE_CONNECTED
  490. else:
  491. response += chr(0x04)
  492. self.state = self.STATE_ERROR
  493. response += chr(0x00)
  494. response += self.requested_address
  495. self.buffer += response
  496. self.handle_write()
  497. ########################################################################################################################
  498. # stdin command processor
  499. ########################################################################################################################
  500. class CommandProcessor():
  501. def __init__(self):
  502. thread = threading.Thread(target = self.run, args=())
  503. thread.daemon = True
  504. thread.start()
  505. def run(self):
  506. global must_exit
  507. while not must_exit:
  508. for line in iter(sys.stdin.readline, ''):
  509. self.ProcessCommand(line.strip())
  510. def ProcessCommand(self, input):
  511. global in_pipe
  512. global out_pipe
  513. global needs_flush
  514. global REMOVE_TCP_OVERHEAD
  515. global port_mappings
  516. global server
  517. if len(input):
  518. ok = False
  519. try:
  520. command = input.split()
  521. if len(command) and len(command[0]):
  522. if command[0].lower() == 'flush':
  523. ok = True
  524. elif command[0].lower() == 'set' and len(command) >= 3:
  525. if command[1].lower() == 'rtt' and len(command[2]):
  526. rtt = float(command[2])
  527. latency = rtt / 2000.0
  528. in_pipe.latency = latency
  529. out_pipe.latency = latency
  530. ok = True
  531. elif command[1].lower() == 'inkbps' and len(command[2]):
  532. in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
  533. ok = True
  534. elif command[1].lower() == 'outkbps' and len(command[2]):
  535. out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
  536. ok = True
  537. elif command[1].lower() == 'mapports' and len(command[2]):
  538. SetPortMappings(command[2])
  539. ok = True
  540. elif command[0].lower() == 'reset' and len(command) >= 2:
  541. if command[1].lower() == 'rtt' or command[1].lower() == 'all':
  542. in_pipe.latency = 0
  543. out_pipe.latency = 0
  544. ok = True
  545. if command[1].lower() == 'inkbps' or command[1].lower() == 'all':
  546. in_pipe.kbps = 0
  547. ok = True
  548. if command[1].lower() == 'outkbps' or command[1].lower() == 'all':
  549. out_pipe.kbps = 0
  550. ok = True
  551. if command[1].lower() == 'mapports' or command[1].lower() == 'all':
  552. port_mappings = {}
  553. ok = True
  554. if ok:
  555. needs_flush = True
  556. except:
  557. pass
  558. if not ok:
  559. PrintMessage('ERROR')
  560. # open and close a local socket which will interrupt the long polling loop to process the flush
  561. if needs_flush:
  562. s = socket.socket()
  563. s.connect((server.ipaddr, server.port))
  564. s.close()
  565. ########################################################################################################################
  566. # Main Entry Point
  567. ########################################################################################################################
  568. def main():
  569. global server
  570. global options
  571. global in_pipe
  572. global out_pipe
  573. global dest_addresses
  574. global port_mappings
  575. global map_localhost
  576. import argparse
  577. global REMOVE_TCP_OVERHEAD
  578. parser = argparse.ArgumentParser(description='Traffic-shaping socks5 proxy.',
  579. prog='tsproxy')
  580. parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more). -vvvv for full debug output.")
  581. parser.add_argument('-b', '--bind', default='localhost', help="Server interface address (defaults to localhost).")
  582. parser.add_argument('-p', '--port', type=int, default=1080, help="Server port (defaults to 1080, use 0 for randomly assigned).")
  583. parser.add_argument('-r', '--rtt', type=float, default=.0, help="Round Trip Time Latency (in ms).")
  584. parser.add_argument('-i', '--inkbps', type=float, default=.0, help="Download Bandwidth (in 1000 bits/s - Kbps).")
  585. parser.add_argument('-o', '--outkbps', type=float, default=.0, help="Upload Bandwidth (in 1000 bits/s - Kbps).")
  586. parser.add_argument('-w', '--window', type=int, default=10, help="Emulated TCP initial congestion window (defaults to 10).")
  587. parser.add_argument('-d', '--desthost', help="Redirect all outbound connections to the specified host.")
  588. parser.add_argument('-m', '--mapports', help="Remap outbound ports. Comma-separated list of original:new with * as a wildcard. --mapports '443:8443,*:8080'")
  589. parser.add_argument('-l', '--localhost', action='store_true', default=False,
  590. help="Include connections already destined for localhost/127.0.0.1 in the host and port remapping.")
  591. options = parser.parse_args()
  592. # Set up logging
  593. log_level = logging.CRITICAL
  594. if options.verbose == 1:
  595. log_level = logging.ERROR
  596. elif options.verbose == 2:
  597. log_level = logging.WARNING
  598. elif options.verbose == 3:
  599. log_level = logging.INFO
  600. elif options.verbose >= 4:
  601. log_level = logging.DEBUG
  602. logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")
  603. # Parse any port mappings
  604. if options.mapports:
  605. SetPortMappings(options.mapports)
  606. map_localhost = options.localhost
  607. # Resolve the address for a rewrite destination host if one was specified
  608. if options.desthost:
  609. logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.desthost, GetDestPort(80)))
  610. dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
  611. # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1000 to convert to seconds)
  612. in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE_TCP_OVERHEAD)
  613. out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REMOVE_TCP_OVERHEAD)
  614. signal.signal(signal.SIGINT, signal_handler)
  615. server = Socks5Server(options.bind, options.port)
  616. command_processor = CommandProcessor()
  617. PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.format(server.ipaddr, server.port))
  618. run_loop()
  619. def signal_handler(signal, frame):
  620. global server
  621. global must_exit
  622. logging.error('Exiting...')
  623. must_exit = True
  624. del server
  625. # Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms
  626. def run_loop():
  627. global must_exit
  628. global in_pipe
  629. global out_pipe
  630. global needs_flush
  631. global flush_pipes
  632. global last_activity
  633. winmm = None
  634. # increase the windows timer resolution to 1ms
  635. if platform.system() == "Windows":
  636. try:
  637. import ctypes
  638. winmm = ctypes.WinDLL('winmm')
  639. winmm.timeBeginPeriod(1)
  640. except:
  641. pass
  642. last_activity = time.clock()
  643. last_check = time.clock()
  644. # disable gc to avoid pauses during traffic shaping/proxying
  645. gc.disable()
  646. while not must_exit:
  647. # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
  648. lock.acquire()
  649. tick_interval = 0.001
  650. if background_activity_count == 0:
  651. if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty():
  652. tick_interval = 1.0
  653. elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0:
  654. tick_interval = 1.0
  655. lock.release()
  656. asyncore.poll(tick_interval, asyncore.socket_map)
  657. if needs_flush:
  658. flush_pipes = True
  659. needs_flush = False
  660. out_pipe.tick()
  661. in_pipe.tick()
  662. if flush_pipes:
  663. PrintMessage('OK')
  664. flush_pipes = False
  665. # Every 500 ms check to see if it is a good time to do a gc
  666. now = time.clock()
  667. if now - last_check > 0.5:
  668. last_check = now
  669. # manually gc after 5 seconds of idle
  670. if now - last_activity >= 5:
  671. last_activity = now
  672. logging.debug("Triggering manual GC")
  673. gc.collect()
  674. if winmm is not None:
  675. winmm.timeEndPeriod(1)
  676. def GetDestPort(port):
  677. global port_mappings
  678. if port_mappings is not None:
  679. src_port = str(port)
  680. if src_port in port_mappings:
  681. return port_mappings[src_port]
  682. elif 'default' in port_mappings:
  683. return port_mappings['default']
  684. return port
  685. def SetPortMappings(map_string):
  686. global port_mappings
  687. port_mappings = {}
  688. map_string = map_string.strip('\'" \t\r\n')
  689. for pair in map_string.split(','):
  690. (src, dest) = pair.split(':')
  691. if src == '*':
  692. port_mappings['default'] = int(dest)
  693. logging.debug("Default port mapped to port {0}".format(dest))
  694. else:
  695. logging.debug("Port {0} mapped to port {1}".format(src, dest))
  696. port_mappings[src] = int(dest)
  697. if '__main__' == __name__:
  698. main()