jam-cloud/websocket-gateway/lib/jam_websockets/router.rb

852 lines
32 KiB
Ruby

require 'set'
require 'amqp'
require 'thread'
require 'json'
require 'eventmachine'
include Jampb
# add new field to client connection
module EventMachine
module WebSocket
class Connection < EventMachine::Connection
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like
# http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open
attr_accessor :connected
def connection_completed
connected = true
super
end
def connected?
!!connected
end
def unbind
connected = false
super
end
end
end
end
module JamWebsockets
class Router
attr_accessor :user_context_lookup, :heartbeat_interval_client, :connect_time_expire_client, :connect_time_stale_client,
:heartbeat_interval_browser, :connect_time_expire_browser, :connect_time_stale_browser
def initialize()
@log = Logging.logger[self]
@clients = {} # clients that have logged in
@user_context_lookup = {} # lookup a set of client_contexts by user_id
@client_lookup = {} # lookup a client by client_id
@amqp_connection_manager = nil
@users_exchange = nil
@message_factory = JamRuby::MessageFactory.new
@semaphore = Mutex.new
@user_topic = nil
@client_topic = nil
@thread_pool = nil
@heartbeat_interval_client = nil
@connect_time_expire_client = nil
@connect_time_stale_client = nil
@heartbeat_interval_browser= nil
@connect_time_expire_browser= nil
@connect_time_stale_browser= nil
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
end
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672}, &block)
@log.info "startup"
@heartbeat_interval_client = connect_time_stale_client / 2
@connect_time_stale_client = connect_time_stale_client
@connect_time_expire_client = connect_time_expire_client
@heartbeat_interval_browser = connect_time_stale_browser / 2
@connect_time_stale_browser = connect_time_stale_browser
@connect_time_expire_browser = connect_time_expire_browser
begin
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
@amqp_connection_manager.connect do |channel|
register_topics(channel)
block.call
end
rescue => e
@log.error "unable to initialize #{e.to_s}"
cleanup
raise e
end
@log.info "started"
end
def add_client(client_id, client_context)
@log.debug "adding client #{client_id} to @client_lookup"
@client_lookup[client_id] = client_context
end
def remove_client(client_id)
deleted = @client_lookup.delete(client_id)
if deleted.nil?
@log.warn "unable to delete #{client_id} from client_lookup because it's already gone"
else
@log.debug "cleaned up @client_lookup for #{client_id}"
end
end
def add_user(context)
user_contexts = @user_context_lookup[context.user.id]
if user_contexts.nil?
user_contexts = Hash.new
@user_context_lookup[context.user.id] = user_contexts
end
user_contexts[context.client] = context
end
def remove_user(client_context)
user_contexts = @user_context_lookup[client_context.user.id]
if user_contexts.nil?
@log.warn "user can not be removed #{client_context}"
else
# delete the context from set of user contexts
user_contexts.delete(client_context.client)
# if last user context, delete entire set (memory leak concern)
if user_contexts.length == 0
@user_context_lookup.delete(client_context.user.id)
end
end
end
# register topic for user messages and session messages
def register_topics(channel)
######################## USER MESSAGING ###########################
# create user exchange
@users_exchange = channel.topic('users')
# create user messaging topic
@user_topic = channel.queue("", :auto_delete => true)
@user_topic.bind(@users_exchange, :routing_key => "user.#")
@user_topic.purge
# subscribe for any messages to users
@user_topic.subscribe(:ack => false) do |headers, msg|
begin
routing_key = headers.routing_key
user_id = routing_key["user.".length..-1]
@semaphore.synchronize do
contexts = @user_context_lookup[user_id]
if !contexts.nil?
@log.debug "received user-directed message for user: #{user_id}"
msg = Jampb::ClientMessage.parse(msg)
contexts.each do |client_id, context|
EM.schedule do
@log.debug "sending user message to #{context}"
send_to_client(context.client, msg)
end
end
else
@log.debug "Can't route message: no user connected with id #{user_id}"
end
end
rescue => e
@log.error "unhandled error in messaging to client"
@log.error e
end
end
MQRouter.user_exchange = @users_exchange
############## CLIENT MESSAGING ###################
@clients_exchange = channel.topic('clients')
@client_topic = channel.queue("", :auto_delete => true)
@client_topic.bind(@clients_exchange, :routing_key => "client.#")
@client_topic.purge
# subscribe for any p2p messages to a client
@client_topic.subscribe(:ack => false) do |headers, msg|
begin
routing_key = headers.routing_key
client_id = routing_key["client.".length..-1]
@semaphore.synchronize do
client_context = @client_lookup[client_id]
if !client_context.nil?
client = client_context.client
msg = Jampb::ClientMessage.parse(msg)
@log.debug "client-directed message received from #{msg.from} to client #{client_id}"
unless client.nil?
EM.schedule do
@log.debug "sending client-directed down websocket to #{client_id}"
send_to_client(client, msg)
end
else
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
end
else
@log.debug "Can't route message: no client connected with id #{client_id}"
end
end
rescue => e
@log.error "unhandled error in messaging to client"
@log.error e
end
end
MQRouter.client_exchange = @clients_exchange
end
def new_client(client)
# default to using json instead of pb
client.encode_json = true
client.onopen { |handshake|
# a unique ID for this TCP connection, to aid in debugging
client.channel_id = handshake.query["channel_id"]
@log.debug "client connected #{client} with channel_id: #{client.channel_id}"
# check for '?pb' or '?pb=true' in url query parameters
query_pb = handshake.query["pb"]
if !query_pb.nil? && (query_pb == "" || query_pb == "true")
client.encode_json = false
end
}
client.onclose {
@log.debug "connection closed. marking stale: #{client.context}"
cleanup_client(client)
}
client.onerror { |error|
if error.kind_of?(EM::WebSocket::WebSocketError)
@log.error "websockets error: #{error}"
else
@log.error "generic error: #{error} #{error.backtrace}"
end
}
client.onmessage { |msg|
# TODO: set a max message size before we put it through PB?
# TODO: rate limit?
pb_msg = nil
begin
if client.encode_json
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
parse = JSON.parse(msg)
pb_msg = Jampb::ClientMessage.json_create(parse)
self.route(pb_msg, client)
else
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
self.route(pb_msg, client)
end
rescue SessionError => e
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
begin
# wrap the message up and send it down
error_msg = @message_factory.server_rejection_error(e.to_s)
send_to_client(client, error_msg)
ensure
cleanup_client(client)
end
rescue PermissionError => e
@log.info "permission error. reason=#{e.to_s}"
@log.info e
# wrap the message up and send it down
error_msg = @message_factory.server_permission_error(pb_msg.message_id, e.to_s)
send_to_client(client, error_msg)
rescue => e
@log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}"
@log.error e
begin
# wrap the message up and send it down
error_msg = @message_factory.server_generic_error(e.to_s)
send_to_client(client, error_msg)
ensure
cleanup_client(client)
end
end
}
end
def send_to_client(client, msg)
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK
if client.encode_json
client.send(msg.to_json.to_s)
else
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
end
end
def cleanup()
# shutdown topic listeners and mq connection
unless @amqp_connection_manager.nil?
@amqp_connection_manager.disconnect
end
# tear down each individual client
@clients.each do |client, context|
cleanup_client(client)
end
end
def stop
@log.info "shutdown"
cleanup
end
# caused a client connection to be marked stale
def stale_client(client)
if client.client_id
@log.info "marking client stale: #{client.context}"
ConnectionManager.active_record_transaction do |connection_manager|
music_session_id = connection_manager.flag_connection_stale_with_client_id(client.client_id)
# update the session members, letting them know this client went stale
context = @client_lookup[client.client_id]
if music_session = MusicSession.find_by_id(music_session_id)
Notification.send_musician_session_stale(music_session, client.client_id, context.user)
end unless music_session_id.nil?
end
end
end
def cleanup_clients_with_ids(expired_connections)
expired_connections.each do |expired_connection|
cid = expired_connection[:client_id]
client_context = @client_lookup[cid]
if client_context
Diagnostic.expired_stale_connection(client_context.user.id, client_context)
cleanup_client(client_context.client)
end
music_session = nil
recording_id = nil
user = nil
# remove this connection from the database
ConnectionManager.active_record_transaction do |mgr|
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}"
Notification.send_friend_update(user_id, false, conn) if count == 0
music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil?
user = User.find_by_id(user_id) unless user_id.nil?
recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one
recording_id = recording.id unless recording.nil?
music_session.with_lock do # VRFS-1297
music_session.tick_track_changes
end if music_session
}
end
if user && music_session
Notification.send_session_depart(music_session, cid, user, recording_id)
end
end
end
# removes all resources associated with a client
def cleanup_client(client)
@semaphore.synchronize do
client.close if client.connected?
pending = client.context.nil? # presence of context implies this connection has been logged into
if pending
@log.debug "cleaned up not-logged-in client #{client}"
else
@log.debug "cleanup up logged-in client #{client}"
context = @clients.delete(client)
if context
remove_client(client.client_id)
remove_user(context)
else
@log.warn "skipping duplicate cleanup attempt of logged-in client"
end
end
end
end
def route(client_msg, client)
message_type = @message_factory.get_message_type(client_msg)
if message_type.nil?
Diagnostic.unknown_message_type(client.user_id, client_msg)
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
end
@log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT
if client_msg.route_to.nil?
Diagnostic.missing_route_to(client.user_id, client_msg)
raise SessionError, 'client_msg.route_to is null'
end
if !client.user_id and client_msg.type != ClientMessage::Type::LOGIN
# this client has not logged in and is trying to send a non-login message
raise SessionError, "must 'Login' first"
end
if @message_factory.server_directed? client_msg
handle_server_directed(client_msg, client)
elsif @message_factory.client_directed? client_msg
to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1]
handle_client_directed(to_client_id, client_msg, client)
elsif @message_factory.session_directed? client_msg
session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
handle_session_directed(session_id, client_msg, client)
elsif @message_factory.user_directed? client_msg
user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
handle_user_directed(user_id, client_msg, client)
else
raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}"
end
end
def handle_server_directed(client_msg, client)
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
if client_msg.type == ClientMessage::Type::LOGIN
handle_login(client_msg.login, client)
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) }
else
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
end
end
# returns heartbeat_interval, connection stale time, and connection expire time
def determine_connection_times(user, client_type)
if client_type == Connection::TYPE_BROWSER
default_heartbeat = @heartbeat_interval_browser
default_stale = @connect_time_stale_browser
default_expire = @connect_time_expire_browser
else
default_heartbeat = @heartbeat_interval_client
default_stale = @connect_time_stale_client
default_expire = @connect_time_expire_client
end
heartbeat_interval = user.heartbeat_interval_client || default_heartbeat
heartbeat_interval = heartbeat_interval.to_i
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
connection_expire_time = user.connection_expire_time_client || default_expire
connection_expire_time = connection_expire_time.to_i
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
if heartbeat_interval >= connection_stale_time
raise SessionError, "misconfiguration! heartbeat_interval (#{heartbeat_interval}) should be less than stale time (#{connection_stale_time})"
end
if connection_stale_time >= connection_expire_time
raise SessionError, "misconfiguration! stale time (#{connection_stale_time}) should be less than expire time (#{connection_expire_time})"
end
[heartbeat_interval, connection_stale_time, connection_expire_time]
end
def handle_login(login, client)
username = login.username if login.value_for_tag(1)
password = login.password if login.value_for_tag(2)
token = login.token if login.value_for_tag(3)
client_id = login.client_id if login.value_for_tag(4)
reconnect_music_session_id = login.reconnect_music_session_id if login.value_for_tag(5)
client_type = login.client_type if login.value_for_tag(6)
@log.info("*** handle_login: token=#{token}; client_id=#{client_id}, client_type=#{client_type}")
reconnected = false
# you don't have to supply client_id in login--if you don't, we'll generate one
if client_id.nil? || client_id.empty?
# give a unique ID to this client.
client_id = UUIDTools::UUID.random_create.to_s
end
user = valid_login(username, password, token, client_id)
# kill any websocket connections that have this same client_id, which can happen in race conditions
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
existing_context = @client_lookup[client_id]
if existing_context
# in some reconnect scenarios, we may have in memory a websocket client still.
@log.info "duplicate client: #{existing_context}"
Diagnostic.duplicate_client(existing_context.user, existing_context) if existing_context.client.connected
cleanup_client(existing_context.client)
end
connection = JamRuby::Connection.find_by_client_id(client_id)
# if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection
# because it will recreate a new connection lower down
if connection && user && connection.user != user
@log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}")
connection.delete
connection = nil
end
client.client_id = client_id
client.user_id = user.id if user
remote_ip = extract_ip(client)
if user
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(user, client_type)
@log.debug "logged in #{user} with client_id: #{client_id}"
# check if there's a connection for the client... if it's stale, reconnect it
unless connection.nil?
# FIXME: I think connection table needs to updated within connection_manager
# otherwise this would be 1 line of code (connection.connect!)
music_session_upon_reentry = connection.music_session
send_depart = false
recording_id = nil
ConnectionManager.active_record_transaction do |connection_manager|
music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time)
if music_session_id.nil?
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
# if so, then we need to tell the others in the session that this user is now departed
unless music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed?
recording = music_session_upon_reentry.stop_recording
recording_id = recording.id unless recording.nil?
music_session_upon_reentry.with_lock do # VRFS-1297
music_session_upon_reentry.tick_track_changes
end
send_depart = true
end
else
music_session = MusicSession.find_by_id(music_session_id)
Notification.send_musician_session_fresh(music_session, client.client_id, user)
end
end
if send_depart
Notification.send_session_depart(music_session_upon_reentry, client.client_id, user, recording_id)
end
end
# respond with LOGIN_ACK to let client know it was successful
@semaphore.synchronize do
# add a tracker for this user
context = ClientContext.new(user, client, client_type)
@clients[client] = context
add_user(context)
add_client(client_id, context)
@log.debug "logged in context created: #{context}"
unless connection
# log this connection in the database
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.create_connection(user.id, client.client_id, remote_ip, client_type, connection_stale_time, connection_expire_time) do |conn, count|
if count == 1
Notification.send_friend_update(user.id, true, conn)
end
end
end
end
login_ack = @message_factory.login_ack(remote_ip,
client_id,
user.remember_token,
heartbeat_interval,
connection.try(:music_session_id),
reconnected,
user.id,
connection_expire_time)
send_to_client(client, login_ack)
end
else
raise SessionError, 'invalid login'
end
end
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
unless context = @clients[client]
@log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session"
Diagnostic.missing_client_state(client.user_id, client.context)
raise SessionError, 'context state is gone. please reconnect.'
else
connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id)
track_changes_counter = nil
if connection.nil?
@log.warn "*** WARNING: unable to find connection when handling heartbeat. context= #{context}; killing session"
Diagnostic.missing_connection(client.user_id, client.context)
raise SessionError, 'connection state is gone. please reconnect.'
else
Connection.transaction do
# send back track_changes_counter if in a session
if connection.music_session_id
music_session = MusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
track_changes_counter = music_session.track_changes_counter if music_session
end
# update connection updated_at
connection.touch
# update user's notification_seen_at field if the heartbeat indicates it saw one
# first we try to use the notification id, which should usually exist.
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
update_notification_seen_at(connection, context, heartbeat)
end
ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, connection.music_session_id, nil, connection_stale_time, connection_expire_time)
end if connection.stale?
end
heartbeat_ack = @message_factory.heartbeat_ack(track_changes_counter)
send_to_client(client, heartbeat_ack)
# send errors to clients in response to heartbeats if rabbitmq is down
if !@amqp_connection_manager.connected?
error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down")
context.sent_bad_state_previously = true
send_to_client(client, error_msg)
return
elsif context.sent_bad_state_previously
context.sent_bad_state_previously = false
recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id)
send_to_client(client, recovery_msg)
end
end
end
def update_notification_seen_at(connection, context, heartbeat)
notification_id_field = heartbeat.notification_seen if heartbeat.value_for_tag(1)
if notification_id_field
notification = Notification.find_by_id(notification_id_field)
if notification
connection.user.notification_seen_at = notification.created_at
unless connection.user.save(validate: false)
@log.error "unable to update notification_seen_at via id field for client #{context}. errors: #{connection.user.errors.inspect}"
end
else
notification_seen_at_parsed = nil
notification_seen_at = heartbeat.notification_seen_at if heartbeat.value_for_tag(2)
begin
notification_seen_at_parsed = Time.parse(notification_seen_at) if notification_seen_at && notification_seen_at.length > 0
rescue Exception => e
@log.error "unable to parse notification_seen_at in heartbeat from #{context}. notification_seen_at: #{notification_seen_at}"
end
if notification_seen_at_parsed
connection.user.notification_seen_at = notification_seen_at
unless connection.user.save(validate: false)
@log.error "unable to update notification_seen_at via time field for client #{context}. errors: #{connection.user.errors.inspect}"
end
end
end
end
end
def valid_login(username, password, token, client_id)
if !token.nil? && token != ''
@log.debug "logging in via token"
# attempt login with token
user = JamRuby::User.find_by_remember_token(token)
if user.nil?
@log.debug "no user found with token #{token}"
return nil
else
@log.debug "#{user} login via token"
return user
end
elsif !username.nil? and !password.nil?
@log.debug "logging in via user/pass '#{username}' '#{password}'"
# attempt login with username and password
user = User.find_by_email(username)
if !user.nil? && user.valid_password?(password)
@log.debug "#{user} login via password"
return user
else
@log.debug "#{username} login failure"
return nil
end
else
raise SessionError, 'no login data was found in Login message'
end
end
def access_music_session(music_session_id, user)
music_session = MusicSession.find_by_id(music_session_id)
if music_session.nil?
raise SessionError, 'specified session not found'
end
if !music_session.access? user
raise SessionError, 'not allowed to join the specified session'
end
return music_session
end
# client_id = the id of the client being accessed
# client = the current client
def access_p2p(client_id, user, msg)
return nil
# ping_request and ping_ack messages are special in that they are simply allowed
if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK
return nil
end
client_connection = Connection.find_by_client_id(client_id)
if client_connection.nil?
raise PermissionError, 'specified client not found'
end
if !client_connection.access_p2p? user
raise SessionError, 'not allowed to message this client'
end
end
def handle_client_directed(to_client_id, client_msg, client)
context = @clients[client]
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
# if for some reason the client is trying to send to a client that it doesn't
# belong to
access_p2p(to_client_id, context.user, client_msg)
if to_client_id.nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases
raise SessionError, "empty client_id specified in peer-to-peer message"
end
# populate routing data
client_msg.from = client.client_id
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}"
# put it on the topic exchange for clients
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
end
def handle_user_directed(user_id, client_msg, client)
@log.debug "publishing to user #{user_id} from client_id #{client.client_id}"
# put it on the topic exchange for users
@users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}")
end
def handle_session_directed(session_id, client_msg, client)
context = @clients[client]
user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id)
end
# sends a message to a session on behalf of a user
# if this is originating in the context of a client, it should be specified as :client_id => "value"
# client_msg should be a well-structure message (jam-pb message)
def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""})
music_session = access_music_session(music_session_id, user)
# gather up client_ids in the session
client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] }
publish_to_session(music_session.id, client_ids, client_msg.to_s, sender)
end
# sends a message to a session with no checking of permissions
# this method deliberately has no database interactivity/active_record objects
def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""})
EM.schedule do
sender_client_id = sender[:client_id]
# iterate over each person in the session, and send a p2p message
client_ids.each do |client_id|
@@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}"
# put it on the topic exchange3 for clients
self.class.client_exchange.publish(client_msg, :routing_key => "client.#{client_id}")
end
end
end
def extract_ip(client)
return Socket.unpack_sockaddr_in(client.get_peername)[1]
end
private
def sane_logging(&blk)
# used around repeated transactions that cause too much ActiveRecord::Base logging
begin
if @ar_base_logger
original_level = @ar_base_logger.level
@ar_base_logger.level = :info
end
blk.call
ensure
if @ar_base_logger
@ar_base_logger.level = original_level
end
end
end
end
end