merge develop

This commit is contained in:
Brian Smith 2014-09-25 09:09:44 -04:00
commit da771acdaf
23 changed files with 381 additions and 177 deletions

View File

@ -36,6 +36,7 @@ FactoryGirl.define do
addr 0
locidispid 0
client_type 'client'
gateway 'gateway1'
scoring_timeout Time.now
sequence(:channel_id) { |n| "Channel#{n}"}
association :user, factory: :user

View File

@ -208,4 +208,5 @@ undirected_scores.sql
discard_scores.sql
new_genres.sql
get_work_faster.sql
fix_find_session_sorting_2216.sql
fix_find_session_sorting_2216.sql
multiple_gateways.sql

View File

@ -0,0 +1,2 @@
-- allow multiple websockegateways to open
ALTER TABLE connections ADD COLUMN gateway VARCHAR NOT NULL DEFAULT 'default-1';

View File

@ -6,3 +6,5 @@ Create development database 'jam_ruby'
Once you've created your database, migrate it:
`bundle exec jam_ruby up`

View File

@ -44,7 +44,7 @@ module JamRuby
end
# reclaim the existing connection, if ip_address is not nil then perhaps a new address as well
def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable)
def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable, gateway)
music_session_id = nil
reconnected = false
@ -89,8 +89,8 @@ module JamRuby
udp_reachable_value = udp_reachable.nil? ? 'udp_reachable' : udp_reachable
sql =<<SQL
UPDATE connections SET (channel_id, aasm_state, updated_at, music_session_id, joined_session_at, stale_time, expire_time, udp_reachable) = ('#{channel_id}', '#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression}, #{joined_session_at_expression}, #{connection_stale_time}, #{connection_expire_time}, #{udp_reachable_value})
WHERE
UPDATE connections SET (channel_id, aasm_state, updated_at, music_session_id, joined_session_at, stale_time, expire_time, udp_reachable, gateway) = ('#{channel_id}', '#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression}, #{joined_session_at_expression}, #{connection_stale_time}, #{connection_expire_time}, #{udp_reachable_value}, '#{gateway}')
WHERE
client_id = '#{conn.client_id}'
RETURNING music_session_id
SQL
@ -129,46 +129,26 @@ SQL
end
# flag connections as stale
def flag_stale_connections()
def flag_stale_connections(gateway_name)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
sql =<<SQL
SELECT count(user_id) FROM connections
WHERE
updated_at < (NOW() - (interval '1 second' * stale_time))AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
SQL
conn.exec(sql) do |result|
count = result.getvalue(0, 0)
if 0 < count.to_i
sql =<<SQL
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
WHERE
updated_at < (NOW() - (interval '1 second' * stale_time)) AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
SQL
conn.exec(sql)
end
end
sql = "UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}' WHERE updated_at < (NOW() - (interval '1 second' * stale_time)) AND aasm_state = '#{Connection::CONNECT_STATE.to_s}' AND gateway = '#{gateway_name}'"
conn.exec(sql)
end
end
# NOTE this is only used for testing purposes;
# actual deletes will be processed in the websocket context which cleans up dependencies
def expire_stale_connections()
self.stale_connection_client_ids.each { |client| self.delete_connection(client[:client_id]) }
def expire_stale_connections(gateway_name)
self.stale_connection_client_ids(gateway_name).each { |client| self.delete_connection(client[:client_id]) }
end
# expiring connections in stale state, which deletes them
def stale_connection_client_ids
def stale_connection_client_ids(gateway_name)
clients = []
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
sql =<<SQL
SELECT client_id, music_session_id, user_id, client_type FROM connections
WHERE
updated_at < (NOW() - (interval '1 second' * expire_time))
SQL
sql = "SELECT client_id, music_session_id, user_id, client_type FROM connections WHERE updated_at < (NOW() - (interval '1 second' * expire_time)) AND gateway = '#{gateway_name}'"
conn.exec(sql) do |result|
result.each { |row|
client_id = row['client_id']
@ -187,7 +167,7 @@ SQL
# this number is used by notification logic elsewhere to know
# 'oh the user joined for the 1st time, so send a friend update', or
# 'don't bother because the user has connected somewhere else already'
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, &blk)
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, gateway, &blk)
# validate client_type
raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser'
@ -218,8 +198,8 @@ SQL
lock_connections(conn)
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable]).clear
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable, gateway) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable, gateway]).clear
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|

View File

