From 2d805516ff15c82eb83ff81c972023f4c91b1f33 Mon Sep 17 00:00:00 2001 From: Seth Call Date: Sun, 27 Dec 2020 17:58:31 -0600 Subject: [PATCH] websocket optimization --- ruby/lib/jam_ruby/base_manager.rb | 7 +- ruby/lib/jam_ruby/connection_manager.rb | 32 +- ruby/lib/jam_ruby/models/music_session.rb | 2 +- .../stores/SessionStore.js.coffee | 4 +- .../lib/jam_websockets/client_context.rb | 7 +- .../lib/jam_websockets/router.rb | 331 +++++++++++------- .../lib/jam_websockets/server.rb | 18 +- websocket-gateway/spec/factories.rb | 12 + .../spec/jam_websockets/stress_spec.rb | 304 ++++++++++++++++ 9 files changed, 567 insertions(+), 150 deletions(-) create mode 100644 websocket-gateway/spec/jam_websockets/stress_spec.rb diff --git a/ruby/lib/jam_ruby/base_manager.rb b/ruby/lib/jam_ruby/base_manager.rb index bf5dab694..5a4f45991 100644 --- a/ruby/lib/jam_ruby/base_manager.rb +++ b/ruby/lib/jam_ruby/base_manager.rb @@ -27,20 +27,21 @@ module JamRuby # create a transaction, and pass the current connection to ConnectionManager. # this lets the entire operation work with the same transaction, # across Rails ActiveRecord and the pg-gem based code in ConnectionManager. - manager.pg_conn = connection.instance_variable_get("@connection") + conn = connection.instance_variable_get("@connection") + manager.pg_conn = conn if @@in_websocket_gateway # it only necessary to catch exceptions in websocket-gateway, which has only one AR connection and does not clean it up like a Rails context does begin connection.transaction do - yield manager + yield manager, conn end rescue Exception => e ActiveRecord::Base.connection.execute('ROLLBACK') end else connection.transaction do - yield manager + yield manager, conn end end end diff --git a/ruby/lib/jam_ruby/connection_manager.rb b/ruby/lib/jam_ruby/connection_manager.rb index 4d79477be..183f4a5ff 100644 --- a/ruby/lib/jam_ruby/connection_manager.rb +++ b/ruby/lib/jam_ruby/connection_manager.rb @@ -138,8 +138,7 @@ SQL # flag connections as stale def flag_stale_connections(gateway_name) - ConnectionManager.active_record_transaction do |connection_manager| - conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, conn| sql = "UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}' WHERE updated_at < (NOW() - (interval '1 second' * stale_time)) AND aasm_state = '#{Connection::CONNECT_STATE.to_s}' AND gateway = '#{gateway_name}'" conn.exec(sql) end @@ -151,11 +150,23 @@ SQL self.stale_connection_client_ids(gateway_name).each { |client| self.delete_connection(client[:client_id]) } end + def connection_client_ids_for_gateway(gateway_name) + clients = [] + ConnectionManager.active_record_transaction do |connection_manager, conn| + sql = "SELECT client_id FROM connections WHERE gateway = '#{gateway_name}'" + conn.exec(sql) do |result| + result.each { |row| + client_id = row['client_id'] + clients << client_id + } + end + end + clients + end # expiring connections in stale state, which deletes them def stale_connection_client_ids(gateway_name) clients = [] - ConnectionManager.active_record_transaction do |connection_manager| - conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, conn| sql = "SELECT client_id, music_session_id, user_id, client_type FROM connections WHERE updated_at < (NOW() - (interval '1 second' * expire_time)) AND gateway = '#{gateway_name}'" conn.exec(sql) do |result| result.each { |row| @@ -181,8 +192,7 @@ SQL raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser' count = 0 - ConnectionManager.active_record_transaction do |connection_manager| - conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, conn| # turn ip_address string into a number, then fetch the isp and block records @@ -242,8 +252,7 @@ SQL # and this connection was in a session def delete_connection(client_id, &blk) - ConnectionManager.active_record_transaction do |connection_manager| - conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, conn| count = 0 user_id = nil music_session_id = nil @@ -426,8 +435,7 @@ SQL connection = nil tracks_changed = false - ConnectionManager.active_record_transaction do |connection_manager| - db_conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, db_conn| connection = Connection.find_by_client_id(client_id) @@ -469,9 +477,7 @@ SQL # within the connection table lock def leave_music_session(user, connection, music_session, &blk) send_tracks_changed = false - ConnectionManager.active_record_transaction do |connection_manager| - - conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, conn| lock_connections(conn) diff --git a/ruby/lib/jam_ruby/models/music_session.rb b/ruby/lib/jam_ruby/models/music_session.rb index 96f8ca7a5..af5841bca 100644 --- a/ruby/lib/jam_ruby/models/music_session.rb +++ b/ruby/lib/jam_ruby/models/music_session.rb @@ -651,7 +651,7 @@ module JamRuby ms.description = options[:description] ms.genre_id = (options[:genres].length > 0 ? options[:genres][0] : nil) if options[:genres] ms.musician_access = options[:musician_access] - ms.friends_can_join = options[:friends_can_join] + ms.friends_can_join = options[:friends_can_join] || false ms.approval_required = options[:approval_required] ms.fan_access = options[:fan_access] ms.fan_chat = options[:fan_chat] diff --git a/web/app/assets/javascripts/react-components/stores/SessionStore.js.coffee b/web/app/assets/javascripts/react-components/stores/SessionStore.js.coffee index b7b2f977e..645b3a83c 100644 --- a/web/app/assets/javascripts/react-components/stores/SessionStore.js.coffee +++ b/web/app/assets/javascripts/react-components/stores/SessionStore.js.coffee @@ -694,8 +694,8 @@ ConfigureTracksActions = @ConfigureTracksActions ) ) ) - .fail(() => - logger.error("unable to fetch session history") + .fail((e) => + logger.error("unable to fetch session history", e) ) waitForSessionPageEnterDone: () -> diff --git a/websocket-gateway/lib/jam_websockets/client_context.rb b/websocket-gateway/lib/jam_websockets/client_context.rb index 971dc9cc0..81deefab4 100644 --- a/websocket-gateway/lib/jam_websockets/client_context.rb +++ b/websocket-gateway/lib/jam_websockets/client_context.rb @@ -1,7 +1,7 @@ module JamWebsockets class ClientContext - attr_accessor :user, :client, :msg_count, :session, :client_type, :sent_bad_state_previously, :active + attr_accessor :user, :client, :msg_count, :session, :client_type, :sent_bad_state_previously, :active, :updated_at def initialize(user, client, client_type) @user = user @@ -12,9 +12,14 @@ @session = nil @sent_bad_state_previously = false @active = true + @updated_at = Time.now client.context = self end + def stale?(stale_time) + return Time.now - @updated_at > stale_time + end + def to_s return "Client[user:#{@user} client:#{@client.client_id} msgs:#{@msg_count} session:#{@session} client_type:#{@client_type} channel_id: #{@client.channel_id}]" end diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb index e219874f0..635ea6313 100644 --- a/websocket-gateway/lib/jam_websockets/router.rb +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -82,6 +82,40 @@ module JamWebsockets @largest_message_user = nil @highest_drift = 0 + @pending_notification_seen_ats = {} + @semaphore_pnsa = Mutex.new + + @pending_deletions = {} + @semaphore_deletions = Mutex.new + + end + + def init + wipe_all_connections + + # this thread runs forever while WSG runs, and should do anything easily gotten out of critical message handling path + @background_thread = Thread.new { + count = 0 + + while true + + begin + periodical_check_connections + periodical_notification_seen + + if count == 30 + periodical_check_clients + count = 0 + end + count = count + 1 + + rescue => e + @log.error "unhandled error in thread #{e}" + @log.error "stacktrace #{e.backtrace}" + end + sleep 1 + end + } end def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true, :chat_enabled => true, :chat_blast => true}, &block) @@ -466,7 +500,8 @@ module JamWebsockets client.onclose { time_it('onclose') { @log.debug "connection closed. marking stale: #{client.context}" - cleanup_client(client) + #cleanup_client(client) + cleanup_client_with_id(client.client_id) } } @@ -538,19 +573,19 @@ module JamWebsockets end # caused a client connection to be marked stale - def stale_client(client) - if client.client_id - @log.info "marking client stale: #{client.context}" - ConnectionManager.active_record_transaction do |connection_manager| - music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id) - # update the session members, letting them know this client went stale - context = @client_lookup[client.client_id] - if music_session = ActiveMusicSession.find_by_id(music_session_id) - Notification.send_musician_session_stale(music_session, client.client_id, context.user) - end unless music_session_id.nil? - end - end - end + # def stale_client(client) + # if client.client_id + # @log.info "marking client stale: #{client.context}" + # ConnectionManager.active_record_transaction do |connection_manager| + # music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id) + # # update the session members, letting them know this client went stale + # context = @client_lookup[client.client_id] + # if music_session = ActiveMusicSession.find_by_id(music_session_id) + # Notification.send_musician_session_stale(music_session, client.client_id, context.user) + # end unless music_session_id.nil? + # end + # end + # end def route(client_msg, client) message_type = @message_factory.get_message_type(client_msg) @@ -713,7 +748,7 @@ module JamWebsockets latency_tester.id, connection_expire_time, "latency_tester", - client_id_int) + client_id_int) stats_logged_in send_to_client(client, login_ack) end @@ -794,7 +829,6 @@ module JamWebsockets end - # we have to deal with jamblaster before login if jamblaster_serial_no && jamblaster_serial_no != '' jamblaster = Jamblaster.bootstrap(jamblaster_serial_no) @@ -1117,16 +1151,24 @@ module JamWebsockets #profile_it('heartbeat_transaction') { #Connection.transaction do # send back track_changes_counter if in a session - profile_it('heartbeat_session') { - if connection.music_session_id - music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id) - track_changes_counter = music_session.track_changes_counter if music_session - end - } + + #dboptz + + #profile_it('heartbeat_session') { + # if connection.music_session_id + # music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id) + # track_changes_counter = music_session.track_changes_counter if music_session + # end + #} + + #stale = context.stale?(@connect_time_stale_client) profile_it('heartbeat_touch') { # update connection updated_at and if the user is active - Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now) + # dboptz + #Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now) + context.updated_at = Time.now + context.active = heartbeat.active } profile_it('heartbeat_notification') { @@ -1139,12 +1181,12 @@ module JamWebsockets #} profile_it('heartbeat_stale') { - if connection.stale? - ConnectionManager.active_record_transaction do |connection_manager| - heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type) - connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name) - end - end + #if stale + # ConnectionManager.active_record_transaction do |connection_manager| + # heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type) + # connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name) + # end + #end } end @@ -1170,24 +1212,20 @@ module JamWebsockets notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1) if notification_id_field && notification_id_field != '' notification = Notification.find_by_id(notification_id_field) - if notification - connection.user.notification_seen_at = notification.created_at - unless connection.user.save(validate: false) - @log.error "unable to update notification_seen_at via id field for client #{context}. errors: #{connection.user.errors.inspect}" - end - else - notification_seen_at_parsed = nil - notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2) - begin - notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0 - rescue Exception => e - @log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}" - end + @semaphore_pnsa.synchronize do + if notification + @pending_notification_seen_ats[connection.user.id] = notification.created_at + else + notification_seen_at_parsed = nil + notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2) + begin + notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0 + rescue Exception => e + @log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}" + end - if notification_seen_at_parsed - connection.user.notification_seen_at = notification_seen_at - unless connection.user.save(validate: false) - @log.error "unable to update notification_seen_at via time field for client #{context}. errors: #{connection.user.errors.inspect}" + if notification_seen_at_parsed + @pending_notification_seen_ats[connection.user.id] = notification_seen_at end end end @@ -1361,9 +1399,11 @@ module JamWebsockets def periodical_flag_connections # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") - ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.flag_stale_connections(@gateway_name) - end + #pgoptz + #ConnectionManager.active_record_transaction do |connection_manager| + # connection_manager.flag_stale_connections(@gateway_name) + #end + end def periodical_check_clients @@ -1396,14 +1436,14 @@ module JamWebsockets sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids}) SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}'); " - ConnectionManager.active_record_transaction do |connection_manager| - conn = connection_manager.pg_conn + ConnectionManager.active_record_transaction do |connection_manager, conn| conn.exec(sql) do |result| result.each { |row| client_id = row['client_id'] context = @client_lookup[client_id] if context @log.debug("cleaning up missing client #{client_id}, #{context.user}") + #cleanup_client_with_id(client_id) cleanup_client(context.client) else @log.error("could not clean up missing client #{client_id}") @@ -1413,6 +1453,39 @@ module JamWebsockets end end + def wipe_all_connections + # meant to be called only on startup; delete all connections fo myself + ConnectionManager.active_record_transaction do |connection_manager| + clients = connection_manager.connection_client_ids_for_gateway(@gateway_name) + clients.each do |client_id| + cleanup_client_with_id(client_id) + end + end + end + + def periodical_notification_seen + pending = nil + @semaphore_pnsa.synchronize do + return if @pending_notification_seen_ats.count == 0 + pending = @pending_notification_seen_ats.clone + @pending_notification_seen_ats.clear + end + + bulk_values = pending.map{|k,v| "('#{k}', '#{v}'::timestamp)"}.join(',') + sql = %{ + update users as u + set notification_seen_at = c.notification_seen_at + from (values + #{bulk_values} + ) as c(user_id, notification_seen_at) + where c.user_id = u.id; + } + @log.info("SQL #{sql}") + ConnectionManager.active_record_transaction do |connection_manager, conn| + conn.exec(sql) + end + end + def periodical_check_connections # this method is designed to be called periodically (every few seconds) # in which this gateway instance will check only its own clients for their health @@ -1426,18 +1499,31 @@ module JamWebsockets # to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy clients = [] - ConnectionManager.active_record_transaction do |connection_manager| - clients = connection_manager.stale_connection_client_ids(@gateway_name) + # dboptz + #ConnectionManager.active_record_transaction do |connection_manager| + # clients = connection_manager.stale_connection_client_ids(@gateway_name) + # + #end + + puts "periodical_check_connections" + @client_lookup.each do |client_id, client_context| + if Time.now - client_context.updated_at > @connect_time_expire_client + cleanup_client_with_id(client_id) + end end - cleanup_clients_with_ids(clients) + end - + def periodical_stats_dump # assume 60 seconds per status dump stats = @message_stats.sort_by { |k, v| -v } stats.map { |i| i[1] = (i[1] / 60.0).round(2) } + @semaphore.synchronize do + @log.info("num clients: #{@client_lookup.count}") + end + @log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', ')) @log.info("largest msg from #{@largest_message_user}: #{@largest_message ? @largest_message.length : 0}b") @@ -1485,7 +1571,7 @@ module JamWebsockets @message_stats['total_time'] = total_time @message_stats['banned_users'] = @temp_ban.length - Stats.write('gateway.stats', @message_stats) + #Stats.write('gateway.stats', @message_stats) # clear out stats @message_stats.clear @@ -1501,87 +1587,84 @@ module JamWebsockets @heartbeat_tracker = {} end - def cleanup_clients_with_ids(expired_connections) - expired_connections.each do |expired_connection| - cid = expired_connection[:client_id] + def cleanup_client_with_id(cid) - client_context = @client_lookup[cid] + client_context = @client_lookup[cid] - if client_context - #Diagnostic.expired_stale_connection(client_context.user.id, client_context) - cleanup_client(client_context.client) - end + if client_context + #Diagnostic.expired_stale_connection(client_context.user.id, client_context) + cleanup_client(client_context.client) + end - music_session = nil - recording_id = nil - user = nil + music_session = nil + recording_id = nil + user = nil - # remove this connection from the database - ConnectionManager.active_record_transaction do |mgr| - mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| - user = User.find_by_id(user_id) - return if user.nil? # this can happen if you delete a user while their connection is up - @log.info "expiring stale connection client_id:#{cid}, user_id:#{user}" - Notification.send_friend_update(user_id, false, conn) if count == 0 - music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil? - user = User.find_by_id(user_id) unless user_id.nil? + # remove this connection from the database + ConnectionManager.active_record_transaction do |mgr| + mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| + user = User.find_by_id(user_id) + return if user.nil? # this can happen if you delete a user while their connection is up + @log.info "expiring stale connection client_id:#{cid}, user_id:#{user}" + Notification.send_friend_update(user_id, false, conn) if count == 0 + music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil? + user = User.find_by_id(user_id) unless user_id.nil? - if music_session - - msuh = MusicSessionUserHistory.where(client_id: cid).order('created_at DESC').first - if msuh - msuh.session_removed_at = Time.now if msuh.session_removed_at.nil? - msuh.save(validate:false) - end - - - recording = music_session.stop_recording - unless recording.nil? - @log.debug "cleanup_client: stopped recording: #{recording.id} because user #{user} reconnected" - recording.discard_if_no_action(user) # throw away this users vote for the - @log.debug "cleanup_client: discard complete" - recording_id = recording.id unless recording.nil? - end - - # if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog) - # then throw away the user's - most_recent_recording = music_session.most_recent_recording - if most_recent_recording && most_recent_recording.users.exists?(user) - @log.debug "cleanup_client: discarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected" - # if this user was in the most recent recording associated with the session they were just in, discard any tracks they had - most_recent_recording.discard_if_no_action(user) # throw away this users vote for the - else - @log.debug "cleanup_client: no recent recording to clean up" - end - - - music_session.with_lock do # VRFS-1297 - @log.debug("cleanup_client: tick track changes") - music_session.tick_track_changes - @log.debug("cleanup_client: tick track changes done") - end + if music_session + msuh = MusicSessionUserHistory.where(client_id: cid).order('created_at DESC').first + if msuh + msuh.session_removed_at = Time.now if msuh.session_removed_at.nil? + msuh.save(validate: false) end - } - end - if user && music_session - @log.info("cleanup_client: Send session depart message for #{user} to #{music_session.id}") - Notification.send_session_depart(music_session, cid, user, recording_id) - @log.info("cleanup_client: Sent session depart") - end + + recording = music_session.stop_recording + unless recording.nil? + @log.debug "cleanup_client: stopped recording: #{recording.id} because user #{user} reconnected" + recording.discard_if_no_action(user) # throw away this users vote for the + @log.debug "cleanup_client: discard complete" + recording_id = recording.id unless recording.nil? + end + + # if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog) + # then throw away the user's + most_recent_recording = music_session.most_recent_recording + if most_recent_recording && most_recent_recording.users.exists?(user) + @log.debug "cleanup_client: discarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected" + # if this user was in the most recent recording associated with the session they were just in, discard any tracks they had + most_recent_recording.discard_if_no_action(user) # throw away this users vote for the + else + @log.debug "cleanup_client: no recent recording to clean up" + end + + + music_session.with_lock do # VRFS-1297 + @log.debug("cleanup_client: tick track changes") + music_session.tick_track_changes + @log.debug("cleanup_client: tick track changes done") + end + end + } + end + + if user && music_session + @log.info("cleanup_client: Send session depart message for #{user} to #{music_session.id}") + Notification.send_session_depart(music_session, cid, user, recording_id) + @log.info("cleanup_client: Sent session depart") end end # removes all resources associated with a client def cleanup_client(client) - client.close - - # unregister any subscriptions - client.subscriptions.each do |subscription| - unregister_subscription(client, subscription[:type], subscription[:id]) - end - @semaphore.synchronize do + + client.close + + # unregister any subscriptions + client.subscriptions.each do |subscription| + unregister_subscription(client, subscription[:type], subscription[:id]) + end + pending = client.context.nil? # presence of context implies this connection has been logged into if pending diff --git a/websocket-gateway/lib/jam_websockets/server.rb b/websocket-gateway/lib/jam_websockets/server.rb index 0e068ba9c..a1228315a 100644 --- a/websocket-gateway/lib/jam_websockets/server.rb +++ b/websocket-gateway/lib/jam_websockets/server.rb @@ -1,7 +1,7 @@ require 'em-websocket' require 'bugsnag' module JamWebsockets - + class Server def initialize(options={}) @@ -38,17 +38,22 @@ module JamWebsockets @log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale_client}; reconnect time = #{connect_time_expire_client}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port} gateway_name=#{gateway_name}" + @router.init + EventMachine.error_handler{|e| puts "unhandled error #{e}" + puts "unhandled error #{e.backtrace}" @log.error "unhandled error #{e}" + @log.error "unhandled error #{e.backtrace}" #Bugsnag.notify(e) } EventMachine.run do @router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name, allow_dynamic_registration: allow_dynamic_registration, chat_enabled: chat_enabled, chat_blast: chat_blast) do - start_connection_expiration - start_client_expiration - start_connection_flagger + + #start_connection_expiration + #start_client_expiration + #start_connection_flagger start_stats_dump start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug]) calling_thread.wakeup if calling_thread @@ -100,7 +105,8 @@ module JamWebsockets def start_connection_expiration # one cleanup on startup - @router.periodical_check_connections + @router.wipe_all_connections + #@router.periodical_check_connections @last_conn_check = Time.now timer = 2 @@ -129,7 +135,7 @@ module JamWebsockets end def start_stats_dump - EventMachine::PeriodicTimer.new(60) do + EventMachine::PeriodicTimer.new(6) do time_it('stats_dump') { safety_net { @router.periodical_stats_dump } } end end diff --git a/websocket-gateway/spec/factories.rb b/websocket-gateway/spec/factories.rb index bb250272d..dc83cd128 100644 --- a/websocket-gateway/spec/factories.rb +++ b/websocket-gateway/spec/factories.rb @@ -171,4 +171,16 @@ FactoryGirl.define do association :jam_track, factory: :jam_track association :user, factory: :user end + + factory :notification, :class => JamRuby::Notification do + + factory :notification_text_message do + description 'TEXT_MESSAGE' + message "chocolate" + end + end + + factory :friendship, :class => JamRuby::Friendship do + + end end diff --git a/websocket-gateway/spec/jam_websockets/stress_spec.rb b/websocket-gateway/spec/jam_websockets/stress_spec.rb new file mode 100644 index 000000000..e299afc43 --- /dev/null +++ b/websocket-gateway/spec/jam_websockets/stress_spec.rb @@ -0,0 +1,304 @@ +require 'spec_helper' +require 'thread' + +def time_it(cat, &blk) + start = Time.now + + blk.call + + time = Time.now - start + + puts("TIME: #{cat}: #{time}") +end +def safety_net(&blk) + begin + blk.call + rescue => e + #Bugsnag.notify(e) + @log.error("unhandled exception in EM Timer #{e}") + puts "Error during processing: #{$!}" + puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" + + end +end + +LoginClient = Class.new do + attr_accessor :onmsgblock, :onopenblock, :oncloseblock, :onerrorblock, :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions + + + def initialize(user) + @subscriptions = Set.new + @channel_id = user.id + @encode_json = true + end + + def connected? + true + end + + def onopen(&block) + @onopenblock = Proc.new { |handshake| block } + end + + def onmessage(&block) + @onmsgblock= Proc.new { |data| block.call data } + end + + def onclose(&block) + @oncloseblock = Proc.new { || block.call } + end + + def onerror(&block) + @onerrorblock = Proc.new { |err| block.call err } + end + + def close() + + end + + def send(msg) + puts msg + end + + def get_peername + return "\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" # 37643, "localhost" + end + +end + +class TestClient + + def initialize(user, router) + @user = user + @client = LoginClient.new(@user) + @router = router + end + + def connect + @router.new_client(@client, false) + end + + def login + login_message = login_options(@user) + @router.handle_login(@client, login_message) + end + + def heartbeat(notification_id = nil, notification_seen_at = nil) + heartbeat = Jampb::Heartbeat.new + if notification_id + heartbeat.notification_seen = notification_id + else + heartbeat.notification_seen = 'junk' + end + + if notification_seen_at + heartbeat.notification_seen_at = notification_seen_at.to_s + end + + @router.handle_heartbeat(heartbeat, '1', @client) + end + + def disconnect + @client.oncloseblock.call + end + + private + def login_options(user) + options = {} + options["token"] = user.remember_token + options["client_id"] = user.id + options["client_type"] = "client" + options["client_id_int"] = 1 + options + end + +end + +describe Router, :spec_timeout => 15 do + + def database_env (env) + singleton = GenericState.new + singleton.id = 'default' + singleton.env = env + singleton.save! + end + + def rails_env (env) + JamRuby::Environment.should_receive(:mode).any_number_of_times.and_return(env) + end + + + include EventedSpec::EMSpec + default_timeout(1000000) + + em_before do + puts "EM BEFORE ROUTER NEW" + + end + + subject { @router } + + em_after do + puts "EM AFTER" + end + + def sequence_1(user, special_heartbeat) + + client1 = TestClient.new(user, @router) + client1.connect + client1.login + @router.client_lookup.count.should be 1 + client1.heartbeat + update_notification_at = Time.now.utc + #client1.heartbeat(special_heartbeat.id.to_s, nil) + client1.heartbeat(nil, update_notification_at) + + client1.disconnect + + sleep 6 + + user.reload + user = User.find(user.id) + @router.client_lookup.count.should be 0 + last_notification_seen_at = user.notification_seen_at + last_notification_seen_at.should_not be_nil + last_notification_seen_at.to_i.should be update_notification_at.to_i + end + + def sequence_2(user) + + before_count = Connection.count + client1 = TestClient.new(user, @router) + client1.connect + client1.login + @router.client_lookup.count.should be 1 + after_count = Connection.count + before_count.should be (after_count - 1) + + sleep 25 + + @router.client_lookup.count.should be 0 + disc_count = Connection.count + before_count.should be disc_count + end + + def sequence_in_session(user) + before_count = Connection.count + client1 = TestClient.new(user, @router) + client1.connect + client1.login + @router.client_lookup.count.should be 1 + + music_session = FactoryGirl.create(:active_music_session, :creator => user) + connection = Connection.find_by_user_id(user.id) + connection.music_session = music_session + connection.save! + + + client1.disconnect + @router.client_lookup.count.should be 0 + disc_count = Connection.count + before_count.should be disc_count + end + def sequence_in_two_session(user1, user2) + + # create friendship between the two, to increase notifications + FactoryGirl.create(:friendship, :user => user1, :friend => user2) + FactoryGirl.create(:friendship, :user => user2, :friend => user1) + + before_count = Connection.count + client1 = TestClient.new(user1, @router) + client1.connect + client1.login + @router.client_lookup.count.should be 1 + + client1 = TestClient.new(user2, @router) + client1.connect + client1.login + @router.client_lookup.count.should be 2 + + music_session = FactoryGirl.create(:active_music_session, :creator => user) + connection = Connection.find_by_user_id(user1.id) + connection.music_session = music_session + connection.save! + + connection2 = Connection.find_by_user_id(user2.id) + connection2.music_session = music_session + connection2.save! + + + client1.disconnect + @router.client_lookup.count.should be 1 + disc_count = Connection.count + before_count.should be (disc_count - 1) + end + + def delete_conn_from_under(user) + + before_count = Connection.count + client1 = TestClient.new(user, @router) + client1.connect + client1.login + @router.client_lookup.count.should be 1 + after_count = Connection.count + before_count.should be (after_count - 1) + + Connection.first.delete + + @router.periodical_check_clients + + @router.client_lookup.count.should be 0 + disc_count = Connection.count + before_count.should be disc_count + end + + + describe "stress" do + before { + database_env('production') + rails_env('production') + stub_const("ENV", {'BUILD_NUMBER' => 1}) + } + let(:user1) { FactoryGirl.create(:user) } + let(:user2) { FactoryGirl.create(:user) } + let(:user3) { FactoryGirl.create(:user) } + let(:user4) { FactoryGirl.create(:user) } + let(:user5) { FactoryGirl.create(:user) } + let!(:other) { FactoryGirl.create(:user, last_jam_locidispid: 1) } + let!(:msg1) {FactoryGirl.create(:notification_text_message, source_user: other, target_user: user1) } + + it "the test" do + + @router = Router.new() + @router.connect_time_expire_client = 20 + @router.connect_time_stale_client = 14 + @router.heartbeat_interval_client = @router.connect_time_stale_client / 2 + @router.connect_time_expire_browser = 20 + @router.connect_time_stale_browser = 14 + @router.max_connections_per_user = 10 + @router.heartbeat_interval_browser = @router.connect_time_stale_browser / 2 + @router.maximum_minutely_heartbeat_rate_browser = 10 + @router.maximum_minutely_heartbeat_rate_client = 10 + @router.amqp_connection_manager = AmqpConnectionManager.new(true, 4, host: 'localhost', port: 5672) + @router.gateway_name = 'gateway1' + + @router.init + + em do + EventMachine::PeriodicTimer.new(5) do + time_it('stats_dump') { safety_net { @router.periodical_stats_dump } } + end + + sequence_in_session(user3) + sequence_in_two_session(user4, user5) + delete_conn_from_under(user3) + sequence_1(user1, msg1) + sequence_1(user1, msg1) + sequence_2(user2) + + done + + end + end + end +end \ No newline at end of file