| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768 |
- #!/usr/bin/python
- """
- Copyright 2016 Google Inc. All Rights Reserved.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import asyncore
- import gc
- import logging
- import platform
- import Queue
- import re
- import signal
- import socket
- import sys
- import threading
- import time
- server = None
- in_pipe = None
- out_pipe = None
- must_exit = False
- options = None
- dest_addresses = None
- connections = {}
- dns_cache = {}
- port_mappings = None
- map_localhost = False
- needs_flush = False
- flush_pipes = False
- last_activity = None
- REMOVE_TCP_OVERHEAD = 1460.0 / 1500.0
- lock = threading.Lock()
- background_activity_count = 0
- def PrintMessage(msg):
- # Print the message to stdout & flush to make sure that the message is not
- # buffered when tsproxy is run as a subprocess.
- print >> sys.stdout, msg
- sys.stdout.flush()
- ########################################################################################################################
- # Traffic-shaping pipe (just passthrough for now)
- ########################################################################################################################
- class TSPipe():
- PIPE_IN = 0
- PIPE_OUT = 1
- def __init__(self, direction, latency, kbps):
- self.direction = direction
- self.latency = latency
- self.kbps = kbps
- self.queue = Queue.Queue()
- self.last_tick = time.clock()
- self.next_message = None
- self.available_bytes = .0
- self.peer = 'server'
- if self.direction == self.PIPE_IN:
- self.peer = 'client'
- def SendMessage(self, message, main_thread = True):
- global connections, in_pipe, out_pipe
- message_sent = False
- now = time.clock()
- if message['message'] == 'closed':
- message['time'] = now
- else:
- message['time'] = time.clock() + self.latency
- message['size'] = .0
- if 'data' in message:
- message['size'] = float(len(message['data']))
- try:
- connection_id = message['connection']
- # Send messages directly, bypassing the queues is throttling is disabled and we are on the main thread
- if main_thread and connection_id in connections and self.peer in connections[connection_id]and self.latency == 0 and self.kbps == .0:
- message_sent = self.SendPeerMessage(message)
- except:
- pass
- if not message_sent:
- try:
- self.queue.put(message)
- except:
- pass
- def SendPeerMessage(self, message):
- global last_activity
- last_activity = time.clock()
- message_sent = False
- connection_id = message['connection']
- if connection_id in connections:
- if self.peer in connections[connection_id]:
- try:
- connections[connection_id][self.peer].handle_message(message)
- message_sent = True
- except:
- # Clean up any disconnected connections
- try:
- connections[connection_id]['server'].close()
- except:
- pass
- try:
- connections[connection_id]['client'].close()
- except:
- pass
- del connections[connection_id]
- return message_sent
- def tick(self):
- global connections
- global flush_pipes
- processed_messages = False
- now = time.clock()
- try:
- if self.next_message is None:
- self.next_message = self.queue.get_nowait()
- # Accumulate bandwidth if an available packet/message was waiting since our last tick
- if self.next_message is not None and self.kbps > .0 and self.next_message['time'] <= now:
- elapsed = now - self.last_tick
- accumulated_bytes = elapsed * self.kbps * 1000.0 / 8.0
- self.available_bytes += accumulated_bytes
- # process messages as long as the next message is sendable (latency or available bytes)
- while (self.next_message is not None) and\
- (flush_pipes or ((self.next_message['time'] <= now) and
- (self.kbps <= .0 or self.next_message['size'] <= self.available_bytes))):
- self.queue.task_done()
- processed_messages = True
- if self.kbps > .0:
- self.available_bytes -= self.next_message['size']
- self.SendPeerMessage(self.next_message)
- self.next_message = None
- self.next_message = self.queue.get_nowait()
- except:
- pass
- # Only accumulate bytes while we have messages that are ready to send
- if self.next_message is None or self.next_message['time'] > now:
- self.available_bytes = .0
- self.last_tick = now
- return processed_messages
- ########################################################################################################################
- # Threaded DNS resolver
- ########################################################################################################################
- class AsyncDNS(threading.Thread):
- def __init__(self, client_id, hostname, port, result_pipe):
- threading.Thread.__init__(self)
- self.hostname = hostname
- self.port = port
- self.client_id = client_id
- self.result_pipe = result_pipe
- def run(self):
- global lock, background_activity_count
- try:
- logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
- addresses = socket.getaddrinfo(self.hostname, self.port)
- logging.info('[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
- except:
- addresses = ()
- logging.info('[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
- message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses}
- self.result_pipe.SendMessage(message, False)
- lock.acquire()
- if background_activity_count > 0:
- background_activity_count -= 1
- lock.release()
- # open and close a local socket which will interrupt the long polling loop to process the message
- s = socket.socket()
- s.connect((server.ipaddr, server.port))
- s.close()
- ########################################################################################################################
- # TCP Client
- ########################################################################################################################
- class TCPConnection(asyncore.dispatcher):
- STATE_ERROR = -1
- STATE_IDLE = 0
- STATE_RESOLVING = 1
- STATE_CONNECTING = 2
- STATE_CONNECTED = 3
- def __init__(self, client_id):
- global options
- asyncore.dispatcher.__init__(self)
- self.client_id = client_id
- self.state = self.STATE_IDLE
- self.buffer = ''
- self.addr = None
- self.dns_thread = None
- self.hostname = None
- self.port = None
- self.needs_config = True
- self.needs_close = False
- self.is_localhost = False
- self.did_resolve = False
- def SendMessage(self, type, message):
- message['message'] = type
- message['connection'] = self.client_id
- in_pipe.SendMessage(message)
- def handle_message(self, message):
- if message['message'] == 'data' and 'data' in message and len(message['data']):
- self.buffer += message['data']
- if self.state == self.STATE_CONNECTED:
- self.handle_write()
- elif message['message'] == 'resolve':
- self.HandleResolve(message)
- elif message['message'] == 'connect':
- self.HandleConnect(message)
- elif message['message'] == 'closed':
- if len(self.buffer) == 0:
- self.handle_close()
- else:
- self.needs_close = True
- def handle_error(self):
- logging.warning('[{0:d}] Error'.format(self.client_id))
- if self.state == self.STATE_CONNECTING:
- self.SendMessage('connected', {'success': False, 'address': self.addr})
- def handle_close(self):
- logging.info('[{0:d}] Server Connection Closed'.format(self.client_id))
- self.state = self.STATE_ERROR
- self.close()
- try:
- if self.client_id in connections:
- if 'server' in connections[self.client_id]:
- del connections[self.client_id]['server']
- if 'client' in connections[self.client_id]:
- self.SendMessage('closed', {})
- else:
- del connections[self.client_id]
- except:
- pass
- def handle_connect(self):
- if self.state == self.STATE_CONNECTING:
- self.state = self.STATE_CONNECTED
- self.SendMessage('connected', {'success': True, 'address': self.addr})
- logging.info('[{0:d}] Connected'.format(self.client_id))
- self.handle_write()
- def writable(self):
- if self.state == self.STATE_CONNECTING:
- return True
- return len(self.buffer) > 0
- def handle_write(self):
- if self.needs_config:
- self.needs_config = False
- self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- if len(self.buffer) > 0:
- sent = self.send(self.buffer)
- logging.debug('[{0:d}] TCP => {1:d} byte(s)'.format(self.client_id, sent))
- self.buffer = self.buffer[sent:]
- if self.needs_close and len(self.buffer) == 0:
- self.needs_close = False
- self.handle_close()
- def handle_read(self):
- try:
- while True:
- data = self.recv(1460)
- if data:
- if self.state == self.STATE_CONNECTED:
- logging.debug('[{0:d}] TCP <= {1:d} byte(s)'.format(self.client_id, len(data)))
- self.SendMessage('data', {'data': data})
- else:
- return
- except:
- pass
- def HandleResolve(self, message):
- global in_pipe, map_localhost, lock, background_activity_count
- self.did_resolve = True
- if 'hostname' in message:
- self.hostname = message['hostname']
- self.port = 0
- if 'port' in message:
- self.port = message['port']
- logging.info('[{0:d}] Resolving {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
- if self.hostname == 'localhost':
- self.hostname = '127.0.0.1'
- if self.hostname == '127.0.0.1':
- logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
- self.is_localhost = True
- if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
- logging.info('[{0:d}] Resolving {1}:{2:d} to mapped address {3}'.format(self.client_id, self.hostname, self.port, dest_addresses))
- self.SendMessage('resolved', {'addresses': dest_addresses})
- else:
- lock.acquire()
- background_activity_count += 1
- lock.release()
- self.state = self.STATE_RESOLVING
- self.dns_thread = AsyncDNS(self.client_id, self.hostname, self.port, in_pipe)
- self.dns_thread.start()
- def HandleConnect(self, message):
- global map_localhost
- if 'addresses' in message and len(message['addresses']):
- self.state = self.STATE_CONNECTING
- if not self.did_resolve and message['addresses'][0] == '127.0.0.1':
- logging.info('[{0:d}] Connection to localhost detected'.format(self.client_id))
- self.is_localhost = True
- if (dest_addresses is not None) and (not self.is_localhost or map_localhost):
- self.addr = dest_addresses[0]
- else:
- self.addr = message['addresses'][0]
- self.create_socket(self.addr[0], socket.SOCK_STREAM)
- addr = self.addr[4][0]
- if not self.is_localhost or map_localhost:
- port = GetDestPort(message['port'])
- else:
- port = message['port']
- logging.info('[{0:d}] Connecting to {1}:{2:d}'.format(self.client_id, addr, port))
- self.connect((addr, port))
- ########################################################################################################################
- # Socks5 Server
- ########################################################################################################################
- class Socks5Server(asyncore.dispatcher):
- def __init__(self, host, port):
- asyncore.dispatcher.__init__(self)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- #self.set_reuse_addr()
- self.bind((host, port))
- self.listen(socket.SOMAXCONN)
- self.ipaddr, self.port = self.getsockname()
- self.current_client_id = 0
- except:
- PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port))
- exit(1)
- def handle_accept(self):
- global connections
- pair = self.accept()
- if pair is not None:
- sock, addr = pair
- self.current_client_id += 1
- logging.info('[{0:d}] Incoming connection from {1}'.format(self.current_client_id, repr(addr)))
- connections[self.current_client_id] = {
- 'client' : Socks5Connection(sock, self.current_client_id),
- 'server' : None
- }
- # Socks5 reference: https://en.wikipedia.org/wiki/SOCKS#SOCKS5
- class Socks5Connection(asyncore.dispatcher):
- STATE_ERROR = -1
- STATE_WAITING_FOR_HANDSHAKE = 0
- STATE_WAITING_FOR_CONNECT_REQUEST = 1
- STATE_RESOLVING = 2
- STATE_CONNECTING = 3
- STATE_CONNECTED = 4
- def __init__(self, connected_socket, client_id):
- global options
- asyncore.dispatcher.__init__(self, connected_socket)
- self.client_id = client_id
- self.state = self.STATE_WAITING_FOR_HANDSHAKE
- self.ip = None
- self.addresses = None
- self.hostname = None
- self.port = None
- self.requested_address = None
- self.buffer = ''
- self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1460)
- self.needs_close = False
- def SendMessage(self, type, message):
- message['message'] = type
- message['connection'] = self.client_id
- out_pipe.SendMessage(message)
- def handle_message(self, message):
- if message['message'] == 'data' and 'data' in message and len(message['data']) > 0:
- self.buffer += message['data']
- if self.state == self.STATE_CONNECTED:
- self.handle_write()
- elif message['message'] == 'resolved':
- self.HandleResolved(message)
- elif message['message'] == 'connected':
- self.HandleConnected(message)
- self.handle_write()
- elif message['message'] == 'closed':
- if len(self.buffer) == 0:
- logging.info('[{0:d}] Server connection close being processed, closing Browser connection'.format(self.client_id))
- self.handle_close()
- else:
- logging.info('[{0:d}] Server connection close being processed, queuing browser connection close'.format(self.client_id))
- self.needs_close = True
- def writable(self):
- return len(self.buffer) > 0
- def handle_write(self):
- if len(self.buffer) > 0:
- sent = self.send(self.buffer)
- logging.debug('[{0:d}] SOCKS <= {1:d} byte(s)'.format(self.client_id, sent))
- self.buffer = self.buffer[sent:]
- if self.needs_close and len(self.buffer) == 0:
- logging.info('[{0:d}] queued browser connection close being processed, closing Browser connection'.format(self.client_id))
- self.needs_close = False
- self.handle_close()
- def handle_read(self):
- global connections
- global dns_cache
- try:
- while True:
- # Consume in up-to packet-sized chunks (TCP packet payload as 1460 bytes from 1500 byte ethernet frames)
- data = self.recv(1460)
- if data:
- data_len = len(data)
- if self.state == self.STATE_CONNECTED:
- logging.debug('[{0:d}] SOCKS => {1:d} byte(s)'.format(self.client_id, data_len))
- self.SendMessage('data', {'data': data})
- elif self.state == self.STATE_WAITING_FOR_HANDSHAKE:
- self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
- if data_len >= 2 and ord(data[0]) == 0x05:
- supports_no_auth = False
- auth_count = ord(data[1])
- if data_len == auth_count + 2:
- for i in range(auth_count):
- offset = i + 2
- if ord(data[offset]) == 0:
- supports_no_auth = True
- if supports_no_auth:
- # Respond with a message that "No Authentication" was agreed to
- logging.info('[{0:d}] New Socks5 client'.format(self.client_id))
- response = chr(0x05) + chr(0x00)
- self.state = self.STATE_WAITING_FOR_CONNECT_REQUEST
- self.buffer += response
- self.handle_write()
- elif self.state == self.STATE_WAITING_FOR_CONNECT_REQUEST:
- self.state = self.STATE_ERROR #default to an error state, set correctly if things work out
- if data_len >= 10 and ord(data[0]) == 0x05 and ord(data[2]) == 0x00:
- if ord(data[1]) == 0x01: #TCP connection (only supported method for now)
- connections[self.client_id]['server'] = TCPConnection(self.client_id)
- self.requested_address = data[3:]
- port_offset = 0
- if ord(data[3]) == 0x01:
- port_offset = 8
- self.ip = '{0:d}.{1:d}.{2:d}.{3:d}'.format(ord(data[4]), ord(data[5]), ord(data[6]), ord(data[7]))
- elif ord(data[3]) == 0x03:
- name_len = ord(data[4])
- if data_len >= 6 + name_len:
- port_offset = 5 + name_len
- self.hostname = data[5:5 + name_len]
- elif ord(data[3]) == 0x04 and data_len >= 22:
- port_offset = 20
- self.ip = ''
- for i in range(16):
- self.ip += '{0:02x}'.format(ord(data[4 + i]))
- if i % 2 and i < 15:
- self.ip += ':'
- if port_offset and connections[self.client_id]['server'] is not None:
- self.port = 256 * ord(data[port_offset]) + ord(data[port_offset + 1])
- if self.port:
- if self.ip is None and self.hostname is not None:
- if self.hostname in dns_cache:
- self.state = self.STATE_CONNECTING
- self.addresses = dns_cache[self.hostname]
- self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
- else:
- self.state = self.STATE_RESOLVING
- self.SendMessage('resolve', {'hostname': self.hostname, 'port': self.port})
- elif self.ip is not None:
- self.state = self.STATE_CONNECTING
- logging.debug('[{0:d}] Socks Connect - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.ip, self.port))
- self.addresses = socket.getaddrinfo(self.ip, self.port)
- self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
- else:
- return
- except:
- pass
- def handle_close(self):
- logging.info('[{0:d}] Browser Connection Closed by browser'.format(self.client_id))
- self.state = self.STATE_ERROR
- self.close()
- try:
- if self.client_id in connections:
- if 'client' in connections[self.client_id]:
- del connections[self.client_id]['client']
- if 'server' in connections[self.client_id]:
- self.SendMessage('closed', {})
- else:
- del connections[self.client_id]
- except:
- pass
- def HandleResolved(self, message):
- global dns_cache
- if self.state == self.STATE_RESOLVING:
- if 'addresses' in message and len(message['addresses']):
- self.state = self.STATE_CONNECTING
- self.addresses = message['addresses']
- dns_cache[self.hostname] = self.addresses
- logging.debug('[{0:d}] Resolved {1}, Connecting'.format(self.client_id, self.hostname))
- self.SendMessage('connect', {'addresses': self.addresses, 'port': self.port})
- else:
- # Send host unreachable error
- self.state = self.STATE_ERROR
- self.buffer += chr(0x05) + chr(0x04) + self.requested_address
- self.handle_write()
- def HandleConnected(self, message):
- if 'success' in message and self.state == self.STATE_CONNECTING:
- response = chr(0x05)
- if message['success']:
- response += chr(0x00)
- logging.debug('[{0:d}] Connected to {1}'.format(self.client_id, self.hostname))
- self.state = self.STATE_CONNECTED
- else:
- response += chr(0x04)
- self.state = self.STATE_ERROR
- response += chr(0x00)
- response += self.requested_address
- self.buffer += response
- self.handle_write()
- ########################################################################################################################
- # stdin command processor
- ########################################################################################################################
- class CommandProcessor():
- def __init__(self):
- thread = threading.Thread(target = self.run, args=())
- thread.daemon = True
- thread.start()
- def run(self):
- global must_exit
- while not must_exit:
- for line in iter(sys.stdin.readline, ''):
- self.ProcessCommand(line.strip())
- def ProcessCommand(self, input):
- global in_pipe
- global out_pipe
- global needs_flush
- global REMOVE_TCP_OVERHEAD
- global port_mappings
- global server
- if len(input):
- ok = False
- try:
- command = input.split()
- if len(command) and len(command[0]):
- if command[0].lower() == 'flush':
- ok = True
- elif command[0].lower() == 'set' and len(command) >= 3:
- if command[1].lower() == 'rtt' and len(command[2]):
- rtt = float(command[2])
- latency = rtt / 2000.0
- in_pipe.latency = latency
- out_pipe.latency = latency
- ok = True
- elif command[1].lower() == 'inkbps' and len(command[2]):
- in_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
- ok = True
- elif command[1].lower() == 'outkbps' and len(command[2]):
- out_pipe.kbps = float(command[2]) * REMOVE_TCP_OVERHEAD
- ok = True
- elif command[1].lower() == 'mapports' and len(command[2]):
- SetPortMappings(command[2])
- ok = True
- elif command[0].lower() == 'reset' and len(command) >= 2:
- if command[1].lower() == 'rtt' or command[1].lower() == 'all':
- in_pipe.latency = 0
- out_pipe.latency = 0
- ok = True
- if command[1].lower() == 'inkbps' or command[1].lower() == 'all':
- in_pipe.kbps = 0
- ok = True
- if command[1].lower() == 'outkbps' or command[1].lower() == 'all':
- out_pipe.kbps = 0
- ok = True
- if command[1].lower() == 'mapports' or command[1].lower() == 'all':
- port_mappings = {}
- ok = True
- if ok:
- needs_flush = True
- except:
- pass
- if not ok:
- PrintMessage('ERROR')
- # open and close a local socket which will interrupt the long polling loop to process the flush
- if needs_flush:
- s = socket.socket()
- s.connect((server.ipaddr, server.port))
- s.close()
- ########################################################################################################################
- # Main Entry Point
- ########################################################################################################################
- def main():
- global server
- global options
- global in_pipe
- global out_pipe
- global dest_addresses
- global port_mappings
- global map_localhost
- import argparse
- global REMOVE_TCP_OVERHEAD
- parser = argparse.ArgumentParser(description='Traffic-shaping socks5 proxy.',
- prog='tsproxy')
- parser.add_argument('-v', '--verbose', action='count', help="Increase verbosity (specify multiple times for more). -vvvv for full debug output.")
- parser.add_argument('-b', '--bind', default='localhost', help="Server interface address (defaults to localhost).")
- parser.add_argument('-p', '--port', type=int, default=1080, help="Server port (defaults to 1080, use 0 for randomly assigned).")
- parser.add_argument('-r', '--rtt', type=float, default=.0, help="Round Trip Time Latency (in ms).")
- parser.add_argument('-i', '--inkbps', type=float, default=.0, help="Download Bandwidth (in 1000 bits/s - Kbps).")
- parser.add_argument('-o', '--outkbps', type=float, default=.0, help="Upload Bandwidth (in 1000 bits/s - Kbps).")
- parser.add_argument('-w', '--window', type=int, default=10, help="Emulated TCP initial congestion window (defaults to 10).")
- parser.add_argument('-d', '--desthost', help="Redirect all outbound connections to the specified host.")
- parser.add_argument('-m', '--mapports', help="Remap outbound ports. Comma-separated list of original:new with * as a wildcard. --mapports '443:8443,*:8080'")
- parser.add_argument('-l', '--localhost', action='store_true', default=False,
- help="Include connections already destined for localhost/127.0.0.1 in the host and port remapping.")
- options = parser.parse_args()
- # Set up logging
- log_level = logging.CRITICAL
- if options.verbose == 1:
- log_level = logging.ERROR
- elif options.verbose == 2:
- log_level = logging.WARNING
- elif options.verbose == 3:
- log_level = logging.INFO
- elif options.verbose >= 4:
- log_level = logging.DEBUG
- logging.basicConfig(level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")
- # Parse any port mappings
- if options.mapports:
- SetPortMappings(options.mapports)
- map_localhost = options.localhost
- # Resolve the address for a rewrite destination host if one was specified
- if options.desthost:
- logging.debug('Startup - calling getaddrinfo for {0}:{1:d}'.format(options.desthost, GetDestPort(80)))
- dest_addresses = socket.getaddrinfo(options.desthost, GetDestPort(80))
- # Set up the pipes. 1/2 of the latency gets applied in each direction (and /1000 to convert to seconds)
- in_pipe = TSPipe(TSPipe.PIPE_IN, options.rtt / 2000.0, options.inkbps * REMOVE_TCP_OVERHEAD)
- out_pipe = TSPipe(TSPipe.PIPE_OUT, options.rtt / 2000.0, options.outkbps * REMOVE_TCP_OVERHEAD)
- signal.signal(signal.SIGINT, signal_handler)
- server = Socks5Server(options.bind, options.port)
- command_processor = CommandProcessor()
- PrintMessage('Started Socks5 proxy server on {0}:{1:d}\nHit Ctrl-C to exit.'.format(server.ipaddr, server.port))
- run_loop()
- def signal_handler(signal, frame):
- global server
- global must_exit
- logging.error('Exiting...')
- must_exit = True
- del server
- # Wrapper around the asyncore loop that lets us poll the in/out pipes every 1ms
- def run_loop():
- global must_exit
- global in_pipe
- global out_pipe
- global needs_flush
- global flush_pipes
- global last_activity
- winmm = None
- # increase the windows timer resolution to 1ms
- if platform.system() == "Windows":
- try:
- import ctypes
- winmm = ctypes.WinDLL('winmm')
- winmm.timeBeginPeriod(1)
- except:
- pass
- last_activity = time.clock()
- last_check = time.clock()
- # disable gc to avoid pauses during traffic shaping/proxying
- gc.disable()
- while not must_exit:
- # Tick every 1ms if traffic-shaping is enabled and we have data or are doing background dns lookups, every 1 second otherwise
- lock.acquire()
- tick_interval = 0.001
- if background_activity_count == 0:
- if in_pipe.next_message is None and in_pipe.queue.empty() and out_pipe.next_message is None and out_pipe.queue.empty():
- tick_interval = 1.0
- elif in_pipe.kbps == .0 and in_pipe.latency == 0 and out_pipe.kbps == .0 and out_pipe.latency == 0:
- tick_interval = 1.0
- lock.release()
- asyncore.poll(tick_interval, asyncore.socket_map)
- if needs_flush:
- flush_pipes = True
- needs_flush = False
- out_pipe.tick()
- in_pipe.tick()
- if flush_pipes:
- PrintMessage('OK')
- flush_pipes = False
- # Every 500 ms check to see if it is a good time to do a gc
- now = time.clock()
- if now - last_check > 0.5:
- last_check = now
- # manually gc after 5 seconds of idle
- if now - last_activity >= 5:
- last_activity = now
- logging.debug("Triggering manual GC")
- gc.collect()
- if winmm is not None:
- winmm.timeEndPeriod(1)
- def GetDestPort(port):
- global port_mappings
- if port_mappings is not None:
- src_port = str(port)
- if src_port in port_mappings:
- return port_mappings[src_port]
- elif 'default' in port_mappings:
- return port_mappings['default']
- return port
- def SetPortMappings(map_string):
- global port_mappings
- port_mappings = {}
- map_string = map_string.strip('\'" \t\r\n')
- for pair in map_string.split(','):
- (src, dest) = pair.split(':')
- if src == '*':
- port_mappings['default'] = int(dest)
- logging.debug("Default port mapped to port {0}".format(dest))
- else:
- logging.debug("Port {0} mapped to port {1}".format(src, dest))
- port_mappings[src] = int(dest)
- if '__main__' == __name__:
- main()
|