1785 lines
68 KiB
Ruby
1785 lines
68 KiB
Ruby
require 'set'
|
|
require 'amqp'
|
|
require 'thread'
|
|
require 'json'
|
|
require 'eventmachine'
|
|
|
|
include Jampb
|
|
|
|
# add new field to client connection
|
|
module EventMachine
|
|
module WebSocket
|
|
class Connection < EventMachine::Connection
|
|
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions, :x_forwarded_for, :query, :is_jamblaster # client_id is uuid we give to each client to track them as we like
|
|
end
|
|
end
|
|
end
|
|
|
|
module JamWebsockets
|
|
|
|
class Router
|
|
|
|
attr_accessor :user_context_lookup,
|
|
:amqp_connection_manager,
|
|
:heartbeat_interval_client,
|
|
:connect_time_expire_client,
|
|
:connect_time_stale_client,
|
|
:heartbeat_interval_browser,
|
|
:connect_time_expire_browser,
|
|
:connect_time_stale_browser,
|
|
:maximum_minutely_heartbeat_rate_browser,
|
|
:maximum_minutely_heartbeat_rate_client,
|
|
:max_connections_per_user,
|
|
:gateway_name,
|
|
:client_lookup,
|
|
:time_it_sums,
|
|
:profile_it_sums,
|
|
:highest_drift,
|
|
:heartbeat_tracker
|
|
:temp_ban
|
|
|
|
|
|
def initialize()
|
|
@log = Logging.logger[self]
|
|
@clients = {} # clients that have logged in
|
|
@user_context_lookup = {} # lookup a set of client_contexts by user_id
|
|
@client_lookup = {} # lookup a client by client_id
|
|
@subscription_lookup = {}
|
|
@amqp_connection_manager = nil
|
|
@users_exchange = nil
|
|
@message_factory = JamRuby::MessageFactory.new
|
|
@semaphore = Mutex.new
|
|
@user_topic = nil
|
|
@client_topic = nil
|
|
@subscription_topic = nil
|
|
@thread_pool = nil
|
|
@heartbeat_interval_client = nil
|
|
@connect_time_expire_client = nil
|
|
@connect_time_stale_client = nil
|
|
@heartbeat_interval_browser= nil
|
|
@connect_time_expire_browser= nil
|
|
@connect_time_stale_browser= nil
|
|
@maximum_minutely_heartbeat_rate_browser = nil
|
|
@maximum_minutely_heartbeat_rate_client = nil
|
|
@gateway_name = nil
|
|
@stored_ars = nil
|
|
@stored_ars_beta = nil
|
|
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
|
|
@message_stats = {}
|
|
@time_it_sums = {}
|
|
@profile_it_sums = {}
|
|
@heartbeat_tracker = {}
|
|
@temp_ban = {}
|
|
@chat_enabled = true
|
|
@chat_blast = true
|
|
|
|
@login_success_count = 0
|
|
@login_fail_count = 0
|
|
@connected_count = 0
|
|
@disconnected_count = 0
|
|
@user_message_counts = {}
|
|
@largest_message = nil
|
|
@largest_message_user = nil
|
|
@highest_drift = 0
|
|
|
|
@pending_notification_seen_ats = {}
|
|
@semaphore_pnsa = Mutex.new
|
|
|
|
@pending_deletions = {}
|
|
@semaphore_deletions = Mutex.new
|
|
|
|
end
|
|
|
|
def init
|
|
wipe_all_connections
|
|
|
|
# this thread runs forever while WSG runs, and should do anything easily gotten out of critical message handling path
|
|
@background_thread = Thread.new {
|
|
count = 0
|
|
|
|
while true
|
|
|
|
begin
|
|
periodical_check_connections
|
|
periodical_notification_seen
|
|
|
|
if count == 30
|
|
periodical_check_clients
|
|
count = 0
|
|
end
|
|
count = count + 1
|
|
|
|
rescue => e
|
|
@log.error "unhandled error in thread #{e}"
|
|
@log.error "stacktrace #{e.backtrace}"
|
|
end
|
|
sleep 1
|
|
end
|
|
}
|
|
end
|
|
|
|
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true, :chat_enabled => true, :chat_blast => true}, &block)
|
|
|
|
@log.info "startup"
|
|
|
|
@heartbeat_interval_client = connect_time_stale_client / 2
|
|
@connect_time_stale_client = connect_time_stale_client
|
|
@connect_time_expire_client = connect_time_expire_client
|
|
@heartbeat_interval_browser = connect_time_stale_browser / 2
|
|
@connect_time_stale_browser = connect_time_stale_browser
|
|
@connect_time_expire_browser = connect_time_expire_browser
|
|
@max_connections_per_user = options[:max_connections_per_user]
|
|
@gateway_name = options[:gateway]
|
|
@allow_dynamic_registration = options[:allow_dynamic_registration]
|
|
@chat_enabled = options[:chat_enabled]
|
|
@chat_blast = options[:chat_blast]
|
|
|
|
# determine the maximum amount of heartbeats we should get per user
|
|
@maximum_minutely_heartbeat_rate_client = ((@heartbeat_interval_client / 60.0) * 2).ceil + 3
|
|
@maximum_minutely_heartbeat_rate_browser = ((@heartbeat_interval_browser / 60.0) * 2).ceil + 3
|
|
|
|
@log.info("maxmium minutely timer #{maximum_minutely_heartbeat_rate_client}")
|
|
begin
|
|
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
|
|
@amqp_connection_manager.connect do |channel|
|
|
register_topics(channel)
|
|
block.call
|
|
end
|
|
|
|
rescue => e
|
|
@log.error "unable to initialize #{e.to_s}"
|
|
cleanup
|
|
raise e
|
|
end
|
|
|
|
@log.info "started"
|
|
end
|
|
|
|
def add_client(client_id, client_context)
|
|
@log.debug "adding client #{client_id} to @client_lookup"
|
|
@client_lookup[client_id] = client_context
|
|
end
|
|
|
|
def remove_client(client_id)
|
|
deleted = @client_lookup.delete(client_id)
|
|
|
|
if deleted.nil?
|
|
@log.warn "unable to delete #{client_id} from client_lookup because it's already gone"
|
|
else
|
|
@log.debug "cleaned up @client_lookup for #{client_id}"
|
|
end
|
|
|
|
end
|
|
|
|
def add_user(context)
|
|
user_contexts = @user_context_lookup[context.user.id]
|
|
if user_contexts.nil?
|
|
user_contexts = Hash.new
|
|
@user_context_lookup[context.user.id] = user_contexts
|
|
end
|
|
|
|
user_contexts[context.client] = context
|
|
end
|
|
|
|
def remove_user(client_context)
|
|
if client_context.user
|
|
user_contexts = @user_context_lookup[client_context.user.id]
|
|
|
|
if user_contexts.nil?
|
|
@log.warn "user can not be removed #{client_context}"
|
|
else
|
|
# delete the context from set of user contexts
|
|
user_contexts.delete(client_context.client)
|
|
|
|
# if last user context, delete entire set (memory leak concern)
|
|
if user_contexts.length == 0
|
|
@user_context_lookup.delete(client_context.user.id)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# register topic for user messages and session messages
|
|
def register_topics(channel)
|
|
|
|
######################## USER MESSAGING ###########################
|
|
|
|
# create user exchange
|
|
@users_exchange = channel.topic('users')
|
|
# create user messaging topic
|
|
@user_topic = channel.queue("", :auto_delete => true)
|
|
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
|
@user_topic.purge
|
|
|
|
# subscribe for any messages to users
|
|
@user_topic.subscribe(:ack => false) do |headers, msg|
|
|
time_it('user_topic') {
|
|
begin
|
|
routing_key = headers.routing_key
|
|
user_id = routing_key["user.".length..-1]
|
|
|
|
@semaphore.synchronize do
|
|
contexts = @user_context_lookup[user_id]
|
|
|
|
if !contexts.nil?
|
|
|
|
@log.debug "received user-directed message for user: #{user_id}"
|
|
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
|
|
contexts.each do |client_id, context|
|
|
EM.schedule do
|
|
@log.debug "sending user message to #{context}"
|
|
send_to_client(context.client, msg)
|
|
end
|
|
end
|
|
else
|
|
#@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty
|
|
end
|
|
end
|
|
|
|
rescue => e
|
|
@log.error "unhandled error in messaging to client"
|
|
@log.error e
|
|
end
|
|
}
|
|
end
|
|
|
|
MQRouter.user_exchange = @users_exchange
|
|
|
|
############## CLIENT MESSAGING ###################
|
|
|
|
@clients_exchange = channel.topic('clients')
|
|
|
|
@client_topic = channel.queue("", :auto_delete => true)
|
|
@client_topic.bind(@clients_exchange, :routing_key => "client.#")
|
|
@client_topic.purge
|
|
|
|
# subscribe for any p2p messages to a client
|
|
@client_topic.subscribe(:ack => false) do |headers, msg|
|
|
time_it('p2p_topic') {
|
|
begin
|
|
routing_key = headers.routing_key
|
|
client_id = routing_key["client.".length..-1]
|
|
@semaphore.synchronize do
|
|
|
|
if client_id == MessageFactory::ALL_NATIVE_CLIENTS
|
|
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
@log.debug "client-directed message received from #{msg.from} to all clients"
|
|
@client_lookup.each do |client_id, client_context|
|
|
if client_context.client_type == JamRuby::Connection::TYPE_CLIENT
|
|
client = client_context.client
|
|
|
|
if client
|
|
EM.schedule do
|
|
@log.debug "sending client-directed down websocket to #{client_id}"
|
|
send_to_client(client, msg)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
elsif client_id == MessageFactory::ALL_ACTIVE_CLIENTS
|
|
if @chat_enabled
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
@log.debug "client-directed message received from #{msg.from} to all chat clients"
|
|
@client_lookup.each do |client_id, client_context|
|
|
if @chat_blast || client_context.active
|
|
client = client_context.client
|
|
user = client_context.user
|
|
|
|
# disable global chat for schoolers for now
|
|
if client && user && user.school_id.nil?
|
|
EM.schedule do
|
|
#@log.debug "sending client-directed down websocket to #{client_id}"
|
|
send_to_client(client, msg)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
else
|
|
client_context = @client_lookup[client_id]
|
|
|
|
if client_context
|
|
|
|
client = client_context.client
|
|
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
|
|
@log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
|
|
|
unless client.nil?
|
|
|
|
EM.schedule do
|
|
@log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
|
send_to_client(client, msg)
|
|
end
|
|
else
|
|
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
|
|
end
|
|
else
|
|
#@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios
|
|
end
|
|
end
|
|
|
|
end
|
|
rescue => e
|
|
@log.error "unhandled error in messaging to client"
|
|
@log.error e
|
|
end
|
|
}
|
|
end
|
|
|
|
MQRouter.client_exchange = @clients_exchange
|
|
|
|
############## DYNAMIC SUBSCRIPTION MESSAGING ###################
|
|
|
|
@subscriptions_exchange = channel.topic('subscriptions')
|
|
@subscription_topic = channel.queue("subscriptions-#{@gateway_name}", :auto_delete => true)
|
|
@subscription_topic.purge
|
|
|
|
# subscribe for any p2p messages to a client
|
|
@subscription_topic.subscribe(:ack => false) do |headers, msg|
|
|
time_it('subscribe_topic') {
|
|
begin
|
|
routing_key = headers.routing_key
|
|
type_and_id = routing_key["subscription.".length..-1]
|
|
#type, id = type_and_id.split('.')
|
|
|
|
@semaphore.synchronize do
|
|
|
|
clients = @subscription_lookup[type_and_id]
|
|
|
|
msg = Jampb::ClientMessage.parse(msg)
|
|
|
|
if clients
|
|
EM.schedule do
|
|
clients.each do |client|
|
|
@log.debug "subscription msg to client #{client.client_id}"
|
|
send_to_client(client, msg)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
rescue => e
|
|
@log.error "unhandled error in messaging to client for mount"
|
|
@log.error e
|
|
end
|
|
}
|
|
end
|
|
|
|
MQRouter.subscription_exchange = @subscriptions_exchange
|
|
end
|
|
|
|
# listens on a subscription topic on the behalf of a client
|
|
def register_subscription(client, type, id)
|
|
# track subscriptions that this client has made, for disconnect scenarios
|
|
client.subscriptions.add({type: type, id: id})
|
|
|
|
key = "#{type}.#{id}"
|
|
|
|
# for a given type:id in @subscription_lookup, track clients listening
|
|
clients = @subscription_lookup[key]
|
|
if clients.nil?
|
|
clients = Set.new
|
|
@subscription_lookup[key] = clients
|
|
end
|
|
|
|
needs_subscription = clients.length == 0
|
|
|
|
clients.add(client)
|
|
|
|
@log.debug("register subscription handled #{type}.#{id}")
|
|
if needs_subscription
|
|
routing_key = "subscription.#{type}.#{id}"
|
|
@log.debug("register topic bound #{routing_key}")
|
|
# if this is the 1st client to listen for this mount, then subscribe to the topic for this mount_id
|
|
@subscription_topic.bind(@subscriptions_exchange, :routing_key => routing_key)
|
|
end
|
|
end
|
|
|
|
# de-listens on a subscription topic on the behalf of a client
|
|
# called automatically when a clean disconnects, to keep state clean.
|
|
def unregister_subscription(client, type, id)
|
|
# remove subscription from this client's list of subscriptions
|
|
client.subscriptions.delete({type: type, id: id})
|
|
|
|
key = "#{type}.#{id}"
|
|
# for a given mount_id in @subscription_lookup, remove from list of clients listening
|
|
clients = @subscription_lookup[key]
|
|
if clients
|
|
deleted = clients.delete(client)
|
|
if !deleted
|
|
@log.error "unregister_subscription: unable to locate any client #{client.client_id} for id #{id}"
|
|
end
|
|
else
|
|
@log.error "unregister_subscription: unable to locate any clients for id #{id}"
|
|
end
|
|
|
|
if clients.length == 0
|
|
# if there are no more clients listening, then unsubscribe to the topic for this mount_id
|
|
routing_key = "subscription.#{type}.#{id}"
|
|
@log.debug("unregister dynamic topic #{routing_key}")
|
|
@subscription_topic.unbind(@subscriptions_exchange, :routing_key => routing_key)
|
|
end
|
|
end
|
|
|
|
# this method allows you to translate exceptions into websocket channel messages and behavior safely.
|
|
# pass in your block, throw an error in your logic, and have the right things happen on the websocket channel
|
|
def websocket_comm(client, original_message_id, &blk)
|
|
begin
|
|
blk.call
|
|
rescue SessionError => e
|
|
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
|
|
begin
|
|
# wrap the message up and send it down
|
|
error_msg = @message_factory.server_rejection_error(e.to_s, e.error_code)
|
|
send_to_client(client, error_msg)
|
|
ensure
|
|
cleanup_client(client)
|
|
end
|
|
rescue JamPermissionError => e
|
|
@log.info "permission error. reason=#{e.to_s}"
|
|
@log.info e
|
|
|
|
# wrap the message up and send it down
|
|
error_msg = @message_factory.server_permission_error(original_message_id, e.to_s)
|
|
send_to_client(client, error_msg)
|
|
rescue => e
|
|
@log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}"
|
|
@log.error e
|
|
|
|
if DbUtil.bad_conn_exception?(e)
|
|
# indicates connection to server is down; kill self. Will be restarted; if db is up we will be healthy
|
|
@log.error "EXITING DUE TO DEAD DBCONN: #{e}"
|
|
Kernel.exit!(1)
|
|
end
|
|
|
|
begin
|
|
# wrap the message up and send it down
|
|
error_msg = @message_factory.server_generic_error(e.to_s)
|
|
send_to_client(client, error_msg)
|
|
ensure
|
|
cleanup_client(client)
|
|
end
|
|
end
|
|
end
|
|
|
|
def new_client(client, is_trusted)
|
|
# default to using json instead of pb
|
|
client.encode_json = true
|
|
client.trusted = is_trusted
|
|
|
|
client.onopen { |handshake|
|
|
|
|
time_it('onopen') {
|
|
stats_connected
|
|
|
|
# a unique ID for this TCP connection, to aid in debugging
|
|
client.channel_id = handshake.query["channel_id"]
|
|
|
|
@log.debug "client connected #{client} with channel_id: #{client.channel_id} Original-IP: #{handshake.headers["X-Forwarded-For"]}"
|
|
|
|
|
|
# check for '?pb' or '?pb=true' in url query parameters
|
|
query_pb = handshake.query["pb"]
|
|
|
|
if !query_pb.nil? && (query_pb == "" || query_pb == "true")
|
|
client.encode_json = false
|
|
end
|
|
|
|
websocket_comm(client, nil) do
|
|
client.x_forwarded_for = handshake.headers["X-Forwarded-For"]
|
|
client.query = handshake.query
|
|
handle_login(client, client.query, client.x_forwarded_for)
|
|
end
|
|
}
|
|
}
|
|
|
|
client.onclose {
|
|
time_it('onclose') {
|
|
@log.debug "connection closed. marking stale: #{client.context}"
|
|
#cleanup_client(client)
|
|
cleanup_client_with_id(client.client_id)
|
|
}
|
|
}
|
|
|
|
client.onerror { |error|
|
|
if error.kind_of?(EM::WebSocket::WebSocketError)
|
|
@log.error "websockets error: #{error}"
|
|
else
|
|
@log.error "generic error: #{error} #{error.backtrace}"
|
|
end
|
|
}
|
|
|
|
client.onmessage { |data|
|
|
|
|
# TODO: set a max message size before we put it through PB?
|
|
# TODO: rate limit?
|
|
|
|
msg = nil
|
|
|
|
if @largest_message.nil? || data.length > @largest_message.length
|
|
@largest_message = data
|
|
@largest_message_user = client.user_id
|
|
end
|
|
|
|
|
|
# extract the message safely
|
|
websocket_comm(client, nil) do
|
|
if client.encode_json
|
|
json = JSON.parse(data)
|
|
msg = Jampb::ClientMessage.json_create(json)
|
|
else
|
|
msg = Jampb::ClientMessage.parse(data.to_s)
|
|
end
|
|
end
|
|
|
|
# then route it internally
|
|
websocket_comm(client, msg.message_id) do
|
|
self.route(msg, client)
|
|
end
|
|
}
|
|
end
|
|
|
|
|
|
def send_to_client(client, msg)
|
|
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK || msg.type == ClientMessage::Type::PEER_MESSAGE
|
|
if client.encode_json
|
|
client.send(msg.to_json.to_s)
|
|
else
|
|
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
|
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
|
end
|
|
end
|
|
|
|
def cleanup()
|
|
# shutdown topic listeners and mq connection
|
|
|
|
unless @amqp_connection_manager.nil?
|
|
@amqp_connection_manager.disconnect
|
|
end
|
|
|
|
# tear down each individual client
|
|
@clients.each do |client, context|
|
|
cleanup_client(client)
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@log.info "shutdown"
|
|
cleanup
|
|
end
|
|
|
|
# caused a client connection to be marked stale
|
|
# def stale_client(client)
|
|
# if client.client_id
|
|
# @log.info "marking client stale: #{client.context}"
|
|
# ConnectionManager.active_record_transaction do |connection_manager|
|
|
# music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id)
|
|
# # update the session members, letting them know this client went stale
|
|
# context = @client_lookup[client.client_id]
|
|
# if music_session = ActiveMusicSession.find_by_id(music_session_id)
|
|
# Notification.send_musician_session_stale(music_session, client.client_id, context.user)
|
|
# end unless music_session_id.nil?
|
|
# end
|
|
# end
|
|
# end
|
|
|
|
def route(client_msg, client)
|
|
message_type = @message_factory.get_message_type(client_msg)
|
|
if message_type.nil?
|
|
Diagnostic.unknown_message_type(client.user_id, client_msg)
|
|
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
|
|
end
|
|
|
|
@message_stats[message_type] = @message_stats[message_type].to_i + 1
|
|
|
|
@log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::HEARTBEAT_ACK && client_msg.type != ClientMessage::Type::PEER_MESSAGE
|
|
|
|
if client_msg.route_to.nil?
|
|
Diagnostic.missing_route_to(client.user_id, client_msg)
|
|
raise SessionError, 'client_msg.route_to is null'
|
|
end
|
|
|
|
if !client.user_id && (client_msg.type != ClientMessage::Type::LOGIN && client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::LOGOUT)
|
|
# this client has not logged in and is trying to send a non-login message
|
|
|
|
if client.is_jamblaster
|
|
# send message back to client intsead of doing nothing?
|
|
@log.debug("jamblaster sent message #{message_type} at wrong time")
|
|
send_to_client(client, @message_factory.diagnostic("message type #{message_type} ignored because no log in"))
|
|
else
|
|
raise SessionError, "must 'Login' first"
|
|
end
|
|
|
|
end
|
|
|
|
if @message_factory.server_directed? client_msg
|
|
handle_server_directed(client_msg, client)
|
|
|
|
elsif @message_factory.client_directed? client_msg
|
|
to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1]
|
|
time_it('client_directed') { handle_client_directed(to_client_id, client_msg, client) }
|
|
|
|
elsif @message_factory.session_directed? client_msg
|
|
session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
|
|
time_it('session_directed') { handle_session_directed(session_id, client_msg, client) }
|
|
|
|
elsif @message_factory.user_directed? client_msg
|
|
user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
|
|
time_it('user_directed') { handle_user_directed(user_id, client_msg, client) }
|
|
|
|
else
|
|
raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}"
|
|
end
|
|
|
|
end
|
|
|
|
def handle_server_directed(client_msg, client)
|
|
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
|
|
|
|
if client_msg.type == ClientMessage::Type::LOGIN
|
|
# this is curently only a jamblaster path
|
|
client.query["token"] = client_msg.login.token
|
|
client.query["username"] = client_msg.login.username
|
|
client.query["password"] = client_msg.login.password
|
|
time_it('login') { handle_login(client, client.query, client.x_forwarded_for, false) }
|
|
elsif client_msg.type == ClientMessage::Type::LOGOUT
|
|
# this is currently only a jamblaster path
|
|
time_it('login') { handle_logout(client) }
|
|
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
|
|
time_it('heartbeat') { sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } }
|
|
elsif client_msg.type == ClientMessage::Type::USER_STATUS
|
|
time_it('user_status') { sane_logging { handle_user_status(client_msg.user_status, client) } }
|
|
elsif client_msg.type == ClientMessage::Type::SUBSCRIBE_BULK
|
|
time_it('subscribe_bulk') { sane_logging { handle_bulk_subscribe(client_msg.subscribe_bulk, client) } }
|
|
elsif client_msg.type == ClientMessage::Type::SUBSCRIBE
|
|
time_it('subscribe') { sane_logging { handle_subscribe(client_msg.subscribe, client) } }
|
|
elsif client_msg.type == ClientMessage::Type::UNSUBSCRIBE
|
|
time_it('unsubscribe') { sane_logging { handle_unsubscribe(client_msg.unsubscribe, client) } }
|
|
else
|
|
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
|
|
end
|
|
end
|
|
|
|
# returns heartbeat_interval, connection stale time, and connection expire time
|
|
def determine_connection_times(user, client_type)
|
|
|
|
if client_type == Connection::TYPE_BROWSER
|
|
default_heartbeat = @heartbeat_interval_browser
|
|
default_stale = @connect_time_stale_browser
|
|
default_expire = @connect_time_expire_browser
|
|
else
|
|
default_heartbeat = @heartbeat_interval_client
|
|
default_stale = @connect_time_stale_client
|
|
default_expire = @connect_time_expire_client
|
|
end
|
|
|
|
heartbeat_interval = (user && user.heartbeat_interval_client) || default_heartbeat
|
|
heartbeat_interval = heartbeat_interval.to_i
|
|
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
|
|
connection_expire_time = (user && user.connection_expire_time_client) || default_expire
|
|
connection_expire_time = connection_expire_time.to_i
|
|
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
|
|
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
|
|
|
|
if heartbeat_interval >= connection_stale_time
|
|
raise SessionError, "misconfiguration! heartbeat_interval (#{heartbeat_interval}) should be less than stale time (#{connection_stale_time})"
|
|
end
|
|
if connection_stale_time >= connection_expire_time
|
|
raise SessionError, "misconfiguration! stale time (#{connection_stale_time}) should be less than expire time (#{connection_expire_time})"
|
|
end
|
|
|
|
[heartbeat_interval, connection_stale_time, connection_expire_time]
|
|
end
|
|
|
|
def add_tracker(user, client, client_type, client_id)
|
|
# add a tracker for this user
|
|
context = @clients[client]
|
|
if context
|
|
context.user = user
|
|
add_user(context) if user
|
|
else
|
|
context = ClientContext.new(user, client, client_type)
|
|
@clients[client] = context
|
|
add_user(context) if user
|
|
add_client(client_id, context)
|
|
end
|
|
context
|
|
end
|
|
|
|
def handle_latency_tester_login(client_id, client_type, client, override_ip, client_id_int)
|
|
|
|
# respond with LOGIN_ACK to let client know it was successful
|
|
remote_ip = extract_ip(client, override_ip)
|
|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(nil, client_type)
|
|
latency_tester = LatencyTester.connect({
|
|
client_id: client_id,
|
|
channel_id: client.channel_id,
|
|
ip_address: remote_ip,
|
|
connection_stale_time: connection_stale_time,
|
|
connection_expire_time: connection_expire_time,
|
|
gateway: @gateway_name
|
|
})
|
|
if latency_tester.errors.any?
|
|
@log.warn "unable to log in latency_tester with errors: #{latency_tester.errors.inspect}"
|
|
stats_logged_in_failed
|
|
raise SessionError, "invalid login: #{latency_tester.errors.inspect}"
|
|
end
|
|
|
|
client.client_id = client_id
|
|
client.user_id = latency_tester.id if latency_tester
|
|
|
|
|
|
@semaphore.synchronize do
|
|
|
|
context = add_tracker(latency_tester, client, client_type, client_id)
|
|
|
|
@log.debug "logged in context created: #{context}"
|
|
|
|
login_ack = @message_factory.login_ack(remote_ip,
|
|
client_id,
|
|
nil,
|
|
heartbeat_interval,
|
|
nil,
|
|
false,
|
|
latency_tester.id,
|
|
connection_expire_time,
|
|
"latency_tester",
|
|
client_id_int)
|
|
stats_logged_in
|
|
send_to_client(client, login_ack)
|
|
end
|
|
end
|
|
|
|
def handle_logout(client)
|
|
connection = Connection.find_by_client_id(client.client_id)
|
|
|
|
if connection
|
|
connection.delete
|
|
end
|
|
|
|
client.user_id = nil
|
|
context = client.context
|
|
if context
|
|
@log.debug("will remove context with user: #{context.user}")
|
|
remove_user(context)
|
|
context.user = nil
|
|
end
|
|
|
|
logout_ack = @message_factory.logout_ack()
|
|
|
|
send_to_client(client, logout_ack)
|
|
end
|
|
|
|
def ars_list(beta)
|
|
if beta
|
|
@stored_ars_beta
|
|
else
|
|
@stored_ars
|
|
end
|
|
end
|
|
|
|
def handle_login(client, options, override_ip = nil, connecting = true)
|
|
puts("====handle_login====", options)
|
|
username = options["username"]
|
|
password = options["password"]
|
|
token = options["token"]
|
|
client_id = options["client_id"]
|
|
reconnect_music_session_id = options["music_session_id"]
|
|
client_type = options["client_type"]
|
|
machine_fingerprint = options["machine"]
|
|
os = options["os"]
|
|
product = options["product"].nil? ? JamRuby::ArtifactUpdate::CLIENT_PREFIX : options['
|
|
product']
|
|
udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true'
|
|
jamblaster_serial_no = options["jamblaster_serial_no"]
|
|
ipv4_link_local = options["ipv4_link_local"]
|
|
ipv6_link_local = options["ipv6_link_local"]
|
|
client_id_int = options["client_id_int"]
|
|
|
|
# it's nice to have client_ids not flap in the wind, and we can do that with jamblasters
|
|
if jamblaster_serial_no && client_id.nil?
|
|
client_id = jamblaster_serial_no
|
|
end
|
|
# TESTING
|
|
#if jamblaster_serial_no.nil?
|
|
# jamblaster_serial_no = 'hi'
|
|
#end
|
|
|
|
client.subscriptions = Set.new # list of subscriptions that this client is watching in real-time
|
|
|
|
serial_no_debug = jamblaster_serial_no ? "serial_no=#{jamblaster_serial_no}" : ''
|
|
@log.info("handle_login: type=#{client_type} username=#{username} password=#{password ? '*' : 'null' } token=#{token} client_id=#{client_id} client_id_int=#{client_id_int} channel_id=#{client.channel_id} udp_reachable=#{udp_reachable} #{serial_no_debug}")
|
|
if client_type == Connection::TYPE_LATENCY_TESTER
|
|
handle_latency_tester_login(client_id, client_type, client, override_ip, client_id_int)
|
|
return
|
|
end
|
|
|
|
reconnected = false
|
|
|
|
if connecting
|
|
# you don't have to supply client_id in login--if you don't, we'll generate one
|
|
if client_id.nil? || client_id.empty?
|
|
# give a unique ID to this client.
|
|
client_id = UUIDTools::UUID.random_create.to_s
|
|
end
|
|
else
|
|
# client_id's don't change per websocket connection; so use the one from memeory
|
|
client_id = client.client_id
|
|
end
|
|
|
|
|
|
# we have to deal with jamblaster before login
|
|
if jamblaster_serial_no && jamblaster_serial_no != ''
|
|
jamblaster = Jamblaster.bootstrap(jamblaster_serial_no)
|
|
if jamblaster
|
|
client.is_jamblaster = true
|
|
end
|
|
if jamblaster && connecting
|
|
jamblaster.client_id = client_id
|
|
jamblaster.ipv4_link_local = ipv4_link_local
|
|
jamblaster.ipv6_link_local = ipv6_link_local
|
|
jamblaster.save
|
|
end
|
|
end
|
|
|
|
user = valid_login(username, password, token, client_id, jamblaster)
|
|
|
|
# protect against this user swamping the server
|
|
if user && Connection.where(user_id: user.id).count >= @max_connections_per_user
|
|
@log.warn "user #{user.id}/#{user.email} unable to connect due to max_connections_per_user #{@max_connections_per_user}"
|
|
stats_logged_in_failed
|
|
raise SessionError, 'max_user_connections', 'max_user_connections'
|
|
end
|
|
|
|
if connecting
|
|
# XXX This logic needs to instead be handled by a broadcast out to all websockets indicating dup
|
|
# kill any websocket connections that have this same client_id, which can happen in race conditions
|
|
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
|
|
existing_context = @client_lookup[client_id]
|
|
if existing_context
|
|
# in some reconnect scenarios, we may have in memory a websocket client still.
|
|
# let's whack it, and tell the other client, if still connected, that this is a duplicate login attempt
|
|
@log.info "duplicate client: #{existing_context}"
|
|
Diagnostic.duplicate_client(existing_context.user, existing_context)
|
|
error_msg = @message_factory.server_duplicate_client_error
|
|
send_to_client(existing_context.client, error_msg)
|
|
cleanup_client(existing_context.client)
|
|
end
|
|
end
|
|
|
|
|
|
connection = Connection.find_by_client_id(client_id)
|
|
# if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection
|
|
# because it will recreate a new connection lower down
|
|
if connection && connection.user != user
|
|
keep = false
|
|
if user
|
|
if connection.user.nil?
|
|
keep = true
|
|
@log.debug("user #{user.email} logged into #{client_id}")
|
|
connection.user = user
|
|
connection.save
|
|
else
|
|
@log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}")
|
|
end
|
|
else
|
|
@log.debug("user-less connection #{client_id} took from user #{connection.user.email}")
|
|
end
|
|
|
|
if !keep
|
|
connection.delete
|
|
connection = nil
|
|
end
|
|
end
|
|
|
|
client.client_id = client_id
|
|
client.user_id = user.id if user
|
|
remote_ip = extract_ip(client, override_ip)
|
|
|
|
if user
|
|
|
|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(user, client_type)
|
|
|
|
@log.debug "logged in #{user} with client_id: #{client_id} and fingerprint #{machine_fingerprint}"
|
|
|
|
# track fingerprint
|
|
if machine_fingerprint && user.client_fingerprint != machine_fingerprint
|
|
user.update_attribute(:client_fingerprint, machine_fingerprint)
|
|
end
|
|
|
|
# check if there's a connection for the client... if it's stale, reconnect it
|
|
if !connection.nil? && connecting
|
|
# FIXME: I think connection table needs to updated within connection_manager
|
|
# otherwise this would be 1 line of code (connection.connect!)
|
|
|
|
music_session_upon_reentry = connection.music_session
|
|
|
|
send_depart = false
|
|
recording_id = nil
|
|
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name)
|
|
|
|
if music_session_id.nil?
|
|
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
|
|
# if so, then we need to tell the others in the session that this user is now departed
|
|
|
|
|
|
unless music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed?
|
|
|
|
if music_session_upon_reentry.backing_track_initiator == user
|
|
music_session_upon_reentry.close_backing_track
|
|
end
|
|
|
|
# if a jamtrack is open and this user is no longer in the session, close it
|
|
if music_session_upon_reentry.jam_track_initiator == user
|
|
music_session_upon_reentry.close_jam_track
|
|
end
|
|
|
|
# if a recording is open and this user is no longer in the session, close it
|
|
if music_session_upon_reentry.claimed_recording_initiator == user
|
|
music_session_upon_reentry.claimed_recording_stop
|
|
end
|
|
|
|
# handle case that a recording was ongoing - any one leaves, we stop it
|
|
recording = music_session_upon_reentry.stop_recording
|
|
unless recording.nil?
|
|
@log.debug "stopped recording: #{recording.id} because user #{user} reconnected"
|
|
recording.discard_if_no_action(user) # throw away this users vote for the
|
|
recording_id = recording.id unless recording.nil?
|
|
end
|
|
|
|
# if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog)
|
|
# then throw away the user's
|
|
most_recent_recording = music_session_upon_reentry.most_recent_recording
|
|
if most_recent_recording && most_recent_recording.users.exists?(user)
|
|
@log.debug "disarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected"
|
|
# if this user was in the most recent recording associated with the session they were just in, discard any tracks they had
|
|
most_recent_recording.discard_if_no_action(user) # throw away this users vote for the
|
|
end
|
|
|
|
music_session_upon_reentry.with_lock do # VRFS-1297
|
|
music_session_upon_reentry.tick_track_changes
|
|
end
|
|
send_depart = true
|
|
end
|
|
else
|
|
music_session = ActiveMusicSession.find_by_id(music_session_id)
|
|
Notification.send_musician_session_fresh(music_session, client.client_id, user)
|
|
end
|
|
|
|
end
|
|
|
|
if send_depart
|
|
Notification.send_session_depart(music_session_upon_reentry, client.client_id, user, recording_id)
|
|
end
|
|
end
|
|
|
|
# respond with LOGIN_ACK to let client know it was successful
|
|
# add a tracker for this user
|
|
context = add_tracker(user, client, client_type, client_id)
|
|
|
|
@log.debug "logged in context created: #{context}"
|
|
|
|
if !connection
|
|
# log this connection in the database
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name, jamblaster ? true : false) do |conn, count|
|
|
connection = Connection.find_by_client_id(client.client_id)
|
|
user.update_addr_loc(connection, User::JAM_REASON_LOGIN)
|
|
if count == 1
|
|
Notification.send_friend_update(user.id, true, conn)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
puts "========================="
|
|
puts "O/S = #{os}"
|
|
puts "========================="
|
|
# if we have OS data, try to grab client update data and let the client have it
|
|
update = ArtifactUpdate.find_client_by_os(product, os) if client_type == Connection:
|
|
:TYPE_CLIENT && os
|
|
|
|
client_update = update.update_data if update
|
|
|
|
arses = ars_list(user.beta)
|
|
|
|
login_ack = @message_factory.login_ack(remote_ip,
|
|
client_id,
|
|
user.remember_token,
|
|
heartbeat_interval,
|
|
connection.try(:music_session_id),
|
|
reconnected,
|
|
user.id,
|
|
connection_expire_time,
|
|
user.name,
|
|
connection.client_id_int,
|
|
client_update,
|
|
arses,
|
|
user.subscription_rules(false),
|
|
@stored_policy)
|
|
stats_logged_in
|
|
send_to_client(client, login_ack)
|
|
|
|
elsif jamblaster
|
|
# if no user, but we have a jamblaster, we can allow this session to go through
|
|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(nil, client_type)
|
|
|
|
@log.debug "logged in jb::#{jamblaster.serial_no} with client_id: #{client_id}"
|
|
|
|
# check if there's a connection for the client... if it's stale, reconnect it
|
|
if !connection.nil?
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name)
|
|
end
|
|
end
|
|
|
|
# add a tracker for this user
|
|
context = add_tracker(user, client, client_type, client_id)
|
|
|
|
@log.debug "logged in context created: #{context}"
|
|
|
|
if !connection
|
|
# log this connection in the database
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
connection_manager.create_connection(nil, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name, jamblaster ? true : false) do |conn, count|
|
|
# this blk is not call
|
|
end
|
|
end
|
|
end
|
|
# if we have OS data, try to grab client update data and let the client have it
|
|
update = ArtifactUpdate.find_client_by_os(product, os) if client_type == Connection:
|
|
:TYPE_CLIENT && os
|
|
|
|
client_update = update.update_data if update
|
|
|
|
connect_ack = @message_factory.connect_ack(remote_ip,
|
|
client_id,
|
|
heartbeat_interval,
|
|
connection_expire_time,
|
|
client_update)
|
|
stats_logged_in
|
|
send_to_client(client, connect_ack)
|
|
else
|
|
stats_logged_in_failed
|
|
raise SessionError.new('invalid login', 'invalid_login')
|
|
end
|
|
end
|
|
|
|
def handle_bulk_subscribe(subscriptions, client)
|
|
|
|
subscriptions.types.each_with_index do |subscription, i|
|
|
handle_subscribe(OpenStruct.new({type: subscriptions.types[i], id: subscriptions.ids[i]}), client)
|
|
end
|
|
end
|
|
|
|
def handle_subscribe(subscribe, client)
|
|
id = subscribe.id
|
|
type = subscribe.type
|
|
if id && id.length > 0 && type && type.length > 0
|
|
register_subscription(client, type, id) if @allow_dynamic_registration
|
|
else
|
|
@log.error("handle_subscribe: empty data #{subscribe}")
|
|
end
|
|
end
|
|
|
|
def handle_unsubscribe(unsubscribe, client)
|
|
id = unsubscribe.id
|
|
type = unsubscribe.type
|
|
if id && id.length > 0 && type && type.length > 0
|
|
unregister_subscription(client, type, id) if @allow_dynamic_registration
|
|
else
|
|
@log.error("handle_subscribe: empty data #{unsubscribe}")
|
|
end
|
|
end
|
|
|
|
def add_to_ban(user, reason)
|
|
user_ban = @temp_ban[user.id]
|
|
|
|
if user_ban.nil?
|
|
user_ban = {}
|
|
@temp_ban[user.id] = user_ban
|
|
end
|
|
|
|
# allow user back in, after 10 minutes
|
|
user_ban[:allow] = Time.now + 600
|
|
|
|
@log.info("user #{user} banned for 10 minutes. reason #{reason}")
|
|
end
|
|
|
|
def runaway_heartbeat(heartbeat, context)
|
|
heartbeat_count = @heartbeat_tracker[context.client.client_id] || 0
|
|
heartbeat_count += 1
|
|
@heartbeat_tracker[context.client.client_id] = heartbeat_count
|
|
|
|
if heartbeat_count > (context.client_type == 'browser' ? @maximum_minutely_heartbeat_rate_browser : @maximum_minutely_heartbeat_rate_client)
|
|
@log.warn("user #{context.user} sending too many heartbeats: #{heartbeat_count}") if heartbeat_count % 100 == 0
|
|
|
|
add_to_ban(context.user, 'too many heartbeats')
|
|
raise SessionError.new('too many heartbeats', 'empty_login')
|
|
else
|
|
false
|
|
end
|
|
|
|
end
|
|
|
|
def handle_user_status(user_status, client)
|
|
client.context.active = user_status.active
|
|
end
|
|
|
|
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
|
|
unless context = @clients[client]
|
|
profile_it('heartbeat_context_gone') {
|
|
@log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session"
|
|
#Diagnostic.missing_client_state(client.user_id, client.context)
|
|
raise SessionError, 'context state is gone. please reconnect.'
|
|
}
|
|
else
|
|
if runaway_heartbeat(heartbeat, context)
|
|
return
|
|
end
|
|
|
|
connection = nil
|
|
profile_it('heartbeat_find_conn') {
|
|
connection = Connection.find_by_client_id(context.client.client_id)
|
|
}
|
|
track_changes_counter = nil
|
|
if connection.nil?
|
|
profile_it('heartbeat_diag_missing') {
|
|
@log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session"
|
|
Diagnostic.missing_connection(client.user_id, client.context)
|
|
raise SessionError, 'connection state is gone. please reconnect.'
|
|
}
|
|
else
|
|
#profile_it('heartbeat_transaction') {
|
|
#Connection.transaction do
|
|
# send back track_changes_counter if in a session
|
|
|
|
#dboptz
|
|
|
|
#profile_it('heartbeat_session') {
|
|
# if connection.music_session_id
|
|
# music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
|
|
# track_changes_counter = music_session.track_changes_counter if music_session
|
|
# end
|
|
#}
|
|
|
|
#stale = context.stale?(@connect_time_stale_client)
|
|
|
|
profile_it('heartbeat_touch') {
|
|
# update connection updated_at and if the user is active
|
|
# dboptz
|
|
#Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now)
|
|
context.updated_at = Time.now
|
|
context.active = heartbeat.active
|
|
}
|
|
|
|
profile_it('heartbeat_notification') {
|
|
# update user's notification_seen_at field if the heartbeat indicates it saw one
|
|
# first we try to use the notification id, which should usually exist.
|
|
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
|
|
update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER
|
|
}
|
|
#end
|
|
#}
|
|
|
|
profile_it('heartbeat_stale') {
|
|
#if stale
|
|
# ConnectionManager.active_record_transaction do |connection_manager|
|
|
# heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
|
|
# connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name)
|
|
# end
|
|
#end
|
|
}
|
|
end
|
|
|
|
heartbeat_ack = @message_factory.heartbeat_ack(track_changes_counter)
|
|
|
|
send_to_client(client, heartbeat_ack)
|
|
|
|
# send errors to clients in response to heartbeats if rabbitmq is down
|
|
if !@amqp_connection_manager.connected?
|
|
error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down")
|
|
context.sent_bad_state_previously = true
|
|
send_to_client(client, error_msg)
|
|
return
|
|
elsif context.sent_bad_state_previously
|
|
context.sent_bad_state_previously = false
|
|
recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id)
|
|
send_to_client(client, recovery_msg)
|
|
end
|
|
end
|
|
end
|
|
|
|
def update_notification_seen_at(connection, context, heartbeat)
|
|
notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1)
|
|
if notification_id_field && notification_id_field != ''
|
|
notification = Notification.find_by_id(notification_id_field)
|
|
@semaphore_pnsa.synchronize do
|
|
if notification
|
|
@pending_notification_seen_ats[connection.user.id] = notification.created_at
|
|
else
|
|
notification_seen_at_parsed = nil
|
|
notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2)
|
|
begin
|
|
notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0
|
|
rescue Exception => e
|
|
@log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}"
|
|
end
|
|
|
|
if notification_seen_at_parsed
|
|
@pending_notification_seen_ats[connection.user.id] = notification_seen_at
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def valid_login(username, password, token, client_id, jamblaster)
|
|
|
|
if !token.nil? && token != ''
|
|
@log.debug "logging in via token"
|
|
# attempt login with token
|
|
user = User.find_by_remember_token(token)
|
|
|
|
if user.nil?
|
|
@log.debug "no user found with token #{token}"
|
|
return nil
|
|
else
|
|
|
|
# check against temp ban list
|
|
if @temp_ban[user.id]
|
|
@log.debug("user #{user} is still banned; rejecting login")
|
|
raise SessionError.new('login rejected temporarily', 'empty_login')
|
|
end
|
|
|
|
@log.debug "#{user} login via token"
|
|
return user
|
|
end
|
|
|
|
elsif !username.nil? and !password.nil?
|
|
|
|
@log.debug "logging in via user/pass '#{username}' '#{password}'"
|
|
# attempt login with username and password
|
|
user = User.find_by_email(username)
|
|
|
|
# check against temp ban list
|
|
if !user.nil? && @temp_ban[user.id]
|
|
@log.debug("user #{user} is still banned; rejecting login")
|
|
raise SessionError.new('login rejected temporarily', 'empty_login')
|
|
end
|
|
|
|
if !user.nil? && user.valid_password?(password)
|
|
@log.debug "#{user} login via password"
|
|
return user
|
|
else
|
|
@log.debug "#{username} login failure"
|
|
return nil
|
|
end
|
|
elsif jamblaster
|
|
# if there is a jamblaster in context, then we will allow no login.
|
|
return nil
|
|
else
|
|
raise SessionError.new('no login data was found in Login message', 'empty_login')
|
|
end
|
|
end
|
|
|
|
def access_music_session(music_session_id, user)
|
|
music_session = ActiveMusicSession.find_by_id(music_session_id)
|
|
|
|
if music_session.nil?
|
|
raise SessionError, 'specified session not found'
|
|
end
|
|
|
|
if !music_session.access? user
|
|
raise SessionError, 'not allowed to join the specified session'
|
|
end
|
|
|
|
return music_session
|
|
end
|
|
|
|
# client_id = the id of the client being accessed
|
|
# client = the current client
|
|
def access_p2p(client_id, user, msg)
|
|
|
|
return nil
|
|
|
|
# ping_request and ping_ack messages are special in that they are simply allowed
|
|
if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK
|
|
return nil
|
|
end
|
|
|
|
client_connection = Connection.find_by_client_id(client_id)
|
|
|
|
if client_connection.nil?
|
|
raise JamPermissionError, 'specified client not found'
|
|
end
|
|
|
|
if !client_connection.access_p2p? user
|
|
raise SessionError, 'not allowed to message this client'
|
|
end
|
|
end
|
|
|
|
|
|
def handle_client_directed(to_client_id, client_msg, client)
|
|
context = @clients[client]
|
|
|
|
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
|
|
# if for some reason the client is trying to send to a client that it doesn't
|
|
# belong to
|
|
#access_p2p(to_client_id, context.user, client_msg)
|
|
|
|
# quick and dirty safegaurds against the most dangerous operational messages from being sent by malicious clients
|
|
if client_msg.type == ClientMessage::Type::RELOAD ||
|
|
client_msg.type == ClientMessage::Type::CLIENT_UPDATE ||
|
|
client_msg.type == ClientMessage::Type::GENERIC_MESSAGE ||
|
|
client_msg.type == ClientMessage::Type::RESTART_APPLICATION ||
|
|
client_msg.type == ClientMessage::Type::STOP_APPLICATION
|
|
@@log.error("malicious activity")
|
|
raise SessionError, "not allowed"
|
|
end
|
|
|
|
if to_client_id.nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases
|
|
raise SessionError, "empty client_id specified in peer-to-peer message"
|
|
end
|
|
|
|
# populate routing data
|
|
client_msg.from = client.client_id
|
|
|
|
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" unless client_msg.type == ClientMessage::Type::PEER_MESSAGE
|
|
|
|
# put it on the topic exchange for clients
|
|
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
|
|
end
|
|
|
|
def handle_user_directed(user_id, client_msg, client)
|
|
|
|
@log.debug "publishing to user #{user_id} from client_id #{client.client_id}"
|
|
|
|
# put it on the topic exchange for users
|
|
@users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}")
|
|
end
|
|
|
|
def handle_session_directed(session_id, client_msg, client)
|
|
context = @clients[client]
|
|
|
|
user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id)
|
|
end
|
|
|
|
# sends a message to a session on behalf of a user
|
|
# if this is originating in the context of a client, it should be specified as :client_id => "value"
|
|
# client_msg should be a well-structure message (jam-pb message)
|
|
def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""})
|
|
music_session = access_music_session(music_session_id, user)
|
|
|
|
# gather up client_ids in the session
|
|
client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] }
|
|
|
|
publish_to_session(music_session.id, client_ids, client_msg.to_s, sender)
|
|
end
|
|
|
|
|
|
# sends a message to a session with no checking of permissions
|
|
# this method deliberately has no database interactivity/active_record objects
|
|
def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""})
|
|
|
|
EM.schedule do
|
|
sender_client_id = sender[:client_id]
|
|
|
|
# iterate over each person in the session, and send a p2p message
|
|
client_ids.each do |client_id|
|
|
|
|
@@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}"
|
|
# put it on the topic exchange3 for clients
|
|
self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}")
|
|
end
|
|
end
|
|
end
|
|
|
|
def extract_ip(client, override_ip)
|
|
override_ip || Socket.unpack_sockaddr_in(client.get_peername)[1]
|
|
end
|
|
|
|
def periodical_flag_connections
|
|
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
|
|
#pgoptz
|
|
#ConnectionManager.active_record_transaction do |connection_manager|
|
|
# connection_manager.flag_stale_connections(@gateway_name)
|
|
#end
|
|
|
|
end
|
|
|
|
def periodical_check_clients
|
|
# it's possible that a client will not be represented in the database anymore, due to hard to trace/guess scenario
|
|
# usually involve reconnects. Double-check that all clients in memory are actually in the database. if not, delete them from memory
|
|
|
|
stored_ars_raw = Ars.active_arses(true)
|
|
stored_ars_beta_raw = Ars.active_arses(false)
|
|
@stored_policy = GenericState.singleton.connection_policy
|
|
|
|
stored_ars = []
|
|
stored_ars_beta = []
|
|
stored_ars_raw.each do |ars|
|
|
stored_ars << @message_factory.ars_body(ars)
|
|
end
|
|
stored_ars_beta_raw.each do |ars|
|
|
stored_ars_beta << @message_factory.ars_body(ars)
|
|
end
|
|
@stored_ars = stored_ars
|
|
@stored_ars_beta = stored_ars_beta
|
|
|
|
if @client_lookup.length == 0
|
|
return
|
|
end
|
|
|
|
client_ids = @client_lookup.map { |client_id, info| "('#{client_id}')" }.join(',')
|
|
|
|
# find all client_id's that do not have a row in the db, and whack them
|
|
# this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584
|
|
sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids})
|
|
SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}');
|
|
"
|
|
ConnectionManager.active_record_transaction do |connection_manager, conn|
|
|
conn.exec(sql) do |result|
|
|
result.each { |row|
|
|
client_id = row['client_id']
|
|
context = @client_lookup[client_id]
|
|
if context
|
|
@log.debug("cleaning up missing client #{client_id}, #{context.user}")
|
|
#cleanup_client_with_id(client_id)
|
|
cleanup_client(context.client)
|
|
else
|
|
@log.error("could not clean up missing client #{client_id}")
|
|
end
|
|
}
|
|
end
|
|
end
|
|
end
|
|
|
|
def wipe_all_connections
|
|
# meant to be called only on startup; delete all connections fo myself
|
|
ConnectionManager.active_record_transaction do |connection_manager, conn|
|
|
clients = connection_manager.connection_client_ids_for_gateway(@gateway_name)
|
|
@log.info "Cleaning up #{clients.length} clients on startup"
|
|
clients.each do |client_id|
|
|
cleanup_client_with_id(client_id)
|
|
end
|
|
end
|
|
end
|
|
|
|
def periodical_notification_seen
|
|
pending = nil
|
|
@semaphore_pnsa.synchronize do
|
|
return if @pending_notification_seen_ats.count == 0
|
|
pending = @pending_notification_seen_ats.clone
|
|
@pending_notification_seen_ats.clear
|
|
end
|
|
|
|
bulk_values = pending.map{|k,v| "('#{k}', '#{v}'::timestamp)"}.join(',')
|
|
sql = %{
|
|
update users as u
|
|
set notification_seen_at = c.notification_seen_at
|
|
from (values
|
|
#{bulk_values}
|
|
) as c(user_id, notification_seen_at)
|
|
where c.user_id = u.id;
|
|
}
|
|
@log.info("SQL #{sql}")
|
|
ConnectionManager.active_record_transaction do |connection_manager, conn|
|
|
conn.exec(sql)
|
|
end
|
|
end
|
|
|
|
def periodical_check_connections
|
|
# this method is designed to be called periodically (every few seconds)
|
|
# in which this gateway instance will check only its own clients for their health
|
|
|
|
# since each gateway checks only the clients it knows about, this allows us to deploy
|
|
# n gateways that don't know much about each other.
|
|
|
|
# each gateway marks each connection row with it's gateway ID (so each gateway needs it's own ID or bad things happen)
|
|
|
|
# we also have a global resque job that checks for connections that appear to be not controlled by any gateway
|
|
# to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy
|
|
|
|
clients = []
|
|
# dboptz
|
|
#ConnectionManager.active_record_transaction do |connection_manager|
|
|
# clients = connection_manager.stale_connection_client_ids(@gateway_name)
|
|
#
|
|
#end
|
|
|
|
@client_lookup.each do |client_id, client_context|
|
|
if Time.now - client_context.updated_at > @connect_time_expire_client
|
|
cleanup_client_with_id(client_id)
|
|
end
|
|
end
|
|
|
|
end
|
|
|
|
def periodical_update_user_last_seen
|
|
active_users_ids = @client_lookup.map { |client_id, client_context| client_context.active ? client_context.user.id : nil }.compact.uniq
|
|
|
|
if active_users_ids.any?
|
|
sql = %{
|
|
update users set last_jam_updated_at = now(), last_jam_updated_reason='#{User::JAM_REASON_PRESENT}' where users.id in (#{active_users_ids.map{|id| "'#{id}'"}.join(',')});
|
|
}
|
|
@log.info("SQL #{sql}")
|
|
|
|
ConnectionManager.active_record_transaction do |connection_manager, conn|
|
|
conn.exec(sql)
|
|
end
|
|
end
|
|
end
|
|
|
|
def periodical_update_user_last_seen
|
|
active_users_ids = @client_lookup.map { |client_id, client_context| client_context.active ? client_context.user.id : nil }.compact.uniq
|
|
|
|
if active_users_ids.any?
|
|
sql = %{
|
|
update users set last_jam_updated_at = now(), last_jam_updated_reason='#{User::JAM_REASON_PRESENT}' where users.id in (#{active_users_ids.map{|id| "'#{id}'"}.join(',')});
|
|
}
|
|
@log.info("SQL #{sql}")
|
|
|
|
ConnectionManager.active_record_transaction do |connection_manager, conn|
|
|
conn.exec(sql)
|
|
end
|
|
end
|
|
end
|
|
|
|
def periodical_stats_dump
|
|
|
|
# assume 60 seconds per status dump
|
|
stats = @message_stats.sort_by { |k, v| -v }
|
|
stats.map { |i| i[1] = (i[1] / 60.0).round(2) }
|
|
|
|
@semaphore.synchronize do
|
|
@log.info("num clients: #{@client_lookup.count}")
|
|
end
|
|
|
|
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '))
|
|
@log.info("largest msg from #{@largest_message_user}: #{@largest_message ? @largest_message.length : 0}b")
|
|
|
|
if @highest_drift > 1
|
|
@log.info("highest drift: #{@highest_drift - 2}")
|
|
end
|
|
|
|
total_time = 0
|
|
time_sums = @time_it_sums.sort_by { |k, v| -v }
|
|
|
|
log_num = 3
|
|
count = 0
|
|
time_sums.each do |cat, cat_time|
|
|
count += 1
|
|
if count <= log_num
|
|
@log.info("timed #{cat} used time: #{cat_time}")
|
|
end
|
|
|
|
total_time += cat_time
|
|
end
|
|
|
|
@log.info("total used time: #{total_time}")
|
|
|
|
profile_sums = @profile_it_sums.sort_by { |k, v| -v }
|
|
profile_sums.each do |cat, cat_time|
|
|
@log.info("profiled #{cat} used time: #{cat_time}")
|
|
end
|
|
|
|
|
|
@temp_ban.each do |user_id, data|
|
|
if Time.now > data[:allow]
|
|
@log.info("user #{user_id} allowed back in")
|
|
@temp_ban.delete(user_id)
|
|
end
|
|
end
|
|
|
|
# stuff in extra stats into the @message_stats and send it all off
|
|
@message_stats['gateway_name'] = @gateway_name
|
|
@message_stats['login'] = @login_success_count
|
|
@message_stats['login_fail'] = @login_fail_count
|
|
@message_stats['connected'] = @connected_count
|
|
@message_stats['disconnected'] = @disconnected_count
|
|
@message_stats['largest_msg'] = @largest_message ? @largest_message.length : 0
|
|
@message_stats['highest_drift'] = @highest_drift - 2 # 2 comes from the server's 2 second timer for the drift check
|
|
@message_stats['total_time'] = total_time
|
|
@message_stats['banned_users'] = @temp_ban.length
|
|
|
|
#Stats.write('gateway.stats', @message_stats)
|
|
|
|
# clear out stats
|
|
@message_stats.clear
|
|
@login_success_count = 0
|
|
@login_fail_count = 0
|
|
@connected_count = 0
|
|
@disconnected_count = 0
|
|
@user_message_counts = {}
|
|
@largest_message = nil
|
|
@largest_message_user = nil
|
|
@time_it_sums = {}
|
|
@highest_drift = 0
|
|
@heartbeat_tracker = {}
|
|
end
|
|
|
|
def cleanup_client_with_id(cid)
|
|
|
|
client_context = @client_lookup[cid]
|
|
|
|
if client_context
|
|
#Diagnostic.expired_stale_connection(client_context.user.id, client_context)
|
|
cleanup_client(client_context.client)
|
|
end
|
|
|
|
music_session = nil
|
|
recording_id = nil
|
|
user = nil
|
|
|
|
# remove this connection from the database
|
|
ConnectionManager.active_record_transaction do |mgr|
|
|
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
|
|
user = User.find_by_id(user_id)
|
|
return if user.nil? # this can happen if you delete a user while their connection is up
|
|
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user}"
|
|
Notification.send_friend_update(user_id, false, conn) if count == 0
|
|
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
|
|
user = User.find_by_id(user_id) unless user_id.nil?
|
|
|
|
if music_session
|
|
msuh = MusicSessionUserHistory.where(client_id: cid).order('created_at DESC').first
|
|
if msuh
|
|
msuh.session_removed_at = Time.now if msuh.session_removed_at.nil?
|
|
msuh.save(validate: false)
|
|
end
|
|
|
|
|
|
recording = music_session.stop_recording
|
|
unless recording.nil?
|
|
@log.debug "cleanup_client: stopped recording: #{recording.id} because user #{user} reconnected"
|
|
recording.discard_if_no_action(user) # throw away this users vote for the
|
|
@log.debug "cleanup_client: discard complete"
|
|
recording_id = recording.id unless recording.nil?
|
|
end
|
|
|
|
# if the user was in a recording during the finializing phase (after stopped, but keep/discard still required in recordingFinishedDialog)
|
|
# then throw away the user's
|
|
most_recent_recording = music_session.most_recent_recording
|
|
if most_recent_recording && most_recent_recording.users.exists?(user)
|
|
@log.debug "cleanup_client: discarded user's vote for recording: #{most_recent_recording.id} because user #{user} reconnected"
|
|
# if this user was in the most recent recording associated with the session they were just in, discard any tracks they had
|
|
most_recent_recording.discard_if_no_action(user) # throw away this users vote for the
|
|
else
|
|
@log.debug "cleanup_client: no recent recording to clean up"
|
|
end
|
|
|
|
|
|
music_session.with_lock do # VRFS-1297
|
|
@log.debug("cleanup_client: tick track changes")
|
|
music_session.tick_track_changes
|
|
@log.debug("cleanup_client: tick track changes done")
|
|
end
|
|
end
|
|
}
|
|
end
|
|
|
|
if user && music_session
|
|
@log.info("cleanup_client: Send session depart message for #{user} to #{music_session.id}")
|
|
Notification.send_session_depart(music_session, cid, user, recording_id)
|
|
@log.info("cleanup_client: Sent session depart")
|
|
end
|
|
end
|
|
|
|
# removes all resources associated with a client
|
|
def cleanup_client(client)
|
|
@semaphore.synchronize do
|
|
|
|
client.close
|
|
|
|
# unregister any subscriptions
|
|
client.subscriptions.each do |subscription|
|
|
unregister_subscription(client, subscription[:type], subscription[:id])
|
|
end
|
|
|
|
pending = client.context.nil? # presence of context implies this connection has been logged into
|
|
|
|
if pending
|
|
@log.debug "cleaned up not-logged-in client #{client}"
|
|
stats_disconnected
|
|
else
|
|
@log.debug "cleanup up logged-in client #{client}"
|
|
|
|
context = @clients.delete(client)
|
|
|
|
if context
|
|
remove_client(client.client_id)
|
|
remove_user(context)
|
|
stats_disconnected
|
|
else
|
|
@log.warn "skipping duplicate cleanup attempt of logged-in client"
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def stats_logged_in
|
|
@login_success_count = @login_success_count + 1
|
|
end
|
|
|
|
def stats_logged_in_failed
|
|
@login_fail_count = @login_fail_count + 1
|
|
end
|
|
|
|
def stats_connected
|
|
@connected_count = @connected_count + 1
|
|
end
|
|
|
|
def stats_disconnected
|
|
@disconnected_count = @disconnected_count + 1
|
|
end
|
|
|
|
private
|
|
|
|
def time_it(cat, &blk)
|
|
start = Time.now
|
|
|
|
blk.call
|
|
|
|
time = Time.now - start
|
|
|
|
@time_it_sums[cat] = (@time_it_sums[cat] || 0)+ time
|
|
|
|
@log.warn("LONG TIME: #{cat}: #{time}") if time > 1
|
|
end
|
|
|
|
def profile_it(cat, &blk)
|
|
start = Time.now
|
|
|
|
blk.call
|
|
|
|
time = Time.now - start
|
|
|
|
@profile_it_sums[cat] = (@profile_it_sums[cat] || 0)+ time
|
|
|
|
@log.warn("LONG TIME: #{cat}: #{time}") if time > 1
|
|
end
|
|
|
|
def sane_logging(&blk)
|
|
# used around repeated transactions that cause too much ActiveRecord::Base logging
|
|
begin
|
|
if @ar_base_logger
|
|
original_level = @ar_base_logger.level
|
|
@ar_base_logger.level = :info
|
|
end
|
|
blk.call
|
|
ensure
|
|
if @ar_base_logger
|
|
@ar_base_logger.level = original_level
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|