@ -23,6 +23,8 @@ module JamRuby
ip_address = options[:ip_address]
connection_stale_time = options[:connection_stale_time]
connection_expire_time = options[:connection_expire_time]
gateway = options[:gateway]
# first try to find a LatencyTester with that client_id
latency_tester = LatencyTester.find_by_client_id(client_id)
@ -71,11 +73,16 @@ module JamRuby
connection.as_musician = false
connection.channel_id = channel_id
connection.scoring_timeout = Time.now
connection.gateway = gateway
unless connection.save
return connection
end
return latency_tester
end
def to_s
client_id
end
end
end

View File

@ -184,6 +184,7 @@ FactoryGirl.define do
addr 0
locidispid 0
client_type 'client'
gateway 'gateway1'
last_jam_audio_latency { user.last_jam_audio_latency if user }
sequence(:channel_id) { |n| "Channel#{n}"}
association :user, factory: :user

View File

@ -8,6 +8,7 @@ describe ConnectionManager, no_transaction: true do
EXPIRE_TIME = 60
STALE_BUT_NOT_EXPIRED = 50
DEFINITELY_EXPIRED = 70
GATEWAY = 'gateway1'
REACHABLE = true
let(:channel_id) {'1'}
@ -49,8 +50,8 @@ describe ConnectionManager, no_transaction: true do
user.save!
user = nil
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
expect { @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) }.to raise_error(PG::Error)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
expect { @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) }.to raise_error(PG::Error)
end
it "create connection then delete it" do
@ -59,7 +60,7 @@ describe ConnectionManager, no_transaction: true do
#user_id = create_user("test", "user2", "user2@jamkazam.com")
user = FactoryGirl.create(:user)
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
count.should == 1
@ -89,7 +90,7 @@ describe ConnectionManager, no_transaction: true do
#user_id = create_user("test", "user2", "user2@jamkazam.com")
user = FactoryGirl.create(:user)
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
count.should == 1
@ -106,7 +107,7 @@ describe ConnectionManager, no_transaction: true do
cc.locidispid.should == 17192000002
cc.udp_reachable.should == true
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, false)
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, false, GATEWAY)
cc = Connection.find_by_client_id!(client_id)
cc.connected?.should be_true
@ -129,7 +130,7 @@ describe ConnectionManager, no_transaction: true do
#user_id = create_user("test", "user2", "user2@jamkazam.com")
user = FactoryGirl.create(:user)
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, false)
count = @connman.create_connection(user.id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, false, GATEWAY)
count.should == 1
@ -146,7 +147,7 @@ describe ConnectionManager, no_transaction: true do
cc.locidispid.should == 17192000002
cc.udp_reachable.should == false
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, nil) # heartbeat passes nil in for udp_reachable
@connman.reconnect(cc, channel_id, nil, "33.1.2.3", STALE_TIME, EXPIRE_TIME, nil, GATEWAY) # heartbeat passes nil in for udp_reachable
cc = Connection.find_by_client_id!(client_id)
cc.connected?.should be_true
@ -260,12 +261,12 @@ describe ConnectionManager, no_transaction: true do
it "flag stale connection" do
client_id = "client_id8"
user_id = create_user("test", "user8", "user8@jamkazam.com")
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
num = JamRuby::Connection.count(:conditions => ['aasm_state = ?','connected'])
num.should == 1
assert_num_connections(client_id, num)
@connman.flag_stale_connections()
@connman.flag_stale_connections(GATEWAY)
assert_num_connections(client_id, num)
conn = Connection.find_by_client_id(client_id)
@ -274,7 +275,7 @@ describe ConnectionManager, no_transaction: true do
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"])
num.should == 1
# this should change the aasm_state to stale
@connman.flag_stale_connections()
@connman.flag_stale_connections(GATEWAY)
num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"])
num.should == 0
@ -286,7 +287,7 @@ describe ConnectionManager, no_transaction: true do
conn = Connection.find_by_client_id(client_id)
set_updated_at(conn, Time.now - DEFINITELY_EXPIRED)
cids = @connman.stale_connection_client_ids()
cids = @connman.stale_connection_client_ids(GATEWAY)
cids.size.should == 1
cids[0][:client_id].should == client_id
cids[0][:client_type].should == Connection::TYPE_CLIENT
@ -301,21 +302,21 @@ describe ConnectionManager, no_transaction: true do
it "expires stale connection" do
client_id = "client_id8"
user_id = create_user("test", "user8", "user8@jamkazam.com")
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
conn = Connection.find_by_client_id(client_id)
set_updated_at(conn, Time.now - STALE_BUT_NOT_EXPIRED)
@connman.flag_stale_connections
@connman.flag_stale_connections(GATEWAY)
assert_num_connections(client_id, 1)
# assert_num_connections(client_id, JamRuby::Connection.count(:conditions => ['aasm_state = ?','stale']))
@connman.expire_stale_connections
@connman.expire_stale_connections(GATEWAY)
assert_num_connections(client_id, 1)
set_updated_at(conn, Time.now - DEFINITELY_EXPIRED)
# this should delete the stale connection
@connman.expire_stale_connections
@connman.expire_stale_connections(GATEWAY)
assert_num_connections(client_id, 0)
end
@ -327,7 +328,7 @@ describe ConnectionManager, no_transaction: true do
music_session_id = music_session.id
user = User.find(user_id)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
connection.errors.any?.should be_false
@ -363,8 +364,8 @@ describe ConnectionManager, no_transaction: true do
client_id2 = "client_id10.12"
user_id = create_user("test", "user10.11", "user10.11@jamkazam.com", :musician => true)
user_id2 = create_user("test", "user10.12", "user10.12@jamkazam.com", :musician => false)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
@connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
music_session = FactoryGirl.create(:active_music_session, user_id: user_id)
music_session_id = music_session.id
@ -383,7 +384,7 @@ describe ConnectionManager, no_transaction: true do
client_id = "client_id10.2"
user_id = create_user("test", "user10.2", "user10.2@jamkazam.com")
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
music_session = FactoryGirl.create(:active_music_session, user_id: user_id)
user = User.find(user_id)
@ -399,8 +400,8 @@ describe ConnectionManager, no_transaction: true do
fan_client_id = "client_id10.4"
musician_id = create_user("test", "user10.3", "user10.3@jamkazam.com")
fan_id = create_user("test", "user10.4", "user10.4@jamkazam.com", :musician => false)
@connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
@connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
music_session = FactoryGirl.create(:active_music_session, :fan_access => false, user_id: musician_id)
music_session_id = music_session.id
@ -424,7 +425,7 @@ describe ConnectionManager, no_transaction: true do
music_session_id = music_session.id
user = User.find(user_id2)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
# specify real user id, but not associated with this session
expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound)
end
@ -436,7 +437,7 @@ describe ConnectionManager, no_transaction: true do
user = User.find(user_id)
music_session = ActiveMusicSession.new
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
connection.errors.size.should == 1
connection.errors.get(:music_session).should == [ValidationMessages::MUSIC_SESSION_MUST_BE_SPECIFIED]
@ -450,7 +451,7 @@ describe ConnectionManager, no_transaction: true do
music_session_id = music_session.id
user = User.find(user_id2)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
# specify real user id, but not associated with this session
expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound)
end
@ -464,7 +465,7 @@ describe ConnectionManager, no_transaction: true do
user = User.find(user_id)
dummy_music_session = ActiveMusicSession.new
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
end
@ -479,7 +480,7 @@ describe ConnectionManager, no_transaction: true do
dummy_music_session = ActiveMusicSession.new
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
@connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
end
@ -492,7 +493,7 @@ describe ConnectionManager, no_transaction: true do
music_session_id = music_session.id
user = User.find(user_id)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
@connman.join_music_session(user, client_id, music_session, true, TRACKS, 10)
assert_session_exists(music_session_id, true)
@ -535,7 +536,7 @@ describe ConnectionManager, no_transaction: true do
user = User.find(user_id)
client_id1 = Faker::Number.number(20)
@connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE)
@connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY)
music_session1 = FactoryGirl.create(:active_music_session, :user_id => user_id)
connection1 = @connman.join_music_session(user, client_id1, music_session1, true, TRACKS, 10)
connection1.errors.size.should == 0

