diff --git a/admin/spec/factories.rb b/admin/spec/factories.rb index cdc626283..3ba6c50aa 100644 --- a/admin/spec/factories.rb +++ b/admin/spec/factories.rb @@ -36,6 +36,7 @@ FactoryGirl.define do addr 0 locidispid 0 client_type 'client' + gateway 'gateway1' scoring_timeout Time.now sequence(:channel_id) { |n| "Channel#{n}"} association :user, factory: :user diff --git a/db/manifest b/db/manifest index c27550f84..bba0a6125 100755 --- a/db/manifest +++ b/db/manifest @@ -208,4 +208,5 @@ undirected_scores.sql discard_scores.sql new_genres.sql get_work_faster.sql -fix_find_session_sorting_2216.sql \ No newline at end of file +fix_find_session_sorting_2216.sql +multiple_gateways.sql diff --git a/db/up/multiple_gateways.sql b/db/up/multiple_gateways.sql new file mode 100644 index 000000000..d594bdcda --- /dev/null +++ b/db/up/multiple_gateways.sql @@ -0,0 +1,2 @@ +-- allow multiple websockegateways to open +ALTER TABLE connections ADD COLUMN gateway VARCHAR NOT NULL DEFAULT 'default-1'; \ No newline at end of file diff --git a/ruby/README.md b/ruby/README.md index 39a2c2e30..d831a5a40 100644 --- a/ruby/README.md +++ b/ruby/README.md @@ -6,3 +6,5 @@ Create development database 'jam_ruby' Once you've created your database, migrate it: `bundle exec jam_ruby up` + + diff --git a/ruby/lib/jam_ruby/connection_manager.rb b/ruby/lib/jam_ruby/connection_manager.rb index b9e23adf9..e8349c97d 100644 --- a/ruby/lib/jam_ruby/connection_manager.rb +++ b/ruby/lib/jam_ruby/connection_manager.rb @@ -44,7 +44,7 @@ module JamRuby end # reclaim the existing connection, if ip_address is not nil then perhaps a new address as well - def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable) + def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable, gateway) music_session_id = nil reconnected = false @@ -89,8 +89,8 @@ module JamRuby udp_reachable_value = udp_reachable.nil? ? 'udp_reachable' : udp_reachable sql =< ['aasm_state = ?','connected']) num.should == 1 assert_num_connections(client_id, num) - @connman.flag_stale_connections() + @connman.flag_stale_connections(GATEWAY) assert_num_connections(client_id, num) conn = Connection.find_by_client_id(client_id) @@ -274,7 +275,7 @@ describe ConnectionManager, no_transaction: true do num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"]) num.should == 1 # this should change the aasm_state to stale - @connman.flag_stale_connections() + @connman.flag_stale_connections(GATEWAY) num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"]) num.should == 0 @@ -286,7 +287,7 @@ describe ConnectionManager, no_transaction: true do conn = Connection.find_by_client_id(client_id) set_updated_at(conn, Time.now - DEFINITELY_EXPIRED) - cids = @connman.stale_connection_client_ids() + cids = @connman.stale_connection_client_ids(GATEWAY) cids.size.should == 1 cids[0][:client_id].should == client_id cids[0][:client_type].should == Connection::TYPE_CLIENT @@ -301,21 +302,21 @@ describe ConnectionManager, no_transaction: true do it "expires stale connection" do client_id = "client_id8" user_id = create_user("test", "user8", "user8@jamkazam.com") - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) conn = Connection.find_by_client_id(client_id) set_updated_at(conn, Time.now - STALE_BUT_NOT_EXPIRED) - @connman.flag_stale_connections + @connman.flag_stale_connections(GATEWAY) assert_num_connections(client_id, 1) # assert_num_connections(client_id, JamRuby::Connection.count(:conditions => ['aasm_state = ?','stale'])) - @connman.expire_stale_connections + @connman.expire_stale_connections(GATEWAY) assert_num_connections(client_id, 1) set_updated_at(conn, Time.now - DEFINITELY_EXPIRED) # this should delete the stale connection - @connman.expire_stale_connections + @connman.expire_stale_connections(GATEWAY) assert_num_connections(client_id, 0) end @@ -327,7 +328,7 @@ describe ConnectionManager, no_transaction: true do music_session_id = music_session.id user = User.find(user_id) - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) connection.errors.any?.should be_false @@ -363,8 +364,8 @@ describe ConnectionManager, no_transaction: true do client_id2 = "client_id10.12" user_id = create_user("test", "user10.11", "user10.11@jamkazam.com", :musician => true) user_id2 = create_user("test", "user10.12", "user10.12@jamkazam.com", :musician => false) - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) - @connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) + @connman.create_connection(user_id2, client_id2, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) music_session = FactoryGirl.create(:active_music_session, user_id: user_id) music_session_id = music_session.id @@ -383,7 +384,7 @@ describe ConnectionManager, no_transaction: true do client_id = "client_id10.2" user_id = create_user("test", "user10.2", "user10.2@jamkazam.com") - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) music_session = FactoryGirl.create(:active_music_session, user_id: user_id) user = User.find(user_id) @@ -399,8 +400,8 @@ describe ConnectionManager, no_transaction: true do fan_client_id = "client_id10.4" musician_id = create_user("test", "user10.3", "user10.3@jamkazam.com") fan_id = create_user("test", "user10.4", "user10.4@jamkazam.com", :musician => false) - @connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) - @connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(musician_id, musician_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) + @connman.create_connection(fan_id, fan_client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) music_session = FactoryGirl.create(:active_music_session, :fan_access => false, user_id: musician_id) music_session_id = music_session.id @@ -424,7 +425,7 @@ describe ConnectionManager, no_transaction: true do music_session_id = music_session.id user = User.find(user_id2) - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) # specify real user id, but not associated with this session expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound) end @@ -436,7 +437,7 @@ describe ConnectionManager, no_transaction: true do user = User.find(user_id) music_session = ActiveMusicSession.new - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) connection = @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) connection.errors.size.should == 1 connection.errors.get(:music_session).should == [ValidationMessages::MUSIC_SESSION_MUST_BE_SPECIFIED] @@ -450,7 +451,7 @@ describe ConnectionManager, no_transaction: true do music_session_id = music_session.id user = User.find(user_id2) - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) # specify real user id, but not associated with this session expect { @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) } .to raise_error(ActiveRecord::RecordNotFound) end @@ -464,7 +465,7 @@ describe ConnectionManager, no_transaction: true do user = User.find(user_id) dummy_music_session = ActiveMusicSession.new - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError) end @@ -479,7 +480,7 @@ describe ConnectionManager, no_transaction: true do dummy_music_session = ActiveMusicSession.new - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError) end @@ -492,7 +493,7 @@ describe ConnectionManager, no_transaction: true do music_session_id = music_session.id user = User.find(user_id) - @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) @connman.join_music_session(user, client_id, music_session, true, TRACKS, 10) assert_session_exists(music_session_id, true) @@ -535,7 +536,7 @@ describe ConnectionManager, no_transaction: true do user = User.find(user_id) client_id1 = Faker::Number.number(20) - @connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE) + @connman.create_connection(user_id, client_id1, channel_id, "1.1.1.1", 'client', STALE_TIME, EXPIRE_TIME, REACHABLE, GATEWAY) music_session1 = FactoryGirl.create(:active_music_session, :user_id => user_id) connection1 = @connman.join_music_session(user, client_id1, music_session1, true, TRACKS, 10) connection1.errors.size.should == 0 diff --git a/ruby/spec/jam_ruby/models/latency_tester_spec.rb b/ruby/spec/jam_ruby/models/latency_tester_spec.rb index 4bae7633d..52163bcc4 100644 --- a/ruby/spec/jam_ruby/models/latency_tester_spec.rb +++ b/ruby/spec/jam_ruby/models/latency_tester_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' describe LatencyTester do - let(:params) {{client_id: 'abc', ip_address: '10.1.1.1', connection_stale_time:40, connection_expire_time:60, channel_id: '1'} } + let(:params) {{client_id: 'abc', ip_address: '10.1.1.1', connection_stale_time:40, connection_expire_time:60, channel_id: '1', gateway: 'gateway1'} } it "success" do latency_tester = FactoryGirl.create(:latency_tester) diff --git a/web/app/assets/javascripts/session.js b/web/app/assets/javascripts/session.js index 1bdb38990..5add060ae 100644 --- a/web/app/assets/javascripts/session.js +++ b/web/app/assets/javascripts/session.js @@ -904,7 +904,7 @@ $('.disabled-track-overlay', $track).show(); $('.track-connection', $track).removeClass('red yellow green').addClass('red'); } - var participant = (sessionModel.getParticipant(clientId) || {name:'unknown'}).name; + var participant = (sessionModel.getParticipant(clientId) || { user: {name: 'unknown'}}).user.name; logger.debug("still looking for mixer for participant=" + participant + ", clientId=" + clientId) } } diff --git a/web/app/views/dialogs/_gettingStartedDialog.html.slim b/web/app/views/dialogs/_gettingStartedDialog.html.slim index 9b34a11be..88e674e8c 100644 --- a/web/app/views/dialogs/_gettingStartedDialog.html.slim +++ b/web/app/views/dialogs/_gettingStartedDialog.html.slim @@ -60,7 +60,7 @@ | To play more music, tap into our growing community to connect with other musicians. Watch this video for tips on how to do this. .action-button - a.button-orange rel="external" href="https://www.youtube.com/watch?v=xWponSJo-GU" WATCH VIDEO + a.button-orange rel="external" href="https://www.youtube.com/watch?v=4KWklSZZxRc" WATCH VIDEO br clear="both" .row.full.learn-more .column diff --git a/web/config/initializers/eventmachine.rb b/web/config/initializers/eventmachine.rb index f40e162f8..71f09cb1b 100644 --- a/web/config/initializers/eventmachine.rb +++ b/web/config/initializers/eventmachine.rb @@ -17,7 +17,8 @@ unless $rails_rake_task :rabbitmq_host => APP_CONFIG.rabbitmq_host, :rabbitmq_port => APP_CONFIG.rabbitmq_port, :calling_thread => current, - :cidr => APP_CONFIG.websocket_gateway_cidr) + :cidr => APP_CONFIG.websocket_gateway_cidr, + :gateway_name => "default-#{ENV["JAM_INSTANCE"] || 1}") end Thread.stop end diff --git a/web/spec/factories.rb b/web/spec/factories.rb index 898c94527..d82706fe3 100644 --- a/web/spec/factories.rb +++ b/web/spec/factories.rb @@ -207,6 +207,7 @@ FactoryGirl.define do addr {JamIsp.ip_to_num(ip_address)} locidispid 0 client_type 'client' + gateway 'gateway1' scoring_timeout Time.now sequence(:channel_id) { |n| "Channel#{n}"} end diff --git a/web/spec/spec_helper.rb b/web/spec/spec_helper.rb index 3da0ac5f0..984180a66 100644 --- a/web/spec/spec_helper.rb +++ b/web/spec/spec_helper.rb @@ -86,7 +86,8 @@ Thread.new do :rabbitmq_host => 'localhost', :rabbitmq_port => 5672, :calling_thread => current, - :cidr => ['0.0.0.0/0']) + :cidr => ['0.0.0.0/0'], + :gateway_name => 'default') rescue Exception => e puts "websocket-gateway failed: #{e}" end diff --git a/websocket-gateway/bin/websocket_gateway b/websocket-gateway/bin/websocket_gateway index c92265559..555e80b35 100755 --- a/websocket-gateway/bin/websocket_gateway +++ b/websocket-gateway/bin/websocket_gateway @@ -14,6 +14,13 @@ db_config = YAML::load(File.open(db_config_file))[jamenv] ActiveRecord::Base.establish_connection(db_config) +jam_instance = ENV['JAM_INSTANCE'] || 1 +jam_instance = jam_instance.to_i + +if jam_instance == 0 + puts "JAM INSTANCE MUST BE > 0" + exit 1 +end # now bring in the Jam code require 'jam_websockets' @@ -33,11 +40,11 @@ require "#{Dir.pwd}/config/application.rb" if jamenv == "production" - ENV['NEW_RELIC_LOG'] = '/var/log/websocket-gateway/newrelic_agent.log' + ENV['NEW_RELIC_LOG'] = "/var/log/websocket-gateway/newrelic_agent-#{jam_instance}.log" one_meg = 1024 * 1024 - Logging.logger.root.appenders = Logging.appenders.rolling_file("log/#{jamenv}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20) + Logging.logger.root.appenders = Logging.appenders.rolling_file("/var/log/websocket-gateway/#{jamenv}-#{jam_instance}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20, :layout => Logging.layouts.pattern(:pattern => '[%d] %-5l: %m\n')) else - ENV['NEW_RELIC_LOG'] = "#{Dir.pwd}/log/newrelic_agent.log" + ENV['NEW_RELIC_LOG'] = "#{Dir.pwd}/log/newrelic_agent-#{jam_instance}.log" Logging.logger.root.appenders = Logging.appenders.stdout end @@ -47,7 +54,11 @@ require 'newrelic_rpm' Object.send(:remove_const, :Rails) # this is to 'fool' new relic into not thinking this is a Rails app. ::NewRelic::Agent.manual_start -Server.new.run(:port => config["port"], +# determine gateway_name +gateway_name = ENV['GATEWAY_NAME'] || 'default' +gateway_name = "#{gateway_name}-#{jam_instance}" + +Server.new.run(:port => config["port"] + (jam_instance-1 ) * 2, :emwebsocket_debug => config["emwebsocket_debug"], :connect_time_stale_client => config["connect_time_stale_client"], :connect_time_expire_client => config["connect_time_expire_client"], @@ -56,4 +67,5 @@ Server.new.run(:port => config["port"], :max_connections_per_user => config["max_connections_per_user"], :rabbitmq_host => config['rabbitmq_host'], :rabbitmq_port => config['rabbitmq_port'], - :cidr => config['cidr']) + :cidr => config['cidr'], + :gateway_name => gateway_name) diff --git a/websocket-gateway/config/haproxy.cfg b/websocket-gateway/config/haproxy.cfg new file mode 100644 index 000000000..8dec239b6 --- /dev/null +++ b/websocket-gateway/config/haproxy.cfg @@ -0,0 +1,57 @@ +global + maxconn 4096 + pidfile ~/tmp/haproxy-queue.pid + +defaults + log global + log 127.0.0.1 local0 + log 127.0.0.1 local1 notice + mode tcp + option httplog + option http-server-close + #option dontlognull + option redispatch + option contstats + retries 3 + backlog 10000 + timeout client 25s + timeout connect 5s + timeout server 25s +# timeout tunnel available in ALOHA 5.5 or HAProxy 1.5-dev10 and higher + timeout tunnel 3600s + timeout http-keep-alive 1s + timeout http-request 15s + timeout queue 30s + timeout tarpit 60s + default-server inter 3s rise 2 fall 3 + option forwardfor + +frontend gateways + bind *:6767 + default_backend bk_ws + +backend bk_ws + balance leastconn + +## websocket protocol validation +# acl hdr_connection_upgrade hdr(Connection) -i upgrade +# acl hdr_upgrade_websocket hdr(Upgrade) -i websocket +# acl hdr_websocket_key hdr_cnt(Sec-WebSocket-Key) eq 1 +# acl hdr_websocket_version hdr_cnt(Sec-WebSocket-Version) eq 1 +# acl hdr_host hdr_cnt(Sec-WebSocket-Version) eq 1 +# http-request deny if ! hdr_connection_upgrade ! hdr_upgrade_websocket ! hdr_w +#ebsocket_key ! hdr_websocket_version ! hdr_host + +## ensure our application protocol name is valid +## (don't forget to update the list each time you publish new applications) + acl ws_valid_protocol hdr(Sec-WebSocket-Protocol) echo-protocol + http-request deny if ! ws_valid_protocol + +## websocket health checking + #option httpchk GET / HTTP/1.1\r\nHost:\ ws.domain.com\r\nConnection:\ Upgrade +#\r\nUpgrade:\ websocket\r\nSec-WebSocket-Key:\ haproxy\r\nSec-WebSocket-Version +# :\ 13\r\nSec-WebSocket-Protocol:\ echo-protocol +# http-check expect status 101 + + server websrv1 127.0.0.1:6769 maxconn 1000 weight 10 cookie gateway1 check + server websrv2 127.0.0.1:6771 maxconn 1000 weight 10 cookie gateway2 check \ No newline at end of file diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb index f16dc1fd9..2815431f2 100644 --- a/websocket-gateway/lib/jam_websockets/router.rb +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -27,7 +27,9 @@ module JamWebsockets :heartbeat_interval_browser, :connect_time_expire_browser, :connect_time_stale_browser, - :max_connections_per_user + :max_connections_per_user, + :gateway_name, + :client_lookup def initialize() @log = Logging.logger[self] @@ -47,10 +49,12 @@ module JamWebsockets @heartbeat_interval_browser= nil @connect_time_expire_browser= nil @connect_time_stale_browser= nil + @gateway_name = nil @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] + @message_stats = {} end - def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10}, &block) + def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default'}, &block) @log.info "startup" @@ -61,6 +65,7 @@ module JamWebsockets @connect_time_stale_browser = connect_time_stale_browser @connect_time_expire_browser = connect_time_expire_browser @max_connections_per_user = options[:max_connections_per_user] + @gateway_name = options[:gateway] begin @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) @@ -154,7 +159,7 @@ module JamWebsockets end end else - @log.debug "Can't route message: no user connected with id #{user_id}" + #@log.debug "Can't route message: no user connected with id #{user_id}" # too chatty end end @@ -205,19 +210,19 @@ module JamWebsockets msg = Jampb::ClientMessage.parse(msg) - @log.debug "client-directed message received from #{msg.from} to client #{client_id}" + @log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE unless client.nil? EM.schedule do - @log.debug "sending client-directed down websocket to #{client_id}" + @log.debug "sending client-directed down websocket to #{client_id}" unless msg.type == ClientMessage::Type::PEER_MESSAGE send_to_client(client, msg) end else @log.debug "client-directed message unroutable to disconnected client #{client_id}" end else - @log.debug "Can't route message: no client connected with id #{client_id}" + #@log.debug "Can't route message: no client connected with id #{client_id}" this happens all the time in multi-websocket scenarios end end @@ -329,7 +334,7 @@ module JamWebsockets def send_to_client(client, msg) - @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK + @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" unless msg.type == ClientMessage::Type::HEARTBEAT_ACK || msg.type == ClientMessage::Type::PEER_MESSAGE if client.encode_json client.send(msg.to_json.to_s) else @@ -371,66 +376,6 @@ module JamWebsockets end end - def cleanup_clients_with_ids(expired_connections) - expired_connections.each do |expired_connection| - cid = expired_connection[:client_id] - - client_context = @client_lookup[cid] - - if client_context - Diagnostic.expired_stale_connection(client_context.user.id, client_context) - cleanup_client(client_context.client) - end - - music_session = nil - recording_id = nil - user = nil - - # remove this connection from the database - ConnectionManager.active_record_transaction do |mgr| - mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| - @log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}" - Notification.send_friend_update(user_id, false, conn) if count == 0 - music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil? - user = User.find_by_id(user_id) unless user_id.nil? - recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one - recording_id = recording.id unless recording.nil? - music_session.with_lock do # VRFS-1297 - music_session.tick_track_changes - end if music_session - } - end - - if user && music_session - Notification.send_session_depart(music_session, cid, user, recording_id) - end - end - end - - # removes all resources associated with a client - def cleanup_client(client) - client.close - - @semaphore.synchronize do - pending = client.context.nil? # presence of context implies this connection has been logged into - - if pending - @log.debug "cleaned up not-logged-in client #{client}" - else - @log.debug "cleanup up logged-in client #{client}" - - context = @clients.delete(client) - - if context - remove_client(client.client_id) - remove_user(context) - else - @log.warn "skipping duplicate cleanup attempt of logged-in client" - end - end - end - end - def route(client_msg, client) message_type = @message_factory.get_message_type(client_msg) if message_type.nil? @@ -438,7 +383,9 @@ module JamWebsockets raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? end - @log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT + @message_stats[message_type] = @message_stats[message_type].to_i + 1 + + @log.debug("msg received #{message_type}") if client_msg.type != ClientMessage::Type::HEARTBEAT && client_msg.type != ClientMessage::Type::HEARTBEAT_ACK && client_msg.type != ClientMessage::Type::PEER_MESSAGE if client_msg.route_to.nil? Diagnostic.missing_route_to(client.user_id, client_msg) @@ -535,7 +482,9 @@ module JamWebsockets channel_id: client.channel_id, ip_address: remote_ip, connection_stale_time: connection_stale_time, - connection_expire_time: connection_expire_time}) + connection_expire_time: connection_expire_time, + gateway: @gateway_name + }) if latency_tester.errors.any? @log.warn "unable to log in latency_tester with errors: #{latency_tester.errors.inspect}" raise SessionError, "invalid login: #{latency_tester.errors.inspect}" @@ -639,7 +588,7 @@ module JamWebsockets recording_id = nil ConnectionManager.active_record_transaction do |connection_manager| - music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable) + music_session_id, reconnected = connection_manager.reconnect(connection, client.channel_id, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) if music_session_id.nil? # if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before. @@ -675,7 +624,7 @@ module JamWebsockets unless connection # log this connection in the database ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable) do |conn, count| + connection_manager.create_connection(user.id, client.client_id, client.channel_id, remote_ip, client_type, connection_stale_time, connection_expire_time, udp_reachable, @gateway_name) do |conn, count| user.update_addr_loc(Connection.find_by_client_id(client.client_id), User::JAM_REASON_LOGIN) if count == 1 Notification.send_friend_update(user.id, true, conn) @@ -737,7 +686,7 @@ module JamWebsockets if connection.stale? ConnectionManager.active_record_transaction do |connection_manager| heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(context.user, context.client_type) - connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil) + connection_manager.reconnect(connection, client.channel_id, connection.music_session_id, nil, connection_stale_time, connection_expire_time, nil, @gateway_name) end end end @@ -883,7 +832,7 @@ module JamWebsockets # populate routing data client_msg.from = client.client_id - @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" + @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" unless client_msg.type == ClientMessage::Type::PEER_MESSAGE # put it on the topic exchange for clients @clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}}) @@ -937,6 +886,135 @@ module JamWebsockets Socket.unpack_sockaddr_in(client.get_peername)[1] end + def periodical_flag_connections + # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.flag_stale_connections(@gateway_name) + end + end + + def periodical_check_clients + # it's possible that a client will not be represented in the database anymore, due to hard to trace/guess scenario + # usually involve reconnects. Double-check that all clients in memory are actually in the database. if not, delete them from memory + if @client_lookup.length == 0 + return + end + + client_ids = @client_lookup.map { | client_id, info | "('#{client_id}')" }.join(',') + + # find all client_id's that do not have a row in the db, and whack them + # this style of query does the following: https://gist.github.com/sethcall/15308ccde298bff74584 + sql = "WITH app_client_ids(client_id) AS (VALUES#{client_ids}) + SELECT client_id from app_client_ids WHERE client_id NOT IN (SELECT client_id FROM connections WHERE gateway = '#{@gateway_name}'); + " + ConnectionManager.active_record_transaction do |connection_manager| + conn = connection_manager.pg_conn + conn.exec(sql) do |result| + result.each { |row| + client_id = row['client_id'] + context = @client_lookup[client_id] + if context + @log.debug("cleaning up missing client #{client_id}, #{context.user}") + cleanup_client(context.client) + else + @log.error("could not clean up missing client #{client_id}") + end + } + end + end + end + + def periodical_check_connections + # this method is designed to be called periodically (every few seconds) + # in which this gateway instance will check only its own clients for their health + + # since each gateway checks only the clients it knows about, this allows us to deploy + # n gateways that don't know much about each other. + + # each gateway marks each connection row with it's gateway ID (so each gateway needs it's own ID or bad things happen) + + # we also have a global resque job that checks for connections that appear to be not controlled by any gateway + # to make sure that we have stale connections cleaned up, even in the case of gateways that have crashed or are buggy + + clients = [] + ConnectionManager.active_record_transaction do |connection_manager| + clients = connection_manager.stale_connection_client_ids(@gateway_name) + end + cleanup_clients_with_ids(clients) + end + + def periodical_stats_dump + # assume 60 seconds per status dump + stats = @message_stats.sort_by{|k,v| -v} + stats.map { |i| i[1] = (i[1] / 60.0).round(2) } + + @log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', ')); + @message_stats.clear + end + + def cleanup_clients_with_ids(expired_connections) + expired_connections.each do |expired_connection| + cid = expired_connection[:client_id] + + client_context = @client_lookup[cid] + + if client_context + Diagnostic.expired_stale_connection(client_context.user.id, client_context) + cleanup_client(client_context.client) + end + + music_session = nil + recording_id = nil + user = nil + + # remove this connection from the database + ConnectionManager.active_record_transaction do |mgr| + mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| + @log.info "expiring stale connection client_id:#{cid}, user_id:#{user_id}" + Notification.send_friend_update(user_id, false, conn) if count == 0 + music_session = ActiveMusicSession.find_by_id(music_session_id) unless music_session_id.nil? + user = User.find_by_id(user_id) unless user_id.nil? + recording = music_session.stop_recording unless music_session.nil? # stop any ongoing recording, if there is one + recording_id = recording.id unless recording.nil? + if music_session + music_session.with_lock do # VRFS-1297 + music_session.tick_track_changes + end + end + } + end + + if user && music_session + Notification.send_session_depart(music_session, cid, user, recording_id) + end + end + end + + # removes all resources associated with a client + def cleanup_client(client) + client.close + + @semaphore.synchronize do + pending = client.context.nil? # presence of context implies this connection has been logged into + + if pending + @log.debug "cleaned up not-logged-in client #{client}" + else + @log.debug "cleanup up logged-in client #{client}" + + context = @clients.delete(client) + + if context + remove_client(client.client_id) + remove_user(context) + else + @log.warn "skipping duplicate cleanup attempt of logged-in client" + end + end + end + end + + private def sane_logging(&blk) diff --git a/websocket-gateway/lib/jam_websockets/server.rb b/websocket-gateway/lib/jam_websockets/server.rb index 7b25ea7c9..256c6b0bb 100644 --- a/websocket-gateway/lib/jam_websockets/server.rb +++ b/websocket-gateway/lib/jam_websockets/server.rb @@ -21,6 +21,7 @@ module JamWebsockets connect_time_stale_browser = options[:connect_time_stale_browser].to_i connect_time_expire_browser = options[:connect_time_expire_browser].to_i max_connections_per_user = options[:max_connections_per_user].to_i + gateway_name = options[:gateway_name] rabbitmq_host = options[:rabbitmq_host] rabbitmq_port = options[:rabbitmq_port].to_i calling_thread = options[:calling_thread] @@ -34,9 +35,11 @@ module JamWebsockets } EventMachine.run do - @router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user) do + @router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name) do start_connection_expiration + start_client_expiration start_connection_flagger + start_stats_dump start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug]) calling_thread.wakeup if calling_thread end @@ -55,7 +58,7 @@ module JamWebsockets def start_websocket_listener(listen_ip, port, trust_port, trust_check, emwebsocket_debug) EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws| - @log.info "new client #{ws}" + #@log.info "new client #{ws}" @router.new_client(ws, false) end EventMachine::WebSocket.run(:host => listen_ip, :port => trust_port, :debug => emwebsocket_debug) do |ws| @@ -74,35 +77,34 @@ module JamWebsockets def start_connection_expiration # one cleanup on startup - expire_stale_connections + @router.periodical_check_connections EventMachine::PeriodicTimer.new(2) do - sane_logging { expire_stale_connections } + sane_logging { @router.periodical_check_connections } end - end - def expire_stale_connections - clients = [] - ConnectionManager.active_record_transaction do |connection_manager| - clients = connection_manager.stale_connection_client_ids + def start_client_expiration + # one cleanup on startup + @router.periodical_check_clients + + EventMachine::PeriodicTimer.new(30) do + sane_logging { @router.periodical_check_clients } end - @router.cleanup_clients_with_ids(clients) end def start_connection_flagger # one cleanup on startup - flag_stale_connections + @router.periodical_flag_connections EventMachine::PeriodicTimer.new(2) do - sane_logging { flag_stale_connections } + sane_logging { @router.periodical_flag_connections } end end - def flag_stale_connections() - # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") - ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.flag_stale_connections + def start_stats_dump + EventMachine::PeriodicTimer.new(60) do + @router.periodical_stats_dump end end diff --git a/websocket-gateway/script/package/post-install.sh b/websocket-gateway/script/package/post-install.sh index fb09f8d7f..2f794e45a 100755 --- a/websocket-gateway/script/package/post-install.sh +++ b/websocket-gateway/script/package/post-install.sh @@ -11,5 +11,7 @@ GROUP="$NAME" cp /var/lib/$NAME/script/package/$NAME.conf /etc/init/$NAME.conf mkdir -p /var/lib/$NAME/log +mkdir -p /var/log/$NAME chown -R $USER:$GROUP /var/lib/$NAME +chown -R $USER:$GROUP /var/log/$NAME diff --git a/websocket-gateway/script/package/upstart-run.sh b/websocket-gateway/script/package/upstart-run.sh index 11ffffb06..0df30a0e7 100755 --- a/websocket-gateway/script/package/upstart-run.sh +++ b/websocket-gateway/script/package/upstart-run.sh @@ -1,17 +1,31 @@ #!/bin/bash -l -# default config values -BUILD_NUMBER=`cat /var/lib/websocket-gateway/BUILD_NUMBER` +JAM_INSTANCE="$1" -CONFIG_FILE="/etc/websocket-gateway/upstart.conf" -if [ -e "$CONFIG_FILE" ]; then - . "$CONFIG_FILE" -fi +usage() +{ + echo "pass one numerical argument representing the instance of this websocket-gateway" + exit 0 +} -# I don't like doing this, but the next command (bundle exec) retouches/generates -# the gemfile. This unfortunately means the next debian update doesn't update this file. -# Ultimately this means an old Gemfile.lock is left behind for a new package, -# and bundle won't run because it thinks it has the wrong versions of gems -rm -f Gemfile.lock +main() +{ + # default config values + BUILD_NUMBER=`cat /var/lib/websocket-gateway/BUILD_NUMBER` + + CONFIG_FILE="/etc/websocket-gateway/upstart.conf" + if [ -e "$CONFIG_FILE" ]; then + . "$CONFIG_FILE" + fi + + # I don't like doing this, but the next command (bundle exec) retouches/generates + # the gemfile. This unfortunately means the next debian update doesn't update this file. + # Ultimately this means an old Gemfile.lock is left behind for a new package, + # and bundle won't run because it thinks it has the wrong versions of gems + rm -f Gemfile.lock + + JAM_INSTANCE=$JAM_INSTANCE BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway +} + +[ "$#" -ne 1 ] && ( usage && exit 1 ) || main -BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway diff --git a/websocket-gateway/script/package/websocket-gateway.conf b/websocket-gateway/script/package/websocket-gateway.conf index 106ea2c7d..14ef2b1da 100755 --- a/websocket-gateway/script/package/websocket-gateway.conf +++ b/websocket-gateway/script/package/websocket-gateway.conf @@ -1,18 +1,22 @@ description "websocket-gateway" -start on startup -start on runlevel [2345] -stop on runlevel [016] +#start on startup +#start on runlevel [2345] +# stop on runlevel [016] +stop on stopping gateways + limit nofile 20000 20000 limit core unlimited unlimited respawn respawn limit 10 5 +instance $N + pre-start script set -e mkdir -p /var/run/websocket-gateway chown websocket-gateway:websocket-gateway /var/run/websocket-gateway end script -exec start-stop-daemon --start --chuid websocket-gateway:websocket-gateway --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh +exec start-stop-daemon --start --chuid websocket-gateway:websocket-gateway --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh $N diff --git a/websocket-gateway/spec/factories.rb b/websocket-gateway/spec/factories.rb index f73abe6f8..501c386db 100644 --- a/websocket-gateway/spec/factories.rb +++ b/websocket-gateway/spec/factories.rb @@ -91,6 +91,7 @@ FactoryGirl.define do ip_address '1.1.1.1' as_musician true client_type 'client' + gateway 'gateway1' scoring_timeout Time.now sequence(:channel_id) { |n| "Channel#{n}"} end diff --git a/websocket-gateway/spec/jam_websockets/router_spec.rb b/websocket-gateway/spec/jam_websockets/router_spec.rb index 269cca773..99e7a2f2f 100644 --- a/websocket-gateway/spec/jam_websockets/router_spec.rb +++ b/websocket-gateway/spec/jam_websockets/router_spec.rb @@ -114,6 +114,7 @@ describe Router do @router.max_connections_per_user = 10 @router.heartbeat_interval_browser = @router.connect_time_stale_browser / 2 @router.amqp_connection_manager = AmqpConnectionManager.new(true, 4, host: 'localhost', port: 5672) + @router.gateway_name = 'gateway1' end subject { @router } @@ -122,6 +123,41 @@ describe Router do end + describe "periodical_check_clients" do + let(:user) { FactoryGirl.create(:user) } + it "with no data" do + @router.client_lookup.length.should == 0 + @router.periodical_check_clients + done + end + + it "with one OK client" do + client = double("client") + client.should_receive(:context=).any_number_of_times + conn1 = FactoryGirl.create(:connection, :user => user, :client_id => "pc1") + @router.add_tracker(user, client, 'client', conn1.client_id) + @router.client_lookup[conn1.client_id].should_not be_nil + @router.periodical_check_clients + @router.client_lookup[conn1.client_id].should_not be_nil + done + end + + it "with one missing client" do + client = double("client") + client.should_receive(:context=).any_number_of_times + context = ClientContext.new(user, client, "client") + client.should_receive(:context).any_number_of_times.and_return(context) + client.should_receive(:close) + conn1 = FactoryGirl.create(:connection, :user => user, :client_id => "pc1") + client.should_receive(:client_id).and_return(conn1.client_id) + @router.add_tracker(user, client, 'client', conn1.client_id) + conn1.delete + @router.client_lookup[conn1.client_id].should_not be_nil + @router.periodical_check_clients + @router.client_lookup[conn1.client_id].should be_nil + done + end + end describe "serviceability" do it "should start and stop", :mq => true do