require 'set' require 'amqp' require 'thread' require 'json' require 'eventmachine' include Jampb # add new field to client connection module EventMachine module WebSocket class Connection < EventMachine::Connection attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions, :x_forwarded_for, :query, :is_jamblaster # client_id is uuid we give to each client to track them as we like end end end module JamWebsockets class Router attr_accessor :user_context_lookup, :amqp_connection_manager, :heartbeat_interval_client, :connect_time_expire_client, :connect_time_stale_client, :heartbeat_interval_browser, :connect_time_expire_browser, :connect_time_stale_browser, :maximum_minutely_heartbeat_rate_browser, :maximum_minutely_heartbeat_rate_client, :max_connections_per_user, :gateway_name, :client_lookup, :time_it_sums, :profile_it_sums, :highest_drift, :heartbeat_tracker :temp_ban def initialize() @log = Logging.logger[self] @clients = {} # clients that have logged in @user_context_lookup = {} # lookup a set of client_contexts by user_id @client_lookup = {} # lookup a client by client_id @subscription_lookup = {} @amqp_connection_manager = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new @semaphore = Mutex.new @user_topic = nil @client_topic = nil @subscription_topic = nil @thread_pool = nil @heartbeat_interval_client = nil @connect_time_expire_client = nil @connect_time_stale_client = nil @heartbeat_interval_browser= nil @connect_time_expire_browser= nil @connect_time_stale_browser= nil @maximum_minutely_heartbeat_rate_browser = nil @maximum_minutely_heartbeat_rate_client = nil @gateway_name = nil @stored_ars = nil @stored_ars_beta = nil @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] @message_stats = {} @time_it_sums = {} @profile_it_sums = {} @heartbeat_tracker = {} @temp_ban = {} @chat_enabled = true @chat_blast = true @login_success_count = 0 @login_fail_count = 0 @connected_count = 0 @disconnected_count = 0 @user_message_counts = {} @largest_message = nil @largest_message_user = nil @highest_drift = 0 @pending_notification_seen_ats = {} @semaphore_pnsa = Mutex.new @pending_deletions = {} @semaphore_deletions = Mutex.new end def init wipe_all_connections # this thread runs forever while WSG runs, and should do anything easily gotten out of critical message handling path @background_thread = Thread.new { client_check_count = 0 user_last_seen_count = 0 while true begin periodical_check_connections periodical_notification_seen if client_check_count == 30 periodical_check_clients client_check_count = 0 end client_check_count = client_check_count + 1 if user_last_seen_count == 120 periodical_update_user_last_seen user_last_seen_count = 0 end user_last_seen_count = user_last_seen_count + 1 rescue => e @log.error "unhandled error in thread #{e}" @log.error "stacktrace #{e.backtrace}" end sleep 1 end } end def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true, :chat_enabled => true, :chat_blast => true}, &block) @log.info "startup" @heartbeat_interval_client = connect_time_stale_client / 2 @connect_time_stale_client = connect_time_stale_client @connect_time_expire_client = connect_time_expire_client @heartbeat_interval_browser = connect_time_stale_browser / 2 @connect_time_stale_browser = connect_time_stale_browser @connect_time_expire_browser = connect_time_expire_browser @max_connections_per_user = options[:max_connections_per_user] @gateway_name = options[:gateway] @allow_dynamic_registration = options[:allow_dynamic_registration] @chat_enabled = options[:chat_enabled] @chat_blast = options[:chat_blast] # determine the maximum amount of heartbeats we should get per user @maximum_minutely_heartbeat_rate_client = ((@heartbeat_interval_client / 60.0) * 2).ceil + 3 @maximum_minutely_heartbeat_rate_browser = ((@heartbeat_interval_browser / 60.0) * 2).ceil + 3 @log.info("maxmium minutely timer #{maximum_minutely_heartbeat_rate_client}") begin @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) @amqp_connection_manager.connect do |channel| register_topics(channel) block.call end rescue => e @log.error "unable to initialize #{e.to_s}" cleanup raise e end @log.info "started" end def add_client(client_id, client_context) @log.debug "adding client #{client_id} to @client_lookup" @client_lookup[client_id] = client_context end def remove_client(client_id) deleted = @client_lookup.delete(client_id) if deleted.nil? @log.warn "unable to delete #{client_id} from client_lookup because it's already gone" else @log.debug "cleaned up @client_lookup for #{client_id}" end end def add_user(context) user_contexts = @user_context_lookup[context.user.id] if user_contexts.nil? user_contexts = Hash.new @user_context_lookup[context.user.id] = user_contexts end user_contexts[context.client] = context end def remove_user(client_context) if client_context.user user_contexts = @user_context_lookup[client_context.user.id] if user_contexts.nil? @log.warn "user can not be removed #{client_context}" else # delete the context from set of user contexts user_contexts.delete(client_context.client) # if last user context, delete entire set (memory leak concern) if user_contexts.length == 0 @user_context_lookup.delete(client_context.user.id) end end end end # register topic for user messages and session messages def register_topics(channel) ######################## USER MESSAGING ########################### # create user exchange @users_exchange = channel.topic('users') # create user messaging topic @user_topic = channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge # subscribe for any messages to users @user_topic.subscribe(:ack => false) do |headers, msg| time_it('user_topic') { begin routing_key = headers.routing_key user_id = routing_key["user.".length..-1] @semaphore.synchronize do contexts = @user_context_lookup[user_id] if !contexts.nil? @log.debug "received user-directed message for user: #{user_id}" msg = Jampb::ClientMessage.parse(msg) contexts.each do |client_id, context| EM.schedule do @log.debug "sending user message to #{context}" send_to_client(context.client, msg) end end else #@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end } end MQRouter.user_exchange = @users_exchange ############## CLIENT MESSAGING ################### @clients_exchange = channel.topic('clients') @client_topic = channel.queue("", :auto_delete => true) @client_topic.bind(@clients_exchange, :routing_key => "client.#") @client_topic.purge # subscribe for any p2p messages to a client @client_topic.subscribe(:ack => false) do |headers, msg| time_it('p2p_topic') { begin routing_key = headers.routing_key client_id = routing_key["client.".length..-1] @semaphore.synchronize do if client_id == MessageFactory::ALL_NATIVE_CLIENTS msg = Jampb::ClientMessage.parse(msg) @log.debug "client-directed message received from #{msg.from} to all clients" @client_lookup.each do |client_id, client_context| if client_context.client_type == JamRuby::Connection::TYPE_CLIENT client = client_context.client if client EM.schedule do @log.debug "sending client-directed down websocket to #{client_id}" send_to_client(client, msg) end end end end elsif client_id == MessageFactory::ALL_ACTIVE_CLIENTS if @chat_enabled msg = Jampb::ClientMessage.parse(msg) @log.debug "client-directed message received from #{msg.from} to all chat clients" @client_lookup.each do |client_id, client_context| if @chat_blast || client_context.active client = client_context.client user = client_context.user # disable global chat for schoolers for now if client && user && user.school_id.nil? EM.schedule do #@log.debug "sending client-directed down websocket to #{client_id}" send_to_client(client, msg) end end end end end else client_context = @client_lookup[client_id] if client_context client = client_context.client msg = Jampb::ClientMessage.parse(msg) @log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE unless client.nil? EM.schedule do @log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE send_to_client(client, msg) end else @log.debug "client-directed message unroutable to disconnected client #{client_id}" end else #@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios end end end rescue => e @log.error "unhandled error in messaging to client" @log.error e end } end MQRouter.client_exchange = @clients_exchange ############## DYNAMIC SUBSCRIPTION MESSAGING ################### @subscriptions_exchange = channel.topic('subscriptions') @subscription_topic = channel.queue("subscriptions-#{@gateway_name}", :auto_delete => true) @subscription_topic.purge # subscribe for any p2p messages to a client @subscription_topic.subscribe(:ack => false) do |headers, msg| time_it('subscribe_topic') { begin routing_key = headers.routing_key type_and_id = routing_key["subscription.".length..-1] #type, id = type_and_id.split('.') @semaphore.synchronize do clients = @subscription_lookup[type_and_id] msg = Jampb::ClientMessage.parse(msg) if clients EM.schedule do clients.each do |client| @log.debug "subscription msg to client #{client.client_id}" send_to_client(client, msg) end end end end rescue => e @log.error "unhandled error in messaging to client for mount" @log.error e end } end MQRouter.subscription_exchange = @subscriptions_exchange end # listens on a subscription topic on the behalf of a client def register_subscription(client, type, id) # track subscriptions that this client has made, for disconnect scenarios client.subscriptions.add({type: type, id: id}) key = "#{type}.#{id}" # for a given type:id in @subscription_lookup, track clients listening clients = @subscription_lookup[key] if clients.nil? clients = Set.new @subscription_lookup[key] = clients end needs_subscription = clients.length == 0 clients.add(client) @log.debug("register subscription handled #{type}.#{id}") if needs_subscription routing_key = "subscription.#{type}.#{id}" @log.debug("register topic bound #{routing_key}") # if this is the 1st client to listen for this mount, then subscribe to the topic for this mount_id @subscription_topic.bind(@subscriptions_exchange, :routing_key => routing_key) end end # de-listens on a subscription topic on the behalf of a client # called automatically when a clean disconnects, to keep state clean. def unregister_subscription(client, type, id) # remove subscription from this client's list of subscriptions client.subscriptions.delete({type: type, id: id}) key = "#{type}.#{id}" # for a given mount_id in @subscription_lookup, remove from list of clients listening clients = @subscription_lookup[key] if clients deleted = clients.delete(client) if !deleted @log.error "unregister_subscription: unable to locate any client #{client.client_id} for id #{id}" end else @log.error "unregister_subscription: unable to locate any clients for id #{id}" end if clients.length == 0 # if there are no more clients listening, then unsubscribe to the topic for this mount_id routing_key = "subscription.#{type}.#{id}" @log.debug("unregister dynamic topic #{routing_key}") @subscription_topic.unbind(@subscriptions_exchange, :routing_key => routing_key) end end # this method allows you to translate exceptions into websocket channel messages and behavior safely. # pass in your block, throw an error in your logic, and have the right things happen on the websocket channel def websocket_comm(client, original_message_id, &blk) begin blk.call rescue SessionError => e @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" begin # wrap the message up and send it down error_msg = @message_factory.server_rejection_error(e.to_s, e.error_code) send_to_client(client, error_msg) ensure cleanup_client(client) end rescue JamPermissionError => e @log.info "permission error. reason=#{e.to_s}" @log.info e # wrap the message up and send it down error_msg = @message_factory.server_permission_error(original_message_id, e.to_s) send_to_client(client, error_msg) rescue => e @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" @log.error e if DbUtil.bad_conn_exception?(e) # indicates connection to server is down; kill self. Will be restarted; if db is up we will be healthy @log.error "EXITING DUE TO DEAD DBCONN: #{e}" Kernel.exit!(1) end begin # wrap the message up and send it down error_msg = @message_factory.server_generic_error(e.to_s) send_to_client(client, error_msg) ensure cleanup_client(client) end end end def new_client(client, is_trusted) # default to using json instead of pb client.encode_json = true client.trusted = is_trusted client.onopen { |handshake| time_it('onopen') { stats_connected # a unique ID for this TCP connection, to aid in debugging client.channel_id = handshake.query["channel_id"] @log.debug "client connected #{client} with channel_id: #{client.channel_id} Original-IP: #{handshake.headers["X-Forwarded-For"]}" # check for '?pb' or '?pb=true' in url query parameters query_pb = handshake.query["pb"] if !query_pb.nil? && (query_pb == "" || query_pb == "true") client.encode_json = false end websocket_comm(client, nil) do client.x_forwarded_for = handshake.headers["X-Forwarded-For"] client.query = handshake.query handle_login(client, client.query, client.x_forwarded_for) end } } client.onclose { time_it('onclose') { @log.debug "connection closed. marking stale: #{client.context}" #cleanup_client(client) cleanup_client_with_id(client.client_id) } } client.onerror { |error| if error.kind_of?(EM::WebSocket::WebSocketError) @log.error "websockets error: #{error}" else @log.error "generic error: #{error} #{error.backtrace}" end } client.onmessage { |data| # TODO: set a max message size before we put it through PB? # TODO: rate limit? msg = nil if @largest_message.nil? || data.length > @largest_message.length @largest_message = data @largest_message_user = client.user_id end # extract the message safely websocket_comm(client, nil) do if client.encode_json json = JSON.parse(data) msg = Jampb::ClientMessage.json_create(json) else msg = Jampb::ClientMessage.parse(data.to_s) end end # then route it internally websocket_comm(client, msg.message_id) do self.route(msg, client) end } end def send_to_client(client, msg) @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK || msg.type == ClientMessage::Type::PEER_MESSAGE if client.encode_json client.send(msg.to_json.to_s) else # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) end end def cleanup() # shutdown topic listeners and mq connection unless @amqp_connection_manager.nil? @amqp_connection_manager.disconnect end # tear down each individual client @clients.each do |client, context| cleanup_client(client) end end def stop @log.info "shutdown" cleanup end # caused a client connection to be marked stale # def stale_client(client) # if client.client_id # @log.info "marking client stale: #{client.context}" # ConnectionManager.active_record_transaction do |connection_manager| # music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id) # # update the session members, letting them know this client went stale # context = @client_lookup[client.client_id] # if music_session = ActiveMusicSession.find_by_id(music_session_id) # Notification.send_musician_session_stale(music_session, client.client_id, context.user) # end unless music_session_id.nil? # end # end # end def route(client_msg, client) message_type = @message_factory.get_message_type(client_msg) if message_type.nil? Diagnostic.unknown_message_type(client.user_id, client_msg) raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? end @message_stats[message_type] = @message_stats[message_type].to_i + 1 @log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::HEARTBEAT_ACK && client_msg.type != ClientMessage::Type::PEER_MESSAGE if client_msg.route_to.nil? Diagnostic.missing_route_to(client.user_id, client_msg) raise SessionError, 'client_msg.route_to is null' end if !client.user_id && (client_msg.type != ClientMessage::Type::LOGIN && client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::LOGOUT) # this client has not logged in and is trying to send a non-login message if client.is_jamblaster # send message back to client intsead of doing nothing? @log.debug("jamblaster sent message #{message_type} at wrong time") send_to_client(client, @message_factory.diagnostic("message type #{message_type} ignored because no log in")) else raise SessionError, "must 'Login' first" end end if @message_factory.server_directed? client_msg handle_server_directed(client_msg, client) elsif @message_factory.client_directed? client_msg to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1] time_it('client_directed') { handle_client_directed(to_client_id, client_msg, client) } elsif @message_factory.session_directed? client_msg session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1] time_it('session_directed') { handle_session_directed(session_id, client_msg, client) } elsif @message_factory.user_directed? client_msg user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1] time_it('user_directed') { handle_user_directed(user_id, client_msg, client) } else raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}" end end def handle_server_directed(client_msg, client) # @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})") if client_msg.type == ClientMessage::Type::LOGIN # this is curently only a jamblaster path client.query["token"] = client_msg.login.token client.query["username"] = client_msg.login.username client.query["password"] = client_msg.login.password time_it('login') { handle_login(client, client.query, client.x_forwarded_for, false) } elsif client_msg.type == ClientMessage::Type::LOGOUT # this is currently only a jamblaster path time_it('login') { handle_logout(client) } elsif client_msg.type == ClientMessage::Type::HEARTBEAT time_it('heartbeat') { sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } } elsif client_msg.type == ClientMessage::Type::USER_STATUS time_it('user_status') { sane_logging { handle_user_status(client_msg.user_status, client) } } elsif client_msg.type == ClientMessage::Type::SUBSCRIBE_BULK time_it('subscribe_bulk') { sane_logging { handle_bulk_subscribe(client_msg.subscribe_bulk, client) } } elsif client_msg.type == ClientMessage::Type::SUBSCRIBE time_it('subscribe') { sane_logging { handle_subscribe(client_msg.subscribe, client) } } elsif client_msg.type == ClientMessage::Type::UNSUBSCRIBE time_it('unsubscribe') { sane_logging { handle_unsubscribe(client_msg.unsubscribe, client) } } else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end end # returns heartbeat_interval, connection stale time, and connection expire time def determine_connection_times(user, client_type) if client_type == Connection::TYPE_BROWSER default_heartbeat = @heartbeat_interval_browser default_stale = @connect_time_stale_browser default_expire = @connect_time_expire_browser else default_heartbeat = @heartbeat_interval_client default_stale = @connect_time_stale_client default_expire = @connect_time_expire_client end heartbeat_interval = (user && user.heartbeat_interval_client) || default_heartbeat heartbeat_interval = heartbeat_interval.to_i heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config connection_expire_time = (user && user.connection_expire_time_client) || default_expire connection_expire_time = connection_expire_time.to_i connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now if heartbeat_interval >= connection_stale_time raise SessionError, "misconfiguration! heartbeat_interval (#{heartbeat_interval}) should be less than stale time (#{connection_stale_time})" end if connection_stale_time >= connection_expire_time raise SessionError, "misconfiguration! stale time (#{connection_stale_time}) should be less than expire time (#{connection_expire_time})" end [heartbeat_interval, connection_stale_time, connection_expire_time] end def add_tracker(user, client, client_type, client_id) # add a tracker for this user context = @clients[client] if context context.user = user add_user(context) if user else context = ClientContext.new(user, client, client_type) @clients[client] = context add_user(context) if user add_client(client_id, context) end context end def handle_latency_tester_login(client_id, client_type, client, override_ip, client_id_int) # respond with LOGIN_ACK to let client know it was successful remote_ip = extract_ip(client, override_ip) heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(nil, client_type) latency_tester = LatencyTester.connect({ client_id: client_id, channel_id: client.channel_id, ip_address: remote_ip, connection_stale_time: connection_stale_time, connection_expire_time: connection_expire_time, gateway: @gateway_name }) if latency_tester.errors.any? @log.warn "unable to log in latency_tester with errors: #{latency_tester.errors.inspect}" stats_logged_in_failed raise SessionError, "invalid login: #{latency_tester.errors.inspect}" end client.client_id = client_id client.user_id = latency_tester.id if latency_tester @semaphore.synchronize do context = add_tracker(latency_tester, client, client_type, client_id) @log.debug "logged in context created: #{context}" login_ack = @message_factory.login_ack(remote_ip, client_id, nil, heartbeat_interval, nil, false, latency_tester.id, connection_expire_time, "latency_tester", client_id_int) stats_logged_in send_to_client(client, login_ack) end end def handle_logout(client) connection = Connection.find_by_client_id(client.client_id) if connection connection.delete end client.user_id = nil context = client.context if context @log.debug("will remove context with user: #{context.user}") remove_user(context) context.user = nil end logout_ack = @message_factory.logout_ack() send_to_client(client, logout_ack) end def ars_list(beta) if beta @stored_ars_beta else @stored_ars end end def handle_login(client, options, override_ip = nil, connecting = true) puts("====handle_login====", options) username = options["username"] password = options["password"] token = options["token"] client_id = options["client_id"] reconnect_music_session_id = options["music_session_id"] client_type = options["client_type"] machine_fingerprint = options["machine"] os = options["os"] product = options["product"].nil? ? JamRuby::ArtifactUpdate::CLIENT_PREFIX : options[' product'] udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true' jamblaster_serial_no = options["jamblaster_serial_no"] ipv4_link_local = options["ipv4_link_local"] ipv6_link_local = options["ipv6_link_local"] client_id_int = options["client_id_int"] # it's nice to have client_ids not flap in the wind, and we can do that with jamblasters if jamblaster_serial_no && client_id.nil? client_id = jamblaster_serial_no end # TESTING #if jamblaster_serial_no.nil? # jamblaster_serial_no = 'hi' #end client.subscriptions = Set.new # list of subscriptions that this client is watching in real-time serial_no_debug = jamblaster_serial_no ? "serial_no=#{jamblaster_serial_no}" : '' @log.info("handle_login: type=#{client_type} username=#{username} password=#{password ? '*' : 'null' } token=#{token} client_id=#{client_id} client_id_int=#{client_id_int} channel_id=#{client.channel_id} udp_reachable=#{udp_reachable} #{serial_no_debug}") if client_type == Connection::TYPE_LATENCY_TESTER handle_latency_tester_login(client_id, client_type, client, override_ip, client_id_int) return end reconnected = false if connecting # you don't have to supply client_id in login--if you don't, we'll generate one if client_id.nil? || client_id.empty? # give a unique ID to this client. client_id = UUIDTools::UUID.random_create.to_s end else # client_id's don't change per websocket connection; so use the one from memeory client_id = client.client_id end # we have to deal with jamblaster before login if jamblaster_serial_no && jamblaster_serial_no != '' jamblaster = Jamblaster.bootstrap(jamblaster_serial_no) if jamblaster client.is_jamblaster = true end if jamblaster && connecting jamblaster.client_id = client_id jamblaster.ipv4_link_local = ipv4_link_local jamblaster.ipv6_link_local = ipv6_link_local jamblaster.save end end user = valid_login(username, password, token, client_id, jamblaster) # protect against this user swamping the server if user && Connection.where(user_id: user.id).count >= @max_connections_per_user @log.warn "user #{user.id}/#{user.email} unable to connect due to max_connections_per_user #{@max_connections_per_user}" stats_logged_in_failed raise SessionError, 'max_user_connections', 'max_user_connections' end if connecting # XXX This logic needs to instead be handled by a broadcast out to all websockets indicating dup # kill any websocket connections that have this same client_id, which can happen in race conditions # this code must happen here, before we go any further, so that there is only one websocket connection per client_id existing_context = @client_lookup[client_id] if existing_context # in some reconnect scenarios, we may have in memory a websocket client still. # let's whack it, and tell the other client, if still connected, that this is a duplicate login attempt @log.info "duplicate client: #{existing_context}" Diagnostic.duplicate_client(existing_context.user, existing_context) error_msg = @message_factory.server_duplicate_client_error send_to_client(existing_context.client, error_msg) cleanup_client(existing_context.client) end end connection = Connection.find_by_client_id(client_id) # if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection # because it will recreate a new connection lower down if connection && connection.user != user keep = false if user if connection.user.nil? keep = true @log.debug("user #{user.email} logged into #{client_id}") connection.user = user connection.save else @log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}") end else @log.debug("user-less connection #{client_id} took from user #{connection.user.email}") end if !keep connection.delete connection = nil end end client.client_id = client_id client.user_id = user.id if user remote_ip = extract_ip(client, override_ip) if user heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(user, client_type) @log.debug "logged in #{user} with client_id: #{client_id} and fingerprint #{machine_fingerprint}" # track fingerprint if machine_fingerprint && user.client_fingerprint != machine_fingerprint user.update_attribute(:client_fingerprint, machine_fingerprint) end # check if there's a connection for the client... if it's stale, reconnect it if !connection.nil? && connecting # FIXME: I think connection table needs to updated within connection_manager # otherwise this would be 1 line of code (connection.connect!) music_session_upon_reentry = connection.music_session send_depart = false recording_id = nil ConnectionManager.active_record_transaction do |connection_manager| music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) if music_session_id.nil? # if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before. # if so, then we need to tell the others in the session that this user is now departed unless music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed? if music_session_upon_reentry.backing_track_initiator == user music_session_upon_reentry.close_backing_track end # if a jamtrack is open and this user is no longer in the session, close it if music_session_upon_reentry.jam_track_initiator == user music_session_upon_reentry.close_jam_track end # if a recording is open and this user is no longer in the session, close it if music_session_upon_reentry.claimed_recording_initiator == user music_session_upon_reentry.claimed_recording_stop end # handle case that a recording was ongoing - any one leaves, we stop it recording = music_session_upon_reentry.stop_recording unless recording.nil? @log.debug "stopped recording: #{recording.id} because user #{user} reconnected" recording.discard_if_no_action(user) # throw away this users vote for the recording_id = recording.id unless recording.nil? end # if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog) # then throw away the user's most_recent_recording = music_session_upon_reentry.most_recent_recording if most_recent_recording && most_recent_recording.users.exists?(user) @log.debug "disarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected" # if this user was in the most recent recording associated with the session they were just in, discard any tracks they had most_recent_recording.discard_if_no_action(user) # throw away this users vote for the end music_session_upon_reentry.with_lock do # VRFS-1297 music_session_upon_reentry.tick_track_changes end send_depart = true end else music_session = ActiveMusicSession.find_by_id(music_session_id) Notification.send_musician_session_fresh(music_session, client.client_id, user) end end if send_depart Notification.send_session_depart(music_session_upon_reentry, client.client_id, user, recording_id) end end # respond with LOGIN_ACK to let client know it was successful # add a tracker for this user context = add_tracker(user, client, client_type, client_id) @log.debug "logged in context created: #{context}" if !connection # log this connection in the database ConnectionManager.active_record_transaction do |connection_manager| connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name, jamblaster ? true : false) do |conn, count| connection = Connection.find_by_client_id(client.client_id) user.update_addr_loc(connection, User::JAM_REASON_LOGIN) if count == 1 Notification.send_friend_update(user.id, true, conn) end end end end puts "=========================" puts "O/S = #{os}" puts "=========================" # if we have OS data, try to grab client update data and let the client have it update = ArtifactUpdate.find_client_by_os(product, os) if client_type == Connection::TYPE_CLIENT && os client_update = update.update_data if update arses = ars_list(user.beta) login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token, heartbeat_interval, connection.try(:music_session_id), reconnected, user.id, connection_expire_time, user.name, connection.client_id_int, client_update, arses, user.subscription_rules(false), @stored_policy) stats_logged_in send_to_client(client, login_ack) elsif jamblaster # if no user, but we have a jamblaster, we can allow this session to go through heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(nil, client_type) @log.debug "logged in jb::#{jamblaster.serial_no} with client_id: #{client_id}" # check if there's a connection for the client... if it's stale, reconnect it if !connection.nil? ConnectionManager.active_record_transaction do |connection_manager| music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) end end # add a tracker for this user context = add_tracker(user, client, client_type, client_id) @log.debug "logged in context created: #{context}" if !connection # log this connection in the database ConnectionManager.active_record_transaction do |connection_manager| connection_manager.create_connection(nil, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name, jamblaster ? true : false) do |conn, count| # this blk is not call end end end # if we have OS data, try to grab client update data and let the client have it update = ArtifactUpdate.find_client_by_os(product, os) if client_type == Connection::TYPE_CLIENT && os client_update = update.update_data if update connect_ack = @message_factory.connect_ack(remote_ip, client_id, heartbeat_interval, connection_expire_time, client_update) stats_logged_in send_to_client(client, connect_ack) else stats_logged_in_failed raise SessionError.new('invalid login', 'invalid_login') end end def handle_bulk_subscribe(subscriptions, client) subscriptions.types.each_with_index do |subscription, i| handle_subscribe(OpenStruct.new({type: subscriptions.types[i], id: subscriptions.ids[i]}), client) end end def handle_subscribe(subscribe, client) id = subscribe.id type = subscribe.type if id && id.length > 0 && type && type.length > 0 register_subscription(client, type, id) if @allow_dynamic_registration else @log.error("handle_subscribe: empty data #{subscribe}") end end def handle_unsubscribe(unsubscribe, client) id = unsubscribe.id type = unsubscribe.type if id && id.length > 0 && type && type.length > 0 unregister_subscription(client, type, id) if @allow_dynamic_registration else @log.error("handle_subscribe: empty data #{unsubscribe}") end end def add_to_ban(user, reason) user_ban = @temp_ban[user.id] if user_ban.nil? user_ban = {} @temp_ban[user.id] = user_ban end # allow user back in, after 10 minutes user_ban[:allow] = Time.now + 600 @log.info("user #{user} banned for 10 minutes. reason #{reason}") end def runaway_heartbeat(heartbeat, context) heartbeat_count = @heartbeat_tracker[context.client.client_id] || 0 heartbeat_count += 1 @heartbeat_tracker[context.client.client_id] = heartbeat_count if heartbeat_count > (context.client_type == 'browser' ? @maximum_minutely_heartbeat_rate_browser : @maximum_minutely_heartbeat_rate_client) @log.warn("user #{context.user} sending too many heartbeats: #{heartbeat_count}") if heartbeat_count % 100 == 0 add_to_ban(context.user, 'too many heartbeats') raise SessionError.new('too many heartbeats', 'empty_login') else false end end def handle_user_status(user_status, client) client.context.active = user_status.active end def handle_heartbeat(heartbeat, heartbeat_message_id, client) unless context = @clients[client] profile_it('heartbeat_context_gone') { @log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session" #Diagnostic.missing_client_state(client.user_id, client.context) raise SessionError, 'context state is gone. please reconnect.' } else if runaway_heartbeat(heartbeat, context) return end connection = nil profile_it('heartbeat_find_conn') { connection = Connection.find_by_client_id(context.client.client_id) } track_changes_counter = nil if connection.nil? profile_it('heartbeat_diag_missing') { @log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session" Diagnostic.missing_connection(client.user_id, client.context) raise SessionError, 'connection state is gone. please reconnect.' } else #profile_it('heartbeat_transaction') { #Connection.transaction do # send back track_changes_counter if in a session #dboptz #profile_it('heartbeat_session') { # if connection.music_session_id # music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id) # track_changes_counter = music_session.track_changes_counter if music_session # end #} #stale = context.stale?(@connect_time_stale_client) profile_it('heartbeat_touch') { # update connection updated_at and if the user is active # dboptz #Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now) context.updated_at = Time.now context.active = heartbeat.active } profile_it('heartbeat_notification') { # update user's notification_seen_at field if the heartbeat indicates it saw one # first we try to use the notification id, which should usually exist. # if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER } #end #} profile_it('heartbeat_stale') { #if stale # ConnectionManager.active_record_transaction do |connection_manager| # heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type) # connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name) # end #end } end heartbeat_ack = @message_factory.heartbeat_ack(track_changes_counter) send_to_client(client, heartbeat_ack) # send errors to clients in response to heartbeats if rabbitmq is down if !@amqp_connection_manager.connected? error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down") context.sent_bad_state_previously = true send_to_client(client, error_msg) return elsif context.sent_bad_state_previously context.sent_bad_state_previously = false recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id) send_to_client(client, recovery_msg) end end end def update_notification_seen_at(connection, context, heartbeat) notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1) if notification_id_field && notification_id_field != '' notification = Notification.find_by_id(notification_id_field) @semaphore_pnsa.synchronize do if notification @pending_notification_seen_ats[connection.user.id] = notification.created_at else notification_seen_at_parsed = nil notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2) begin notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0 rescue Exception => e @log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}" end if notification_seen_at_parsed @pending_notification_seen_ats[connection.user.id] = notification_seen_at end end end end end def valid_login(username, password, token, client_id, jamblaster) if !token.nil? && token != '' @log.debug "logging in via token" # attempt login with token user = User.find_by_remember_token(token) if user.nil? @log.debug "no user found with token #{token}" return nil else # check against temp ban list if @temp_ban[user.id] @log.debug("user #{user} is still banned; rejecting login") raise SessionError.new('login rejected temporarily', 'empty_login') end @log.debug "#{user} login via token" return user end elsif !username.nil? and !password.nil? @log.debug "logging in via user/pass '#{username}' '#{password}'" # attempt login with username and password user = User.find_by_email(username) # check against temp ban list if !user.nil? && @temp_ban[user.id] @log.debug("user #{user} is still banned; rejecting login") raise SessionError.new('login rejected temporarily', 'empty_login') end if !user.nil? && user.valid_password?(password) @log.debug "#{user} login via password" return user else @log.debug "#{username} login failure" return nil end elsif jamblaster # if there is a jamblaster in context, then we will allow no login. return nil else raise SessionError.new('no login data was found in Login message', 'empty_login') end end def access_music_session(music_session_id, user) music_session = ActiveMusicSession.find_by_id(music_session_id) if music_session.nil? raise SessionError, 'specified session not found' end if !music_session.access? user raise SessionError, 'not allowed to join the specified session' end return music_session end # client_id = the id of the client being accessed # client = the current client def access_p2p(client_id, user, msg) return nil # ping_request and ping_ack messages are special in that they are simply allowed if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK return nil end client_connection = Connection.find_by_client_id(client_id) if client_connection.nil? raise JamPermissionError, 'specified client not found' end if !client_connection.access_p2p? user raise SessionError, 'not allowed to message this client' end end def handle_client_directed(to_client_id, client_msg, client) context = @clients[client] # by not catching any exception here, a PermissionError will be thrown if this isn't valid # if for some reason the client is trying to send to a client that it doesn't # belong to #access_p2p(to_client_id, context.user, client_msg) # quick and dirty safegaurds against the most dangerous operational messages from being sent by malicious clients if client_msg.type == ClientMessage::Type::RELOAD || client_msg.type == ClientMessage::Type::CLIENT_UPDATE || client_msg.type == ClientMessage::Type::GENERIC_MESSAGE || client_msg.type == ClientMessage::Type::RESTART_APPLICATION || client_msg.type == ClientMessage::Type::STOP_APPLICATION @@log.error("malicious activity") raise SessionError, "not allowed" end if to_client_id.nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases raise SessionError, "empty client_id specified in peer-to-peer message" end # populate routing data client_msg.from = client.client_id @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" unless client_msg.type == ClientMessage::Type::PEER_MESSAGE # put it on the topic exchange for clients @clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}}) end def handle_user_directed(user_id, client_msg, client) @log.debug "publishing to user #{user_id} from client_id #{client.client_id}" # put it on the topic exchange for users @users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}") end def handle_session_directed(session_id, client_msg, client) context = @clients[client] user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id) end # sends a message to a session on behalf of a user # if this is originating in the context of a client, it should be specified as :client_id => "value" # client_msg should be a well-structure message (jam-pb message) def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""}) music_session = access_music_session(music_session_id, user) # gather up client_ids in the session client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] } publish_to_session(music_session.id, client_ids, client_msg.to_s, sender) end # sends a message to a session with no checking of permissions # this method deliberately has no database interactivity/active_record objects def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""}) EM.schedule do sender_client_id = sender[:client_id] # iterate over each person in the session, and send a p2p message client_ids.each do |client_id| @@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}" # put it on the topic exchange3 for clients self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}") end end end def extract_ip(client, override_ip) override_ip || Socket.unpack_sockaddr_in(client.get_peername)[1] end def periodical_flag_connections # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") #pgoptz #ConnectionManager.active_record_transaction do |connection_manager| # connection_manager.flag_stale_connections(@gateway_name) #end end def periodical_check_clients # it's possible that a client will not be represented in the database anymore, due to hard to trace/guess scenario # usually involve reconnects. Double-check that all clients in memory are actually in the database. if not, delete them from memory stored_ars_raw = Ars.active_arses(true) stored_ars_beta_raw = Ars.active_arses(false) @stored_policy = GenericState.singleton.connection_policy stored_ars = [] stored_ars_beta = [] stored_ars_raw.each do |ars| stored_ars << @message_factory.ars_body(ars) end stored_ars_beta_raw.each do |ars| stored_ars_beta << @message_factory.ars_body(ars) end @stored_ars = stored_ars @stored_ars_beta = stored_ars_beta if @client_lookup.length == 0 return end client_ids = @client_lookup.map { |client_id, info| "('#{client_id}')" }.join(',') # find all client_id's that do not have a row in the db, and whack them # this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584 sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids}) SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}'); " ConnectionManager.active_record_transaction do |connection_manager, conn| conn.exec(sql) do |result| result.each { |row| client_id = row['client_id'] context = @client_lookup[client_id] if context @log.debug("cleaning up missing client #{client_id}, #{context.user}") #cleanup_client_with_id(client_id) cleanup_client(context.client) else @log.error("could not clean up missing client #{client_id}") end } end end end def wipe_all_connections # meant to be called only on startup; delete all connections fo myself ConnectionManager.active_record_transaction do |connection_manager, conn| clients = connection_manager.connection_client_ids_for_gateway(@gateway_name) @log.info "Cleaning up #{clients.length} clients on startup" clients.each do |client_id| cleanup_client_with_id(client_id) end end end def periodical_notification_seen pending = nil @semaphore_pnsa.synchronize do return if @pending_notification_seen_ats.count == 0 pending = @pending_notification_seen_ats.clone @pending_notification_seen_ats.clear end bulk_values = pending.map{|k,v| "('#{k}', '#{v}'::timestamp)"}.join(',') sql = %{ update users as u set notification_seen_at = c.notification_seen_at from (values #{bulk_values} ) as c(user_id, notification_seen_at) where c.user_id = u.id; } @log.info("SQL #{sql}") ConnectionManager.active_record_transaction do |connection_manager, conn| conn.exec(sql) end end def periodical_check_connections # this method is designed to be called periodically (every few seconds) # in which this gateway instance will check only its own clients for their health # since each gateway checks only the clients it knows about, this allows us to deploy # n gateways that don't know much about each other. # each gateway marks each connection row with it's gateway ID (so each gateway needs it's own ID or bad things happen) # we also have a global resque job that checks for connections that appear to be not controlled by any gateway # to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy clients = [] # dboptz #ConnectionManager.active_record_transaction do |connection_manager| # clients = connection_manager.stale_connection_client_ids(@gateway_name) # #end @client_lookup.each do |client_id, client_context| if Time.now - client_context.updated_at > @connect_time_expire_client cleanup_client_with_id(client_id) end end end def periodical_update_user_last_seen active_users_ids = @client_lookup.map { |client_id, client_context| client_context.active ? client_context.user.id : nil }.compact.uniq if active_users_ids.any? sql = %{ update users set last_jam_updated_at = now(), last_jam_updated_reason='#{User::JAM_REASON_PRESENT}' where users.id in (#{active_users_ids.map{|id| "'#{id}'"}.join(',')}); } @log.info("SQL #{sql}") ConnectionManager.active_record_transaction do |connection_manager, conn| conn.exec(sql) end end end def periodical_update_user_last_seen active_users_ids = @client_lookup.map { |client_id, client_context| client_context.active ? client_context.user.id : nil }.compact.uniq if active_users_ids.any? sql = %{ update users set last_jam_updated_at = now(), last_jam_updated_reason='#{User::JAM_REASON_PRESENT}' where users.id in (#{active_users_ids.map{|id| "'#{id}'"}.join(',')}); } @log.info("SQL #{sql}") ConnectionManager.active_record_transaction do |connection_manager, conn| conn.exec(sql) end end end def periodical_stats_dump # assume 60 seconds per status dump stats = @message_stats.sort_by { |k, v| -v } stats.map { |i| i[1] = (i[1] / 60.0).round(2) } @semaphore.synchronize do @log.info("num clients: #{@client_lookup.count}") end @log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', ')) @log.info("largest msg from #{@largest_message_user}: #{@largest_message ? @largest_message.length : 0}b") if @highest_drift > 1 @log.info("highest drift: #{@highest_drift - 2}") end total_time = 0 time_sums = @time_it_sums.sort_by { |k, v| -v } log_num = 3 count = 0 time_sums.each do |cat, cat_time| count += 1 if count <= log_num @log.info("timed #{cat} used time: #{cat_time}") end total_time += cat_time end @log.info("total used time: #{total_time}") profile_sums = @profile_it_sums.sort_by { |k, v| -v } profile_sums.each do |cat, cat_time| @log.info("profiled #{cat} used time: #{cat_time}") end @temp_ban.each do |user_id, data| if Time.now > data[:allow] @log.info("user #{user_id} allowed back in") @temp_ban.delete(user_id) end end # stuff in extra stats into the @message_stats and send it all off @message_stats['gateway_name'] = @gateway_name @message_stats['login'] = @login_success_count @message_stats['login_fail'] = @login_fail_count @message_stats['connected'] = @connected_count @message_stats['disconnected'] = @disconnected_count @message_stats['largest_msg'] = @largest_message ? @largest_message.length : 0 @message_stats['highest_drift'] = @highest_drift - 2 # 2 comes from the server's 2 second timer for the drift check @message_stats['total_time'] = total_time @message_stats['banned_users'] = @temp_ban.length #Stats.write('gateway.stats', @message_stats) # clear out stats @message_stats.clear @login_success_count = 0 @login_fail_count = 0 @connected_count = 0 @disconnected_count = 0 @user_message_counts = {} @largest_message = nil @largest_message_user = nil @time_it_sums = {} @highest_drift = 0 @heartbeat_tracker = {} end def cleanup_client_with_id(cid) client_context = @client_lookup[cid] if client_context #Diagnostic.expired_stale_connection(client_context.user.id, client_context) cleanup_client(client_context.client) end music_session = nil recording_id = nil user = nil # remove this connection from the database ConnectionManager.active_record_transaction do |mgr| mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| user = User.find_by_id(user_id) return if user.nil? # this can happen if you delete a user while their connection is up @log.info "expiring stale connection client_id:#{cid}, user_id:#{user}" Notification.send_friend_update(user_id, false, conn) if count == 0 music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil? user = User.find_by_id(user_id) unless user_id.nil? if music_session msuh = MusicSessionUserHistory.where(client_id: cid).order('created_at DESC').first if msuh msuh.session_removed_at = Time.now if msuh.session_removed_at.nil? msuh.save(validate: false) end recording = music_session.stop_recording unless recording.nil? @log.debug "cleanup_client: stopped recording: #{recording.id} because user #{user} reconnected" recording.discard_if_no_action(user) # throw away this users vote for the @log.debug "cleanup_client: discard complete" recording_id = recording.id unless recording.nil? end # if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog) # then throw away the user's most_recent_recording = music_session.most_recent_recording if most_recent_recording && most_recent_recording.users.exists?(user) @log.debug "cleanup_client: discarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected" # if this user was in the most recent recording associated with the session they were just in, discard any tracks they had most_recent_recording.discard_if_no_action(user) # throw away this users vote for the else @log.debug "cleanup_client: no recent recording to clean up" end music_session.with_lock do # VRFS-1297 @log.debug("cleanup_client: tick track changes") music_session.tick_track_changes @log.debug("cleanup_client: tick track changes done") end end } end if user && music_session @log.info("cleanup_client: Send session depart message for #{user} to #{music_session.id}") Notification.send_session_depart(music_session, cid, user, recording_id) @log.info("cleanup_client: Sent session depart") end end # removes all resources associated with a client def cleanup_client(client) @semaphore.synchronize do client.close # unregister any subscriptions client.subscriptions.each do |subscription| unregister_subscription(client, subscription[:type], subscription[:id]) end pending = client.context.nil? # presence of context implies this connection has been logged into if pending @log.debug "cleaned up not-logged-in client #{client}" stats_disconnected else @log.debug "cleanup up logged-in client #{client}" context = @clients.delete(client) if context remove_client(client.client_id) remove_user(context) stats_disconnected else @log.warn "skipping duplicate cleanup attempt of logged-in client" end end end end def stats_logged_in @login_success_count = @login_success_count + 1 end def stats_logged_in_failed @login_fail_count = @login_fail_count + 1 end def stats_connected @connected_count = @connected_count + 1 end def stats_disconnected @disconnected_count = @disconnected_count + 1 end private def time_it(cat, &blk) start = Time.now blk.call time = Time.now - start @time_it_sums[cat] = (@time_it_sums[cat] || 0)+ time @log.warn("LONG TIME: #{cat}: #{time}") if time > 1 end def profile_it(cat, &blk) start = Time.now blk.call time = Time.now - start @profile_it_sums[cat] = (@profile_it_sums[cat] || 0)+ time @log.warn("LONG TIME: #{cat}: #{time}") if time > 1 end def sane_logging(&blk) # used around repeated transactions that cause too much ActiveRecord::Base logging begin if @ar_base_logger original_level = @ar_base_logger.level @ar_base_logger.level = :info end blk.call ensure if @ar_base_logger @ar_base_logger.level = original_level end end end end end