View File

@ -2,7 +2,7 @@ require 'spec_helper'
describe LatencyTester do
let(:params) {{client_id: 'abc', ip_address: '10.1.1.1', connection_stale_time:40, connection_expire_time:60, channel_id: '1'} }
let(:params) {{client_id: 'abc', ip_address: '10.1.1.1', connection_stale_time:40, connection_expire_time:60, channel_id: '1', gateway: 'gateway1'} }
it "success" do
latency_tester = FactoryGirl.create(:latency_tester)

View File

@ -904,7 +904,7 @@
$('.disabled-track-overlay', $track).show();
$('.track-connection', $track).removeClass('red yellow green').addClass('red');
}
var participant = (sessionModel.getParticipant(clientId) || {name:'unknown'}).name;
var participant = (sessionModel.getParticipant(clientId) || { user: {name: 'unknown'}}).user.name;
logger.debug("still looking for mixer for participant=" + participant + ", clientId=" + clientId)
}
}

View File

@ -60,7 +60,7 @@
| To play more music, tap into our growing
community to connect with other musicians. Watch this video for tips on how to do this.
.action-button
a.button-orange rel="external" href="https://www.youtube.com/watch?v=xWponSJo-GU" WATCH VIDEO
a.button-orange rel="external" href="https://www.youtube.com/watch?v=4KWklSZZxRc" WATCH VIDEO
br clear="both"
.row.full.learn-more
.column

