# Tor control port filter proxy, only white-listing SIGNAL NEWNYM.
# This filter proxy should allow Torbutton to request a
# new Tor circuit, without exposing dangerous control requests
# like "GETINFO address" to applications running as a local user.
# If something goes wrong, an error code is returned, and
# Torbutton will display a warning dialog that New Identity failed.
# Only one application can talk through this filter proxy
# simultaneously. A malicious application that is running as a
# local user could use this to prevent other applications from
# doing NEWNYM. But it could just as well rewrite the
# TOR_CONTROL_PORT environment varible to itself or do something else.
import socket
import binascii
class UnexpectedAnswer(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return "[UnexpectedAnswer] " + self.msg
def do_newnym_real():
# Read authentication cookie
with open("/var/run/tor/control.authcookie", "rb") as f:
rawcookie =
hexcookie = binascii.hexlify(rawcookie)
# Connect to the real control port
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
readh = sock.makefile("r")
writeh = sock.makefile("w")
# Authenticate
writeh.write("AUTHENTICATE " + hexcookie + "\n")
answer = readh.readline(128)
if answer.find("250") != 0:
raise UnexpectedAnswer("AUTHENTICATE failed")
# Send the newnym signal
writeh.write("SIGNAL NEWNYM\n")
answer = readh.readline(128)
if answer.find("250") != 0:
raise UnexpectedAnswer("SIGNAL NEWNYM failed")
# Close the connection
answer = readh.readline(128)
if answer.find("250") != 0:
raise UnexpectedAnswer("QUIT failed")
def do_newnym():
# Catch innocent exceptions, will report error instead
print "Newnym went fine"
return True
except (IOError, UnexpectedAnswer) as e:
print "Warning: Couldn't perform newnym!"
print e
return False
def handle_connection(sock):
# Create file handles for the socket
readh = sock.makefile('r')
writeh = sock.makefile('w')
# Keep accepting commands
while True:
# Read in a newline terminated line (max 128 chars)
line = readh.readline(128)
if not line: break
# Check what it is
if line.find('AUTHENTICATE') == 0:
# Don't check authentication, since only
# safe commands are allowed
writeh.write('250 OK\n')
elif line.find('SIGNAL NEWNYM') == 0:
# Perform a real SIGNAL NEWNYM (new Tor circuit)
if do_newnym():
writeh.write('250 OK\n')
writeh.write('510 Newnym signal failed\n')
elif line.find('QUIT') == 0:
# Quit session
writeh.write('250 Closing connection\n')
# Everything else we ignore/block
writeh.write('510 Command filtered\n')
# Ensure the answer was written
# Ensure all data was written
def main():
# Listen on port 9052 (we cannot use 9051 as Tor uses that one)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 9052))
print "Tor control port filter started, listening on 9052"
# Accept and handle connections one after one,
# sessions are short enough that the added complexity of
# simultaneous connections are unnecessary (in absence of attacks)
while True:
clisock, cliaddr = server.accept()
print "Accepted a connection"
print "Connection closed"
except IOError:
print "Connection closed (IOError)"
if __name__ == "__main__":
