require 'set' require 'amqp' require 'thread' require 'json' require 'eventmachine' include Jampb # add new field to client connection module EventMachine module WebSocket class Connection < EventMachine::Connection attr_accessor :encode_json, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like # http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open attr_accessor :connected def connection_completed connected = true super end def connected? !!connected end def unbind connected = false super end end end end module JamWebsockets class Router attr_accessor :user_context_lookup def initialize() @log = Logging.logger[self] @clients = {} # clients that have logged in @user_context_lookup = {} # lookup a set of client_contexts by user_id @client_lookup = {} # lookup a client by client_id @amqp_connection_manager = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new @semaphore = Mutex.new @user_topic = nil @client_topic = nil @thread_pool = nil @heartbeat_interval_client = nil @connect_time_expire_client = nil @heartbeat_interval_browser= nil @connect_time_expire_browser= nil @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] end def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672}, &block) @log.info "startup" @heartbeat_interval_client = connect_time_stale_client / 2 @connect_time_expire_client = connect_time_expire_client begin @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) @amqp_connection_manager.connect do |channel| register_topics(channel) block.call end rescue => e @log.error "unable to initialize #{e.to_s}" cleanup raise e end @log.info "started" end def add_client(client_id, client_context) @log.debug "adding client #{client_id} to @client_lookup" @client_lookup[client_id] = client_context end def remove_client(client_id) deleted = @client_lookup.delete(client_id) if deleted.nil? @log.warn "unable to delete #{client_id} from client_lookup because it's already gone" else @log.debug "cleaned up @client_lookup for #{client_id}" end end def add_user(context) user_contexts = @user_context_lookup[context.user.id] if user_contexts.nil? user_contexts = Hash.new @user_context_lookup[context.user.id] = user_contexts end user_contexts[context.client] = context end def remove_user(client_context) user_contexts = @user_context_lookup[client_context.user.id] if user_contexts.nil? @log.warn "user can not be removed #{client_context}" else # delete the context from set of user contexts user_contexts.delete(client_context.client) # if last user context, delete entire set (memory leak concern) if user_contexts.length == 0 @user_context_lookup.delete(client_context.user.id) end client_context.user = nil end end # register topic for user messages and session messages def register_topics(channel) ######################## USER MESSAGING ########################### # create user exchange @users_exchange = channel.topic('users') # create user messaging topic @user_topic = channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge # subscribe for any messages to users @user_topic.subscribe(:ack => false) do |headers, msg| begin routing_key = headers.routing_key user_id = routing_key["user.".length..-1] @semaphore.synchronize do contexts = @user_context_lookup[user_id] if !contexts.nil? @log.debug "received user-directed message for user: #{user_id}" msg = Jampb::ClientMessage.parse(msg) contexts.each do |client_id, context| EM.schedule do @log.debug "sending user message to #{context}" send_to_client(context.client, msg) end end else @log.debug "Can't route message: no user connected with id #{user_id}" end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end end MQRouter.user_exchange = @users_exchange ############## CLIENT MESSAGING ################### @clients_exchange = channel.topic('clients') @client_topic = channel.queue("", :auto_delete => true) @client_topic.bind(@clients_exchange, :routing_key => "client.#") @client_topic.purge # subscribe for any p2p messages to a client @client_topic.subscribe(:ack => false) do |headers, msg| begin routing_key = headers.routing_key client_id = routing_key["client.".length..-1] @semaphore.synchronize do client_context = @client_lookup[client_id] if !client_context.nil? client = client_context.client msg = Jampb::ClientMessage.parse(msg) @log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless client.nil? EM.schedule do @log.debug "sending client-directed down websocket to #{client_id}" send_to_client(client, msg) end else @log.debug "client-directed message unroutable to disconnected client #{client_id}" end else @log.debug "Can't route message: no client connected with id #{client_id}" end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end end MQRouter.client_exchange = @clients_exchange end def new_client(client) # default to using json instead of pb client.encode_json = true client.onopen { |handshake| #binding.pry @log.debug "client connected #{client}" # check for '?pb' or '?pb=true' in url query parameters query_pb = handshake.query["pb"] if !query_pb.nil? && (query_pb == "" || query_pb == "true") client.encode_json = false end } client.onclose { @log.debug "Connection closed" stale_client(client) cleanup_client(client) } client.onerror { |error| if error.kind_of?(EM::WebSocket::WebSocketError) @log.error "websockets error: #{error}" else @log.error "generic error: #{error} #{error.backtrace}" end } client.onmessage { |msg| # TODO: set a max message size before we put it through PB? # TODO: rate limit? pb_msg = nil begin if client.encode_json #example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}} parse = JSON.parse(msg) pb_msg = Jampb::ClientMessage.json_create(parse) self.route(pb_msg, client) else pb_msg = Jampb::ClientMessage.parse(msg.to_s) self.route(pb_msg, client) end rescue SessionError => e @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" begin # wrap the message up and send it down error_msg = @message_factory.server_rejection_error(e.to_s) send_to_client(client, error_msg) ensure cleanup_client(client) end rescue PermissionError => e @log.info "permission error. reason=#{e.to_s}" @log.info e # wrap the message up and send it down error_msg = @message_factory.server_permission_error(pb_msg.message_id, e.to_s) send_to_client(client, error_msg) rescue => e @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" @log.error e begin # wrap the message up and send it down error_msg = @message_factory.server_generic_error(e.to_s) send_to_client(client, error_msg) ensure cleanup_client(client) end end } end def send_to_client(client, msg) @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK if client.encode_json client.send(msg.to_json.to_s) else # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) end end def cleanup() # shutdown topic listeners and mq connection unless @amqp_connection_manager.nil? @amqp_connection_manager.disconnect end # tear down each individual client @clients.each do |client, context| cleanup_client(client) end end def stop @log.info "shutdown" cleanup end # caused a client connection to be marked stale def stale_client(client) if client.client_id ConnectionManager.active_record_transaction do |connection_manager| music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id) # update the session members, letting them know this client went stale context = @client_lookup[client.client_id] if music_session = MusicSession.find_by_id(music_session_id) Notification.send_musician_session_stale(music_session, client.client_id, context.user) end unless music_session_id.nil? end end end def cleanup_clients_with_ids(expired_connections) expired_connections.each do |expired_connection| cid = expired_connection[:client_id] client_context = @client_lookup[cid] diagnostic_data = client_context.to_json unless client_context.nil? cleanup_client(client_context.client) unless client_context.nil? music_session = nil recordingId = nil user = nil # remove this connection from the database ConnectionManager.active_record_transaction do |mgr| mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| Diagnostic.expired_stale_connection(user_id, diagnostic_data) Notification.send_friend_update(user_id, false, conn) if count == 0 music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil? user = User.find_by_id(user_id) unless user_id.nil? recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one recordingId = recording.id unless recording.nil? music_session.with_lock do # VRFS-1297 music_session.tick_track_changes end if music_session } end Notification.send_session_depart(music_session, cid, user, recordingId) unless music_session.nil? || user.nil? end end # removes all resources associated with a client def cleanup_client(client) @semaphore.synchronize do client.close if client.connected? pending = client.context.nil? # presence of context implies this connection has been logged into if pending @log.debug "cleaned up not-logged-in client #{client}" else @log.debug "cleanup up logged-in client #{client}" context = @clients.delete(client) if context remove_client(client.client_id) remove_user(context) else @log.warn "skipping duplicate cleanup attempt of logged-in client" end end end end def route(client_msg, client) message_type = @message_factory.get_message_type(client_msg) if message_type.nil? Diagnostic.unknown_message_type(client.user_id, client_msg) raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? end @log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT if client_msg.route_to.nil? Diagnostic.missing_route_to(client.user_id, client_msg) raise SessionError, 'client_msg.route_to is null' end if !client.user_id and client_msg.type != ClientMessage::Type::LOGIN # this client has not logged in and is trying to send a non-login message raise SessionError, "must 'Login' first" end if @message_factory.server_directed? client_msg handle_server_directed(client_msg, client) elsif @message_factory.client_directed? client_msg to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1] handle_client_directed(to_client_id, client_msg, client) elsif @message_factory.session_directed? client_msg session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1] handle_session_directed(session_id, client_msg, client) elsif @message_factory.user_directed? client_msg user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1] handle_user_directed(user_id, client_msg, client) else raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}" end end def handle_server_directed(client_msg, client) # @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})") if client_msg.type == ClientMessage::Type::LOGIN handle_login(client_msg.login, client) elsif client_msg.type == ClientMessage::Type::HEARTBEAT sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end end def handle_login(login, client) username = login.username if login.value_for_tag(1) password = login.password if login.value_for_tag(2) token = login.token if login.value_for_tag(3) client_id = login.client_id if login.value_for_tag(4) reconnect_music_session_id = login.reconnect_music_session_id if login.value_for_tag(5) client_type = login.client_type if login.value_for_tag(6) @log.info("*** handle_login: token=#{token}; client_id=#{client_id}, client_type=#{client_type}") reconnected = false # you don't have to supply client_id in login--if you don't, we'll generate one if client_id.nil? || client_id.empty? # give a unique ID to this client. This is used to prevent session messages # from echoing back to the sender, for instance. client_id = UUIDTools::UUID.random_create.to_s end user = valid_login(username, password, token, client_id) # kill any websocket connections that have this same client_id, which can happen in race conditions # this code must happen here, before we go any further, so that there is only one websocket connection per client_id existing_context = @client_lookup[client_id] if existing_context # in reconnect scenarios, we may have in memory a client still Diagnostic.duplicate_client(existing_context.user, existing_context) if existing_context.client.connected cleanup_client(existing_context.client) end connection = JamRuby::Connection.find_by_client_id(client_id) # if this connection is reused by a different user, then whack the connection # because it will recreate a new connection lower down if connection && user && connection.user != user @log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}") connection.delete connection = nil end client.client_id = client_id client.user_id = user.id if user remote_ip = extract_ip(client) if user @log.debug "user #{user} logged in with client_id #{client_id}" # check if there's a connection for the client... if it's stale, reconnect it unless connection.nil? # FIXME: I think connection table needs to updated within connection_manager # otherwise this would be 1 line of code (connection.connect!) music_session_upon_reentry = connection.music_session send_depart = false recording_id = nil ConnectionManager.active_record_transaction do |connection_manager| music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id, remote_ip) if music_session_id.nil? # if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before. # if so, then we need to tell the others in the session that this user is now departed unless music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed? recording = music_session_upon_reentry.stop_recording recording_id = recording.id unless recording.nil? music_session_upon_reentry.with_lock do # VRFS-1297 music_session_upon_reentry.tick_track_changes end send_depart = true end else music_session = MusicSession.find_by_id(music_session_id) Notification.send_musician_session_fresh(music_session, client.client_id, user) end end if send_depart Notification.send_session_depart(music_session_upon_reentry, client.client_id, user, recording_id) end end # respond with LOGIN_ACK to let client know it was successful @semaphore.synchronize do # add a tracker for this user context = ClientContext.new(user, client, client_type) @clients[client] = context add_user(context) add_client(client_id, context) unless connection # log this connection in the database ConnectionManager.active_record_transaction do |connection_manager| connection_manager.create_connection(user.id, client.client_id, remote_ip, client_type) do |conn, count| if count == 1 Notification.send_friend_update(user.id, true, conn) end end end end heartbeat_interval = user.heartbeat_interval_client.to_i || @heartbeat_interval_client heartbeat_interval = @heartbeat_interval_client if heartbeat_interval == 0 # protect against bad config connection_expire_time = user.connection_expire_time || @connection_expire_time connection_expire_time = @connection_expire_time if connection_expire_time == 0 # protect against bad config login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token, @heartbeat_interval_client, connection.try(:music_session_id), reconnected, user.id, @connection_expire_time) send_to_client(client, login_ack) end else raise SessionError, 'invalid login' end end def handle_heartbeat(heartbeat, heartbeat_message_id, client) unless context = @clients[client] @log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session" Diagnostic.missing_client_state(client.user_id, client.context) raise SessionError, 'context state is gone. please reconnect.' else connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id) track_changes_counter = nil if connection.nil? @log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session" Diagnostic.missing_connection(client.user_id, client.context) raise SessionError, 'connection state is gone. please reconnect.' else Connection.transaction do # send back track_changes_counter if in a session if connection.music_session_id music_session = MusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id) track_changes_counter = music_session.track_changes_counter if music_session end # update connection updated_at connection.touch # update user's notification_seen_at field if the heartbeat indicates it saw one # first we try to use the notification id, which should usually exist. # if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification update_notification_seen_at(connection, context, heartbeat) end ConnectionManager.active_record_transaction do |connection_manager| connection_manager.reconnect(connection, connection.music_session_id, nil) end if connection.stale? end heartbeat_ack = @message_factory.heartbeat_ack(track_changes_counter) send_to_client(client, heartbeat_ack) # send errors to clients in response to heartbeats if rabbitmq is down if !@amqp_connection_manager.connected? error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down") context.sent_bad_state_previously = true send_to_client(client, error_msg) return elsif context.sent_bad_state_previously context.sent_bad_state_previously = false recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id) send_to_client(client, recovery_msg) end end end def update_notification_seen_at(connection, context, heartbeat) notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1) if notification_id_field notification = Notification.find_by_id(notification_id_field) if notification connection.user.notification_seen_at = notification.created_at unless connection.user.save(validate: false) @log.error "unable to update notification_seen_at via id field for client #{context}. errors: #{connection.user.errors.inspect}" end else notification_seen_at_parsed = nil notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2) begin notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0 rescue Exception => e @log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}" end if notification_seen_at_parsed connection.user.notification_seen_at = notification_seen_at unless connection.user.save(validate: false) @log.error "unable to update notification_seen_at via time field for client #{context}. errors: #{connection.user.errors.inspect}" end end end end end def valid_login(username, password, token, client_id) if !token.nil? && token != '' @log.debug "logging in via token" # attempt login with token user = JamRuby::User.find_by_remember_token(token) if user.nil? @log.debug "no user found with token #{token}" return nil else @log.debug "#{user} login via token" return user end elsif !username.nil? and !password.nil? @log.debug "logging in via user/pass '#{username}' '#{password}'" # attempt login with username and password user = User.find_by_email(username) if !user.nil? && user.valid_password?(password) @log.debug "#{user} login via password" return user else @log.debug "#{username} login failure" return nil end else raise SessionError, 'no login data was found in Login message' end end def access_music_session(music_session_id, user) music_session = MusicSession.find_by_id(music_session_id) if music_session.nil? raise SessionError, 'specified session not found' end if !music_session.access? user raise SessionError, 'not allowed to join the specified session' end return music_session end # client_id = the id of the client being accessed # client = the current client def access_p2p(client_id, user, msg) return nil # ping_request and ping_ack messages are special in that they are simply allowed if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK return nil end client_connection = Connection.find_by_client_id(client_id) if client_connection.nil? raise PermissionError, 'specified client not found' end if !client_connection.access_p2p? user raise SessionError, 'not allowed to message this client' end end def handle_client_directed(to_client_id, client_msg, client) context = @clients[client] # by not catching any exception here, a PermissionError will be thrown if this isn't valid # if for some reason the client is trying to send to a client that it doesn't # belong to access_p2p(to_client_id, context.user, client_msg) if to_client_id.nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases raise SessionError, "empty client_id specified in peer-to-peer message" end # populate routing data client_msg.from = client.client_id @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" # put it on the topic exchange for clients @clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}}) end def handle_user_directed(user_id, client_msg, client) @log.debug "publishing to user #{user_id} from client_id #{client.client_id}" # put it on the topic exchange for users @users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}") end def handle_session_directed(session_id, client_msg, client) context = @clients[client] user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id) end # sends a message to a session on behalf of a user # if this is originating in the context of a client, it should be specified as :client_id => "value" # client_msg should be a well-structure message (jam-pb message) def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""}) music_session = access_music_session(music_session_id, user) # gather up client_ids in the session client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] } publish_to_session(music_session.id, client_ids, client_msg.to_s, sender) end # sends a message to a session with no checking of permissions # this method deliberately has no database interactivity/active_record objects def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""}) EM.schedule do sender_client_id = sender[:client_id] # iterate over each person in the session, and send a p2p message client_ids.each do |client_id| @@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}" # put it on the topic exchange3 for clients self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}") end end end def extract_ip(client) return Socket.unpack_sockaddr_in(client.get_peername)[1] end private def sane_logging(&blk) # used around repeated transactions that cause too much ActiveRecord::Base logging begin original_level = @ar_base_logger.level @ar_base_logger.level = :info if @ar_base_logger blk.call ensure @ar_base_logger.level = original_level if @ar_base_logger end end end end