View File

@ -17,7 +17,8 @@ unless $rails_rake_task
:rabbitmq_host => APP_CONFIG.rabbitmq_host,
:rabbitmq_port => APP_CONFIG.rabbitmq_port,
:calling_thread => current,
:cidr => APP_CONFIG.websocket_gateway_cidr)
:cidr => APP_CONFIG.websocket_gateway_cidr,
:gateway_name => "default-#{ENV["JAM_INSTANCE"] || 1}")
end
Thread.stop
end

View File

@ -207,6 +207,7 @@ FactoryGirl.define do
addr {JamIsp.ip_to_num(ip_address)}
locidispid 0
client_type 'client'
gateway 'gateway1'
scoring_timeout Time.now
sequence(:channel_id) { |n| "Channel#{n}"}
end

View File

@ -86,7 +86,8 @@ Thread.new do
:rabbitmq_host => 'localhost',
:rabbitmq_port => 5672,
:calling_thread => current,
:cidr => ['0.0.0.0/0'])
:cidr => ['0.0.0.0/0'],
:gateway_name => 'default')
rescue Exception => e
puts "websocket-gateway failed: #{e}"
end

View File

@ -14,6 +14,13 @@ db_config = YAML::load(File.open(db_config_file))[jamenv]
ActiveRecord::Base.establish_connection(db_config)
jam_instance = ENV['JAM_INSTANCE'] || 1
jam_instance = jam_instance.to_i
if jam_instance == 0
puts "JAM INSTANCE MUST BE > 0"
exit 1
end
# now bring in the Jam code
require 'jam_websockets'
@ -33,11 +40,11 @@ require "#{Dir.pwd}/config/application.rb"
if jamenv == "production"
ENV['NEW_RELIC_LOG'] = '/var/log/websocket-gateway/newrelic_agent.log'
ENV['NEW_RELIC_LOG'] = "/var/log/websocket-gateway/newrelic_agent-#{jam_instance}.log"
one_meg = 1024 * 1024
Logging.logger.root.appenders = Logging.appenders.rolling_file("log/#{jamenv}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20)
Logging.logger.root.appenders = Logging.appenders.rolling_file("/var/log/websocket-gateway/#{jamenv}-#{jam_instance}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20, :layout => Logging.layouts.pattern(:pattern => '[%d] %-5l: %m\n'))
else
ENV['NEW_RELIC_LOG'] = "#{Dir.pwd}/log/newrelic_agent.log"
ENV['NEW_RELIC_LOG'] = "#{Dir.pwd}/log/newrelic_agent-#{jam_instance}.log"
Logging.logger.root.appenders = Logging.appenders.stdout
end
@ -47,7 +54,11 @@ require 'newrelic_rpm'
Object.send(:remove_const, :Rails) # this is to 'fool' new relic into not thinking this is a Rails app.
::NewRelic::Agent.manual_start
Server.new.run(:port => config["port"],
# determine gateway_name
gateway_name = ENV['GATEWAY_NAME'] || 'default'
gateway_name = "#{gateway_name}-#{jam_instance}"
Server.new.run(:port => config["port"] + (jam_instance-1 ) * 2,
:emwebsocket_debug => config["emwebsocket_debug"],
:connect_time_stale_client => config["connect_time_stale_client"],
:connect_time_expire_client => config["connect_time_expire_client"],
@ -56,4 +67,5 @@ Server.new.run(:port => config["port"],
:max_connections_per_user => config["max_connections_per_user"],
:rabbitmq_host => config['rabbitmq_host'],
:rabbitmq_port => config['rabbitmq_port'],
:cidr => config['cidr'])
:cidr => config['cidr'],
:gateway_name => gateway_name)

View File

@ -0,0 +1,57 @@
global
maxconn 4096
pidfile ~/tmp/haproxy-queue.pid
defaults
log global
log 127.0.0.1 local0
log 127.0.0.1 local1 notice
mode tcp
option httplog
option http-server-close
#option dontlognull
option redispatch
option contstats
retries 3
backlog 10000
timeout client 25s
timeout connect 5s
timeout server 25s
# timeout tunnel available in ALOHA 5.5 or HAProxy 1.5-dev10 and higher
timeout tunnel 3600s
timeout http-keep-alive 1s
timeout http-request 15s
timeout queue 30s
timeout tarpit 60s
default-server inter 3s rise 2 fall 3
option forwardfor
frontend gateways
bind *:6767
default_backend bk_ws
backend bk_ws
balance leastconn
## websocket protocol validation
# acl hdr_connection_upgrade hdr(Connection) -i upgrade
# acl hdr_upgrade_websocket hdr(Upgrade) -i websocket
# acl hdr_websocket_key hdr_cnt(Sec-WebSocket-Key) eq 1
# acl hdr_websocket_version hdr_cnt(Sec-WebSocket-Version) eq 1
# acl hdr_host hdr_cnt(Sec-WebSocket-Version) eq 1
# http-request deny if ! hdr_connection_upgrade ! hdr_upgrade_websocket ! hdr_w
#ebsocket_key ! hdr_websocket_version ! hdr_host
## ensure our application protocol name is valid
## (don't forget to update the list each time you publish new applications)
acl ws_valid_protocol hdr(Sec-WebSocket-Protocol) echo-protocol
http-request deny if ! ws_valid_protocol
## websocket health checking
#option httpchk GET / HTTP/1.1\r\nHost:\ ws.domain.com\r\nConnection:\ Upgrade
#\r\nUpgrade:\ websocket\r\nSec-WebSocket-Key:\ haproxy\r\nSec-WebSocket-Version
# <span class="wp-smiley emoji emoji-uneasy" title=":\">:\</span> 13\r\nSec-WebSocket-Protocol:\ echo-protocol
# http-check expect status 101
server websrv1 127.0.0.1:6769 maxconn 1000 weight 10 cookie gateway1 check
server websrv2 127.0.0.1:6771 maxconn 1000 weight 10 cookie gateway2 check

View File

@ -27,7 +27,9 @@ module JamWebsockets
:heartbeat_interval_browser,
:connect_time_expire_browser,
:connect_time_stale_browser,
:max_connections_per_user
:max_connections_per_user,
:gateway_name,
:client_lookup
def initialize()
@log = Logging.logger[self]
@ -47,10 +49,12 @@ module JamWebsockets
@heartbeat_interval_browser= nil
@connect_time_expire_browser= nil
@connect_time_stale_browser= nil
@gateway_name = nil
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
@message_stats = {}
end
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10}, &block)
def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default'}, &block)
@log.info "startup"
@ -61,6 +65,7 @@ module JamWebsockets
@connect_time_stale_browser = connect_time_stale_browser
@connect_time_expire_browser = connect_time_expire_browser
@max_connections_per_user = options[:max_connections_per_user]
@gateway_name = options[:gateway]
begin
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
@ -154,7 +159,7 @@ module JamWebsockets
end
end
else
@log.debug "Can't route message: no user connected with id #{user_id}"
#@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty
end
end
@ -205,19 +210,19 @@ module JamWebsockets
msg = Jampb::ClientMessage.parse(msg)
@log.debug "client-directed message received from #{msg.from} to client #{client_id}"
@log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
unless client.nil?
EM.schedule do
@log.debug "sending client-directed down websocket to #{client_id}"
@log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE
send_to_client(client, msg)
end
else
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
end
else
@log.debug "Can't route message: no client connected with id #{client_id}"
#@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios
end
end
@ -329,7 +334,7 @@ module JamWebsockets
def send_to_client(client, msg)
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK || msg.type == ClientMessage::Type::PEER_MESSAGE
if client.encode_json
client.send(msg.to_json.to_s)
else
@ -371,66 +376,6 @@ module JamWebsockets
end
end
def cleanup_clients_with_ids(expired_connections)
expired_connections.each do |expired_connection|
cid = expired_connection[:client_id]
client_context = @client_lookup[cid]
if client_context
Diagnostic.expired_stale_connection(client_context.user.id, client_context)
cleanup_client(client_context.client)
end
music_session = nil
recording_id = nil
user = nil
# remove this connection from the database
ConnectionManager.active_record_transaction do |mgr|
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}"
Notification.send_friend_update(user_id, false, conn) if count == 0
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
user = User.find_by_id(user_id) unless user_id.nil?
recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one
recording_id = recording.id unless recording.nil?
music_session.with_lock do # VRFS-1297
music_session.tick_track_changes
end if music_session
}
end
if user && music_session
Notification.send_session_depart(music_session, cid, user, recording_id)
end
end
end
# removes all resources associated with a client
def cleanup_client(client)
client.close
@semaphore.synchronize do
pending = client.context.nil? # presence of context implies this connection has been logged into
if pending
@log.debug "cleaned up not-logged-in client #{client}"
else
@log.debug "cleanup up logged-in client #{client}"
context = @clients.delete(client)
if context
remove_client(client.client_id)
remove_user(context)
else
@log.warn "skipping duplicate cleanup attempt of logged-in client"
end
end
end
end
def route(client_msg, client)
message_type = @message_factory.get_message_type(client_msg)
if message_type.nil?
@ -438,7 +383,9 @@ module JamWebsockets
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
end
@log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT
@message_stats[message_type] = @message_stats[message_type].to_i + 1
@log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::HEARTBEAT_ACK && client_msg.type != ClientMessage::Type::PEER_MESSAGE
if client_msg.route_to.nil?
Diagnostic.missing_route_to(client.user_id, client_msg)
@ -535,7 +482,9 @@ module JamWebsockets
channel_id: client.channel_id,
ip_address: remote_ip,
connection_stale_time: connection_stale_time,
connection_expire_time: connection_expire_time})
connection_expire_time: connection_expire_time,
gateway: @gateway_name
})
if latency_tester.errors.any?
@log.warn "unable to log in latency_tester with errors: #{latency_tester.errors.inspect}"
raise SessionError, "invalid login: #{latency_tester.errors.inspect}"
@ -639,7 +588,7 @@ module JamWebsockets
recording_id = nil
ConnectionManager.active_record_transaction do |connection_manager|
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable)
music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name)
if music_session_id.nil?
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
@ -675,7 +624,7 @@ module JamWebsockets
unless connection
# log this connection in the database
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable) do |conn, count|
connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) do |conn, count|
user.update_addr_loc(Connection.find_by_client_id(client.client_id), User::JAM_REASON_LOGIN)
if count == 1
Notification.send_friend_update(user.id, true, conn)
@ -737,7 +686,7 @@ module JamWebsockets
if connection.stale?
ConnectionManager.active_record_transaction do |connection_manager|
heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type)
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil)
connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name)
end
end
end
@ -883,7 +832,7 @@ module JamWebsockets
# populate routing data
client_msg.from = client.client_id
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}"
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" unless client_msg.type == ClientMessage::Type::PEER_MESSAGE
# put it on the topic exchange for clients
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
@ -937,6 +886,135 @@ module JamWebsockets
Socket.unpack_sockaddr_in(client.get_peername)[1]
end
def periodical_flag_connections
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.flag_stale_connections(@gateway_name)
end
end
def periodical_check_clients
# it's possible that a client will not be represented in the database anymore, due to hard to trace/guess scenario
# usually involve reconnects. Double-check that all clients in memory are actually in the database. if not, delete them from memory
if @client_lookup.length == 0
return
end
client_ids = @client_lookup.map { | client_id, info | "('#{client_id}')" }.join(',')
# find all client_id's that do not have a row in the db, and whack them
# this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584
sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids})
SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}');
"
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
conn.exec(sql) do |result|
result.each { |row|
client_id = row['client_id']
context = @client_lookup[client_id]
if context
@log.debug("cleaning up missing client #{client_id}, #{context.user}")
cleanup_client(context.client)
else
@log.error("could not clean up missing client #{client_id}")
end
}
end
end
end
def periodical_check_connections
# this method is designed to be called periodically (every few seconds)
# in which this gateway instance will check only its own clients for their health
# since each gateway checks only the clients it knows about, this allows us to deploy
# n gateways that don't know much about each other.
# each gateway marks each connection row with it's gateway ID (so each gateway needs it's own ID or bad things happen)
# we also have a global resque job that checks for connections that appear to be not controlled by any gateway
# to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy
clients = []
ConnectionManager.active_record_transaction do |connection_manager|
clients = connection_manager.stale_connection_client_ids(@gateway_name)
end
cleanup_clients_with_ids(clients)
end
def periodical_stats_dump
# assume 60 seconds per status dump
stats = @message_stats.sort_by{|k,v| -v}
stats.map { |i| i[1] = (i[1] / 60.0).round(2) }
@log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', '));
@message_stats.clear
end
def cleanup_clients_with_ids(expired_connections)
expired_connections.each do |expired_connection|
cid = expired_connection[:client_id]
client_context = @client_lookup[cid]
if client_context
Diagnostic.expired_stale_connection(client_context.user.id, client_context)
cleanup_client(client_context.client)
end
music_session = nil
recording_id = nil
user = nil
# remove this connection from the database
ConnectionManager.active_record_transaction do |mgr|
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
@log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}"
Notification.send_friend_update(user_id, false, conn) if count == 0
music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil?
user = User.find_by_id(user_id) unless user_id.nil?
recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one
recording_id = recording.id unless recording.nil?
if music_session
music_session.with_lock do # VRFS-1297
music_session.tick_track_changes
end
end
}
end
if user && music_session
Notification.send_session_depart(music_session, cid, user, recording_id)
end
end
end
# removes all resources associated with a client
def cleanup_client(client)
client.close
@semaphore.synchronize do
pending = client.context.nil? # presence of context implies this connection has been logged into
if pending
@log.debug "cleaned up not-logged-in client #{client}"
else
@log.debug "cleanup up logged-in client #{client}"
context = @clients.delete(client)
if context
remove_client(client.client_id)
remove_user(context)
else
@log.warn "skipping duplicate cleanup attempt of logged-in client"
end
end
end
end
private
def sane_logging(&blk)

