websocket optimization

This commit is contained in:
Seth Call 2020-12-27 17:58:31 -06:00
parent 8393506f52
commit 2d805516ff
9 changed files with 567 additions and 150 deletions

View File

@ -27,20 +27,21 @@ module JamRuby
# create a transaction, and pass the current connection to ConnectionManager. # create a transaction, and pass the current connection to ConnectionManager.
# this lets the entire operation work with the same transaction, # this lets the entire operation work with the same transaction,
# across Rails ActiveRecord and the pg-gem based code in ConnectionManager. # across Rails ActiveRecord and the pg-gem based code in ConnectionManager.
manager.pg_conn = connection.instance_variable_get("@connection") conn = connection.instance_variable_get("@connection")
manager.pg_conn = conn
if @@in_websocket_gateway if @@in_websocket_gateway
# it only necessary to catch exceptions in websocket-gateway, which has only one AR connection and does not clean it up like a Rails context does # it only necessary to catch exceptions in websocket-gateway, which has only one AR connection and does not clean it up like a Rails context does
begin begin
connection.transaction do connection.transaction do
yield manager yield manager, conn
end end
rescue Exception => e rescue Exception => e
ActiveRecord::Base.connection.execute('ROLLBACK') ActiveRecord::Base.connection.execute('ROLLBACK')
end end
else else
connection.transaction do connection.transaction do
yield manager yield manager, conn
end end
end end
end end

View File

@ -138,8 +138,7 @@ SQL
# flag connections as stale # flag connections as stale
def flag_stale_connections(gateway_name) def flag_stale_connections(gateway_name)
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, conn|
conn = connection_manager.pg_conn
sql = "UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}' WHERE updated_at < (NOW() - (interval '1 second' * stale_time)) AND aasm_state = '#{Connection::CONNECT_STATE.to_s}' AND gateway = '#{gateway_name}'" sql = "UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}' WHERE updated_at < (NOW() - (interval '1 second' * stale_time)) AND aasm_state = '#{Connection::CONNECT_STATE.to_s}' AND gateway = '#{gateway_name}'"
conn.exec(sql) conn.exec(sql)
end end
@ -151,11 +150,23 @@ SQL
self.stale_connection_client_ids(gateway_name).each { |client| self.delete_connection(client[:client_id]) } self.stale_connection_client_ids(gateway_name).each { |client| self.delete_connection(client[:client_id]) }
end end
def connection_client_ids_for_gateway(gateway_name)
clients = []
ConnectionManager.active_record_transaction do |connection_manager, conn|
sql = "SELECT client_id FROM connections WHERE gateway = '#{gateway_name}'"
conn.exec(sql) do |result|
result.each { |row|
client_id = row['client_id']
clients << client_id
}
end
end
clients
end
# expiring connections in stale state, which deletes them # expiring connections in stale state, which deletes them
def stale_connection_client_ids(gateway_name) def stale_connection_client_ids(gateway_name)
clients = [] clients = []
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, conn|
conn = connection_manager.pg_conn
sql = "SELECT client_id, music_session_id, user_id, client_type FROM connections WHERE updated_at < (NOW() - (interval '1 second' * expire_time)) AND gateway = '#{gateway_name}'" sql = "SELECT client_id, music_session_id, user_id, client_type FROM connections WHERE updated_at < (NOW() - (interval '1 second' * expire_time)) AND gateway = '#{gateway_name}'"
conn.exec(sql) do |result| conn.exec(sql) do |result|
result.each { |row| result.each { |row|
@ -181,8 +192,7 @@ SQL
raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser' raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser'
count = 0 count = 0
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, conn|
conn = connection_manager.pg_conn
# turn ip_address string into a number, then fetch the isp and block records # turn ip_address string into a number, then fetch the isp and block records
@ -242,8 +252,7 @@ SQL
# and this connection was in a session # and this connection was in a session
def delete_connection(client_id, &blk) def delete_connection(client_id, &blk)
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, conn|
conn = connection_manager.pg_conn
count = 0 count = 0
user_id = nil user_id = nil
music_session_id = nil music_session_id = nil
@ -426,8 +435,7 @@ SQL
connection = nil connection = nil
tracks_changed = false tracks_changed = false
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, db_conn|
db_conn = connection_manager.pg_conn
connection = Connection.find_by_client_id(client_id) connection = Connection.find_by_client_id(client_id)
@ -469,9 +477,7 @@ SQL
# within the connection table lock # within the connection table lock
def leave_music_session(user, connection, music_session, &blk) def leave_music_session(user, connection, music_session, &blk)
send_tracks_changed = false send_tracks_changed = false
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, conn|
conn = connection_manager.pg_conn
lock_connections(conn) lock_connections(conn)

View File

@ -651,7 +651,7 @@ module JamRuby
ms.description = options[:description] ms.description = options[:description]
ms.genre_id = (options[:genres].length > 0 ? options[:genres][0] : nil) if options[:genres] ms.genre_id = (options[:genres].length > 0 ? options[:genres][0] : nil) if options[:genres]
ms.musician_access = options[:musician_access] ms.musician_access = options[:musician_access]
ms.friends_can_join = options[:friends_can_join] ms.friends_can_join = options[:friends_can_join] || false
ms.approval_required = options[:approval_required] ms.approval_required = options[:approval_required]
ms.fan_access = options[:fan_access] ms.fan_access = options[:fan_access]
ms.fan_chat = options[:fan_chat] ms.fan_chat = options[:fan_chat]

View File

@ -694,8 +694,8 @@ ConfigureTracksActions = @ConfigureTracksActions
) )
) )
) )
.fail(() => .fail((e) =>
logger.error("unable to fetch session history") logger.error("unable to fetch session history", e)
) )
waitForSessionPageEnterDone: () -> waitForSessionPageEnterDone: () ->

