formatting fix

This commit is contained in:
Brian Smith 2012-10-01 17:34:29 -04:00
parent cd723ac964
commit 849c6d3b20
1 changed files with 275 additions and 275 deletions

View File

@ -23,25 +23,25 @@ module JamWebsockets
class Router
attr_accessor :user_context_lookup, :session_context_lookup
attr_accessor :user_context_lookup, :session_context_lookup
def initialize(options={})
@log = Logging.logger[self]
@pending_clients = Set.new # clients that have connected to server, but not logged in.
@clients = {} # clients that have logged in
@user_context_lookup = {} # lookup a set of client_contexts by user_id
@session_context_lookup = {} # lookup a set of client_contexts by session_id
@user_context_lookup = {} # lookup a set of client_contexts by user_id
@session_context_lookup = {} # lookup a set of client_contexts by session_id
@sessions_exchange = nil
@connection = nil
@channel = nil
@users_exchange = nil
@message_factory = JamRuby::MessageFactory.new
@semaphore = Mutex.new
@user_topic = nil
@user_subscription = nil
@session_topic = nil
@session_subscription = nil
@thread_pool = nil
@semaphore = Mutex.new
@user_topic = nil
@user_subscription = nil
@session_topic = nil
@session_subscription = nil
@thread_pool = nil
end
def start(options = {})
@ -49,7 +49,7 @@ module JamWebsockets
@log.info "startup"
begin
@thread_pool = Executors.new_fixed_thread_pool(8)
@thread_pool = Executors.new_fixed_thread_pool(8)
@connection = HotBunnies.connect(:host => options[:host], :port => options[:port])
@channel = @connection.create_channel
@channel.prefetch = 10
@ -62,119 +62,119 @@ module JamWebsockets
end
def add_user(context)
user_contexts = @user_context_lookup[context.user.id]
if user_contexts.nil?
user_contexts = Set.new
@user_context_lookup[context.user.id] = user_contexts
end
def add_user(context)
user_contexts = @user_context_lookup[context.user.id]
if user_contexts.nil?
user_contexts = Set.new
@user_context_lookup[context.user.id] = user_contexts
end
user_contexts.add(context)
end
user_contexts.add(context)
end
def remove_user(context)
user_contexts = @user_context_lookup[context.user.id]
if user_contexts.nil?
@log.warn "user can not be removed #{context}"
else
# delete the context from set of user contexts
user_contexts.delete(context)
def remove_user(context)
user_contexts = @user_context_lookup[context.user.id]
if user_contexts.nil?
@log.warn "user can not be removed #{context}"
else
# delete the context from set of user contexts
user_contexts.delete(context)
# if last user context, delete entire set (memory leak concern)
if user_contexts.length == 0
@user_context_lookup.delete(context.user.id)
end
end
end
# if last user context, delete entire set (memory leak concern)
if user_contexts.length == 0
@user_context_lookup.delete(context.user.id)
end
end
end
def add_session(context)
session_contexts = @session_context_lookup[context.session.id]
if session_contexts.nil?
session_contexts = Set.new
@session_context_lookup[context.session.id] = session_contexts
end
def add_session(context)
session_contexts = @session_context_lookup[context.session.id]
if session_contexts.nil?
session_contexts = Set.new
@session_context_lookup[context.session.id] = session_contexts
end
session_contexts.add(context)
end
session_contexts.add(context)
end
def remove_session(context)
session_contexts = @session_context_lookup[context.session.id]
if session_contexts.nil?
@log.warn "session can not be removed #{context}"
else
# delete the context from set of session contexts
session_contexts.delete(context)
def remove_session(context)
session_contexts = @session_context_lookup[context.session.id]
if session_contexts.nil?
@log.warn "session can not be removed #{context}"
else
# delete the context from set of session contexts
session_contexts.delete(context)
# if last session context, delete entire set (memory leak concern)
if session_contexts.length == 0
@session_context_lookup.delete(context.session.id)
end
# if last session context, delete entire set (memory leak concern)
if session_contexts.length == 0
@session_context_lookup.delete(context.session.id)
end
context.session = nil
end
end
context.session = nil
end
end
# register topic for user messages and session messages
def register_topics
@users_exchange = @channel.exchange('users', :type => :topic)
# register topic for user messages and session messages
def register_topics
@users_exchange = @channel.exchange('users', :type => :topic)
@sessions_exchange = @channel.exchange('sessions', :type => :topic)
# create user messaging topic
# create user messaging topic
@user_topic = @channel.queue("", :auto_delete => true)
@user_topic.bind(@users_exchange, :routing_key => "user.#")
@user_topic.purge
# TODO: alert friends
# subscribe for any messages to users
#@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg|
@user_subscription = @user_topic.subscribe(:ack => false)
@user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
begin
routing_key = headers.envelope.routing_key
user_id = routing_key["user.".length..-1]
@sempahore.synchronize do
contexts = @user_context_lookup[user_id]
# subscribe for any messages to users
#@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg|
@user_subscription = @user_topic.subscribe(:ack => false)
@user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
begin
routing_key = headers.envelope.routing_key
user_id = routing_key["user.".length..-1]
@sempahore.synchronize do
contexts = @user_context_lookup[user_id]
unless contexts.nil?
@log.debug "received user-directed message for session: #{user_id}"
unless contexts.nil?
@log.debug "received user-directed message for session: #{user_id}"
msg = Jampb::ClientMessage.parse(msg)
contexts.each do |context|
EM.schedule do
@log.debug "sending user message to #{context}"
send_to_client(context.client, msg)
end
end
end
end
contexts.each do |context|
EM.schedule do
@log.debug "sending user message to #{context}"
send_to_client(context.client, msg)
end
end
end
end
rescue => e
@log.error "unhandled error in messaging to client"
end
end
rescue => e
@log.error "unhandled error in messaging to client"
end
end
@session_topic = @channel.queue("", :auto_delete => true)
@session_topic = @channel.queue("", :auto_delete => true)
@session_topic.bind(@sessions_exchange, :routing_key => "session.#")
@session_topic.purge
# subscribe for any messages to session
#@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg|
@session_subscription = @session_topic.subscribe(:ack => false)
@session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
begin
routing_key = headers.envelope.routing_key
session_id = routing_key["session.".length..-1]
@semaphore.synchronize do
contexts = @session_context_lookup[session_id]
@session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
begin
routing_key = headers.envelope.routing_key
session_id = routing_key["session.".length..-1]
@semaphore.synchronize do
contexts = @session_context_lookup[session_id]
unless contexts.nil?
@log.debug "received session-directed message for session: #{session_id}"
unless contexts.nil?
@log.debug "received session-directed message for session: #{session_id}"
msg = Jampb::ClientMessage.parse(msg)
# ok, its very odd to have your own message that you sent bounce back to you.
@ -188,88 +188,88 @@ module JamWebsockets
origin_client_id = origin_client_id.to_s unless origin_client_id.nil?
@log.debug "message received from client #{origin_client_id}"
contexts.each do |context|
contexts.each do |context|
if context.client.client_id != origin_client_id
EM.schedule do
@log.debug "sending session message to #{context}"
send_to_client(context.client, msg)
end
end
end
end
end
end
end
end
rescue => e
@log.error "unhandled error in messaging to client"
end
rescue => e
@log.error "unhandled error in messaging to client"
end
end
end
end
def send_to_client(client, msg)
def send_to_client(client, msg)
if client.encode_json
client.send(msg.to_json.to_s)
else
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
end
end
end
def cleanup()
# shutdown topic listeners and mq connection
begin
if !@user_subscription.nil? && @user_subscription.active?
@log.debug "cleaning up user subscription"
@user_subscription.cancel
@user_subscription.shutdown!
end
# shutdown topic listeners and mq connection
begin
if !@user_subscription.nil? && @user_subscription.active?
@log.debug "cleaning up user subscription"
@user_subscription.cancel
@user_subscription.shutdown!
end
if !@session_subscription.nil? && @session_subscription.active?
@log.debug "cleaning up session subscription"
@session_subscription.cancel
@session_subscription.shutdown!
end
if !@session_subscription.nil? && @session_subscription.active?
@log.debug "cleaning up session subscription"
@session_subscription.cancel
@session_subscription.shutdown!
end
rescue => e
@log.debug "unable to cancel subscription on cleanup: #{e}"
end
rescue => e
@log.debug "unable to cancel subscription on cleanup: #{e}"
end
@thread_pool.shutdown
@thread_pool.shutdown
if !@channel.nil?
@channel.close
end
if !@channel.nil?
@channel.close
end
if !@connection.nil?
@connection.close
end
if !@connection.nil?
@connection.close
end
# tear down each individual client
@clients.each do |client, context|
cleanup_client(client)
end
end
# tear down each individual client
@clients.each do |client, context|
cleanup_client(client)
end
end
def stop
@log.info "shutdown"
cleanup
end
def stop
@log.info "shutdown"
cleanup
end
def new_client(client)
def new_client(client)
# give a unique ID to this client. This is used to prevent session messages
# from echoing back to the sender, for instance.
client.client_id = UUIDTools::UUID.random_create.to_s
@semaphore.synchronize do
@pending_clients.add(client)
end
@semaphore.synchronize do
@pending_clients.add(client)
end
# default to using json instead of pb
client.encode_json = true
client.encode_json = true
client.onopen {
#binding.pry
@log.debug "client connected #{client}"
client.onopen {
#binding.pry
@log.debug "client connected #{client}"
# check for '?pb' or '?pb=true' in url query parameters
query_pb = client.request["query"]["pb"]
@ -280,10 +280,10 @@ module JamWebsockets
}
client.onclose {
@log.debug "Connection closed"
client.onclose {
@log.debug "Connection closed"
cleanup_client(client)
cleanup_client(client)
}
client.onerror { |error|
@ -294,7 +294,7 @@ module JamWebsockets
end
cleanup_client(client)
client.close_websocket
client.close_websocket
}
client.onmessage { |msg|
@ -303,37 +303,37 @@ module JamWebsockets
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
begin
if client.encode_json
if client.encode_json
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
parse = JSON.parse(msg)
pb_msg = Jampb::ClientMessage.json_create(parse)
parse = JSON.parse(msg)
pb_msg = Jampb::ClientMessage.json_create(parse)
self.route(pb_msg, client)
else
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
self.route(pb_msg, client)
end
rescue SessionError => e
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
begin
else
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
self.route(pb_msg, client)
end
rescue SessionError => e
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
begin
# wrap the message up and send it down
error_msg = @message_factory.server_rejection_error(e.to_s)
send_to_client(client, error_msg)
send_to_client(client, error_msg)
ensure
client.close_websocket
client.close_websocket
cleanup_client(client)
end
rescue => e
@log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}"
@log.error e
begin
@log.error e
begin
# wrap the message up and send it down
error_msg = @message_factory.server_generic_error(e.to_s)
send_to_client(client, error_msg)
send_to_client(client, error_msg)
ensure
client.close_websocket
client.close_websocket
cleanup_client(client)
end
end
@ -342,38 +342,38 @@ module JamWebsockets
end
# removes all resources associated with a client
# removes all resources associated with a client
def cleanup_client(client)
@semaphore.synchronize do
pending = @pending_clients.delete?(client)
@semaphore.synchronize do
pending = @pending_clients.delete?(client)
if !pending.nil?
@log.debug "cleaning up pending client #{client}"
else
context = @clients.delete(client)
if !pending.nil?
@log.debug "cleaning up pending client #{client}"
else
context = @clients.delete(client)
if !context.nil?
remove_user(context)
if !context.nil?
remove_user(context)
if !context.session.nil?
remove_session(context)
end
else
@log.debug "skipping duplicate cleanup attempt of authorized client"
end
if !context.session.nil?
remove_session(context)
end
else
@log.debug "skipping duplicate cleanup attempt of authorized client"
end
end
end
end
end
end
def route(client_msg, client)
message_type = @message_factory.get_message_type(client_msg)
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
message_type = @message_factory.get_message_type(client_msg)
@log.debug("msg received #{message_type}")
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
@log.debug("msg received #{message_type}")
raise SessionError, 'client_msg.target is null' if client_msg.target.nil?
@ -408,9 +408,9 @@ module JamWebsockets
handle_login(client_msg.login, client)
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
handle_heartbeat(client_msg.heartbeat, client)
handle_heartbeat(client_msg.heartbeat, client)
elsif client_msg.type == ClientMessage::Type::LOGIN_JAM_SESSION
@ -438,10 +438,10 @@ module JamWebsockets
@log.debug "user #{user.email} logged in"
# respond with LOGIN_ACK to let client know it was successful
#binding.pry
remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername)
login_ack = @message_factory.login_ack(remote_ip)
send_to_client(client, login_ack)
#binding.pry
remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername)
login_ack = @message_factory.login_ack(remote_ip)
send_to_client(client, login_ack)
# log this connection in the database
connection = Connection.new()
@ -450,22 +450,22 @@ module JamWebsockets
connection.save
# remove from pending_queue
@semaphore.synchronize do
@pending_clients.delete(client)
@semaphore.synchronize do
@pending_clients.delete(client)
# add a tracker for this user
context = ClientContext.new(user, client)
@clients[client] = context
add_user(context)
end
# add a tracker for this user
context = ClientContext.new(user, client)
@clients[client] = context
add_user(context)
end
else
raise SessionError, 'invalid login'
end
end
def handle_heartbeat(heartbeat, client)
# todo: manage staleness
end
def handle_heartbeat(heartbeat, client)
# todo: manage staleness
end
def handle_join_jam_session(join_jam_session, client)
# verify that the current user has the rights to actually join the jam session
@ -475,106 +475,106 @@ module JamWebsockets
begin
session = access_jam_session?(session_id, context.user)
@log.debug "user #{context} joining new session #{session}"
@semaphore.synchronize do
old_session = context.session
if !old_session.nil?
@log.debug "#{context} is already in session. auto-logging out to join new session."
remove_session(context)
end
context.session = session
add_session(context)
end
@log.debug "user #{context} joining new session #{session}"
@semaphore.synchronize do
old_session = context.session
if !old_session.nil?
@log.debug "#{context} is already in session. auto-logging out to join new session."
remove_session(context)
end
context.session = session
add_session(context)
end
rescue => e
# send back a failure ack and bail
@log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}"
@log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}"
login_jam_session = @message_factory.login_jam_session_ack(true, e.to_s)
send_to_client(client, login_jam_session)
send_to_client(client, login_jam_session)
return
end
# respond with LOGIN_JAM_SESSION_ACK to let client know it was successful
login_jam_session = @message_factory.login_jam_session_ack(false, nil)
send_to_client(client, login_jam_session)
send_to_client(client, login_jam_session)
# send 'new client' message to other members in the session
handle_session_directed(session_id,
@message_factory.user_joined_jam_session(context.user.id, context.user.name),
client)
@message_factory.user_joined_jam_session(context.user.id, context.user.name),
client)
end
def handle_leave_jam_session(leave_jam_session, client)
def handle_leave_jam_session(leave_jam_session, client)
context = @clients[client]
context = @clients[client]
raise SessionError, "unsupported"
end
raise SessionError, "unsupported"
end
def valid_login(username, password, token)
def valid_login(username, password, token)
if !token.nil? && token != ''
@log.debug "logging in via token"
# attempt login with token
user = User.find_by_remember_token(token)
# attempt login with token
user = User.find_by_remember_token(token)
if user.nil?
@log.debug "no user found with token"
return false
else
@log.debug "#{user} login via token"
return user
end
if user.nil?
@log.debug "no user found with token"
return false
else
@log.debug "#{user} login via token"
return user
end
elsif !username.nil? and !password.nil?
elsif !username.nil? and !password.nil?
@log.debug "logging in via user/pass '#{username}' '#{password}'"
# attempt login with username and password
user = User.find_by_email(username)
# attempt login with username and password
user = User.find_by_email(username)
if !user.nil? && user.authenticate(password)
@log.debug "#{user} login via password"
return user
else
@log.debug "#{username} login failure"
return nil
end
else
raise SessionError, 'no login data was found in Login message'
end
if !user.nil? && user.authenticate(password)
@log.debug "#{user} login via password"
return user
else
@log.debug "#{username} login failure"
return nil
end
else
raise SessionError, 'no login data was found in Login message'
end
:properties => { :headers => { "client_id" => client.client_id } } )
end
end
def access_jam_session?(jam_session_id, user)
jam_session = JamSession.find_by_id(jam_session_id)
def access_jam_session?(jam_session_id, user)
jam_session = JamSession.find_by_id(jam_session_id)
if jam_session.nil?
raise SessionError, 'specified session not found'
end
if jam_session.nil?
raise SessionError, 'specified session not found'
end
if !jam_session.access? user
raise SessionError, 'not allowed to join the specified session'
end
if !jam_session.access? user
raise SessionError, 'not allowed to join the specified session'
end
return jam_session
end
return jam_session
end
def handle_session_directed(session_id, client_msg, client)
def handle_session_directed(session_id, client_msg, client)
context = @clients[client]
context = @clients[client]
# by not catching any exception here, this will kill the connection
# if for some reason the client is trying to send to a session that it doesn't
# belong to
session = access_jam_session?(session_id, context.user)
# by not catching any exception here, this will kill the connection
# if for some reason the client is trying to send to a session that it doesn't
# belong to
session = access_jam_session?(session_id, context.user)
@log.debug "publishing to session #{session} from client_id #{client.client_id}"
# put it on the topic exchange for sessions
@sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => { :headers => { "client_id" => client.client_id } } )
end
@log.debug "publishing to session #{session} from client_id #{client.client_id}"
# put it on the topic exchange for sessions
@sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => { :headers => { "client_id" => client.client_id } } )
end
def handle_user_directed(user, client_msg, client)
def handle_user_directed(user, client_msg, client)
raise SessionError, 'not implemented'
end
end
raise SessionError, 'not implemented'
end
end
end