View File

@ -21,6 +21,7 @@ module JamWebsockets
connect_time_stale_browser = options[:connect_time_stale_browser].to_i
connect_time_expire_browser = options[:connect_time_expire_browser].to_i
max_connections_per_user = options[:max_connections_per_user].to_i
gateway_name = options[:gateway_name]
rabbitmq_host = options[:rabbitmq_host]
rabbitmq_port = options[:rabbitmq_port].to_i
calling_thread = options[:calling_thread]
@ -34,9 +35,11 @@ module JamWebsockets
}
EventMachine.run do
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user) do
@router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name) do
start_connection_expiration
start_client_expiration
start_connection_flagger
start_stats_dump
start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug])
calling_thread.wakeup if calling_thread
end
@ -55,7 +58,7 @@ module JamWebsockets
def start_websocket_listener(listen_ip, port, trust_port, trust_check, emwebsocket_debug)
EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
@log.info "new client #{ws}"
#@log.info "new client #{ws}"
@router.new_client(ws, false)
end
EventMachine::WebSocket.run(:host => listen_ip, :port => trust_port, :debug => emwebsocket_debug) do |ws|
@ -74,35 +77,34 @@ module JamWebsockets
def start_connection_expiration
# one cleanup on startup
expire_stale_connections
@router.periodical_check_connections
EventMachine::PeriodicTimer.new(2) do
sane_logging { expire_stale_connections }
sane_logging { @router.periodical_check_connections }
end
end
def expire_stale_connections
clients = []
ConnectionManager.active_record_transaction do |connection_manager|
clients = connection_manager.stale_connection_client_ids
def start_client_expiration
# one cleanup on startup
@router.periodical_check_clients
EventMachine::PeriodicTimer.new(30) do
sane_logging { @router.periodical_check_clients }
end
@router.cleanup_clients_with_ids(clients)
end
def start_connection_flagger
# one cleanup on startup
flag_stale_connections
@router.periodical_flag_connections
EventMachine::PeriodicTimer.new(2) do
sane_logging { flag_stale_connections }
sane_logging { @router.periodical_flag_connections }
end
end
def flag_stale_connections()
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.flag_stale_connections
def start_stats_dump
EventMachine::PeriodicTimer.new(60) do
@router.periodical_stats_dump
end
end