View File

@ -1,7 +1,7 @@
module JamWebsockets module JamWebsockets
class ClientContext class ClientContext
attr_accessor :user, :client, :msg_count, :session, :client_type, :sent_bad_state_previously, :active attr_accessor :user, :client, :msg_count, :session, :client_type, :sent_bad_state_previously, :active, :updated_at
def initialize(user, client, client_type) def initialize(user, client, client_type)
@user = user @user = user
@ -12,9 +12,14 @@
@session = nil @session = nil
@sent_bad_state_previously = false @sent_bad_state_previously = false
@active = true @active = true
@updated_at = Time.now
client.context = self client.context = self
end end
def stale?(stale_time)
return Time.now - @updated_at > stale_time
end
def to_s def to_s
return "Client[user:#{@user} client:#{@client.client_id} msgs:#{@msg_count} session:#{@session} client_type:#{@client_type} channel_id: #{@client.channel_id}]" return "Client[user:#{@user} client:#{@client.client_id} msgs:#{@msg_count} session:#{@session} client_type:#{@client_type} channel_id: #{@client.channel_id}]"
end end

View File

@ -82,6 +82,40 @@ module JamWebsockets
@largest_message_user = nil @largest_message_user = nil
@highest_drift = 0 @highest_drift = 0
@pending_notification_seen_ats = {}
@semaphore_pnsa = Mutex.new
@pending_deletions = {}
@semaphore_deletions = Mutex.new
end
def init
wipe_all_connections
# this thread runs forever while WSG runs, and should do anything easily gotten out of critical message handling path
@background_thread = Thread.new {
count = 0
while true
begin
periodical_check_connections
periodical_notification_seen
if count == 30
periodical_check_clients
count = 0
end
count = count + 1
rescue => e
@log.error "unhandled error in thread #{e}"
@log.error "stacktrace #{e.backtrace}"
end
sleep 1
end
}
end end
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true, :chat_enabled => true, :chat_blast => true}, &block) def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true, :chat_enabled => true, :chat_blast => true}, &block)
@ -466,7 +500,8 @@ module JamWebsockets
client.onclose { client.onclose {
time_it('onclose') { time_it('onclose') {
@log.debug "connection closed. marking stale: #{client.context}" @log.debug "connection closed. marking stale: #{client.context}"
cleanup_client(client) #cleanup_client(client)
cleanup_client_with_id(client.client_id)
} }
} }
@ -538,19 +573,19 @@ module JamWebsockets
end end
# caused a client connection to be marked stale # caused a client connection to be marked stale
def stale_client(client) # def stale_client(client)
if client.client_id # if client.client_id
@log.info "marking client stale: #{client.context}" # @log.info "marking client stale: #{client.context}"
ConnectionManager.active_record_transaction do |connection_manager| # ConnectionManager.active_record_transaction do |connection_manager|
music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id) # music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id)
# update the session members, letting them know this client went stale # # update the session members, letting them know this client went stale
context = @client_lookup[client.client_id] # context = @client_lookup[client.client_id]
if music_session = ActiveMusicSession.find_by_id(music_session_id) # if music_session = ActiveMusicSession.find_by_id(music_session_id)
Notification.send_musician_session_stale(music_session, client.client_id, context.user) # Notification.send_musician_session_stale(music_session, client.client_id, context.user)
end unless music_session_id.nil? # end unless music_session_id.nil?
end # end
end # end
end # end
def route(client_msg, client) def route(client_msg, client)
message_type = @message_factory.get_message_type(client_msg) message_type = @message_factory.get_message_type(client_msg)
@ -713,7 +748,7 @@ module JamWebsockets
latency_tester.id, latency_tester.id,
connection_expire_time, connection_expire_time,
"latency_tester", "latency_tester",
client_id_int) client_id_int)
stats_logged_in stats_logged_in
send_to_client(client, login_ack) send_to_client(client, login_ack)
end end
@ -794,7 +829,6 @@ module JamWebsockets
end end
# we have to deal with jamblaster before login # we have to deal with jamblaster before login
if jamblaster_serial_no && jamblaster_serial_no != '' if jamblaster_serial_no && jamblaster_serial_no != ''
jamblaster = Jamblaster.bootstrap(jamblaster_serial_no) jamblaster = Jamblaster.bootstrap(jamblaster_serial_no)
@ -1117,16 +1151,24 @@ module JamWebsockets
#profile_it('heartbeat_transaction') { #profile_it('heartbeat_transaction') {
#Connection.transaction do #Connection.transaction do
# send back track_changes_counter if in a session # send back track_changes_counter if in a session
profile_it('heartbeat_session') {
if connection.music_session_id #dboptz
music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
track_changes_counter = music_session.track_changes_counter if music_session #profile_it('heartbeat_session') {
end # if connection.music_session_id
} # music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
# track_changes_counter = music_session.track_changes_counter if music_session
# end
#}
#stale = context.stale?(@connect_time_stale_client)
profile_it('heartbeat_touch') { profile_it('heartbeat_touch') {
# update connection updated_at and if the user is active # update connection updated_at and if the user is active
Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now) # dboptz
#Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now)
context.updated_at = Time.now
context.active = heartbeat.active
} }
profile_it('heartbeat_notification') { profile_it('heartbeat_notification') {
@ -1139,12 +1181,12 @@ module JamWebsockets
#} #}
profile_it('heartbeat_stale') { profile_it('heartbeat_stale') {
if connection.stale? #if stale
ConnectionManager.active_record_transaction do |connection_manager| # ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type) # heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name) # connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name)
end # end
end #end
} }
end end
@ -1170,24 +1212,20 @@ module JamWebsockets
notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1) notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1)
if notification_id_field && notification_id_field != '' if notification_id_field && notification_id_field != ''
notification = Notification.find_by_id(notification_id_field) notification = Notification.find_by_id(notification_id_field)
if notification @semaphore_pnsa.synchronize do
connection.user.notification_seen_at = notification.created_at if notification
unless connection.user.save(validate: false) @pending_notification_seen_ats[connection.user.id] = notification.created_at
@log.error "unable to update notification_seen_at via id field for client #{context}. errors: #{connection.user.errors.inspect}" else
end notification_seen_at_parsed = nil
else notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2)
notification_seen_at_parsed = nil begin
notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2) notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0
begin rescue Exception => e
notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0 @log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}"
rescue Exception => e end
@log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}"
end
if notification_seen_at_parsed if notification_seen_at_parsed
connection.user.notification_seen_at = notification_seen_at @pending_notification_seen_ats[connection.user.id] = notification_seen_at
unless connection.user.save(validate: false)
@log.error "unable to update notification_seen_at via time field for client #{context}. errors: #{connection.user.errors.inspect}"
end end
end end
end end
@ -1361,9 +1399,11 @@ module JamWebsockets
def periodical_flag_connections def periodical_flag_connections
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
ConnectionManager.active_record_transaction do |connection_manager| #pgoptz
connection_manager.flag_stale_connections(@gateway_name) #ConnectionManager.active_record_transaction do |connection_manager|
end # connection_manager.flag_stale_connections(@gateway_name)
#end
end end
def periodical_check_clients def periodical_check_clients
@ -1396,14 +1436,14 @@ module JamWebsockets
sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids}) sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids})
SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}'); SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}');
" "
ConnectionManager.active_record_transaction do |connection_manager| ConnectionManager.active_record_transaction do |connection_manager, conn|
conn = connection_manager.pg_conn
conn.exec(sql) do |result| conn.exec(sql) do |result|
result.each { |row| result.each { |row|
client_id = row['client_id'] client_id = row['client_id']
context = @client_lookup[client_id] context = @client_lookup[client_id]
if context if context
@log.debug("cleaning up missing client #{client_id}, #{context.user}") @log.debug("cleaning up missing client #{client_id}, #{context.user}")
#cleanup_client_with_id(client_id)
cleanup_client(context.client) cleanup_client(context.client)
else else
@log.error("could not clean up missing client #{client_id}") @log.error("could not clean up missing client #{client_id}")
@ -1413,6 +1453,39 @@ module JamWebsockets
end end
end end
def wipe_all_connections
# meant to be called only on startup; delete all connections fo myself
ConnectionManager.active_record_transaction do |connection_manager|
clients = connection_manager.connection_client_ids_for_gateway(@gateway_name)
clients.each do |client_id|
cleanup_client_with_id(client_id)
end
end
end
def periodical_notification_seen
pending = nil
@semaphore_pnsa.synchronize do
return if @pending_notification_seen_ats.count == 0
pending = @pending_notification_seen_ats.clone
@pending_notification_seen_ats.clear
end
bulk_values = pending.map{|k,v| "('#{k}', '#{v}'::timestamp)"}.join(',')
sql = %{
update users as u
set notification_seen_at = c.notification_seen_at
from (values
#{bulk_values}
) as c(user_id, notification_seen_at)
where c.user_id = u.id;
}
@log.info("SQL #{sql}")
ConnectionManager.active_record_transaction do |connection_manager, conn|
conn.exec(sql)
end
end
def periodical_check_connections def periodical_check_connections
# this method is designed to be called periodically (every few seconds) # this method is designed to be called periodically (every few seconds)
# in which this gateway instance will check only its own clients for their health # in which this gateway instance will check only its own clients for their health
@ -1426,18 +1499,31 @@ module JamWebsockets
# to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy # to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy
clients = [] clients = []
ConnectionManager.active_record_transaction do |connection_manager| # dboptz
clients = connection_manager.stale_connection_client_ids(@gateway_name) #ConnectionManager.active_record_transaction do |connection_manager|
# clients = connection_manager.stale_connection_client_ids(@gateway_name)
#
#end
puts "periodical_check_connections"
@client_lookup.each do |client_id, client_context|
if Time.now - client_context.updated_at > @connect_time_expire_client
cleanup_client_with_id(client_id)
end
end end
cleanup_clients_with_ids(clients)
end end
def periodical_stats_dump def periodical_stats_dump
# assume 60 seconds per status dump # assume 60 seconds per status dump
stats = @message_stats.sort_by { |k, v| -v } stats = @message_stats.sort_by { |k, v| -v }
stats.map { |i| i[1] = (i[1] / 60.0).round(2) } stats.map { |i| i[1] = (i[1] / 60.0).round(2) }
@semaphore.synchronize do
@log.info("num clients: #{@client_lookup.count}")
end
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', ')) @log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '))
@log.info("largest msg from #{@largest_message_user}: #{@largest_message ? @largest_message.length : 0}b") @log.info("largest msg from #{@largest_message_user}: #{@largest_message ? @largest_message.length : 0}b")
@ -1485,7 +1571,7 @@ module JamWebsockets
@message_stats['total_time'] = total_time @message_stats['total_time'] = total_time
@message_stats['banned_users'] = @temp_ban.length @message_stats['banned_users'] = @temp_ban.length
Stats.write('gateway.stats', @message_stats) #Stats.write('gateway.stats', @message_stats)
# clear out stats # clear out stats
@message_stats.clear @message_stats.clear
@ -1501,87 +1587,84 @@ module JamWebsockets
@heartbeat_tracker = {} @heartbeat_tracker = {}
end end
def cleanup_clients_with_ids(expired_connections) def cleanup_client_with_id(cid)
expired_connections.each do |expired_connection|
cid = expired_connection[:client_id]
client_context = @client_lookup[cid] client_context = @client_lookup[cid]
if client_context if client_context
#Diagnostic.expired_stale_connection(client_context.user.id, client_context) #Diagnostic.expired_stale_connection(client_context.user.id, client_context)
cleanup_client(client_context.client) cleanup_client(client_context.client)
end end
music_session = nil music_session = nil
recording_id = nil recording_id = nil
user = nil user = nil
# remove this connection from the database # remove this connection from the database
ConnectionManager.active_record_transaction do |mgr| ConnectionManager.active_record_transaction do |mgr|
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
user = User.find_by_id(user_id) user = User.find_by_id(user_id)
return if user.nil? # this can happen if you delete a user while their connection is up return if user.nil? # this can happen if you delete a user while their connection is up
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user}" @log.info "expiring stale connection client_id:#{cid}, user_id:#{user}"
Notification.send_friend_update(user_id, false, conn) if count == 0 Notification.send_friend_update(user_id, false, conn) if count == 0
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil? music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
user = User.find_by_id(user_id) unless user_id.nil? user = User.find_by_id(user_id) unless user_id.nil?
if music_session if music_session
msuh = MusicSessionUserHistory.where(client_id: cid).order('created_at DESC').first
msuh = MusicSessionUserHistory.where(client_id: cid).order('created_at DESC').first if msuh
if msuh msuh.session_removed_at = Time.now if msuh.session_removed_at.nil?
msuh.session_removed_at = Time.now if msuh.session_removed_at.nil? msuh.save(validate: false)
msuh.save(validate:false)
end
recording = music_session.stop_recording
unless recording.nil?
@log.debug "cleanup_client: stopped recording: #{recording.id} because user #{user} reconnected"
recording.discard_if_no_action(user) # throw away this users vote for the
@log.debug "cleanup_client: discard complete"
recording_id = recording.id unless recording.nil?
end
# if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog)
# then throw away the user's
most_recent_recording = music_session.most_recent_recording
if most_recent_recording && most_recent_recording.users.exists?(user)
@log.debug "cleanup_client: discarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected"
# if this user was in the most recent recording associated with the session they were just in, discard any tracks they had
most_recent_recording.discard_if_no_action(user) # throw away this users vote for the
else
@log.debug "cleanup_client: no recent recording to clean up"
end
music_session.with_lock do # VRFS-1297
@log.debug("cleanup_client: tick track changes")
music_session.tick_track_changes
@log.debug("cleanup_client: tick track changes done")
end
end end
}
end
if user && music_session
@log.info("cleanup_client: Send session depart message for #{user} to #{music_session.id}") recording = music_session.stop_recording
Notification.send_session_depart(music_session, cid, user, recording_id) unless recording.nil?
@log.info("cleanup_client: Sent session depart") @log.debug "cleanup_client: stopped recording: #{recording.id} because user #{user} reconnected"
end recording.discard_if_no_action(user) # throw away this users vote for the
@log.debug "cleanup_client: discard complete"
recording_id = recording.id unless recording.nil?
end
# if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog)
# then throw away the user's
most_recent_recording = music_session.most_recent_recording
if most_recent_recording && most_recent_recording.users.exists?(user)
@log.debug "cleanup_client: discarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected"
# if this user was in the most recent recording associated with the session they were just in, discard any tracks they had
most_recent_recording.discard_if_no_action(user) # throw away this users vote for the
else
@log.debug "cleanup_client: no recent recording to clean up"
end
music_session.with_lock do # VRFS-1297
@log.debug("cleanup_client: tick track changes")
music_session.tick_track_changes
@log.debug("cleanup_client: tick track changes done")
end
end
}
end
if user && music_session
@log.info("cleanup_client: Send session depart message for #{user} to #{music_session.id}")
Notification.send_session_depart(music_session, cid, user, recording_id)
@log.info("cleanup_client: Sent session depart")
end end
end end
# removes all resources associated with a client # removes all resources associated with a client
def cleanup_client(client) def cleanup_client(client)
client.close
# unregister any subscriptions
client.subscriptions.each do |subscription|
unregister_subscription(client, subscription[:type], subscription[:id])
end
@semaphore.synchronize do @semaphore.synchronize do
client.close
# unregister any subscriptions
client.subscriptions.each do |subscription|
unregister_subscription(client, subscription[:type], subscription[:id])
end
pending = client.context.nil? # presence of context implies this connection has been logged into pending = client.context.nil? # presence of context implies this connection has been logged into
if pending if pending

