diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index c8ac46f01..8358c2669 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -23,25 +23,25 @@ module JamWebsockets class Router - attr_accessor :user_context_lookup, :session_context_lookup - + attr_accessor :user_context_lookup, :session_context_lookup + def initialize(options={}) @log = Logging.logger[self] @pending_clients = Set.new # clients that have connected to server, but not logged in. @clients = {} # clients that have logged in - @user_context_lookup = {} # lookup a set of client_contexts by user_id - @session_context_lookup = {} # lookup a set of client_contexts by session_id + @user_context_lookup = {} # lookup a set of client_contexts by user_id + @session_context_lookup = {} # lookup a set of client_contexts by session_id @sessions_exchange = nil @connection = nil @channel = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new - @semaphore = Mutex.new - @user_topic = nil - @user_subscription = nil - @session_topic = nil - @session_subscription = nil - @thread_pool = nil + @semaphore = Mutex.new + @user_topic = nil + @user_subscription = nil + @session_topic = nil + @session_subscription = nil + @thread_pool = nil end def start(options = {}) @@ -49,7 +49,7 @@ module JamWebsockets @log.info "startup" begin - @thread_pool = Executors.new_fixed_thread_pool(8) + @thread_pool = Executors.new_fixed_thread_pool(8) @connection = HotBunnies.connect(:host => options[:host], :port => options[:port]) @channel = @connection.create_channel @channel.prefetch = 10 @@ -62,119 +62,119 @@ module JamWebsockets end - def add_user(context) - user_contexts = @user_context_lookup[context.user.id] - if user_contexts.nil? - user_contexts = Set.new - @user_context_lookup[context.user.id] = user_contexts - end + def add_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + user_contexts = Set.new + @user_context_lookup[context.user.id] = user_contexts + end - user_contexts.add(context) - end + user_contexts.add(context) + end - def remove_user(context) - user_contexts = @user_context_lookup[context.user.id] - if user_contexts.nil? - @log.warn "user can not be removed #{context}" - else - # delete the context from set of user contexts - user_contexts.delete(context) + def remove_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + @log.warn "user can not be removed #{context}" + else + # delete the context from set of user contexts + user_contexts.delete(context) - # if last user context, delete entire set (memory leak concern) - if user_contexts.length == 0 - @user_context_lookup.delete(context.user.id) - end - end - end + # if last user context, delete entire set (memory leak concern) + if user_contexts.length == 0 + @user_context_lookup.delete(context.user.id) + end + end + end - def add_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - session_contexts = Set.new - @session_context_lookup[context.session.id] = session_contexts - end + def add_session(context) + session_contexts = @session_context_lookup[context.session.id] + if session_contexts.nil? + session_contexts = Set.new + @session_context_lookup[context.session.id] = session_contexts + end - session_contexts.add(context) - end + session_contexts.add(context) + end - def remove_session(context) - session_contexts = @session_context_lookup[context.session.id] - if session_contexts.nil? - @log.warn "session can not be removed #{context}" - else - # delete the context from set of session contexts - session_contexts.delete(context) + def remove_session(context) + session_contexts = @session_context_lookup[context.session.id] + if session_contexts.nil? + @log.warn "session can not be removed #{context}" + else + # delete the context from set of session contexts + session_contexts.delete(context) - # if last session context, delete entire set (memory leak concern) - if session_contexts.length == 0 - @session_context_lookup.delete(context.session.id) - end + # if last session context, delete entire set (memory leak concern) + if session_contexts.length == 0 + @session_context_lookup.delete(context.session.id) + end - context.session = nil - end - end + context.session = nil + end + end - # register topic for user messages and session messages - def register_topics - @users_exchange = @channel.exchange('users', :type => :topic) + # register topic for user messages and session messages + def register_topics + @users_exchange = @channel.exchange('users', :type => :topic) @sessions_exchange = @channel.exchange('sessions', :type => :topic) - - # create user messaging topic + + # create user messaging topic @user_topic = @channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge # TODO: alert friends - # subscribe for any messages to users - - #@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg| - @user_subscription = @user_topic.subscribe(:ack => false) - @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| - begin - routing_key = headers.envelope.routing_key - user_id = routing_key["user.".length..-1] - @sempahore.synchronize do - contexts = @user_context_lookup[user_id] + # subscribe for any messages to users + + #@user_subscription = @user_topic.subscribe(:ack => false, :blocking => false, :executor => @threadpool) do |headers, msg| + @user_subscription = @user_topic.subscribe(:ack => false) + @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + user_id = routing_key["user.".length..-1] + @sempahore.synchronize do + contexts = @user_context_lookup[user_id] - unless contexts.nil? - - @log.debug "received user-directed message for session: #{user_id}" - + unless contexts.nil? + + @log.debug "received user-directed message for session: #{user_id}" + msg = Jampb::ClientMessage.parse(msg) - contexts.each do |context| - EM.schedule do - @log.debug "sending user message to #{context}" - send_to_client(context.client, msg) - end - end - end - end + contexts.each do |context| + EM.schedule do + @log.debug "sending user message to #{context}" + send_to_client(context.client, msg) + end + end + end + end - rescue => e - @log.error "unhandled error in messaging to client" - end - end + rescue => e + @log.error "unhandled error in messaging to client" + end + end - @session_topic = @channel.queue("", :auto_delete => true) + @session_topic = @channel.queue("", :auto_delete => true) @session_topic.bind(@sessions_exchange, :routing_key => "session.#") @session_topic.purge # subscribe for any messages to session #@session_subscription = @session_topic.subscribe(:ack => false, :blocking => false) do |headers, msg| @session_subscription = @session_topic.subscribe(:ack => false) - @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| - begin - routing_key = headers.envelope.routing_key - session_id = routing_key["session.".length..-1] - @semaphore.synchronize do - contexts = @session_context_lookup[session_id] + @session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + begin + routing_key = headers.envelope.routing_key + session_id = routing_key["session.".length..-1] + @semaphore.synchronize do + contexts = @session_context_lookup[session_id] - unless contexts.nil? - - @log.debug "received session-directed message for session: #{session_id}" - + unless contexts.nil? + + @log.debug "received session-directed message for session: #{session_id}" + msg = Jampb::ClientMessage.parse(msg) # ok, its very odd to have your own message that you sent bounce back to you. @@ -188,88 +188,88 @@ module JamWebsockets origin_client_id = origin_client_id.to_s unless origin_client_id.nil? @log.debug "message received from client #{origin_client_id}" - contexts.each do |context| + contexts.each do |context| if context.client.client_id != origin_client_id EM.schedule do @log.debug "sending session message to #{context}" send_to_client(context.client, msg) end end - end - end - end + end + end + end - rescue => e - @log.error "unhandled error in messaging to client" - end + rescue => e + @log.error "unhandled error in messaging to client" + end end - end + end - def send_to_client(client, msg) + def send_to_client(client, msg) if client.encode_json client.send(msg.to_json.to_s) else - # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent - client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) + # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent + client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) end - end + end def cleanup() - # shutdown topic listeners and mq connection - begin - if !@user_subscription.nil? && @user_subscription.active? - @log.debug "cleaning up user subscription" - @user_subscription.cancel - @user_subscription.shutdown! - end + # shutdown topic listeners and mq connection + begin + if !@user_subscription.nil? && @user_subscription.active? + @log.debug "cleaning up user subscription" + @user_subscription.cancel + @user_subscription.shutdown! + end - if !@session_subscription.nil? && @session_subscription.active? - @log.debug "cleaning up session subscription" - @session_subscription.cancel - @session_subscription.shutdown! - end + if !@session_subscription.nil? && @session_subscription.active? + @log.debug "cleaning up session subscription" + @session_subscription.cancel + @session_subscription.shutdown! + end - rescue => e - @log.debug "unable to cancel subscription on cleanup: #{e}" - end + rescue => e + @log.debug "unable to cancel subscription on cleanup: #{e}" + end - @thread_pool.shutdown + @thread_pool.shutdown - if !@channel.nil? - @channel.close - end + if !@channel.nil? + @channel.close + end - if !@connection.nil? - @connection.close - end + if !@connection.nil? + @connection.close + end - # tear down each individual client - @clients.each do |client, context| - cleanup_client(client) - end - end + # tear down each individual client + @clients.each do |client, context| + cleanup_client(client) + end + end - def stop - @log.info "shutdown" - cleanup - end + def stop + @log.info "shutdown" + cleanup + end - def new_client(client) + def new_client(client) # give a unique ID to this client. This is used to prevent session messages # from echoing back to the sender, for instance. client.client_id = UUIDTools::UUID.random_create.to_s - @semaphore.synchronize do - @pending_clients.add(client) - end + @semaphore.synchronize do + @pending_clients.add(client) + end # default to using json instead of pb - client.encode_json = true + client.encode_json = true - client.onopen { - #binding.pry - @log.debug "client connected #{client}" + client.onopen { + #binding.pry + @log.debug "client connected #{client}" # check for '?pb' or '?pb=true' in url query parameters query_pb = client.request["query"]["pb"] @@ -280,10 +280,10 @@ module JamWebsockets } - client.onclose { - @log.debug "Connection closed" + client.onclose { + @log.debug "Connection closed" - cleanup_client(client) + cleanup_client(client) } client.onerror { |error| @@ -294,7 +294,7 @@ module JamWebsockets end cleanup_client(client) - client.close_websocket + client.close_websocket } client.onmessage { |msg| @@ -303,37 +303,37 @@ module JamWebsockets # TODO: set a max message size before we put it through PB? # TODO: rate limit? - + begin - if client.encode_json + if client.encode_json #example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}} - parse = JSON.parse(msg) - pb_msg = Jampb::ClientMessage.json_create(parse) + parse = JSON.parse(msg) + pb_msg = Jampb::ClientMessage.json_create(parse) self.route(pb_msg, client) - else - pb_msg = Jampb::ClientMessage.parse(msg.to_s) - self.route(pb_msg, client) - end - rescue SessionError => e - @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" - begin + else + pb_msg = Jampb::ClientMessage.parse(msg.to_s) + self.route(pb_msg, client) + end + rescue SessionError => e + @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" + begin # wrap the message up and send it down error_msg = @message_factory.server_rejection_error(e.to_s) - send_to_client(client, error_msg) + send_to_client(client, error_msg) ensure - client.close_websocket + client.close_websocket cleanup_client(client) end rescue => e @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" - @log.error e - - begin + @log.error e + + begin # wrap the message up and send it down error_msg = @message_factory.server_generic_error(e.to_s) - send_to_client(client, error_msg) + send_to_client(client, error_msg) ensure - client.close_websocket + client.close_websocket cleanup_client(client) end end @@ -342,38 +342,38 @@ module JamWebsockets end - # removes all resources associated with a client + # removes all resources associated with a client def cleanup_client(client) - @semaphore.synchronize do - pending = @pending_clients.delete?(client) + @semaphore.synchronize do + pending = @pending_clients.delete?(client) - if !pending.nil? - @log.debug "cleaning up pending client #{client}" - else - context = @clients.delete(client) + if !pending.nil? + @log.debug "cleaning up pending client #{client}" + else + context = @clients.delete(client) - if !context.nil? - - remove_user(context) + if !context.nil? + + remove_user(context) - if !context.session.nil? - remove_session(context) - end - else - @log.debug "skipping duplicate cleanup attempt of authorized client" - end + if !context.session.nil? + remove_session(context) + end + else + @log.debug "skipping duplicate cleanup attempt of authorized client" + end - end - end + end + end end def route(client_msg, client) - message_type = @message_factory.get_message_type(client_msg) - - raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? + message_type = @message_factory.get_message_type(client_msg) - @log.debug("msg received #{message_type}") + raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? + + @log.debug("msg received #{message_type}") raise SessionError, 'client_msg.target is null' if client_msg.target.nil? @@ -408,9 +408,9 @@ module JamWebsockets handle_login(client_msg.login, client) - elsif client_msg.type == ClientMessage::Type::HEARTBEAT + elsif client_msg.type == ClientMessage::Type::HEARTBEAT - handle_heartbeat(client_msg.heartbeat, client) + handle_heartbeat(client_msg.heartbeat, client) elsif client_msg.type == ClientMessage::Type::LOGIN_JAM_SESSION @@ -438,10 +438,10 @@ module JamWebsockets @log.debug "user #{user.email} logged in" # respond with LOGIN_ACK to let client know it was successful - #binding.pry - remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername) - login_ack = @message_factory.login_ack(remote_ip) - send_to_client(client, login_ack) + #binding.pry + remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername) + login_ack = @message_factory.login_ack(remote_ip) + send_to_client(client, login_ack) # log this connection in the database connection = Connection.new() @@ -450,22 +450,22 @@ module JamWebsockets connection.save # remove from pending_queue - @semaphore.synchronize do - @pending_clients.delete(client) + @semaphore.synchronize do + @pending_clients.delete(client) - # add a tracker for this user - context = ClientContext.new(user, client) - @clients[client] = context - add_user(context) - end + # add a tracker for this user + context = ClientContext.new(user, client) + @clients[client] = context + add_user(context) + end else raise SessionError, 'invalid login' end end - def handle_heartbeat(heartbeat, client) - # todo: manage staleness - end + def handle_heartbeat(heartbeat, client) + # todo: manage staleness + end def handle_join_jam_session(join_jam_session, client) # verify that the current user has the rights to actually join the jam session @@ -475,106 +475,106 @@ module JamWebsockets begin session = access_jam_session?(session_id, context.user) - @log.debug "user #{context} joining new session #{session}" - @semaphore.synchronize do - old_session = context.session - if !old_session.nil? - @log.debug "#{context} is already in session. auto-logging out to join new session." - remove_session(context) - end - context.session = session - add_session(context) - end + @log.debug "user #{context} joining new session #{session}" + @semaphore.synchronize do + old_session = context.session + if !old_session.nil? + @log.debug "#{context} is already in session. auto-logging out to join new session." + remove_session(context) + end + context.session = session + add_session(context) + end rescue => e # send back a failure ack and bail - @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" + @log.debug "client requested non-existent session. client:#{client.request['origin']} user:#{context.user.email}" login_jam_session = @message_factory.login_jam_session_ack(true, e.to_s) - send_to_client(client, login_jam_session) + send_to_client(client, login_jam_session) return end # respond with LOGIN_JAM_SESSION_ACK to let client know it was successful login_jam_session = @message_factory.login_jam_session_ack(false, nil) - send_to_client(client, login_jam_session) - + send_to_client(client, login_jam_session) + # send 'new client' message to other members in the session handle_session_directed(session_id, - @message_factory.user_joined_jam_session(context.user.id, context.user.name), - client) + @message_factory.user_joined_jam_session(context.user.id, context.user.name), + client) end - def handle_leave_jam_session(leave_jam_session, client) + def handle_leave_jam_session(leave_jam_session, client) - context = @clients[client] + context = @clients[client] - raise SessionError, "unsupported" - end + raise SessionError, "unsupported" + end - def valid_login(username, password, token) + def valid_login(username, password, token) if !token.nil? && token != '' @log.debug "logging in via token" - # attempt login with token - user = User.find_by_remember_token(token) + # attempt login with token + user = User.find_by_remember_token(token) - if user.nil? - @log.debug "no user found with token" - return false - else - @log.debug "#{user} login via token" - return user - end + if user.nil? + @log.debug "no user found with token" + return false + else + @log.debug "#{user} login via token" + return user + end - elsif !username.nil? and !password.nil? + elsif !username.nil? and !password.nil? @log.debug "logging in via user/pass '#{username}' '#{password}'" - # attempt login with username and password - user = User.find_by_email(username) + # attempt login with username and password + user = User.find_by_email(username) - if !user.nil? && user.authenticate(password) - @log.debug "#{user} login via password" - return user - else - @log.debug "#{username} login failure" - return nil - end - else - raise SessionError, 'no login data was found in Login message' - end + if !user.nil? && user.authenticate(password) + @log.debug "#{user} login via password" + return user + else + @log.debug "#{username} login failure" + return nil + end + else + raise SessionError, 'no login data was found in Login message' + end :properties => { :headers => { "client_id" => client.client_id } } ) - end + end - def access_jam_session?(jam_session_id, user) - jam_session = JamSession.find_by_id(jam_session_id) + def access_jam_session?(jam_session_id, user) + jam_session = JamSession.find_by_id(jam_session_id) - if jam_session.nil? - raise SessionError, 'specified session not found' - end + if jam_session.nil? + raise SessionError, 'specified session not found' + end - if !jam_session.access? user - raise SessionError, 'not allowed to join the specified session' - end + if !jam_session.access? user + raise SessionError, 'not allowed to join the specified session' + end - return jam_session - end + return jam_session + end - def handle_session_directed(session_id, client_msg, client) + def handle_session_directed(session_id, client_msg, client) - context = @clients[client] + context = @clients[client] - # by not catching any exception here, this will kill the connection - # if for some reason the client is trying to send to a session that it doesn't - # belong to - session = access_jam_session?(session_id, context.user) + # by not catching any exception here, this will kill the connection + # if for some reason the client is trying to send to a session that it doesn't + # belong to + session = access_jam_session?(session_id, context.user) - @log.debug "publishing to session #{session} from client_id #{client.client_id}" - # put it on the topic exchange for sessions - @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => { :headers => { "client_id" => client.client_id } } ) - end + @log.debug "publishing to session #{session} from client_id #{client.client_id}" + # put it on the topic exchange for sessions + @sessions_exchange.publish(client_msg.to_s, :routing_key => "session.#{session_id}", :properties => { :headers => { "client_id" => client.client_id } } ) + end - def handle_user_directed(user, client_msg, client) + def handle_user_directed(user, client_msg, client) - raise SessionError, 'not implemented' - end - end + raise SessionError, 'not implemented' + end + end end