View File

@ -11,5 +11,7 @@ GROUP="$NAME"
cp /var/lib/$NAME/script/package/$NAME.conf /etc/init/$NAME.conf
mkdir -p /var/lib/$NAME/log
mkdir -p /var/log/$NAME
chown -R $USER:$GROUP /var/lib/$NAME
chown -R $USER:$GROUP /var/log/$NAME

View File

@ -1,17 +1,31 @@
#!/bin/bash -l
# default config values
BUILD_NUMBER=`cat /var/lib/websocket-gateway/BUILD_NUMBER`
JAM_INSTANCE="$1"
CONFIG_FILE="/etc/websocket-gateway/upstart.conf"
if [ -e "$CONFIG_FILE" ]; then
. "$CONFIG_FILE"
fi
usage()
{
echo "pass one numerical argument representing the instance of this websocket-gateway"
exit 0
}
# I don't like doing this, but the next command (bundle exec) retouches/generates
# the gemfile. This unfortunately means the next debian update doesn't update this file.
# Ultimately this means an old Gemfile.lock is left behind for a new package,
# and bundle won't run because it thinks it has the wrong versions of gems
rm -f Gemfile.lock
main()
{
# default config values
BUILD_NUMBER=`cat /var/lib/websocket-gateway/BUILD_NUMBER`
CONFIG_FILE="/etc/websocket-gateway/upstart.conf"
if [ -e "$CONFIG_FILE" ]; then
. "$CONFIG_FILE"
fi
# I don't like doing this, but the next command (bundle exec) retouches/generates
# the gemfile. This unfortunately means the next debian update doesn't update this file.
# Ultimately this means an old Gemfile.lock is left behind for a new package,
# and bundle won't run because it thinks it has the wrong versions of gems
rm -f Gemfile.lock
JAM_INSTANCE=$JAM_INSTANCE BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway
}
[ "$#" -ne 1 ] && ( usage && exit 1 ) || main
BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway

