diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb index 1a0bf2e32..dc1c20f66 100644 --- a/websocket-gateway/lib/jam_websockets/router.rb +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -11,6 +11,22 @@ module EventMachine module WebSocket class Connection < EventMachine::Connection attr_accessor :encode_json, :client_id # client_id is uuid we give to each client to track them as we like + + # http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open + attr_accessor :connected + def connection_completed + connected = true + super + end + + def connected? + !!connected + end + + def unbind + connected = false + super + end end end end @@ -35,7 +51,7 @@ module JamWebsockets @client_topic = nil @thread_pool = nil @heartbeat_interval = nil - + @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] end def start(connect_time_stale, options={:host => "localhost", :port => 5672}, &block) @@ -230,12 +246,15 @@ module JamWebsockets @log.error "generic error: #{error} #{error.backtrace}" end - cleanup_client(client) - client.close_websocket + unless error.to_s.include? "Close handshake un-acked" + cleanup_client(client) + else + @log.info "skipping cleanup because error is for dead connection: https://github.com/igrigorik/em-websocket/issues/122" + end + } client.onmessage { |msg| - @log.debug("msg received") # TODO: set a max message size before we put it through PB? # TODO: rate limit? @@ -259,7 +278,7 @@ module JamWebsockets error_msg = @message_factory.server_rejection_error(e.to_s) send_to_client(client, error_msg) ensure - client.close_websocket + client.close cleanup_client(client) end rescue PermissionError => e @@ -278,7 +297,7 @@ module JamWebsockets error_msg = @message_factory.server_generic_error(e.to_s) send_to_client(client, error_msg) ensure - client.close_websocket + client.close cleanup_client(client) end end @@ -287,7 +306,7 @@ module JamWebsockets def send_to_client(client, msg) - @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" + @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK if client.encode_json client.send(msg.to_json.to_s) else @@ -360,25 +379,23 @@ module JamWebsockets # removes all resources associated with a client def cleanup_client(client) @semaphore.synchronize do + client.close if client.connected? + # @log.debug("*** cleanup_clients: client = #{client}") pending = @pending_clients.delete?(client) - if !pending.nil? - @log.debug "cleaning up not-logged-in client #{client}" + if pending + @log.debug "cleaned up not-logged-in client #{client}" else - @log.debug "cleanup up logged-in client #{client}" - - remove_client(client.client_id) - context = @clients.delete(client) - if !context.nil? + if context + remove_client(client.client_id) remove_user(context) else @log.debug "skipping duplicate cleanup attempt of logged-in client" end - end end end @@ -388,7 +405,7 @@ module JamWebsockets raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? - @log.debug("msg received #{message_type}") + @log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT raise SessionError, 'client_msg.route_to is null' if client_msg.route_to.nil? @@ -426,9 +443,7 @@ module JamWebsockets handle_login(client_msg.login, client) elsif client_msg.type == ClientMessage::Type::HEARTBEAT - - handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) - + sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end @@ -458,7 +473,7 @@ module JamWebsockets existing_client = @client_lookup[client_id] if existing_client remove_client(client_id) - existing_client.client.close_websocket + existing_client.client.close end connection = JamRuby::Connection.find_by_client_id(client_id) @@ -767,5 +782,18 @@ module JamWebsockets def extract_ip(client) return Socket.unpack_sockaddr_in(client.get_peername)[1] end + + private + + def sane_logging(&blk) + # used around repeated transactions that cause too much ActiveRecord::Base logging + begin + original_level = @ar_base_logger.level + @ar_base_logger.level = :info if @ar_base_logger + blk.call + ensure + @ar_base_logger.level = original_level if @ar_base_logger + end + end end end diff --git a/websocket-gateway/lib/jam_websockets/server.rb b/websocket-gateway/lib/jam_websockets/server.rb index 4bf384396..1277936cb 100644 --- a/websocket-gateway/lib/jam_websockets/server.rb +++ b/websocket-gateway/lib/jam_websockets/server.rb @@ -6,9 +6,11 @@ module JamWebsockets class Server def initialize(options={}) + EM::WebSocket.close_timeout = 10 # the default of 60 is pretty intense @log = Logging.logger[self] @count=0 @router = Router.new + @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] end def run(options={}) @@ -64,7 +66,7 @@ module JamWebsockets expire_stale_connections(stale_max_time) EventMachine::PeriodicTimer.new(stale_max_time) do - expire_stale_connections(stale_max_time) + sane_logging { expire_stale_connections(stale_max_time) } end end @@ -83,7 +85,7 @@ module JamWebsockets flag_stale_connections(flag_max_time) EventMachine::PeriodicTimer.new(flag_max_time/2) do - flag_stale_connections(flag_max_time) + sane_logging { flag_stale_connections(flag_max_time) } end end @@ -94,6 +96,17 @@ module JamWebsockets end end + def sane_logging(&blk) + # used around repeated transactions that cause too much ActiveRecord::Base logging + begin + original_level = @ar_base_logger.level + @ar_base_logger.level = :info if @ar_base_logger + blk.call + ensure + @ar_base_logger.level = original_level if @ar_base_logger + end + end + end end