View File

@ -1,7 +1,7 @@
require 'em-websocket' require 'em-websocket'
require 'bugsnag' require 'bugsnag'
module JamWebsockets module JamWebsockets
class Server class Server
def initialize(options={}) def initialize(options={})
@ -38,17 +38,22 @@ module JamWebsockets
@log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale_client}; reconnect time = #{connect_time_expire_client}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port} gateway_name=#{gateway_name}" @log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale_client}; reconnect time = #{connect_time_expire_client}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port} gateway_name=#{gateway_name}"
@router.init
EventMachine.error_handler{|e| EventMachine.error_handler{|e|
puts "unhandled error #{e}" puts "unhandled error #{e}"
puts "unhandled error #{e.backtrace}"
@log.error "unhandled error #{e}" @log.error "unhandled error #{e}"
@log.error "unhandled error #{e.backtrace}"
#Bugsnag.notify(e) #Bugsnag.notify(e)
} }
EventMachine.run do EventMachine.run do
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name, allow_dynamic_registration: allow_dynamic_registration, chat_enabled: chat_enabled, chat_blast: chat_blast) do @router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name, allow_dynamic_registration: allow_dynamic_registration, chat_enabled: chat_enabled, chat_blast: chat_blast) do
start_connection_expiration
start_client_expiration #start_connection_expiration
start_connection_flagger #start_client_expiration
#start_connection_flagger
start_stats_dump start_stats_dump
start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug]) start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug])
calling_thread.wakeup if calling_thread calling_thread.wakeup if calling_thread
@ -100,7 +105,8 @@ module JamWebsockets
def start_connection_expiration def start_connection_expiration
# one cleanup on startup # one cleanup on startup
@router.periodical_check_connections @router.wipe_all_connections
#@router.periodical_check_connections
@last_conn_check = Time.now @last_conn_check = Time.now
timer = 2 timer = 2
@ -129,7 +135,7 @@ module JamWebsockets
end end
def start_stats_dump def start_stats_dump
EventMachine::PeriodicTimer.new(60) do EventMachine::PeriodicTimer.new(6) do
time_it('stats_dump') { safety_net { @router.periodical_stats_dump } } time_it('stats_dump') { safety_net { @router.periodical_stats_dump } }
end end
end end