View File

@ -1,18 +1,22 @@
description "websocket-gateway"
start on startup
start on runlevel [2345]
stop on runlevel [016]
#start on startup
#start on runlevel [2345]
# stop on runlevel [016]
stop on stopping gateways
limit nofile 20000 20000
limit core unlimited unlimited
respawn
respawn limit 10 5
instance $N
pre-start script
set -e
mkdir -p /var/run/websocket-gateway
chown websocket-gateway:websocket-gateway /var/run/websocket-gateway
end script
exec start-stop-daemon --start --chuid websocket-gateway:websocket-gateway --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh
exec start-stop-daemon --start --chuid websocket-gateway:websocket-gateway --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh $N

View File

@ -91,6 +91,7 @@ FactoryGirl.define do
ip_address '1.1.1.1'
as_musician true
client_type 'client'
gateway 'gateway1'
scoring_timeout Time.now
sequence(:channel_id) { |n| "Channel#{n}"}
end

View File

@ -114,6 +114,7 @@ describe Router do
@router.max_connections_per_user = 10
@router.heartbeat_interval_browser = @router.connect_time_stale_browser / 2
@router.amqp_connection_manager = AmqpConnectionManager.new(true, 4, host: 'localhost', port: 5672)
@router.gateway_name = 'gateway1'
end
subject { @router }
@ -122,6 +123,41 @@ describe Router do
end
describe "periodical_check_clients" do
let(:user) { FactoryGirl.create(:user) }
it "with no data" do
@router.client_lookup.length.should == 0
@router.periodical_check_clients
done
end
it "with one OK client" do
client = double("client")
client.should_receive(:context=).any_number_of_times
conn1 = FactoryGirl.create(:connection, :user => user, :client_id => "pc1")
@router.add_tracker(user, client, 'client', conn1.client_id)
@router.client_lookup[conn1.client_id].should_not be_nil
@router.periodical_check_clients
@router.client_lookup[conn1.client_id].should_not be_nil
done
end
it "with one missing client" do
client = double("client")
client.should_receive(:context=).any_number_of_times
context = ClientContext.new(user, client, "client")
client.should_receive(:context).any_number_of_times.and_return(context)
client.should_receive(:close)
conn1 = FactoryGirl.create(:connection, :user => user, :client_id => "pc1")
client.should_receive(:client_id).and_return(conn1.client_id)
@router.add_tracker(user, client, 'client', conn1.client_id)
conn1.delete
@router.client_lookup[conn1.client_id].should_not be_nil
@router.periodical_check_clients
@router.client_lookup[conn1.client_id].should be_nil
done
end
end
describe "serviceability" do
it "should start and stop", :mq => true do