* VRFS-3973 - websocket gateway to allow jamblasters to connect with no connection info
This commit is contained in:
parent
8c805f0378
commit
0a3d1016a1
|
|
@ -338,3 +338,4 @@ jamblaster_v2.sql
|
|||
acapella_rename.sql
|
||||
jamblaster_pairing_active.sql
|
||||
email_blacklist.sql
|
||||
jamblaster_connection.sql
|
||||
|
|
@ -0,0 +1 @@
|
|||
ALTER TABLE connections ADD COLUMN is_jamblaster BOOLEAN DEFAULT FALSE;
|
||||
|
|
@ -13,6 +13,7 @@ message ClientMessage {
|
|||
LOGIN_ACK = 105;
|
||||
LOGOUT = 106;
|
||||
LOGOUT_ACK = 107;
|
||||
CONNECT_ACK = 108;
|
||||
LOGIN_MUSIC_SESSION = 110;
|
||||
LOGIN_MUSIC_SESSION_ACK = 115;
|
||||
LEAVE_MUSIC_SESSION = 120;
|
||||
|
|
@ -130,6 +131,7 @@ message ClientMessage {
|
|||
optional LoginAck login_ack = 105; // from server
|
||||
optional Logout logout = 106; // to server
|
||||
optional LogoutAck logout_ack = 107; // from server
|
||||
optional ConnectAck connect_ack = 108; // from server
|
||||
optional LoginMusicSession login_music_session = 110; // to server
|
||||
optional LoginMusicSessionAck login_music_session_ack = 115; // from server
|
||||
optional LeaveMusicSession leave_music_session = 120;
|
||||
|
|
@ -267,6 +269,13 @@ message LoginAck {
|
|||
optional ClientUpdate client_update = 9;
|
||||
}
|
||||
|
||||
message ConnectAck {
|
||||
optional string public_ip = 1;
|
||||
optional string client_id = 2; // a new client_id if none is supplied in Login, or just the original client_id echoed back
|
||||
optional int32 heartbeat_interval = 3; // set your heartbeat interval to this value
|
||||
optional int32 connection_expire_time = 4; // this is how long the server gives you before killing your connection entirely after missing heartbeats
|
||||
optional ClientUpdate client_update = 5;
|
||||
}
|
||||
|
||||
// route_to: server
|
||||
// a logout ack is always sent
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ SQL
|
|||
# this number is used by notification logic elsewhere to know
|
||||
# 'oh the user joined for the 1st time, so send a friend update', or
|
||||
# 'don't bother because the user has connected somewhere else already'
|
||||
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, gateway, &blk)
|
||||
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, gateway, is_jamblaster, &blk)
|
||||
|
||||
# validate client_type
|
||||
raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser'
|
||||
|
|
@ -214,15 +214,18 @@ SQL
|
|||
|
||||
lock_connections(conn)
|
||||
|
||||
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable, gateway) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
|
||||
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable, gateway]).clear
|
||||
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable, gateway, is_jamblaster) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
|
||||
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable, gateway, is_jamblaster]).clear
|
||||
|
||||
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
||||
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
||||
count = result.getvalue(0, 0).to_i
|
||||
# we're passing all this stuff so that the user record might be updated as well...
|
||||
blk.call(conn, count) unless blk.nil?
|
||||
if user_id
|
||||
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
||||
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
||||
count = result.getvalue(0, 0).to_i
|
||||
# we're passing all this stuff so that the user record might be updated as well...
|
||||
blk.call(conn, count) unless blk.nil?
|
||||
end
|
||||
end
|
||||
|
||||
return count
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -96,6 +96,43 @@ module JamRuby
|
|||
)
|
||||
end
|
||||
|
||||
# create a login ack (login was successful)
|
||||
def logout_ack()
|
||||
|
||||
logout_ack = Jampb::LogoutAck.new()
|
||||
|
||||
Jampb::ClientMessage.new(
|
||||
:type => ClientMessage::Type::LOGOUT_ACK,
|
||||
:route_to => CLIENT_TARGET,
|
||||
:logout_ack => logout_ack
|
||||
)
|
||||
end
|
||||
|
||||
|
||||
# create a login ack (login was successful)
|
||||
def connect_ack(public_ip, client_id,heartbeat_interval, connection_expire_time, client_update_data = nil)
|
||||
client_update = Jampb::ClientUpdate.new(
|
||||
product: client_update_data[:product],
|
||||
version: client_update_data[:version],
|
||||
uri: client_update_data[:uri],
|
||||
size: client_update_data[:size]
|
||||
) if client_update_data
|
||||
|
||||
connect_ack = Jampb::ConnectAck.new(
|
||||
:public_ip => public_ip,
|
||||
:client_id => client_id,
|
||||
:heartbeat_interval => heartbeat_interval,
|
||||
:connection_expire_time => connection_expire_time,
|
||||
:client_update => client_update
|
||||
)
|
||||
|
||||
Jampb::ClientMessage.new(
|
||||
:type => ClientMessage::Type::CONNECT_ACK,
|
||||
:route_to => CLIENT_TARGET,
|
||||
:connect_ack => connect_ack
|
||||
)
|
||||
end
|
||||
|
||||
def download_available
|
||||
download_available = Jampb::DownloadAvailable.new
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
module JamRuby
|
||||
class Jamblaster < ActiveRecord::Base
|
||||
|
||||
attr_accessible :user_id, :serial_no, :client_id, :vtoken, :user_ids, as: :admin
|
||||
attr_accessible :user_id, :serial_no, :client_id, :user_ids, as: :admin
|
||||
|
||||
|
||||
belongs_to :user, class_name: 'JamRuby::User'
|
||||
|
|
@ -17,7 +17,6 @@ module JamRuby
|
|||
before_save :sanitize_active_admin
|
||||
|
||||
def sanitize_active_admin
|
||||
self.vtoken = nil if self.vtoken == ''
|
||||
self.client_id = nil if self.client_id == ''
|
||||
self.serial_no = nil if self.serial_no == ''
|
||||
end
|
||||
|
|
|
|||
|
|
@ -50,8 +50,8 @@ describe ConnectionManager, no_transaction: true do
|
|||
user.save!
|
||||
user = nil
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
expect { @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) }.to raise_error(PG::Error)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
expect { @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false) }.to raise_error(PG::Error)
|
||||
end
|
||||
|
||||
it "create connection then delete it" do
|
||||
|
|
@ -60,7 +60,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
#user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
user = FactoryGirl.create(:user)
|
||||
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
count.should == 1
|
||||
|
||||
|
|
@ -90,7 +90,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
#user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
user = FactoryGirl.create(:user)
|
||||
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
count.should == 1
|
||||
|
||||
|
|
@ -107,7 +107,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
cc.locidispid.should == 17192000002
|
||||
cc.udp_reachable.should == true
|
||||
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, false, GATEWAY)
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, false, GATEWAY, false)
|
||||
|
||||
cc = Connection.find_by_client_id!(client_id)
|
||||
cc.connected?.should be_true
|
||||
|
|
@ -130,7 +130,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
#user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
user = FactoryGirl.create(:user)
|
||||
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, false, GATEWAY)
|
||||
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, false, GATEWAY, false)
|
||||
|
||||
count.should == 1
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
cc.locidispid.should == 17192000002
|
||||
cc.udp_reachable.should == false
|
||||
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, nil, GATEWAY) # heartbeat passes nil in for udp_reachable
|
||||
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, nil, GATEWAY, false) # heartbeat passes nil in for udp_reachable
|
||||
|
||||
cc = Connection.find_by_client_id!(client_id)
|
||||
cc.connected?.should be_true
|
||||
|
|
@ -261,7 +261,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
it "flag stale connection" do
|
||||
client_id = "client_id8"
|
||||
user_id = create_user("test", "user8", "user8@jamkazam.com")
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
num = JamRuby::Connection.count(:conditions => ['aasm_state = ?','connected'])
|
||||
num.should == 1
|
||||
|
|
@ -302,7 +302,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
it "expires stale connection" do
|
||||
client_id = "client_id8"
|
||||
user_id = create_user("test", "user8", "user8@jamkazam.com")
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
conn = Connection.find_by_client_id(client_id)
|
||||
set_updated_at(conn, Time.now - STALE_BUT_NOT_EXPIRED)
|
||||
|
|
@ -328,7 +328,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
|
||||
connection.errors.any?.should be_false
|
||||
|
|
@ -364,8 +364,8 @@ describe ConnectionManager, no_transaction: true do
|
|||
client_id2 = "client_id10.12"
|
||||
user_id = create_user("test", "user10.11", "user10.11@jamkazam.com", :musician => true)
|
||||
user_id2 = create_user("test", "user10.12", "user10.12@jamkazam.com", :musician => false)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
@connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
music_session = FactoryGirl.create(:active_music_session, user_id: user_id)
|
||||
music_session_id = music_session.id
|
||||
|
|
@ -384,7 +384,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
client_id = "client_id10.2"
|
||||
|
||||
user_id = create_user("test", "user10.2", "user10.2@jamkazam.com")
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
music_session = FactoryGirl.create(:active_music_session, user_id: user_id)
|
||||
user = User.find(user_id)
|
||||
|
|
@ -400,8 +400,8 @@ describe ConnectionManager, no_transaction: true do
|
|||
fan_client_id = "client_id10.4"
|
||||
musician_id = create_user("test", "user10.3", "user10.3@jamkazam.com")
|
||||
fan_id = create_user("test", "user10.4", "user10.4@jamkazam.com", :musician => false)
|
||||
@connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
@connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
music_session = FactoryGirl.create(:active_music_session, :fan_access => false, user_id: musician_id)
|
||||
music_session_id = music_session.id
|
||||
|
|
@ -425,7 +425,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id2)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
# specify real user id, but not associated with this session
|
||||
expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound)
|
||||
end
|
||||
|
|
@ -437,7 +437,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
user = User.find(user_id)
|
||||
music_session = ActiveMusicSession.new
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
connection.errors.size.should == 1
|
||||
connection.errors.get(:music_session).should == [ValidationMessages::MUSIC_SESSION_MUST_BE_SPECIFIED]
|
||||
|
|
@ -451,7 +451,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id2)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
# specify real user id, but not associated with this session
|
||||
expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound)
|
||||
end
|
||||
|
|
@ -465,7 +465,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
user = User.find(user_id)
|
||||
dummy_music_session = ActiveMusicSession.new
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
|
||||
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
end
|
||||
|
|
@ -480,7 +480,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
|
||||
dummy_music_session = ActiveMusicSession.new
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
@connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
end
|
||||
|
|
@ -493,7 +493,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
music_session_id = music_session.id
|
||||
user = User.find(user_id)
|
||||
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
@connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
|
||||
|
||||
assert_session_exists(music_session_id, true)
|
||||
|
|
@ -536,7 +536,7 @@ describe ConnectionManager, no_transaction: true do
|
|||
user = User.find(user_id)
|
||||
|
||||
client_id1 = Faker::Number.number(20)
|
||||
@connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
|
||||
@connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY, false)
|
||||
music_session1 = FactoryGirl.create(:active_music_session, :user_id => user_id)
|
||||
connection1 = @connman.join_music_session(user, client_id1, music_session1, true, TRACKS, 10)
|
||||
connection1.errors.size.should == 0
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ include Jampb
|
|||
module EventMachine
|
||||
module WebSocket
|
||||
class Connection < EventMachine::Connection
|
||||
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions # client_id is uuid we give to each client to track them as we like
|
||||
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions, :x_forwarded_for, :query, :is_jamblaster # client_id is uuid we give to each client to track them as we like
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -36,7 +36,7 @@ module JamWebsockets
|
|||
:profile_it_sums,
|
||||
:highest_drift,
|
||||
:heartbeat_tracker
|
||||
:temp_ban
|
||||
:temp_ban
|
||||
|
||||
|
||||
def initialize()
|
||||
|
|
@ -86,17 +86,17 @@ module JamWebsockets
|
|||
|
||||
@log.info "startup"
|
||||
|
||||
@heartbeat_interval_client = connect_time_stale_client / 2
|
||||
@connect_time_stale_client = connect_time_stale_client
|
||||
@connect_time_expire_client = connect_time_expire_client
|
||||
@heartbeat_interval_browser = connect_time_stale_browser / 2
|
||||
@connect_time_stale_browser = connect_time_stale_browser
|
||||
@connect_time_expire_browser = connect_time_expire_browser
|
||||
@max_connections_per_user = options[:max_connections_per_user]
|
||||
@gateway_name = options[:gateway]
|
||||
@allow_dynamic_registration = options[:allow_dynamic_registration]
|
||||
@chat_enabled = options[:chat_enabled]
|
||||
@chat_blast = options[:chat_blast]
|
||||
@heartbeat_interval_client = connect_time_stale_client / 2
|
||||
@connect_time_stale_client = connect_time_stale_client
|
||||
@connect_time_expire_client = connect_time_expire_client
|
||||
@heartbeat_interval_browser = connect_time_stale_browser / 2
|
||||
@connect_time_stale_browser = connect_time_stale_browser
|
||||
@connect_time_expire_browser = connect_time_expire_browser
|
||||
@max_connections_per_user = options[:max_connections_per_user]
|
||||
@gateway_name = options[:gateway]
|
||||
@allow_dynamic_registration = options[:allow_dynamic_registration]
|
||||
@chat_enabled = options[:chat_enabled]
|
||||
@chat_blast = options[:chat_blast]
|
||||
|
||||
# determine the maximum amount of heartbeats we should get per user
|
||||
@maximum_minutely_heartbeat_rate_client = ((@heartbeat_interval_client / 60.0) * 2).ceil + 3
|
||||
|
|
@ -129,7 +129,7 @@ module JamWebsockets
|
|||
|
||||
if deleted.nil?
|
||||
@log.warn "unable to delete #{client_id} from client_lookup because it's already gone"
|
||||
else
|
||||
else
|
||||
@log.debug "cleaned up @client_lookup for #{client_id}"
|
||||
end
|
||||
|
||||
|
|
@ -146,17 +146,19 @@ module JamWebsockets
|
|||
end
|
||||
|
||||
def remove_user(client_context)
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
if client_context.user
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
|
||||
if user_contexts.nil?
|
||||
@log.warn "user can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of user contexts
|
||||
user_contexts.delete(client_context.client)
|
||||
if user_contexts.nil?
|
||||
@log.warn "user can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of user contexts
|
||||
user_contexts.delete(client_context.client)
|
||||
|
||||
# if last user context, delete entire set (memory leak concern)
|
||||
if user_contexts.length == 0
|
||||
@user_context_lookup.delete(client_context.user.id)
|
||||
# if last user context, delete entire set (memory leak concern)
|
||||
if user_contexts.length == 0
|
||||
@user_context_lookup.delete(client_context.user.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -176,34 +178,34 @@ module JamWebsockets
|
|||
# subscribe for any messages to users
|
||||
@user_topic.subscribe(:ack => false) do |headers, msg|
|
||||
time_it('user_topic') {
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
user_id = routing_key["user.".length..-1]
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
user_id = routing_key["user.".length..-1]
|
||||
|
||||
@semaphore.synchronize do
|
||||
contexts = @user_context_lookup[user_id]
|
||||
@semaphore.synchronize do
|
||||
contexts = @user_context_lookup[user_id]
|
||||
|
||||
if !contexts.nil?
|
||||
if !contexts.nil?
|
||||
|
||||
@log.debug "received user-directed message for user: #{user_id}"
|
||||
@log.debug "received user-directed message for user: #{user_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
contexts.each do |client_id, context|
|
||||
EM.schedule do
|
||||
@log.debug "sending user message to #{context}"
|
||||
send_to_client(context.client, msg)
|
||||
contexts.each do |client_id, context|
|
||||
EM.schedule do
|
||||
@log.debug "sending user message to #{context}"
|
||||
send_to_client(context.client, msg)
|
||||
end
|
||||
end
|
||||
else
|
||||
#@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty
|
||||
end
|
||||
else
|
||||
#@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty
|
||||
end
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client"
|
||||
@log.error e
|
||||
end
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client"
|
||||
@log.error e
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
|
|
@ -220,74 +222,74 @@ module JamWebsockets
|
|||
# subscribe for any p2p messages to a client
|
||||
@client_topic.subscribe(:ack => false) do |headers, msg|
|
||||
time_it('p2p_topic') {
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
client_id = routing_key["client.".length..-1]
|
||||
@semaphore.synchronize do
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
client_id = routing_key["client.".length..-1]
|
||||
@semaphore.synchronize do
|
||||
|
||||
if client_id == MessageFactory::ALL_NATIVE_CLIENTS
|
||||
if client_id == MessageFactory::ALL_NATIVE_CLIENTS
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
@log.debug "client-directed message received from #{msg.from} to all clients"
|
||||
@client_lookup.each do |client_id, client_context|
|
||||
if client_context.client_type == JamRuby::Connection::TYPE_CLIENT
|
||||
client = client_context.client
|
||||
|
||||
if client
|
||||
EM.schedule do
|
||||
@log.debug "sending client-directed down websocket to #{client_id}"
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
elsif client_id == MessageFactory::ALL_ACTIVE_CLIENTS
|
||||
if @chat_enabled
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
@log.debug "client-directed message received from #{msg.from} to all chat clients"
|
||||
@log.debug "client-directed message received from #{msg.from} to all clients"
|
||||
@client_lookup.each do |client_id, client_context|
|
||||
if @chat_blast || client_context.active
|
||||
if client_context.client_type == JamRuby::Connection::TYPE_CLIENT
|
||||
client = client_context.client
|
||||
|
||||
if client
|
||||
EM.schedule do
|
||||
#@log.debug "sending client-directed down websocket to #{client_id}"
|
||||
@log.debug "sending client-directed down websocket to #{client_id}"
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
else
|
||||
client_context = @client_lookup[client_id]
|
||||
elsif client_id == MessageFactory::ALL_ACTIVE_CLIENTS
|
||||
if @chat_enabled
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
@log.debug "client-directed message received from #{msg.from} to all chat clients"
|
||||
@client_lookup.each do |client_id, client_context|
|
||||
if @chat_blast || client_context.active
|
||||
client = client_context.client
|
||||
|
||||
if client_context
|
||||
|
||||
client = client_context.client
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
@log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
|
||||
unless client.nil?
|
||||
|
||||
EM.schedule do
|
||||
@log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
send_to_client(client, msg)
|
||||
if client
|
||||
EM.schedule do
|
||||
#@log.debug "sending client-directed down websocket to #{client_id}"
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
else
|
||||
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
|
||||
end
|
||||
else
|
||||
#@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios
|
||||
end
|
||||
end
|
||||
client_context = @client_lookup[client_id]
|
||||
|
||||
if client_context
|
||||
|
||||
client = client_context.client
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
@log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
|
||||
unless client.nil?
|
||||
|
||||
EM.schedule do
|
||||
@log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
else
|
||||
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
|
||||
end
|
||||
else
|
||||
#@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client"
|
||||
@log.error e
|
||||
end
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client"
|
||||
@log.error e
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
|
|
@ -302,30 +304,30 @@ module JamWebsockets
|
|||
# subscribe for any p2p messages to a client
|
||||
@subscription_topic.subscribe(:ack => false) do |headers, msg|
|
||||
time_it('subscribe_topic') {
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
type_and_id = routing_key["subscription.".length..-1]
|
||||
#type, id = type_and_id.split('.')
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
type_and_id = routing_key["subscription.".length..-1]
|
||||
#type, id = type_and_id.split('.')
|
||||
|
||||
@semaphore.synchronize do
|
||||
@semaphore.synchronize do
|
||||
|
||||
clients = @subscription_lookup[type_and_id]
|
||||
clients = @subscription_lookup[type_and_id]
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
if clients
|
||||
EM.schedule do
|
||||
clients.each do |client|
|
||||
@log.debug "subscription msg to client #{client.client_id}"
|
||||
send_to_client(client, msg)
|
||||
if clients
|
||||
EM.schedule do
|
||||
clients.each do |client|
|
||||
@log.debug "subscription msg to client #{client.client_id}"
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client for mount"
|
||||
@log.error e
|
||||
end
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client for mount"
|
||||
@log.error e
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
|
|
@ -335,7 +337,7 @@ module JamWebsockets
|
|||
# listens on a subscription topic on the behalf of a client
|
||||
def register_subscription(client, type, id)
|
||||
# track subscriptions that this client has made, for disconnect scenarios
|
||||
client.subscriptions.add({type:type, id: id})
|
||||
client.subscriptions.add({type: type, id: id})
|
||||
|
||||
key = "#{type}.#{id}"
|
||||
|
||||
|
|
@ -363,7 +365,7 @@ module JamWebsockets
|
|||
# called automatically when a clean disconnects, to keep state clean.
|
||||
def unregister_subscription(client, type, id)
|
||||
# remove subscription from this client's list of subscriptions
|
||||
client.subscriptions.delete({type:type, id:id})
|
||||
client.subscriptions.delete({type: type, id: id})
|
||||
|
||||
key = "#{type}.#{id}"
|
||||
# for a given mount_id in @subscription_lookup, remove from list of clients listening
|
||||
|
|
@ -444,7 +446,9 @@ module JamWebsockets
|
|||
end
|
||||
|
||||
websocket_comm(client, nil) do
|
||||
handle_login(client, handshake.query, handshake.headers["X-Forwarded-For"])
|
||||
client.x_forwarded_for = handshake.headers["X-Forwarded-For"]
|
||||
client.query = handshake.query
|
||||
handle_login(client, client.query, client.x_forwarded_for)
|
||||
end
|
||||
}
|
||||
}
|
||||
|
|
@ -554,9 +558,16 @@ module JamWebsockets
|
|||
raise SessionError, 'client_msg.route_to is null'
|
||||
end
|
||||
|
||||
if !client.user_id and client_msg.type != ClientMessage::Type::LOGIN
|
||||
if !client.user_id && (client_msg.type != ClientMessage::Type::LOGIN && client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::LOGOUT)
|
||||
# this client has not logged in and is trying to send a non-login message
|
||||
raise SessionError, "must 'Login' first"
|
||||
|
||||
if client.is_jamblaster
|
||||
# send message back to client intsead of doing nothing?
|
||||
@log.debug("jamblaster sent message #{message_type} at wrong time")
|
||||
else
|
||||
raise SessionError, "must 'Login' first"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
if @message_factory.server_directed? client_msg
|
||||
|
|
@ -584,9 +595,14 @@ module JamWebsockets
|
|||
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
|
||||
|
||||
if client_msg.type == ClientMessage::Type::LOGIN
|
||||
|
||||
time_it('login') { handle_login(client_msg.login, client) }
|
||||
|
||||
# this is curently only a jamblaster path
|
||||
client.query["token"] = client_msg.login.token
|
||||
client.query["username"] = client_msg.login.username
|
||||
client.query["password"] = client_msg.login.password
|
||||
time_it('login') { handle_login(client, client.query, client.x_forwarded_for, false) }
|
||||
elsif client_msg.type == ClientMessage::Type::LOGOUT
|
||||
# this is currently only a jamblaster path
|
||||
time_it('login') { handle_logout(client) }
|
||||
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
|
||||
time_it('heartbeat') { sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } }
|
||||
elsif client_msg.type == ClientMessage::Type::USER_STATUS
|
||||
|
|
@ -607,21 +623,21 @@ module JamWebsockets
|
|||
|
||||
if client_type == Connection::TYPE_BROWSER
|
||||
default_heartbeat = @heartbeat_interval_browser
|
||||
default_stale = @connect_time_stale_browser
|
||||
default_expire = @connect_time_expire_browser
|
||||
default_stale = @connect_time_stale_browser
|
||||
default_expire = @connect_time_expire_browser
|
||||
else
|
||||
default_heartbeat = @heartbeat_interval_client
|
||||
default_stale = @connect_time_stale_client
|
||||
default_expire = @connect_time_expire_client
|
||||
default_stale = @connect_time_stale_client
|
||||
default_expire = @connect_time_expire_client
|
||||
end
|
||||
|
||||
heartbeat_interval = (user && user.heartbeat_interval_client) || default_heartbeat
|
||||
heartbeat_interval = heartbeat_interval.to_i
|
||||
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
|
||||
connection_expire_time = (user && user.connection_expire_time_client) || default_expire
|
||||
connection_expire_time = connection_expire_time.to_i
|
||||
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
|
||||
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
|
||||
heartbeat_interval = (user && user.heartbeat_interval_client) || default_heartbeat
|
||||
heartbeat_interval = heartbeat_interval.to_i
|
||||
heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config
|
||||
connection_expire_time = (user && user.connection_expire_time_client) || default_expire
|
||||
connection_expire_time = connection_expire_time.to_i
|
||||
connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config
|
||||
connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now
|
||||
|
||||
if heartbeat_interval >= connection_stale_time
|
||||
raise SessionError, "misconfiguration! heartbeat_interval (#{heartbeat_interval}) should be less than stale time (#{connection_stale_time})"
|
||||
|
|
@ -635,10 +651,16 @@ module JamWebsockets
|
|||
|
||||
def add_tracker(user, client, client_type, client_id)
|
||||
# add a tracker for this user
|
||||
context = ClientContext.new(user, client, client_type)
|
||||
@clients[client] = context
|
||||
add_user(context)
|
||||
add_client(client_id, context)
|
||||
context = @clients[client]
|
||||
if context
|
||||
context.user = user
|
||||
add_user(context) if user
|
||||
else
|
||||
context = ClientContext.new(user, client, client_type)
|
||||
@clients[client] = context
|
||||
add_user(context) if user
|
||||
add_client(client_id, context)
|
||||
end
|
||||
context
|
||||
end
|
||||
|
||||
|
|
@ -683,7 +705,27 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def handle_login(client, options, override_ip = nil)
|
||||
def handle_logout(client)
|
||||
connection = Connection.find_by_client_id(client.client_id)
|
||||
|
||||
if connection
|
||||
connection.delete
|
||||
end
|
||||
|
||||
client.user_id = nil
|
||||
context = client.context
|
||||
if context
|
||||
@log.debug("will remove context with user: #{context.user}")
|
||||
remove_user(context)
|
||||
context.user = nil
|
||||
end
|
||||
|
||||
logout_ack = @message_factory.logout_ack()
|
||||
|
||||
send_to_client(client, logout_ack)
|
||||
end
|
||||
|
||||
def handle_login(client, options, override_ip = nil, connecting = true)
|
||||
username = options["username"]
|
||||
password = options["password"]
|
||||
token = options["token"]
|
||||
|
|
@ -691,10 +733,15 @@ module JamWebsockets
|
|||
reconnect_music_session_id = options["music_session_id"]
|
||||
client_type = options["client_type"]
|
||||
os = options["os"]
|
||||
udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true'
|
||||
udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true'
|
||||
jamblaster_serial_no = options["jamblaster_serial_no"]
|
||||
|
||||
client.subscriptions = Set.new# list of subscriptions that this client is watching in real-time
|
||||
# TESTING
|
||||
#if jamblaster_serial_no.nil?
|
||||
# jamblaster_serial_no = 'hi'
|
||||
#end
|
||||
|
||||
client.subscriptions = Set.new # list of subscriptions that this client is watching in real-time
|
||||
|
||||
@log.info("handle_login: client_type=#{client_type} token=#{token} client_id=#{client_id} channel_id=#{client.channel_id} udp_reachable=#{udp_reachable}")
|
||||
|
||||
|
|
@ -705,22 +752,32 @@ module JamWebsockets
|
|||
|
||||
reconnected = false
|
||||
|
||||
# you don't have to supply client_id in login--if you don't, we'll generate one
|
||||
if client_id.nil? || client_id.empty?
|
||||
# give a unique ID to this client.
|
||||
client_id = UUIDTools::UUID.random_create.to_s
|
||||
if connecting
|
||||
# you don't have to supply client_id in login--if you don't, we'll generate one
|
||||
if client_id.nil? || client_id.empty?
|
||||
# give a unique ID to this client.
|
||||
client_id = UUIDTools::UUID.random_create.to_s
|
||||
end
|
||||
else
|
||||
# client_id's don't change per websocket connection; so use the one from memeory
|
||||
client_id = client.client_id
|
||||
end
|
||||
|
||||
|
||||
|
||||
# we have to deal with jamblaster before login
|
||||
if jamblaster_serial_no && jamblaster_serial_no != ''
|
||||
jamblaster = Jamblaster.find_by_serial_no(jamblaster_serial_no)
|
||||
if jamblaster
|
||||
client.is_jamblaster = true
|
||||
end
|
||||
if jamblaster && connecting
|
||||
jamblaster.client_id = client_id
|
||||
jamblaster.save
|
||||
end
|
||||
end
|
||||
|
||||
user = valid_login(username, password, token, client_id)
|
||||
user = valid_login(username, password, token, client_id, jamblaster)
|
||||
|
||||
# protect against this user swamping the server
|
||||
if user && Connection.where(user_id: user.id).count >= @max_connections_per_user
|
||||
|
|
@ -729,27 +786,45 @@ module JamWebsockets
|
|||
raise SessionError, 'max_user_connections', 'max_user_connections'
|
||||
end
|
||||
|
||||
# XXX This logic needs to instead be handled by a broadcast out to all websockets indicating dup
|
||||
# kill any websocket connections that have this same client_id, which can happen in race conditions
|
||||
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
|
||||
existing_context = @client_lookup[client_id]
|
||||
if existing_context
|
||||
# in some reconnect scenarios, we may have in memory a websocket client still.
|
||||
# let's whack it, and tell the other client, if still connected, that this is a duplicate login attempt
|
||||
@log.info "duplicate client: #{existing_context}"
|
||||
Diagnostic.duplicate_client(existing_context.user, existing_context)
|
||||
error_msg = @message_factory.server_duplicate_client_error
|
||||
send_to_client(existing_context.client, error_msg)
|
||||
cleanup_client(existing_context.client)
|
||||
if connecting
|
||||
# XXX This logic needs to instead be handled by a broadcast out to all websockets indicating dup
|
||||
# kill any websocket connections that have this same client_id, which can happen in race conditions
|
||||
# this code must happen here, before we go any further, so that there is only one websocket connection per client_id
|
||||
existing_context = @client_lookup[client_id]
|
||||
if existing_context
|
||||
# in some reconnect scenarios, we may have in memory a websocket client still.
|
||||
# let's whack it, and tell the other client, if still connected, that this is a duplicate login attempt
|
||||
@log.info "duplicate client: #{existing_context}"
|
||||
Diagnostic.duplicate_client(existing_context.user, existing_context)
|
||||
error_msg = @message_factory.server_duplicate_client_error
|
||||
send_to_client(existing_context.client, error_msg)
|
||||
cleanup_client(existing_context.client)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
connection = Connection.find_by_client_id(client_id)
|
||||
# if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection
|
||||
# because it will recreate a new connection lower down
|
||||
if connection && user && connection.user != user
|
||||
@log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}")
|
||||
if connection && connection.user != user
|
||||
keep = false
|
||||
if user
|
||||
if connection.user.nil?
|
||||
keep = true
|
||||
@log.debug("user #{user.email} logged into #{client_id}")
|
||||
connection.user = user
|
||||
connection.save
|
||||
else
|
||||
@log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}")
|
||||
end
|
||||
else
|
||||
@log.debug("user-less connection #{client_id} took from user #{connection.user.email}")
|
||||
end
|
||||
|
||||
if !keep
|
||||
connection.delete
|
||||
connection = nil
|
||||
end
|
||||
end
|
||||
|
||||
client.client_id = client_id
|
||||
|
|
@ -763,7 +838,7 @@ module JamWebsockets
|
|||
@log.debug "logged in #{user} with client_id: #{client_id}"
|
||||
|
||||
# check if there's a connection for the client... if it's stale, reconnect it
|
||||
unless connection.nil?
|
||||
if !connection.nil? && connecting
|
||||
# FIXME: I think connection table needs to updated within connection_manager
|
||||
# otherwise this would be 1 line of code (connection.connect!)
|
||||
|
||||
|
|
@ -831,42 +906,78 @@ module JamWebsockets
|
|||
end
|
||||
|
||||
# respond with LOGIN_ACK to let client know it was successful
|
||||
@semaphore.synchronize do
|
||||
# add a tracker for this user
|
||||
context = add_tracker(user, client, client_type, client_id)
|
||||
|
||||
# add a tracker for this user
|
||||
context = add_tracker(user, client, client_type, client_id)
|
||||
@log.debug "logged in context created: #{context}"
|
||||
|
||||
@log.debug "logged in context created: #{context}"
|
||||
|
||||
unless connection
|
||||
# log this connection in the database
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) do |conn, count|
|
||||
user.update_addr_loc(Connection.find_by_client_id(client.client_id), User::JAM_REASON_LOGIN)
|
||||
if count == 1
|
||||
Notification.send_friend_update(user.id, true, conn)
|
||||
end
|
||||
if !connection
|
||||
# log this connection in the database
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name, jamblaster ? true : false) do |conn, count|
|
||||
user.update_addr_loc(Connection.find_by_client_id(client.client_id), User::JAM_REASON_LOGIN)
|
||||
if count == 1
|
||||
Notification.send_friend_update(user.id, true, conn)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# if we have OS data, try to grab client update data and let the client have it
|
||||
update = ArtifactUpdate.find_client_by_os(os) if client_type == Connection::TYPE_CLIENT && os
|
||||
|
||||
client_update = update.update_data if update
|
||||
|
||||
login_ack = @message_factory.login_ack(remote_ip,
|
||||
client_id,
|
||||
user.remember_token,
|
||||
heartbeat_interval,
|
||||
connection.try(:music_session_id),
|
||||
reconnected,
|
||||
user.id,
|
||||
connection_expire_time,
|
||||
client_update)
|
||||
stats_logged_in
|
||||
send_to_client(client, login_ack)
|
||||
end
|
||||
|
||||
# if we have OS data, try to grab client update data and let the client have it
|
||||
update = ArtifactUpdate.find_client_by_os(os) if client_type == Connection::TYPE_CLIENT && os
|
||||
|
||||
client_update = update.update_data if update
|
||||
|
||||
login_ack = @message_factory.login_ack(remote_ip,
|
||||
client_id,
|
||||
user.remember_token,
|
||||
heartbeat_interval,
|
||||
connection.try(:music_session_id),
|
||||
reconnected,
|
||||
user.id,
|
||||
connection_expire_time,
|
||||
client_update)
|
||||
stats_logged_in
|
||||
send_to_client(client, login_ack)
|
||||
|
||||
elsif jamblaster
|
||||
# if no user, but we have a jamblaster, we can allow this session to go through
|
||||
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(nil, client_type)
|
||||
|
||||
@log.debug "logged in jb::#{jamblaster.serial_no} with client_id: #{client_id}"
|
||||
|
||||
# check if there's a connection for the client... if it's stale, reconnect it
|
||||
if !connection.nil?
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name)
|
||||
end
|
||||
end
|
||||
|
||||
# add a tracker for this user
|
||||
context = add_tracker(user, client, client_type, client_id)
|
||||
|
||||
@log.debug "logged in context created: #{context}"
|
||||
|
||||
if !connection
|
||||
# log this connection in the database
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.create_connection(nil, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name, jamblaster ? true : false) do |conn, count|
|
||||
# this blk is not call
|
||||
end
|
||||
end
|
||||
end
|
||||
# if we have OS data, try to grab client update data and let the client have it
|
||||
update = ArtifactUpdate.find_client_by_os(os) if client_type == Connection::TYPE_CLIENT && os
|
||||
|
||||
client_update = update.update_data if update
|
||||
|
||||
connect_ack = @message_factory.connect_ack(remote_ip,
|
||||
client_id,
|
||||
heartbeat_interval,
|
||||
connection_expire_time,
|
||||
client_update)
|
||||
stats_logged_in
|
||||
send_to_client(client, connect_ack)
|
||||
else
|
||||
stats_logged_in_failed
|
||||
raise SessionError.new('invalid login', 'invalid_login')
|
||||
|
|
@ -959,27 +1070,27 @@ module JamWebsockets
|
|||
}
|
||||
else
|
||||
#profile_it('heartbeat_transaction') {
|
||||
#Connection.transaction do
|
||||
# send back track_changes_counter if in a session
|
||||
profile_it('heartbeat_session') {
|
||||
if connection.music_session_id
|
||||
music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
|
||||
track_changes_counter = music_session.track_changes_counter if music_session
|
||||
end
|
||||
}
|
||||
#Connection.transaction do
|
||||
# send back track_changes_counter if in a session
|
||||
profile_it('heartbeat_session') {
|
||||
if connection.music_session_id
|
||||
music_session = ActiveMusicSession.select(:track_changes_counter).find_by_id(connection.music_session_id)
|
||||
track_changes_counter = music_session.track_changes_counter if music_session
|
||||
end
|
||||
}
|
||||
|
||||
profile_it('heartbeat_touch') {
|
||||
# update connection updated_at and if the user is active
|
||||
Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now)
|
||||
}
|
||||
profile_it('heartbeat_touch') {
|
||||
# update connection updated_at and if the user is active
|
||||
Connection.where(id: connection.id).update_all(user_active: heartbeat.active, updated_at: Time.now)
|
||||
}
|
||||
|
||||
profile_it('heartbeat_notification') {
|
||||
# update user's notification_seen_at field if the heartbeat indicates it saw one
|
||||
# first we try to use the notification id, which should usually exist.
|
||||
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
|
||||
update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER
|
||||
}
|
||||
#end
|
||||
profile_it('heartbeat_notification') {
|
||||
# update user's notification_seen_at field if the heartbeat indicates it saw one
|
||||
# first we try to use the notification id, which should usually exist.
|
||||
# if not, then fallback to notification_seen_at, which is approximately the last time we saw a notification
|
||||
update_notification_seen_at(connection, context, heartbeat) if client.context.client_type != Connection::TYPE_LATENCY_TESTER
|
||||
}
|
||||
#end
|
||||
#}
|
||||
|
||||
profile_it('heartbeat_stale') {
|
||||
|
|
@ -1038,7 +1149,7 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def valid_login(username, password, token, client_id)
|
||||
def valid_login(username, password, token, client_id, jamblaster)
|
||||
|
||||
if !token.nil? && token != ''
|
||||
@log.debug "logging in via token"
|
||||
|
|
@ -1079,6 +1190,9 @@ module JamWebsockets
|
|||
@log.debug "#{username} login failure"
|
||||
return nil
|
||||
end
|
||||
elsif jamblaster
|
||||
# if there is a jamblaster in context, then we will allow no login.
|
||||
return nil
|
||||
else
|
||||
raise SessionError.new('no login data was found in Login message', 'empty_login')
|
||||
end
|
||||
|
|
@ -1102,7 +1216,7 @@ module JamWebsockets
|
|||
# client = the current client
|
||||
def access_p2p(client_id, user, msg)
|
||||
|
||||
return nil
|
||||
return nil
|
||||
|
||||
# ping_request and ping_ack messages are special in that they are simply allowed
|
||||
if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK
|
||||
|
|
@ -1135,8 +1249,8 @@ module JamWebsockets
|
|||
client_msg.type == ClientMessage::Type::GENERIC_MESSAGE ||
|
||||
client_msg.type == ClientMessage::Type::RESTART_APPLICATION ||
|
||||
client_msg.type == ClientMessage::Type::STOP_APPLICATION
|
||||
@@log.error("malicious activity")
|
||||
raise SessionError, "not allowed"
|
||||
@@log.error("malicious activity")
|
||||
raise SessionError, "not allowed"
|
||||
end
|
||||
|
||||
if to_client_id.nil? || to_client_id == 'undefined' # javascript translates to 'undefined' in many cases
|
||||
|
|
@ -1214,11 +1328,11 @@ module JamWebsockets
|
|||
return
|
||||
end
|
||||
|
||||
client_ids = @client_lookup.map { | client_id, info | "('#{client_id}')" }.join(',')
|
||||
client_ids = @client_lookup.map { |client_id, info| "('#{client_id}')" }.join(',')
|
||||
|
||||
# find all client_id's that do not have a row in the db, and whack them
|
||||
# this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584
|
||||
sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids})
|
||||
sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids})
|
||||
SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}');
|
||||
"
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
|
|
@ -1260,7 +1374,7 @@ module JamWebsockets
|
|||
def periodical_stats_dump
|
||||
|
||||
# assume 60 seconds per status dump
|
||||
stats = @message_stats.sort_by{|k,v| -v}
|
||||
stats = @message_stats.sort_by { |k, v| -v }
|
||||
stats.map { |i| i[1] = (i[1] / 60.0).round(2) }
|
||||
|
||||
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '))
|
||||
|
|
@ -1271,11 +1385,11 @@ module JamWebsockets
|
|||
end
|
||||
|
||||
total_time = 0
|
||||
time_sums = @time_it_sums.sort_by{|k,v| -v}
|
||||
time_sums = @time_it_sums.sort_by { |k, v| -v }
|
||||
|
||||
log_num = 3
|
||||
count = 0
|
||||
time_sums.each do | cat, cat_time |
|
||||
time_sums.each do |cat, cat_time|
|
||||
count += 1
|
||||
if count <= log_num
|
||||
@log.info("timed #{cat} used time: #{cat_time}")
|
||||
|
|
@ -1286,8 +1400,8 @@ module JamWebsockets
|
|||
|
||||
@log.info("total used time: #{total_time}")
|
||||
|
||||
profile_sums = @profile_it_sums.sort_by{|k,v| -v}
|
||||
profile_sums.each do | cat, cat_time |
|
||||
profile_sums = @profile_it_sums.sort_by { |k, v| -v }
|
||||
profile_sums.each do |cat, cat_time|
|
||||
@log.info("profiled #{cat} used time: #{cat_time}")
|
||||
end
|
||||
|
||||
|
|
@ -1346,7 +1460,7 @@ module JamWebsockets
|
|||
ConnectionManager.active_record_transaction do |mgr|
|
||||
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
|
||||
user = User.find_by_id(user_id)
|
||||
return if user.nil? # this can happen if you delete a user while their connection is up
|
||||
return if user.nil? # this can happen if you delete a user while their connection is up
|
||||
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user}"
|
||||
Notification.send_friend_update(user_id, false, conn) if count == 0
|
||||
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
|
||||
|
|
@ -1438,7 +1552,7 @@ module JamWebsockets
|
|||
|
||||
time = Time.now - start
|
||||
|
||||
@time_it_sums[cat] = (@time_it_sums[cat] || 0 )+ time
|
||||
@time_it_sums[cat] = (@time_it_sums[cat] || 0)+ time
|
||||
|
||||
@log.warn("LONG TIME: #{cat}: #{time}") if time > 1
|
||||
end
|
||||
|
|
@ -1450,7 +1564,7 @@ module JamWebsockets
|
|||
|
||||
time = Time.now - start
|
||||
|
||||
@profile_it_sums[cat] = (@profile_it_sums[cat] || 0 )+ time
|
||||
@profile_it_sums[cat] = (@profile_it_sums[cat] || 0)+ time
|
||||
|
||||
@log.warn("LONG TIME: #{cat}: #{time}") if time > 1
|
||||
end
|
||||
|
|
|
|||
Loading…
Reference in New Issue