View File

@ -171,4 +171,16 @@ FactoryGirl.define do
association :jam_track, factory: :jam_track association :jam_track, factory: :jam_track
association :user, factory: :user association :user, factory: :user
end end
factory :notification, :class => JamRuby::Notification do
factory :notification_text_message do
description 'TEXT_MESSAGE'
message "chocolate"
end
end
factory :friendship, :class => JamRuby::Friendship do
end
end end

View File

@ -0,0 +1,304 @@
require 'spec_helper'
require 'thread'
def time_it(cat, &blk)
start = Time.now
blk.call
time = Time.now - start
puts("TIME: #{cat}: #{time}")
end
def safety_net(&blk)
begin
blk.call
rescue => e
#Bugsnag.notify(e)
@log.error("unhandled exception in EM Timer #{e}")
puts "Error during processing: #{$!}"
puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
end
end
LoginClient = Class.new do
attr_accessor :onmsgblock, :onopenblock, :oncloseblock, :onerrorblock, :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions
def initialize(user)
@subscriptions = Set.new
@channel_id = user.id
@encode_json = true
end
def connected?
true
end
def onopen(&block)
@onopenblock = Proc.new { |handshake| block }
end
def onmessage(&block)
@onmsgblock= Proc.new { |data| block.call data }
end
def onclose(&block)
@oncloseblock = Proc.new { || block.call }
end
def onerror(&block)
@onerrorblock = Proc.new { |err| block.call err }
end
def close()
end
def send(msg)
puts msg
end
def get_peername
return "\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" # 37643, "localhost"
end
end
class TestClient
def initialize(user, router)
@user = user
@client = LoginClient.new(@user)
@router = router
end
def connect
@router.new_client(@client, false)
end
def login
login_message = login_options(@user)
@router.handle_login(@client, login_message)
end
def heartbeat(notification_id = nil, notification_seen_at = nil)
heartbeat = Jampb::Heartbeat.new
if notification_id
heartbeat.notification_seen = notification_id
else
heartbeat.notification_seen = 'junk'
end
if notification_seen_at
heartbeat.notification_seen_at = notification_seen_at.to_s
end
@router.handle_heartbeat(heartbeat, '1', @client)
end
def disconnect
@client.oncloseblock.call
end
private
def login_options(user)
options = {}
options["token"] = user.remember_token
options["client_id"] = user.id
options["client_type"] = "client"
options["client_id_int"] = 1
options
end
end
describe Router, :spec_timeout => 15 do
def database_env (env)
singleton = GenericState.new
singleton.id = 'default'
singleton.env = env
singleton.save!
end
def rails_env (env)
JamRuby::Environment.should_receive(:mode).any_number_of_times.and_return(env)
end
include EventedSpec::EMSpec
default_timeout(1000000)
em_before do
puts "EM BEFORE ROUTER NEW"
end
subject { @router }
em_after do
puts "EM AFTER"
end
def sequence_1(user, special_heartbeat)
client1 = TestClient.new(user, @router)
client1.connect
client1.login
@router.client_lookup.count.should be 1
client1.heartbeat
update_notification_at = Time.now.utc
#client1.heartbeat(special_heartbeat.id.to_s, nil)
client1.heartbeat(nil, update_notification_at)
client1.disconnect
sleep 6
user.reload
user = User.find(user.id)
@router.client_lookup.count.should be 0
last_notification_seen_at = user.notification_seen_at
last_notification_seen_at.should_not be_nil
last_notification_seen_at.to_i.should be update_notification_at.to_i
end
def sequence_2(user)
before_count = Connection.count
client1 = TestClient.new(user, @router)
client1.connect
client1.login
@router.client_lookup.count.should be 1
after_count = Connection.count
before_count.should be (after_count - 1)
sleep 25
@router.client_lookup.count.should be 0
disc_count = Connection.count
before_count.should be disc_count
end
def sequence_in_session(user)
before_count = Connection.count
client1 = TestClient.new(user, @router)
client1.connect
client1.login
@router.client_lookup.count.should be 1
music_session = FactoryGirl.create(:active_music_session, :creator => user)
connection = Connection.find_by_user_id(user.id)
connection.music_session = music_session
connection.save!
client1.disconnect
@router.client_lookup.count.should be 0
disc_count = Connection.count
before_count.should be disc_count
end
def sequence_in_two_session(user1, user2)
# create friendship between the two, to increase notifications
FactoryGirl.create(:friendship, :user => user1, :friend => user2)
FactoryGirl.create(:friendship, :user => user2, :friend => user1)
before_count = Connection.count
client1 = TestClient.new(user1, @router)
client1.connect
client1.login
@router.client_lookup.count.should be 1
client1 = TestClient.new(user2, @router)
client1.connect
client1.login
@router.client_lookup.count.should be 2
music_session = FactoryGirl.create(:active_music_session, :creator => user)
connection = Connection.find_by_user_id(user1.id)
connection.music_session = music_session
connection.save!
connection2 = Connection.find_by_user_id(user2.id)
connection2.music_session = music_session
connection2.save!
client1.disconnect
@router.client_lookup.count.should be 1
disc_count = Connection.count
before_count.should be (disc_count - 1)
end
def delete_conn_from_under(user)
before_count = Connection.count
client1 = TestClient.new(user, @router)
client1.connect
client1.login
@router.client_lookup.count.should be 1
after_count = Connection.count
before_count.should be (after_count - 1)
Connection.first.delete
@router.periodical_check_clients
@router.client_lookup.count.should be 0
disc_count = Connection.count
before_count.should be disc_count
end
describe "stress" do
before {
database_env('production')
rails_env('production')
stub_const("ENV", {'BUILD_NUMBER' => 1})
}
let(:user1) { FactoryGirl.create(:user) }
let(:user2) { FactoryGirl.create(:user) }
let(:user3) { FactoryGirl.create(:user) }
let(:user4) { FactoryGirl.create(:user) }
let(:user5) { FactoryGirl.create(:user) }
let!(:other) { FactoryGirl.create(:user, last_jam_locidispid: 1) }
let!(:msg1) {FactoryGirl.create(:notification_text_message, source_user: other, target_user: user1) }
it "the test" do
@router = Router.new()
@router.connect_time_expire_client = 20
@router.connect_time_stale_client = 14
@router.heartbeat_interval_client = @router.connect_time_stale_client / 2
@router.connect_time_expire_browser = 20
@router.connect_time_stale_browser = 14
@router.max_connections_per_user = 10
@router.heartbeat_interval_browser = @router.connect_time_stale_browser / 2
@router.maximum_minutely_heartbeat_rate_browser = 10
@router.maximum_minutely_heartbeat_rate_client = 10
@router.amqp_connection_manager = AmqpConnectionManager.new(true, 4, host: 'localhost', port: 5672)
@router.gateway_name = 'gateway1'
@router.init
em do
EventMachine::PeriodicTimer.new(5) do
time_it('stats_dump') { safety_net { @router.periodical_stats_dump } }
end
sequence_in_session(user3)
sequence_in_two_session(user4, user5)
delete_conn_from_under(user3)
sequence_1(user1, msg1)
sequence_1(user1, msg1)
sequence_2(user2)
done
end
end
end
end