From d83a632d0b8dfb631f8579426b89c6909ee6f176 Mon Sep 17 00:00:00 2001 From: Seth Call Date: Thu, 18 Dec 2014 15:13:55 -0600 Subject: [PATCH] Conflicts: db/manifest web/app/assets/javascripts/jam_rest.js --- admin/app/admin/icecast_bootstrap.rb | 8 +- db/manifest | 3 +- db/up/icecast_source_changes.sql | 16 ++ pb/src/client_container.proto | 37 +++- ruby/lib/jam_ruby.rb | 2 + ruby/lib/jam_ruby/connection_manager.rb | 4 +- ruby/lib/jam_ruby/lib/em_helper.rb | 5 + ruby/lib/jam_ruby/lib/subscription_message.rb | 25 +++ ruby/lib/jam_ruby/message_factory.rb | 14 ++ ruby/lib/jam_ruby/models/connection.rb | 6 +- ruby/lib/jam_ruby/models/icecast_mount.rb | 177 +++++++++++++++- .../jam_ruby/models/icecast_source_change.rb | 27 +++ ruby/lib/jam_ruby/models/notification.rb | 6 + ruby/lib/jam_ruby/mq_router.rb | 13 +- .../resque/scheduled/icecast_source_check.rb | 12 +- ruby/spec/factories.rb | 10 +- .../jam_ruby/models/icecast_mount_spec.rb | 120 +++++++++++ .../models/icecast_source_change_spec.rb | 20 ++ ruby/spec/jam_ruby/mq_router_spec.rb | 3 + ruby/spec/jam_ruby/resque/audiomixer_spec.rb | 2 + .../resque/icecast_source_check_spec.rb | 17 +- ruby/spec/jam_ruby/resque/quick_mixer_spec.rb | 2 + ruby/spec/support/utilities.rb | 8 + .../assets/javascripts/AAB_message_factory.js | 19 ++ web/app/assets/javascripts/JamServer.js | 11 + web/app/assets/javascripts/application.js | 1 + web/app/assets/javascripts/backend_alerts.js | 12 ++ web/app/assets/javascripts/globals.js | 5 +- web/app/assets/javascripts/jam_rest.js | 26 +++ .../javascripts/jquery.listenbroadcast.js | 200 +++++++++++++++++- web/app/assets/javascripts/sessionModel.js | 54 +++++ .../javascripts/subscription_utils.js.coffee | 128 +++++++++++ web/app/assets/javascripts/web/sessions.js | 4 +- web/app/assets/javascripts/web/web.js | 1 + web/app/assets/stylesheets/client/client.css | 1 + .../client/listenBroadcast.css.scss | 111 ++++++++++ web/app/assets/stylesheets/web/web.css | 1 + web/app/controllers/api_icecast_controller.rb | 63 +++++- web/app/controllers/spikes_controller.rb | 7 + web/app/views/api_icecast/show.rabl | 19 ++ .../views/api_icecast/show_source_change.rabl | 7 + .../source_change_notification.rabl | 13 ++ .../views/clients/_listenBroadcast.html.slim | 57 +++++ web/app/views/clients/index.html.erb | 1 + web/app/views/layouts/web.html.erb | 1 + web/config/application.rb | 6 +- web/config/initializers/rabl_init.rb | 9 +- web/config/routes.rb | 3 + web/config/scheduler.yml | 5 +- web/lib/tasks/start.rake | 2 +- .../api_icecast_controller_spec.rb | 23 ++ web/spec/factories.rb | 2 +- web/spec/javascripts/sessionLatency.spec.js | 2 +- web/spec/spec_helper.rb | 2 +- .../lib/jam_websockets/router.rb | 134 +++++++++++- 55 files changed, 1414 insertions(+), 53 deletions(-) create mode 100644 db/up/icecast_source_changes.sql create mode 100644 ruby/lib/jam_ruby/lib/subscription_message.rb create mode 100644 ruby/lib/jam_ruby/models/icecast_source_change.rb create mode 100644 ruby/spec/jam_ruby/models/icecast_source_change_spec.rb create mode 100644 web/app/assets/javascripts/subscription_utils.js.coffee create mode 100644 web/app/assets/stylesheets/client/listenBroadcast.css.scss create mode 100644 web/app/views/api_icecast/show.rabl create mode 100644 web/app/views/api_icecast/show_source_change.rabl create mode 100644 web/app/views/api_icecast/source_change_notification.rabl create mode 100644 web/app/views/clients/_listenBroadcast.html.slim create mode 100644 web/spec/controllers/api_icecast_controller_spec.rb diff --git a/admin/app/admin/icecast_bootstrap.rb b/admin/app/admin/icecast_bootstrap.rb index e07af5f20..73bf7aa46 100644 --- a/admin/app/admin/icecast_bootstrap.rb +++ b/admin/app/admin/icecast_bootstrap.rb @@ -43,10 +43,10 @@ ActiveAdmin.register_page "Bootstrap" do admin_auth.save! path = IcecastPath.new - path.base_dir = '/usr/local/Cellar/icecast/2.3.3/share/icecast' - path.log_dir = '/usr/local/Cellar/icecast/2.3.3/var/log/icecast' - path.web_root = '/usr/local/Cellar/icecast/2.3.3/share/icecast/web' - path.admin_root = '/usr/local/Cellar/icecast/2.3.3/share/icecast/admin' + path.base_dir = '/usr/local/Cellar/icecast/2.4.1/share/icecast' + path.log_dir = '/usr/local/Cellar/icecast/2.4.1/var/log/icecast' + path.web_root = '/usr/local/Cellar/icecast/2.4.1/share/icecast/web' + path.admin_root = '/usr/local/Cellar/icecast/2.4.1/share/icecast/admin' path.pid_file = nil path.save! diff --git a/db/manifest b/db/manifest index 396ce5b48..6b25b087c 100755 --- a/db/manifest +++ b/db/manifest @@ -229,6 +229,7 @@ deletable_recordings.sql jam_tracks.sql shopping_carts.sql recurly.sql -add_track_resource_id.sql +add_track_resource_id.sql user_genres.sql user_online.sql +icecast_source_changes.sql diff --git a/db/up/icecast_source_changes.sql b/db/up/icecast_source_changes.sql new file mode 100644 index 000000000..7c8cd7f14 --- /dev/null +++ b/db/up/icecast_source_changes.sql @@ -0,0 +1,16 @@ +-- track extra detail about if the source went up or down +CREATE UNLOGGED TABLE icecast_source_changes ( + id VARCHAR(64) PRIMARY KEY DEFAULT uuid_generate_v4(), + source_direction BOOLEAN NOT NULL, + change_type VARCHAR(64) NOT NULL, + user_id VARCHAR(64), + client_id VARCHAR(64), + success BOOLEAN NOT NULL, + reason VARCHAR, + detail VARCHAR, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + icecast_mount_id VARCHAR(64) NOT NULL REFERENCES icecast_mounts(id) ON DELETE CASCADE +); + +-- track when the source_direction +ALTER TABLE icecast_mounts ADD COLUMN source_direction BOOLEAN NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/pb/src/client_container.proto b/pb/src/client_container.proto index 901797515..b339465c7 100644 --- a/pb/src/client_container.proto +++ b/pb/src/client_container.proto @@ -17,6 +17,10 @@ message ClientMessage { LEAVE_MUSIC_SESSION_ACK = 125; HEARTBEAT = 130; HEARTBEAT_ACK = 135; + SUBSCRIBE = 136; + UNSUBSCRIBE = 137; + SUBSCRIPTION_MESSAGE = 138; + SUBSCRIBE_BULK = 139; // friend notifications FRIEND_UPDATE = 140; @@ -73,7 +77,7 @@ message ClientMessage { SOURCE_DOWN_REQUESTED = 251; SOURCE_UP = 252; SOURCE_DOWN = 253; - + TEST_SESSION_MESSAGE = 295; PING_REQUEST = 300; @@ -115,6 +119,10 @@ message ClientMessage { optional LeaveMusicSessionAck leave_music_session_ack = 125; optional Heartbeat heartbeat = 130; optional HeartbeatAck heartbeat_ack = 135; + optional Subscribe subscribe = 136; + optional Unsubscribe unsubscribe = 137; + optional SubscriptionMessage subscription_message = 138; + optional SubscribeBulk subscribe_bulk = 139; // friend notifications optional FriendUpdate friend_update = 140; // from server to all friends of user @@ -586,6 +594,33 @@ message SourceDown { optional string music_session = 1; // music session id } +message SubscriptionMessage { + optional string type = 1; // the type of the subscription + optional string id = 2; // data about what to subscribe to, specifically + optional string body = 3; // this is a JSON string; let the browser decode it +} + +message Subscribe { + optional string type = 1; // the type of the subscription + optional string id = 2; // data about what to subscribe to, specifically +} + +message Unsubscribe { + optional string type = 1; // the type of the subscription + optional string id = 2; // data about what to subscribe to, specifically +} + +message Subscription { + optional string type = 1; // the type of the subscription + optional string id = 2; // data about what to subscribe to, specifically +} + +message SubscribeBulk { + repeated string types = 1; + repeated string ids = 2; + //repeated Subscription subscriptions = 1; # the ruby protocol buffer library chokes on this. so we have to do the above +} + // route_to: session // a test message used by ruby-client currently. just gives way to send out to rest of session message TestSessionMessage { diff --git a/ruby/lib/jam_ruby.rb b/ruby/lib/jam_ruby.rb index ac0ad6c52..cfe15366d 100755 --- a/ruby/lib/jam_ruby.rb +++ b/ruby/lib/jam_ruby.rb @@ -77,6 +77,7 @@ require "jam_ruby/app/uploaders/jam_track_track_uploader" require "jam_ruby/app/uploaders/max_mind_release_uploader" require "jam_ruby/lib/desk_multipass" require "jam_ruby/lib/ip" +require "jam_ruby/lib/subscription_message" require "jam_ruby/amqp/amqp_connection_manager" require "jam_ruby/database" require "jam_ruby/message_factory" @@ -144,6 +145,7 @@ require "jam_ruby/models/icecast_listen_socket" require "jam_ruby/models/icecast_logging" require "jam_ruby/models/icecast_master_server_relay" require "jam_ruby/models/icecast_mount" +require "jam_ruby/models/icecast_source_change" require "jam_ruby/models/icecast_path" require "jam_ruby/models/icecast_relay" require "jam_ruby/models/icecast_security" diff --git a/ruby/lib/jam_ruby/connection_manager.rb b/ruby/lib/jam_ruby/connection_manager.rb index c2f22df30..8cb2709d8 100644 --- a/ruby/lib/jam_ruby/connection_manager.rb +++ b/ruby/lib/jam_ruby/connection_manager.rb @@ -389,7 +389,9 @@ SQL end def lock_connections(conn) - conn.exec("LOCK connections IN EXCLUSIVE MODE").clear + if APP_CONFIG.lock_connections + conn.exec("LOCK connections IN EXCLUSIVE MODE").clear + end end # def associate_tracks(connection, tracks) diff --git a/ruby/lib/jam_ruby/lib/em_helper.rb b/ruby/lib/jam_ruby/lib/em_helper.rb index 592f97e7b..ba6e582e8 100644 --- a/ruby/lib/jam_ruby/lib/em_helper.rb +++ b/ruby/lib/jam_ruby/lib/em_helper.rb @@ -53,6 +53,11 @@ module JamWebEventMachine @@log.debug("#{exchange.name} is ready to go") MQRouter.user_exchange = exchange end + + AMQP::Exchange.new(channel, :topic, "subscriptions") do |exchange| + @@log.debug("#{exchange.name} is ready to go") + MQRouter.subscription_exchange = exchange + end end ran[:ran] = true diff --git a/ruby/lib/jam_ruby/lib/subscription_message.rb b/ruby/lib/jam_ruby/lib/subscription_message.rb new file mode 100644 index 000000000..62409b6aa --- /dev/null +++ b/ruby/lib/jam_ruby/lib/subscription_message.rb @@ -0,0 +1,25 @@ +module JamRuby + + class SubscriptionMessage + + # sent whenever we change the desired direction of the source + def self.mount_source_direction(mount) + Notification.send_subscription_message('mount', mount.id, {type: 'source_direction_change', source_direction: true}.to_json) + end + + # sent whenever we receive an update about a sourcing attempt by a client + # source_change_json is created by a rabl, so currently only possible to use via web + def self.mount_source_change(mount, source_change_json) + Notification.send_subscription_message('mount', mount.id, source_change_json) + end + + def self.mount_source_up_requested(mount) + Notification.send_subscription_message('mount', mount.id, {type: 'source_up_requested'}.to_json ) + end + + def self.mount_source_down_requested(mount) + Notification.send_subscription_message('mount', mount.id, {type: 'source_down_requested'}.to_json ) + end + end +end + diff --git a/ruby/lib/jam_ruby/message_factory.rb b/ruby/lib/jam_ruby/message_factory.rb index 6d1d34fb7..19be72de7 100644 --- a/ruby/lib/jam_ruby/message_factory.rb +++ b/ruby/lib/jam_ruby/message_factory.rb @@ -993,6 +993,20 @@ module JamRuby ) end + def subscription_message(type, id, body) + subscription_message = Jampb::SubscriptionMessage.new( + id: id, + type: type, + body: body + ) + + Jampb::ClientMessage.new( + :type => ClientMessage::Type::SUBSCRIPTION_MESSAGE, + :route_to => CLIENT_TARGET, + :subscription_message => subscription_message, + ) + end + #################################################### # is this message directed to the server? diff --git a/ruby/lib/jam_ruby/models/connection.rb b/ruby/lib/jam_ruby/models/connection.rb index 018901e32..105741a97 100644 --- a/ruby/lib/jam_ruby/models/connection.rb +++ b/ruby/lib/jam_ruby/models/connection.rb @@ -153,9 +153,9 @@ module JamRuby end def report_add_participant - if self.music_session_id_changed? && - self.music_session.present? && - self.connected? && + if self.music_session_id_changed? && + self.music_session.present? && + self.connected? && self.as_musician? && 0 < (count = self.music_session.connected_participant_count) GoogleAnalyticsEvent.report_session_participant(count) diff --git a/ruby/lib/jam_ruby/models/icecast_mount.rb b/ruby/lib/jam_ruby/models/icecast_mount.rb index 4e07dc786..29f3a6888 100644 --- a/ruby/lib/jam_ruby/models/icecast_mount.rb +++ b/ruby/lib/jam_ruby/models/icecast_mount.rb @@ -10,12 +10,16 @@ module JamRuby :music_session_id, :icecast_server_id, :icecast_mount_template_id, :listeners, :sourced, :sourced_needs_changing_at, as: :admin + attr_accessor :no_config_changed + belongs_to :authentication, class_name: "JamRuby::IcecastUserAuthentication", inverse_of: :mount, :foreign_key => 'authentication_id' belongs_to :music_session, class_name: "JamRuby::ActiveMusicSession", inverse_of: :mount, foreign_key: 'music_session_id' belongs_to :server, class_name: "JamRuby::IcecastServer", inverse_of: :mounts, foreign_key: 'icecast_server_id' belongs_to :mount_template, class_name: "JamRuby::IcecastMountTemplate", inverse_of: :mounts, foreign_key: 'icecast_mount_template_id' + has_many :source_changes, class_name: "JamRuby::IcecastSourceChange", inverse_of: :mount, foreign_key: 'icecast_mount_id', order: 'created_at DESC' + validates :name, presence: true, uniqueness: true validates :source_username, length: {minimum: 5}, if: lambda {|m| m.source_username.present?} validates :source_pass, length: {minimum: 5}, if: lambda {|m| m.source_pass.present?} @@ -33,7 +37,7 @@ module JamRuby before_save :sanitize_active_admin after_save :after_save - after_save :poke_config + #after_save :poke_config before_destroy :poke_config def name_has_correct_format @@ -45,7 +49,7 @@ module JamRuby end def after_save - server.update_attribute(:config_changed, 1) + server.update_attribute(:config_changed, 1) unless no_config_changed if !sourced_was && sourced @@ -59,9 +63,13 @@ module JamRuby end - if listeners_was == 0 && listeners > 0 && !sourced - # listener count went above 0 and there is no source. ask the musician clients to source - notify_source_up_requested + if source_direction_was != source_direction + # temporarily removed; it seems better to leave all the data in for now. It should never be that much + # if the requested source direction has changed, then delete diagnostic info + #IcecastSourceChange.delete_all(["icecast_mount_id = ?", self.id]) if source_direction + + # and tell anyone listening that the direction has changed + SubscriptionMessage.mount_source_direction(self) end # Note: @@ -90,10 +98,110 @@ module JamRuby mount end + def fail_state(reason, detail = nil) + {success:false, reason: reason, detail: detail} + end + + def success_state(reason, detail = nil) + {success:true, reason: reason, detail: detail} + end + + # success messages: + # * source_up - we are broadcasting - detail is the source user or empty. if empty you should refresh state in a few seconds + # * source_down - we are broadcasting - detail is the last source user, or empty. if empty you should refresh state in a few seconds + # * transition_up/down - a source change occurred recently, and we don't yet have any info from the client. you should refresh state in a few seconds + # + # failure messages: + # * multiple_clients + # * unknown - represents a code error + # * transition_timeout_up - a source change for up has occurred, but no clients have reported any info + # * transition_timeout_down - a source change for up has occurred, but no clients have reported any info + # * source_wrong_up - the source should be up, but it could not succeed according to the most recent effort. + # * source_wrong_down - the source should be down, but it could not succeed according to the most recent effort. + # + # for both source_wrong_up and source_wrong_down, valid detail values: + # * 'no_client' if no client has said anything to a request after some amount of time, or otherwise it's client defined + # client-defined reason values + # * 'initialize_singleton' - code error in the client + # * 'initialize_thread' - code error in the client + # * 'initialize_ogg' - could not initialize ogg encoder... likely code error + # * 'initialize_mp3' - could not initialize mp3 encoder... likely code error + # * 'initialize_socket' - could not initialize socket... likely code error + # * 'icecast_response' - icecast was not accessible or returned an error + + # * TODO client defined + def state + begin + result = fail_state('unknown') + + if sourced == should_source? + + first = source_changes.first + + # don't check source_changes if actual source state mirrors desired source state... just say we are good, and pass down relevant sourcing user ID if present + result = success_state('source_' + (source_direction ? 'up' : 'down'), first.nil? ? nil : first.user_id) + + elsif source_changes.count > 0 + # if the desired source direction is up, but we haven't sourced yet... let's try and find out why + + # let's first see if we have N clients contributing, which is a code error condition + clients = Hash[source_changes.map { |source_change| [source_change.client_id, source_change] }] + + #if clients.length > 1 + # this means more than one client has contributed... this is an code error condition + # result = fail_state('multiple_clients', clients) + #else + first = source_changes.first + + if first.source_direction == should_source? + if first.success? + # the last message from any client indicated we had the right source + # if less than a second has passed, this is not strange. But more than that, it's strange + if sourced_needs_changing_at.nil? || (Time.now - first.created_at < APP_CONFIG.source_changes_missing_secs) + result = success_state('transition_' + (source_direction ? 'up' : 'down')) + else + result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down')) + end + else + # so the last state indicated by the client agrees that our source info is wrong; we can use it's data to augment the frontend + result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), first.reason) + end + else + # if the last message from the client is for the wrong source direction (meaning no attempt to change source state by client to the correct state) + # then report this info + + if sourced_needs_changing_at.nil? || (Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs) + result = fail_state('transition_' + (source_direction ? 'up' : 'down')) + else + result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), 'no_client') # no client implies no client has attempted to source + end + end + #end + else + # we have the wrong source direction, but no source change info. + + # if less than a second has passed, this is not strange. But more than that, it's strange + if sourced_needs_changing_at.nil? + #result = fail_state('db_error', 'sourced_needs_changing_at is nil') + result = success_state('transition_' + (source_direction ? 'up' : 'down')) + elsif Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs + result = success_state('transition_' + (source_direction ? 'up' : 'down'), 'initial') + else + result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'), 'initial') + end + end + + rescue Exception => e + @@log.error("exception in IcecastMount.state #{e}") + end + result + end + def source_up with_lock do self.sourced = true self.sourced_needs_changing_at = nil + self.no_config_changed = true save(validate: false) end end @@ -102,17 +210,28 @@ module JamRuby with_lock do self.sourced = false self.sourced_needs_changing_at = nil + self.no_config_changed = true save(validate: false) end end + def should_source? + self.listeners > 0 + end + def listener_add with_lock do - self.sourced_needs_changing_at = Time.now if listeners == 0 + if listeners == 0 && !sourced && (self.sourced_needs_changing_at.nil? || Time.now - self.sourced_needs_changing_at > APP_CONFIG.source_changes_missing_secs) + # enough time has elapsed since the last time the source direction changed to instaneously request a source up + + # listener count went above 0 and there is no source. ask the musician clients to source + notify_source_up_requested + end # this is completely unsafe without that 'with_lock' statement above self.listeners = self.listeners + 1 + self.no_config_changed = true save(validate: false) end end @@ -129,31 +248,67 @@ module JamRuby # this is completely unsafe without that 'with_lock' statement above self.listeners = self.listeners - 1 + self.no_config_changed = true save(validations: false) end end def notify_source_up_requested - Notification.send_source_up_requested(music_session, + if music_session_id + self.sourced_needs_changing_at = Time.now + self.source_direction = true + self.no_config_changed = true + save! + + source_change = IcecastSourceChange.new + source_change.source_direction = true + source_change.success = true + source_change.mount = self + source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_UP_REQUEST + source_change.save! + + Notification.send_source_up_requested(music_session, server.hostname, server.pick_listen_socket(:port), name, resolve_string(:source_username), resolve_string(:source_pass), - resolve_int(:bitrate)) if music_session_id + resolve_int(:bitrate)) + SubscriptionMessage.mount_source_up_requested(self) + end end def notify_source_down_requested - Notification.send_source_down_requested(music_session, name) + if music_session_id + self.sourced_needs_changing_at = Time.now + self.source_direction = false + self.no_config_changed = true + save! + + source_change = IcecastSourceChange.new + source_change.source_direction = false + source_change.success = true + source_change.mount = self + source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_DOWN_REQUEST + source_change.save! + + Notification.send_source_down_requested(music_session, name) + + SubscriptionMessage.mount_source_down_requested(self) + end end def notify_source_up - Notification.send_source_up(music_session) if music_session_id + if music_session_id + Notification.send_source_up(music_session) + end end def notify_source_down - Notification.send_source_down(music_session) if music_session_id + if music_session_id + Notification.send_source_down(music_session) + end end # Check if the icecast_mount specifies the value; if not, use the mount_template's value take effect diff --git a/ruby/lib/jam_ruby/models/icecast_source_change.rb b/ruby/lib/jam_ruby/models/icecast_source_change.rb new file mode 100644 index 000000000..335e8594a --- /dev/null +++ b/ruby/lib/jam_ruby/models/icecast_source_change.rb @@ -0,0 +1,27 @@ +module JamRuby + class IcecastSourceChange < ActiveRecord::Base + + @@log = Logging.logger[IcecastSourceChange] + + CHANGE_TYPE_CLIENT = 'client' + CHANGE_TYPE_MOUNT_UP_REQUEST = 'source_up_request' + CHANGE_TYPE_MOUNT_DOWN_REQUEST = 'source_down_request' + + + belongs_to :mount, class_name: "JamRuby::IcecastMount", inverse_of: :source_changes, foreign_key: 'icecast_mount_id' + belongs_to :user, class_name: "JamRuby::User" + + validates :source_direction, inclusion: {:in => [true, false]} + validates :success, inclusion: {:in => [true, false]} + validates :reason, length: {minimum: 0, maximum:255} + validates :detail, length: {minimum: 0, maximum:10000} + validates :user, presence:true, :if => :is_client_change? + validates :client_id, presence: true, :if => :is_client_change? + validates :mount, presence:true + validates :change_type, inclusion: {:in => [CHANGE_TYPE_CLIENT, CHANGE_TYPE_MOUNT_DOWN_REQUEST, CHANGE_TYPE_MOUNT_UP_REQUEST]} + + def is_client_change? + change_type == CHANGE_TYPE_CLIENT + end + end +end diff --git a/ruby/lib/jam_ruby/models/notification.rb b/ruby/lib/jam_ruby/models/notification.rb index da0ff9881..79187c3a2 100644 --- a/ruby/lib/jam_ruby/models/notification.rb +++ b/ruby/lib/jam_ruby/models/notification.rb @@ -1376,6 +1376,12 @@ module JamRuby @@mq_router.server_publish_to_everyone_in_session(music_session, msg) end + + def send_subscription_message(type, id, body) + msg = @@message_factory.subscription_message(type, id, body) + + @@mq_router.publish_to_subscription(type, id, msg) + end end private diff --git a/ruby/lib/jam_ruby/mq_router.rb b/ruby/lib/jam_ruby/mq_router.rb index 8f4efc828..826eb1ace 100644 --- a/ruby/lib/jam_ruby/mq_router.rb +++ b/ruby/lib/jam_ruby/mq_router.rb @@ -7,7 +7,7 @@ class MQRouter # but ultimately there are internal static state variables to represent global MQ exchange connections class << self - attr_accessor :client_exchange, :user_exchange + attr_accessor :client_exchange, :user_exchange, :subscription_exchange @@log = Logging.logger[MQRouter] end @@ -112,4 +112,15 @@ class MQRouter end end end + + def publish_to_subscription(type, id, msg) + @@log.error "EM not running in publish_to_subscription" unless EM.reactor_running? + + EM.schedule do + routing_key = "subscription.#{type}.#{id}" + @@log.debug "publishing to #{routing_key} from server" + # put it on the topic exchange for users + self.class.subscription_exchange.publish(msg, :routing_key => routing_key) + end + end end \ No newline at end of file diff --git a/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb b/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb index 66f01c179..b81c5b992 100644 --- a/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb +++ b/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb @@ -31,8 +31,13 @@ module JamRuby end - def run # if we have seen that sourced_needs_changing_at is older than 15 seconds ago, re-poke clients in the session - IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount| + def run + # we need to find any mount in the following conditions and then poke it with a websocket msg. here are the conditions: + # * (if sourced_needs_changing_at has gone too far in the past OR sourced_needs_changing_at IS NULL) AND + # ** listeners > 0 and sourced is DOWN (false) + # ** listeners == 0 and sourced is UP (true) + + IcecastMount.find_each(lock: true, :conditions => "( (listeners > 0 AND sourced = FALSE) OR (listeners = 0 AND sourced = TRUE) ) AND ( sourced_needs_changing_at IS NULL OR sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second') ) ", :batch_size => 100) do |mount| if mount.music_session_id mount.with_lock do handle_notifications(mount) @@ -46,16 +51,13 @@ module JamRuby # if no listeners, but we are sourced, then ask it to stop sourcing @@log.debug("SOURCE_DOWN_REQUEST called on mount #{mount.name}") - mount.update_attribute(:sourced_needs_changing_at, Time.now) # we send out a source request, so we need to update the time mount.notify_source_down_requested elsif mount.listeners > 0 && !mount.sourced # if we have some listeners, and still are not sourced, then ask to start sourcing again @@log.debug("SOURCE_UP_REQUEST called on mount #{mount.name}") - mount.update_attribute(:sourced_needs_changing_at, Time.now) # we send out a source request, so we need to update the time mount.notify_source_up_requested - end end end diff --git a/ruby/spec/factories.rb b/ruby/spec/factories.rb index 0e1730e09..ecd89c450 100644 --- a/ruby/spec/factories.rb +++ b/ruby/spec/factories.rb @@ -420,7 +420,7 @@ FactoryGirl.define do end factory :icecast_mount, :class => JamRuby::IcecastMount do - name "/" + Faker::Lorem.characters(10) + sequence(:name) { |n| "/mount_#{n}" } source_username Faker::Lorem.characters(10) source_pass Faker::Lorem.characters(10) max_listeners 100 @@ -447,7 +447,15 @@ FactoryGirl.define do end end end + end + factory :icecast_source_change, :class => JamRuby::IcecastSourceChange do + source_direction true + success true + sequence(:client_id) { |n| "client_id#{n}" } + change_type JamRuby::IcecastSourceChange::CHANGE_TYPE_CLIENT + association :user, :factory => :user + association :mount, :factory => :iceast_mount_with_music_session end diff --git a/ruby/spec/jam_ruby/models/icecast_mount_spec.rb b/ruby/spec/jam_ruby/models/icecast_mount_spec.rb index 69028ad03..847e9697c 100644 --- a/ruby/spec/jam_ruby/models/icecast_mount_spec.rb +++ b/ruby/spec/jam_ruby/models/icecast_mount_spec.rb @@ -245,4 +245,124 @@ describe IcecastMount do end end + describe "source_changes are deleted if source_direction transitions" do + + + it "no change if same source_direction" do + mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: true) + change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true) + + mount.source_changes.size.should == 1 + mount.source_direction = true + mount.save! + mount = IcecastMount.find(mount.id) + mount.source_changes.size.should == 1 + end + + it "NOT deleted on transition to down" do + mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: true) + change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true) + + mount.source_changes.size.should == 1 + mount.source_direction = false + mount.save! + mount = IcecastMount.find(mount.id) + mount.source_changes.size.should == 1 + end + + it "not deleted on transition to up" do + mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: false) + change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true) + + + mount.source_changes.size.should == 1 + mount.source_direction = true + mount.save! + mount.reload + mount.source_changes.size.should == 1 + end + end + describe "state" do + + def success_state(reason, detail = nil) + {success: true, reason: reason, detail:detail} + end + + def fail_state(reason, detail = nil) + {success: false, reason: reason, detail:detail} + end + + let(:sourced_mount) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, sourced_needs_changing_at: nil, listeners: 1)} + let(:mount_needing_source) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, sourced_needs_changing_at: Time.now, listeners: 1)} + + it "happy sourced mount" do + sourced_mount.state.should == success_state('source_down') + end + + it "just transitioned" do + mount_needing_source.state.should == success_state('transition_down', 'initial') + end + + it "transition timeout" do + #change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true) + mount_needing_source.sourced_needs_changing_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago + mount_needing_source.save! + + mount_needing_source.state.should == fail_state('transition_timeout_down', 'initial') + end + + describe "client attempted transition and" do + let!(:change1) { FactoryGirl.create(:icecast_source_change, mount: mount_needing_source, source_direction: true, success:true) } + + it "happy sourced mount" do + mount_needing_source.sourced = true + mount_needing_source.save! + mount_needing_source.state.should == success_state('source_down', change1.user.id) + end + + it "succeeded recently in correct transition" do + mount_needing_source.state.should == success_state('transition_down') + end + + it "succeeded a while ago in correct transition" do + change1.update_attribute(:created_at, (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago) + mount_needing_source.state.should == fail_state('transition_timeout_down') + end + + it "failed recently in correct transition" do + change1.success = false + change1.reason = 'bleh' + change1.source_direction = true + change1.save! + mount_needing_source.state.should == fail_state('source_wrong_down', change1.reason) + end + + it "failed a while ago in correct transition" do + change1.success = false + change1.reason = 'bleh' + change1.source_direction = true + change1.created_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago + change1.save! + mount_needing_source.state.should == fail_state('source_wrong_down', change1.reason) + end + + it "failed recently in wrong transition" do + change1.success = false + change1.reason = 'bleh' + change1.source_direction = false + change1.save! + mount_needing_source.state.should == fail_state('transition_down') + end + + it "failed a while ago in wrong transition" do + change1.success = false + change1.reason = 'bleh' + change1.source_direction = false + change1.save! + mount_needing_source.sourced_needs_changing_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago + mount_needing_source.save! + mount_needing_source.state.should == fail_state('source_wrong_down', 'no_client') + end + end + end end \ No newline at end of file diff --git a/ruby/spec/jam_ruby/models/icecast_source_change_spec.rb b/ruby/spec/jam_ruby/models/icecast_source_change_spec.rb new file mode 100644 index 000000000..a2f6fc0e5 --- /dev/null +++ b/ruby/spec/jam_ruby/models/icecast_source_change_spec.rb @@ -0,0 +1,20 @@ +require 'spec_helper' + +describe IcecastSourceChange do + + let(:change) { FactoryGirl.create(:icecast_source_change) } + + it "validates" do + bad_change = IcecastSourceChange.new + bad_change.save.should be_false + bad_change.errors[:change_type].should == ["is not included in the list"] + bad_change.errors[:source_direction].should == ["is not included in the list"] + bad_change.errors[:success].should == ["is not included in the list"] + bad_change.errors[:mount].should == ["can't be blank"] + end + + it "success" do + change.touch + end + +end diff --git a/ruby/spec/jam_ruby/mq_router_spec.rb b/ruby/spec/jam_ruby/mq_router_spec.rb index 3122af502..662f16d2e 100644 --- a/ruby/spec/jam_ruby/mq_router_spec.rb +++ b/ruby/spec/jam_ruby/mq_router_spec.rb @@ -25,11 +25,13 @@ describe MQRouter do before(:all) do @original_client_exchange = MQRouter.client_exchange @original_user_exchange = MQRouter.user_exchange + @original_subscription_exchange = MQRouter.subscription_exchange end after(:all) do MQRouter.client_exchange = @original_client_exchange MQRouter.user_exchange = @original_user_exchange + MQRouter.subscription_exchange = @original_subscription_exchange end it "user_publish_to_session works (checking exchange callbacks)" do @@ -59,6 +61,7 @@ describe MQRouter do # mock up exchange MQRouter.client_exchange = double("client_exchange") MQRouter.user_exchange = double("user_exchange") + MQRouter.subscription_exchange = double("subscription_exchange") MQRouter.client_exchange.should_receive(:publish).with("a message", :routing_key => "client.#{music_session_member2.client_id}") MQRouter.user_exchange.should_not_receive(:publish) diff --git a/ruby/spec/jam_ruby/resque/audiomixer_spec.rb b/ruby/spec/jam_ruby/resque/audiomixer_spec.rb index ab4ed7875..9e26581de 100644 --- a/ruby/spec/jam_ruby/resque/audiomixer_spec.rb +++ b/ruby/spec/jam_ruby/resque/audiomixer_spec.rb @@ -44,8 +44,10 @@ describe AudioMixer do MQRouter.client_exchange = double("client_exchange") MQRouter.user_exchange = double("user_exchange") + MQRouter.subscription_exchange = double("subscription_exchange") MQRouter.client_exchange.should_receive(:publish).any_number_of_times MQRouter.user_exchange.should_receive(:publish).any_number_of_times + MQRouter.subscription_exchange.should_receive(:publish).any_number_of_times end diff --git a/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb b/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb index 2c528bf6d..83be6d612 100644 --- a/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb +++ b/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb @@ -15,9 +15,15 @@ describe IcecastSourceCheck do end - it "find no mounts if source_hanged timestamp is nil and listeners = 1/sourced = false" do - mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, listeners: 1) - check.should_not_receive(:handle_notifications) + it "find a mounts if sourced_needs_changing_at is nil and listeners = 1/sourced = false" do + mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, listeners: 1, sourced_needs_changing_at: nil) + check.should_receive(:handle_notifications).once + check.run + end + + it "find a mount if source_changed timestamp is nil and listeners = 0/sourced = true" do + mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, listeners: 0, sourced_needs_changing_at: nil) + check.should_receive(:handle_notifications).once check.run end @@ -27,6 +33,7 @@ describe IcecastSourceCheck do check.run end + it "find no mounts if source_changed timestamp is very recent and listeners = 1/sourced = false" do mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced_needs_changing_at: Time.now, sourced: false, listeners: 1) check.should_not_receive(:handle_notifications) @@ -92,7 +99,7 @@ describe IcecastSourceCheck do check.run end - it "resets source_changed_at when a notification is sent out" do + it "does not resets source_changed_at when a notification is sent out" do mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced_needs_changing_at: 2.days.ago, sourced:false, listeners: 1) check.stub(:handle_notifications) do |mount| mount.should_not_receive(:notify_source_down_requested) @@ -104,7 +111,7 @@ describe IcecastSourceCheck do end check.run mount.reload - (mount.sourced_needs_changing_at.to_i - Time.now.to_i).abs.should < 10 # less than 5 seconds -- just a little slop for a very slow build server + (2.days.ago - mount.sourced_needs_changing_at).to_i.abs.should < 10 # less than 5 seconds -- just a little slop for a very slow build server end end end diff --git a/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb b/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb index 9a622ce1a..f7d877ef3 100644 --- a/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb +++ b/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb @@ -17,8 +17,10 @@ describe AudioMixer do MQRouter.client_exchange = double("client_exchange") MQRouter.user_exchange = double("user_exchange") + MQRouter.subscription_exchange = double("subscription_exchange") MQRouter.client_exchange.should_receive(:publish).any_number_of_times MQRouter.user_exchange.should_receive(:publish).any_number_of_times + MQRouter.subscription_exchange.should_receive(:publish).any_number_of_times end def assert_found_job_count(expected) diff --git a/ruby/spec/support/utilities.rb b/ruby/spec/support/utilities.rb index 48a525fe0..8dd934198 100644 --- a/ruby/spec/support/utilities.rb +++ b/ruby/spec/support/utilities.rb @@ -142,6 +142,14 @@ def app_config 7 end + def source_changes_missing_secs + 1 + end + + def lock_connections + false + end + private def audiomixer_workspace_path diff --git a/web/app/assets/javascripts/AAB_message_factory.js b/web/app/assets/javascripts/AAB_message_factory.js index a2c1b3a37..2a502f43e 100644 --- a/web/app/assets/javascripts/AAB_message_factory.js +++ b/web/app/assets/javascripts/AAB_message_factory.js @@ -16,6 +16,10 @@ LEAVE_MUSIC_SESSION_ACK : "LEAVE_MUSIC_SESSION_ACK", HEARTBEAT : "HEARTBEAT", HEARTBEAT_ACK : "HEARTBEAT_ACK", + SUBSCRIBE : "SUBSCRIBE", + UNSUBSCRIBE : "UNSUBSCRIBE", + SUBSCRIPTION_MESSAGE : "SUBSCRIPTION_MESSAGE", + SUBSCRIBE_BULK : "SUBSCRIBE_BULK", // friend notifications FRIEND_UPDATE : "FRIEND_UPDATE", @@ -168,6 +172,21 @@ return result; }; + factory.subscribe = function(type, id) { + var subscribe_msg = {type: type, id: id} + return client_container(msg.SUBSCRIBE, route_to.SERVER, subscribe_msg); + } + + factory.subscribeBulk = function(types, ids) { + var subscribe_bulk_msg = {types: types, ids: ids} + return client_container(msg.SUBSCRIBE_BULK, route_to.SERVER, subscribe_bulk_msg); + } + + factory.unsubscribe = function(type, id) { + var unsubscribe_msg = {type: type, id: id} + return client_container(msg.UNSUBSCRIBE, route_to.SERVER, unsubscribe_msg); + } + context.JK.MessageFactory = factory; })(window, jQuery); \ No newline at end of file diff --git a/web/app/assets/javascripts/JamServer.js b/web/app/assets/javascripts/JamServer.js index 040490d5f..f60ea07d0 100644 --- a/web/app/assets/javascripts/JamServer.js +++ b/web/app/assets/javascripts/JamServer.js @@ -10,6 +10,7 @@ var logger = context.JK.logger; var msg_factory = context.JK.MessageFactory; var rest = context.JK.Rest(); + var EVENTS = context.JK.EVENTS; // Let socket.io know where WebSocketMain.swf is context.WEB_SOCKET_SWF_LOCATION = "assets/flash/WebSocketMain.swf"; @@ -89,6 +90,11 @@ // handles logic if the websocket connection closes, and if it was in error then also prompt for reconnect function closedCleanup(in_error) { + + if(server.connected) { + $self.triggerHandler(EVENTS.CONNECTION_DOWN); + } + server.connected = false; server.connecting = false; @@ -229,6 +235,7 @@ heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000); lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat connectDeferred.resolve(); + $self.triggerHandler(EVENTS.CONNECTION_UP) activeElementEvent('afterConnect', payload); @@ -739,6 +746,10 @@ } }); + server.get$Server = function() { + return $self; + } + context.JK.JamServer = server; // Callbacks from jamClient diff --git a/web/app/assets/javascripts/application.js b/web/app/assets/javascripts/application.js index b971a1d5e..037c2c1b7 100644 --- a/web/app/assets/javascripts/application.js +++ b/web/app/assets/javascripts/application.js @@ -43,6 +43,7 @@ //= require AAB_message_factory //= require jam_rest //= require utils +//= require subscription_utils //= require custom_controls //= require_directory . //= require_directory ./dialog diff --git a/web/app/assets/javascripts/backend_alerts.js b/web/app/assets/javascripts/backend_alerts.js index 74c7047cc..d1c9628e1 100644 --- a/web/app/assets/javascripts/backend_alerts.js +++ b/web/app/assets/javascripts/backend_alerts.js @@ -106,6 +106,18 @@ if(context.JK.CurrentSessionModel) context.JK.CurrentSessionModel.onWindowBackgrounded(type, text); } + else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_FAIL) { + if(context.JK.CurrentSessionModel) + context.JK.CurrentSessionModel.onBroadcastFailure(type, text); + } + else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_ACTIVE) { + if(context.JK.CurrentSessionModel) + context.JK.CurrentSessionModel.onBroadcastSuccess(type, text); + } + else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_STOPPED) { + if(context.JK.CurrentSessionModel) + context.JK.CurrentSessionModel.onBroadcastStopped(type, text); + } else if((!context.JK.CurrentSessionModel || !context.JK.CurrentSessionModel.inSession()) && (ALERT_NAMES.INPUT_IO_RATE == type || ALERT_NAMES.INPUT_IO_JTR == type || ALERT_NAMES.OUTPUT_IO_RATE == type || ALERT_NAMES.OUTPUT_IO_JTR== type)) { // squelch these events if not in session diff --git a/web/app/assets/javascripts/globals.js b/web/app/assets/javascripts/globals.js index 05b1f62f4..4f68a3125 100644 --- a/web/app/assets/javascripts/globals.js +++ b/web/app/assets/javascripts/globals.js @@ -41,7 +41,10 @@ FILE_MANAGER_CMD_PROGRESS : 'file_manager_cmd_progress', FILE_MANAGER_CMD_ASAP_UPDATE : 'file_manager_cmd_asap_update', MIXER_MODE_CHANGED : 'mixer_mode_changed', - MUTE_SELECTED: 'mute_selected' + MUTE_SELECTED: 'mute_selected', + SUBSCRIBE_NOTIFICATION: 'subscribe_notification', + CONNECTION_UP: 'connection_up', + CONNECTION_DOWN: 'connection_down' }; context.JK.ALERT_NAMES = { diff --git a/web/app/assets/javascripts/jam_rest.js b/web/app/assets/javascripts/jam_rest.js index 3e7eac08d..1be8d9bab 100644 --- a/web/app/assets/javascripts/jam_rest.js +++ b/web/app/assets/javascripts/jam_rest.js @@ -1396,6 +1396,27 @@ }); } + function getMount(options) { + var id = getId(options); + return $.ajax({ + type: "GET", + url: '/api/icecast/mount/' + id, + dataType: "json", + contentType: 'application/json' + }); + } + + function createSourceChange(options) { + var mountId = options['mount_id']; + + return $.ajax({ + type: "POST", + url: '/api/icecast/mount/' + mountId + '/source_change', + dataType: "json", + contentType: 'application/json', + data: JSON.stringify(options), + }); + } function initialize() { return self; @@ -1519,7 +1540,12 @@ this.updateBillingInfo = updateBillingInfo; this.placeOrder = placeOrder; this.searchMusicians = searchMusicians; +<<<<<<< HEAD this.resendBandInvitation = resendBandInvitation; +======= + this.getMount = getMount; + this.createSourceChange = createSourceChange; +>>>>>>> feature/broadcast_slow return this; }; diff --git a/web/app/assets/javascripts/jquery.listenbroadcast.js b/web/app/assets/javascripts/jquery.listenbroadcast.js index 5468a3236..d8dba3e4a 100644 --- a/web/app/assets/javascripts/jquery.listenbroadcast.js +++ b/web/app/assets/javascripts/jquery.listenbroadcast.js @@ -17,6 +17,11 @@ */ context.JK = context.JK || {}; + context.JK.ListenBroadcastCurrentlyPlaying = null; + context.JK.$templateListenBroadcastState = $('#template-listen-broadcast-state'); + context.JK.$templateListenBroadcastDetail = $('#template-listen-broadcast-detail') + + // the purpose of this code is to simplify the interaction between user and server // it provides methods to call on user events (primarily play/pause), and will fire // events (such as 'stream_gone'). This element does nothing with UI; only fires events for you to handle @@ -41,6 +46,8 @@ var fanAccess = null; var retryAttempts = 0; var self = this; + var mountInfo = null; + var $mountState = null; var destroyed = false; @@ -155,7 +162,9 @@ playState = newState; - clearBufferTimeout(); + if(newState != PlayStateStalled) { + clearBufferTimeout(); + } if( playState == PlayStateNone || playState == PlayStateEnded || @@ -173,6 +182,8 @@ function noBuffer() { + waitForBufferingTimeout = null; + if(retryAttempts >= RETRY_ATTEMPTS) { logger.log("never received indication of buffering or playing"); transition(PlayStateFailedStart); @@ -345,7 +356,17 @@ displayText = 'SESSION IN PROGRESS'; } else if(playState == 'stalled') { - displayText = 'RECONNECTING'; + + // in Chrome, we've observed that a stalled can occur very early, before you've received any audio + // it's better to leave the display text as what it would be if juts retrying + if(!waitForBufferingTimeout) { + displayText = 'RECONNECTING'; + } + else { + if(retryAttempts == 2) { + displayText = 'STILL TRYING, HANG ON'; + } + } } else if(playState == 'ended') { displayText = 'STREAM DISCONNECTED'; @@ -444,8 +465,179 @@ $audio.bind('progress', onProgress); } + function broadcastDetailPreShow(box) { + var $box = $(box) + $mountState = $box.find('.listen-broadcast-state') + + updateMountInfo(mountInfo) + updateMountDetails(mountInfo); + } + + function updateMountInfo(mount) { + var $summary = $mountState.find('.summary') + var $status = $summary.find('.status.' + mount.state.reason) + + if ($status.length == 0) { + // can't find this state + $status = $summary.find('.status.unknown') + $summary.attr('data-status', 'unknown') + $status.find('.msg').text ("unknown state: " + mount.state.reason) + return; + } + + //var $msg = $status.find('.msg'); + + $summary.attr('data-status', mount.state.reason) + } + + + function updateMountDetails(mountInfo) { + + var $detailHolder = $mountState.find('.detail-items') + + context._.each(mountInfo.source_changes, function(sourceChange) { + + var $detail = addSourceChange($detailHolder, sourceChange); + if($detail) { + $detailHolder.append($detail); + } + }) + } + + function onDetailEvent(e, data) { + if($mountState == null) return; + + logger.debug("subscription notification received: type:" + data.type) + + updateMountInfo(data.body.mount); + + var id = data.id + var type = data.type + + if(type == 'mount') { + + var $detailHolder = $mountState.find('.detail-items') + var msgType = data.body.type; + + var sourceChange = data.body.source_change; + if(sourceChange) { + var $detail = addSourceChange($detailHolder, sourceChange) + if ($detail) { + $detailHolder.prepend($detail) + $detail.hide().slideDown() + } + } + } + } + + + function addSourceChange($detailHolder, sourceChange) { + var $detail= $(context._.template($('#template-listen-broadcast-detail').html(), {}, {variable: 'data'})); + + // if this is already in the dom, don't add it + if($detailHolder.find('.listen-broadcast-detail[data-id="' + sourceChange.id + '"]').length > 0) { + return null; + } + + $detail.attr('data-id', sourceChange.id) + + var $detailText = $detail.find('.detail'); + var $who = $detail.find('.who') + + if(sourceChange.change_type == 'source_up_request') { + $who.html('Server') + $detailText.text('Start Broadcast Requested') + } + else if(sourceChange.change_type == 'source_down_request') { + $who.text('Server') + $detailText.text('Start Broadcast Requested') + } + else { + + $who.attr('hoveraction', 'musician').attr('user-id', sourceChange.user.id).addClass('avatar-tiny') + $who.html(''); + context.JK.bindHoverEvents($who); + + if (sourceChange.source_direction) { + // up + if (sourceChange.success) { + $detailText.text(sourceChange.user.name + ' Started Broadcasting') + } + else { + if (sourceChange.reason == 'no_client') { + $detailText.text('No Client Responded') + } + else if (sourceChange.reason == 'initialize_singleton') { + $detailText.text('Init Failure (S)') + } + else if (sourceChange.reason == 'initialize_thread') { + $detailText.text('Init Failure (T)') + } + else if (sourceChange.reason == 'initialize_ogg') { + $detailText.text('Init Failure (O)') + } + else if (sourceChange.reason == 'initialize_mp3') { + $detailText.text('Init Failure (M)') + } + else if (sourceChange.reason == 'initialize_socket') { + $detailText.text('Init Failure (Client)') + } + else if (sourceChange.reason == 'icecast_response') { + $detailText.text('Init Failure (Server)') + } + else { + $detailText.text('Unknown Failure') + } + } + } + else { + $detailText.text(sourceChange.user.name + ' Stopped Broadcasting'); + } + } + return $detail; + } + + function bindHoverDetail() { + + var stateHolderHtml = context._.template($('#template-listen-broadcast-state').html(), {}, {variable: 'data'}); + context.JK.hoverBubble($parent, stateHolderHtml, {trigger: 'none', preShow: broadcastDetailPreShow, positions:['bottom'], width:'300px', height:'250px'}) + + $parent.hoverIntent({ + over: function() { + checkServer().done(function(response) { + var mountId = response.mount ? response.mount.id : null + + if(mountId) { + rest.getMount({id: mountId}) + .done(function (mount) { + mountInfo = mount + $parent.data('mount-id', mountId) + context.JK.SubscriptionUtils.subscribe('mount', mountId).on(context.JK.EVENTS.SUBSCRIBE_NOTIFICATION, onDetailEvent) + $parent.btOn() + }) + .fail(context.JK.app.ajaxError) + } + else { + mountInfo = null; + context.JK.app.layout.notify('This session can not currently broadcast') + } + }) + .fail(function() { + logger.debug("session is over") + }) + }, + out: function() { + var mountId = $parent.data('mount-id') + context.JK.SubscriptionUtils.unsubscribe('mount', mountId) + $parent.btOff(); + $mountState = null; + mountInfo = null; + } + }) + } function initialize() { + musicSessionId = $parent.attr('data-music-session'); if(!musicSessionId) throw "data-music-session must be specified on $parentElement"; @@ -453,6 +645,8 @@ if(fanAccess === null) throw 'fan-access must be specified in $parentElement'; fanAccess = $parent.attr('fan-access') === 'true' // coerce to boolean + bindHoverDetail(); + $audio = $('audio', $parent); if($audio.length == 0) { @@ -479,8 +673,6 @@ return this; } - context.JK.ListenBroadcastCurrentlyPlaying = null; - $.fn.listenBroadcast = function(options) { new context.JK.ListenBroadcast(this, options); } diff --git a/web/app/assets/javascripts/sessionModel.js b/web/app/assets/javascripts/sessionModel.js index f55b6d8d1..0dd38f007 100644 --- a/web/app/assets/javascripts/sessionModel.js +++ b/web/app/assets/javascripts/sessionModel.js @@ -576,6 +576,57 @@ } } + function onBroadcastSuccess(type, text) { + logger.debug("SESSION_LIVEBROADCAST_ACTIVE alert. reason:" + text); + + if(currentSession && currentSession.mount) { + rest.createSourceChange({ + mount_id: currentSession.mount.id, + source_direction: true, + success: true, + reason: text, + client_id: app.clientId + }) + } + else { + logger.debug("unable to report source change because no mount seen on session") + } + } + + function onBroadcastFailure(type, text) { + logger.debug("SESSION_LIVEBROADCAST_FAIL alert. reason:" + text); + + if(currentSession && currentSession.mount) { + rest.createSourceChange({ + mount_id: currentSession.mount.id, + source_direction: true, + success: false, + reason: text, + client_id: app.clientId + }) + } + else { + logger.debug("unable to report source change because no mount seen on session") + } + } + + function onBroadcastStopped(type, text) { + logger.debug("SESSION_LIVEBROADCAST_STOPPED alert. reason:" + text); + + if(currentSession && currentSession.mount) { + rest.createSourceChange({ + mount_id: currentSession.mount.id, + source_direction: false, + success: true, + reason: text, + client_id: app.clientId + }) + } + else { + logger.debug("unable to report source change because no mount seen on session") + } + } + function onBackendMixerChanged(type, text) { logger.debug("BACKEND_MIXER_CHANGE alert. reason:" + text); @@ -680,6 +731,9 @@ this.onDeadUserRemove = onDeadUserRemove; this.onWindowBackgrounded = onWindowBackgrounded; this.waitForSessionPageEnterDone = waitForSessionPageEnterDone; + this.onBroadcastStopped = onBroadcastStopped; + this.onBroadcastSuccess = onBroadcastSuccess; + this.onBroadcastFailure = onBroadcastFailure; this.getCurrentSession = function() { return currentSession; diff --git a/web/app/assets/javascripts/subscription_utils.js.coffee b/web/app/assets/javascripts/subscription_utils.js.coffee new file mode 100644 index 000000000..afd7852ab --- /dev/null +++ b/web/app/assets/javascripts/subscription_utils.js.coffee @@ -0,0 +1,128 @@ +# +# Common utility functions to help deal with subscriptions to the server +# + +$ = jQuery +context = window +context.JK ||= {}; + +class SubscriptionUtils + constructor: () -> + @logger = context.JK.logger + @msgFactory = context.JK.MessageFactory; + @subscriptions = {} + @events = context.JK.EVENTS; + @reconnectRegistrationTimeout = null + + this.init() + + init: () => + # once JamKazam is fully loaded, watch for SUBSCRIPTION_MESSAGE events + $(document).on('JAMKAZAM_READY', this.registerWithGateway) + + + genKey:(type, id) => + type + ":" + id + + test: () => + this.subscribe('test', '1') + this.subscribe('test', '2') + + reregister: () => + # re-register all existing watches + @reconnectRegistrationTimeout = null + + if context.JK.dlen(@subscriptions) > 0 + bulkSubscribe = [] + types = [] + ids = [] + for key, watch of @subscriptions + bits = key.split(':') + type = bits[0] + id = bits[1] + types.push(type) + ids.push(id) +# bulkSubscribe.push({type: type, id:id}) + + # send a special message which contains all subscriptions in one message + msg = @msgFactory.subscribeBulk(types, ids) + context.JK.JamServer.send(msg) + + # whenever we reconnect, set a timer to automatically re-subscribe to any outstanding subscriptions + onConnectionUp: () => + #this.test() + + if @reconnectRegistrationTimeout? + clearTimeout(@reconnectRegistrationTimeout) + + if context.JK.dlen(@subscriptions) > 0 + @reconnectRegistrationTimeout = setTimeout(this.reregister, 1000) # re-register after 1 second, to prevent excessive messaging to server + + onConnectionDown: () => + if @reconnectRegistrationTimeout? + clearTimeout(@reconnectRegistrationTimeout) + + registerWithGateway: () => + + context.JK.JamServer.registerMessageCallback(context.JK.MessageType.SUBSCRIPTION_MESSAGE, this.onSubscriptionMessage); + + $server = context.JK.JamServer.get$Server() + + $server.on(@events.CONNECTION_UP, this.onConnectionUp) + $server.on(@events.CONNECTION_DOWN, this.onConnectionDown) + + onSubscriptionMessage: (header, payload) => + key = this.genKey(payload.type, payload.id) + + watch = @subscriptions[key] + + if watch + watch.triggerHandler(@events.SUBSCRIBE_NOTIFICATION, {type:payload.type, id: payload.id, body: JSON.parse(payload.body)}) + else + @logger.warn("unable to find subscription for #{key}") + + # call subscribe, and use the returned object to listen for events of name context.JK.EVENTS.SUBSCRIBE_NOTIFICATION + subscribe: (type, id) => + + key = this.genKey(type, id) + + @logger.debug("subscribing for any notifications for #{key}") + + watch = @subscriptions[key] + + unless watch? + watch = $({type: type, id: id}) + @subscriptions[key] = watch + + # tell server we want messages for this entity + msg = @msgFactory.subscribe(type, id) + context.JK.JamServer.send(msg) + + # watch can be used to watch for events using jquery + watch + + # TODO: this should not send a unsubscribe message to the server it's the last listener for the specific type/id combo + unsubscribe: (type, id) => + + key = this.genKey(type, id) + + @logger.debug("unsubscribing for any notifications for #{key}") + + watch = @subscriptions[key] + + delete @subscriptions[key] + + # tell server we don't want any more messages for this entity + msg = @msgFactory.unsubscribe(type, id) + context.JK.JamServer.send(msg) + + if watch + # unattach any events + watch.off(); + + + watch + + +# global instance +context.JK.SubscriptionUtils = new SubscriptionUtils() \ No newline at end of file diff --git a/web/app/assets/javascripts/web/sessions.js b/web/app/assets/javascripts/web/sessions.js index 95e64ca71..397795585 100644 --- a/web/app/assets/javascripts/web/sessions.js +++ b/web/app/assets/javascripts/web/sessions.js @@ -98,8 +98,8 @@ } function initialize(musicSessionId) { - $controls = $('.recording-controls'); - $status = $('.session-status') + $controls = $('.sessions-page .recording-controls'); + $status = $('.sessions-page .session-status') $('.timeago').timeago(); $controls.listenBroadcast(); diff --git a/web/app/assets/javascripts/web/web.js b/web/app/assets/javascripts/web/web.js index f3e91abd4..466533efd 100644 --- a/web/app/assets/javascripts/web/web.js +++ b/web/app/assets/javascripts/web/web.js @@ -43,6 +43,7 @@ //= require user_dropdown //= require jamkazam //= require utils +//= require subscription_utils //= require ui_helper //= require custom_controls //= require ga diff --git a/web/app/assets/stylesheets/client/client.css b/web/app/assets/stylesheets/client/client.css index e18189462..8403cf75a 100644 --- a/web/app/assets/stylesheets/client/client.css +++ b/web/app/assets/stylesheets/client/client.css @@ -59,6 +59,7 @@ *= require ./musician *= require ./help *= require ./jquery-ui-overrides + *= require ./listenBroadcast *= require web/audioWidgets *= require web/recordings *= require web/sessions diff --git a/web/app/assets/stylesheets/client/listenBroadcast.css.scss b/web/app/assets/stylesheets/client/listenBroadcast.css.scss new file mode 100644 index 000000000..e4d9495fb --- /dev/null +++ b/web/app/assets/stylesheets/client/listenBroadcast.css.scss @@ -0,0 +1,111 @@ +@import "client/common"; + +.listen-broadcast-state { + + .instructions { + font-size:12px; + margin:5px 0; + padding:3px; + width:100%; + @include border_box_sizing; + } + + .title { + margin:5px 0 5px; + text-align:center; + font-size:14px; + } + + .summary { + + font-size:14px; + + .user {display:none} + + .status { + @include border_box_sizing; + padding:5px; + background-color:$poor; + display:none; + width:100%; + margin-bottom:10px; + font-size:16px; + + &.source_up { background-color:$good;} + &.source_down { background-color:$good;} + &.transition_up { background-color:$good;} + &.transition_down { background-color:$good;} + } + + .msg { + text-align:center; + } + + &[data-status=source_up] {.source_up { display:inline-block; }} + &[data-status=source_down] {.source_down { display:inline-block; }} + &[data-status=source_wrong_up] {.source_up { display:inline-block; }} + &[data-status=source_wrong_down] {.source_up { display:inline-block; }} + &[data-status=transition_up] {.source_up { display:inline-block; }} + &[data-status=transition_down] {.source_up { display:inline-block; }} + &[data-status=transition_timeout_up] {.source_up { display:inline-block; }} + &[data-status=transition_timeout_down] {.source_up { display:inline-block; }} + &[data-status=unknown] {.unknown { display:inline-block; }} + &[data-status=loading] {.loading { display:inline-block; }} + &[data-status=db_error] {.db_error{ display:inline-block; }} + } + + .who-subtitle { + position:absolute; + text-align:center; + width:26px; + overflow:visible; + font-size:12px; + } + + .detail-subtitle { + text-align:center; + font-size:12px; + + span { + font-style:italic; + font-size:10px; + padding:3px; + } + } + + .listen-broadcast-detail { + position:relative; + background-color:black; + margin:5px 0; + padding:2px; + } + + .detail-items { + + height:85px; + overflow:auto; + } + + .details { + + height:135px; + + .title { + margin-bottom:2px; + } + + .who { + margin:0 !important; + position:absolute; + text-align:center; + overflow:visible; + vertical-align:middle; + line-height:26px; + } + .detail { + text-align:center; + line-height:26px; + vertical-align: middle; + } + } +} diff --git a/web/app/assets/stylesheets/web/web.css b/web/app/assets/stylesheets/web/web.css index dc01e969c..d91e79de2 100644 --- a/web/app/assets/stylesheets/web/web.css +++ b/web/app/assets/stylesheets/web/web.css @@ -12,6 +12,7 @@ *= require client/user_dropdown *= require client/hoverBubble *= require client/help +*= require client/listenBroadcast *= require web/main *= require web/footer *= require web/recordings diff --git a/web/app/controllers/api_icecast_controller.rb b/web/app/controllers/api_icecast_controller.rb index 8f03fb85c..7edd4fee5 100644 --- a/web/app/controllers/api_icecast_controller.rb +++ b/web/app/controllers/api_icecast_controller.rb @@ -1,11 +1,39 @@ +=begin + Summary of icecast implementation: + + mount lock locations: + * IcecastMount.listener_add + * IcecastMount.listener_remove + * IcecastMount.source_up + * IcecastMount.source_down + * IcecastSourceCheck on each stale IcecastMount + + sourced_needs_changing_at behavior: + * set to Time.now if first listener comes in (current listeners==0 as they join) + * set to Time.now if last listener leaves (current listeners == 1 as they leave) + * set to nil if source begins + * set to nil if source ends + + + * a session's clients gets requested to start a source if one of: + ** an IcecastMount is saved and listeners go above 0 and not currently sourced + ** an IcecastSourceCheck finds a IcecastMount with a stale sourced_needs_changing_at and mount.listeners > 0 && !mount.sourced + + * a session's clients get requested to stop a source if + ** an IcecastSourceCheck finds a IcecastMount with a stale sourced_needs_changing_at and mount.listeners == 0 && mount.sourced +=end + class ApiIcecastController < ApiController before_filter :local_only, :only => [:test] - before_filter :parse_mount, :except => [:test] + before_filter :parse_mount, :only => [:mount_add, :mount_remove, :listener_add, :listener_remove] + before_filter :api_signed_in_user, :only => [ :create_source_change ] # each request will have this in it, if it's icecast. #user-agent = Icecast 2.3.3 + respond_to :json + def test puts "========= GOT IT=======" render text: 'GOT IT', :status => :ok @@ -51,6 +79,38 @@ class ApiIcecastController < ApiController render text: '', :status => :ok end + # info reported by the client to the server letting us track what's going on at a deeper level + def create_source_change + mount = IcecastMount.find(params[:id]) + + @source_change = IcecastSourceChange.new + @source_change.source_direction = params[:source_direction] + @source_change.user = current_user + @source_change.client_id = params[:client_id] + @source_change.success = params[:success] + @source_change.reason = params[:reason] + @source_change.detail = params[:detail] + @source_change.mount = mount + @source_change.change_type = IcecastSourceChange::CHANGE_TYPE_CLIENT + @source_change.save + + if @source_change.errors.any? + response.status = :unprocessable_entity + respond_with @source_change + else + source_change_json = Rabl::Renderer.json(@source_change, 'api_icecast/source_change_notification') + SubscriptionMessage.mount_source_change(mount, source_change_json) + render :json => {}, :status => 200 + end + end + + def show + @mount = IcecastMount.find(params[:id]) + + puts "@MOUNT #{@mount}" + respond_with @mount, responder: ApiResponder + end + protected def local_only request.local? @@ -74,5 +134,4 @@ class ApiIcecastController < ApiController @mount_id = uri.path @mount_params = Rack::Utils.parse_query(uri.query) end - end diff --git a/web/app/controllers/spikes_controller.rb b/web/app/controllers/spikes_controller.rb index ff31a9511..b07987b4d 100644 --- a/web/app/controllers/spikes_controller.rb +++ b/web/app/controllers/spikes_controller.rb @@ -30,4 +30,11 @@ class SpikesController < ApplicationController def websocket render :layout => false end + + def subscription + + Notification.send_subscription_message('test', '1', '{"msg": "oh hai 1"}') + Notification.send_subscription_message('test', '2', '{"msg": "oh hai 2"}') + render text: 'oh hai' + end end diff --git a/web/app/views/api_icecast/show.rabl b/web/app/views/api_icecast/show.rabl new file mode 100644 index 000000000..b708847f9 --- /dev/null +++ b/web/app/views/api_icecast/show.rabl @@ -0,0 +1,19 @@ +object @mount + +attributes :id, :listeners, :source_direction, :sourced + +node :state do |mount| + mount.state +end + + +child(:source_changes => :source_changes) { + + attributes :id + + puts "source_change #{@object}" + + node do |source_change| + partial("api_icecast/show_source_change", :object => source_change) + end +} diff --git a/web/app/views/api_icecast/show_source_change.rabl b/web/app/views/api_icecast/show_source_change.rabl new file mode 100644 index 000000000..0bdea96ea --- /dev/null +++ b/web/app/views/api_icecast/show_source_change.rabl @@ -0,0 +1,7 @@ +object @source_change + +attributes :id, :source_direction, :client_id, :success, :reason, :detail, :created_at, :change_type + +node :user do |source_change| + partial("api_users/show_minimal", :object => source_change.user) +end diff --git a/web/app/views/api_icecast/source_change_notification.rabl b/web/app/views/api_icecast/source_change_notification.rabl new file mode 100644 index 000000000..012ae9ebf --- /dev/null +++ b/web/app/views/api_icecast/source_change_notification.rabl @@ -0,0 +1,13 @@ +object @source_change + +node do |source_change| + partial("api_icecast/show_source_change", :object => source_change) +end + +child(:mount) { |mount| + attributes :id, :listeners, :source_direction, :sourced + + node :state do |mount| + mount.state + end +} \ No newline at end of file diff --git a/web/app/views/clients/_listenBroadcast.html.slim b/web/app/views/clients/_listenBroadcast.html.slim new file mode 100644 index 000000000..844b481b5 --- /dev/null +++ b/web/app/views/clients/_listenBroadcast.html.slim @@ -0,0 +1,57 @@ +script type="text/template" id="template-listen-broadcast-state" + .listen-broadcast-state + .instructions + | Here you can see a greater amount of detail about the state of the broadcast. If you leave the hover window open, new activity related to broadcasts will show up in real-time. + .summary + .title + | Current Status + .source_up.status + .user + .msg + | Broadcasting + .source_down.status + .user + .msg + | Not Broadcasting + .source_wrong_up.status + .msg + | Broadcast Attempt Failed + .source_wrong_down.status + .msg + | Stop Broadcast Failed + .transition_up.status + .msg + | Initializing Broadcast + .transition_down.status + .msg + | Stopping Broadcast + .transition_timeout_up.status + .msg + | No One Broadcasting Yet + .transition_timeout_down.status + .msg + | Not Stopped Broadcasting + .unknown.status + .msg + | Unknown Status + .loading.status + .msg + | loading ... + .db_error.status + .msg + | db error + .details + .title + .who-subtitle + | Who + .detail-subtitle + | Details + span + | (most recent at top) + .detail-items + +script type="text/template" id="template-listen-broadcast-detail" + .listen-broadcast-detail + .who + .detail + diff --git a/web/app/views/clients/index.html.erb b/web/app/views/clients/index.html.erb index 9852b44d9..0fb57a248 100644 --- a/web/app/views/clients/index.html.erb +++ b/web/app/views/clients/index.html.erb @@ -62,6 +62,7 @@ <%= render "notify" %> <%= render "client_update" %> <%= render "overlay_small" %> +<%= render "listenBroadcast" %> <%= render "sync_viewer_templates" %> <%= render "help" %> <%= render 'dialogs/dialogs' %> diff --git a/web/app/views/layouts/web.html.erb b/web/app/views/layouts/web.html.erb index 62b8d4f7c..62c2f1ec6 100644 --- a/web/app/views/layouts/web.html.erb +++ b/web/app/views/layouts/web.html.erb @@ -78,6 +78,7 @@ <%= render "clients/hoverSession" %> <%= render "clients/hoverRecording" %> <%= render "clients/help" %> + <%= render "clients/listenBroadcast" %> <%= render 'dialogs/dialogs' %> diff --git a/web/config/application.rb b/web/config/application.rb index 4c502e07f..418d38545 100644 --- a/web/config/application.rb +++ b/web/config/application.rb @@ -116,6 +116,7 @@ if defined?(Bundler) config.websocket_gateway_uri = "ws://localhost:#{config.websocket_gateway_port}/websocket" config.websocket_gateway_trusted_uri = "ws://localhost:#{config.websocket_gateway_port + 1}/websocket" config.websocket_gateway_max_connections_per_user = 20 + config.lock_connections = false config.external_hostname = ENV['EXTERNAL_HOSTNAME'] || 'localhost' config.external_port = ENV['EXTERNAL_PORT'] || 3000 @@ -197,9 +198,10 @@ if defined?(Bundler) # this will be the qualifier on the IcecastConfigWorker queue name config.icecast_server_id = ENV['ICECAST_SERVER_ID'] || 'localhost' config.icecast_max_missing_check = 2 * 60 # 2 minutes - config.icecast_max_sourced_changed = 15 # 15 seconds + config.icecast_max_sourced_changed = 10 # 10 seconds config.icecast_hardcoded_source_password = nil # generate a new password everytim. production should always use this value - config.icecast_wait_after_reload = 5 # 5 seconds. a hack needed until VRFS-1043 + config.icecast_wait_after_reload = 0 # 0 seconds. a hack needed until VRFS-1043... maybe + config.source_changes_missing_secs = 2 # amount of time before we think it's odd that there are no source_change notifications config.email_alerts_alias = 'nobody@jamkazam.com' # should be used for 'oh no' server down/service down sorts of emails config.email_generic_from = 'nobody@jamkazam.com' diff --git a/web/config/initializers/rabl_init.rb b/web/config/initializers/rabl_init.rb index cfa6b995c..f03d6261c 100644 --- a/web/config/initializers/rabl_init.rb +++ b/web/config/initializers/rabl_init.rb @@ -1,3 +1,9 @@ +class PrettyJson + def self.dump(object) + JSON.pretty_generate(object, {:indent => " "}) + end +end + Rabl.configure do |config| # Commented as these are defaults # config.cache_all_output = false @@ -5,6 +11,7 @@ Rabl.configure do |config| # config.cache_engine = Rabl::CacheEngine.new # Defaults to Rails cache # config.escape_all_output = false # config.json_engine = nil # Any multi_json engines or a Class with #encode method + config.json_engine = PrettyJson if Rails.env.development? # config.msgpack_engine = nil # Defaults to ::MessagePack # config.bson_engine = nil # Defaults to ::BSON # config.plist_engine = nil # Defaults to ::Plist::Emit @@ -16,5 +23,5 @@ Rabl.configure do |config| config.include_child_root = false # config.enable_json_callbacks = false # config.xml_options = { :dasherize => true, :skip_types => false } - # config.view_paths = [] + config.view_paths << Rails.root.join('app/views') end \ No newline at end of file diff --git a/web/config/routes.rb b/web/config/routes.rb index d4e4b5556..2a6194284 100644 --- a/web/config/routes.rb +++ b/web/config/routes.rb @@ -91,6 +91,7 @@ SampleApp::Application.routes.draw do match '/facebook_invite', to: 'spikes#facebook_invite' match '/launch_app', to: 'spikes#launch_app' match '/websocket', to: 'spikes#websocket' + match '/test_subscription', to: 'spikes#subscription' # junk pages match '/help', to: 'static_pages#help' @@ -476,6 +477,8 @@ SampleApp::Application.routes.draw do match '/icecast/mount_remove' => 'api_icecast#mount_remove', :via => :post match '/icecast/listener_add' => 'api_icecast#listener_add', :via => :post match '/icecast/listener_remove' => 'api_icecast#listener_remove', :via => :post + match '/icecast/mount/:id' => 'api_icecast#show', :via => :get + match '/icecast/mount/:id/source_change' => 'api_icecast#create_source_change', :via => :post # tweet on behalf of client match '/twitter/tweet' => 'api_twitters#tweet', :via => :post diff --git a/web/config/scheduler.yml b/web/config/scheduler.yml index 9108a382d..cf5c1e80a 100644 --- a/web/config/scheduler.yml +++ b/web/config/scheduler.yml @@ -10,7 +10,8 @@ IcecastConfigRetry: description: "Finds icecast servers that have had their config_changed, but no IcecastConfigWriter check recently" IcecastSourceCheck: - cron: "10 * * * * *" + every: + - "7s" class: "JamRuby::IcecastSourceCheck" description: "Finds icecast mounts that need their 'sourced' state to change, but haven't in some time" @@ -20,7 +21,7 @@ CleanupFacebookSignup: description: "Deletes facebook_signups that are old" UnusedMusicNotationCleaner: - cron: "10 * * * * *" + cron: "0 * * * *" class: "JamRuby::UnusedMusicNotationCleaner" description: "Remove unused music notations" diff --git a/web/lib/tasks/start.rake b/web/lib/tasks/start.rake index 87cbce1a0..b53b25f6c 100644 --- a/web/lib/tasks/start.rake +++ b/web/lib/tasks/start.rake @@ -1,6 +1,6 @@ # this rake file is meant to hold shortcuts/helpers for starting onerous command line executions -# bunde exec rake all_jobs +# bundle exec rake all_jobs task :all_jobs do Rake::Task['environment'].invoke diff --git a/web/spec/controllers/api_icecast_controller_spec.rb b/web/spec/controllers/api_icecast_controller_spec.rb new file mode 100644 index 000000000..2cc8716ee --- /dev/null +++ b/web/spec/controllers/api_icecast_controller_spec.rb @@ -0,0 +1,23 @@ +require 'spec_helper' + +describe ApiIcecastController do + render_views + + let(:user) {FactoryGirl.create(:user) } + + before(:each) do + controller.current_user = user + end + + describe "create_source_change" do + let!(:mount) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, sourced_needs_changing_at: nil, listeners: 1)} + + it "success" do + + post :create_source_change, {format:'json', id: mount.id, success: true, source_direction: true, reason: 'blah', client_id: 'abc'} + + response.status.should == 200 + end + end + +end diff --git a/web/spec/factories.rb b/web/spec/factories.rb index 659733bcf..a1d3a1ef8 100644 --- a/web/spec/factories.rb +++ b/web/spec/factories.rb @@ -434,7 +434,7 @@ FactoryGirl.define do association :mount_template, :factory => :icecast_mount_template factory :iceast_mount_with_music_session do - association :music_session, :factory => :music_session + association :music_session, :factory => :active_music_session end end end diff --git a/web/spec/javascripts/sessionLatency.spec.js b/web/spec/javascripts/sessionLatency.spec.js index beb07a122..074efc76a 100644 --- a/web/spec/javascripts/sessionLatency.spec.js +++ b/web/spec/javascripts/sessionLatency.spec.js @@ -96,7 +96,7 @@ sessionLatency.subscribe('test', cb); }); it("should invoke callback on latency result", function() { - var cb = jasmine.createSpy("Latency Subscription Callback"); + var cb = jasmine.createSpy("Latency subscription.rb Callback"); sessionLatency.subscribe('test', cb); $.each(sessions, function(index, session) { sessionLatency.sessionPings(session); diff --git a/web/spec/spec_helper.rb b/web/spec/spec_helper.rb index 483a30851..936471e11 100644 --- a/web/spec/spec_helper.rb +++ b/web/spec/spec_helper.rb @@ -87,7 +87,7 @@ Thread.new do :rabbitmq_port => 5672, :calling_thread => current, :cidr => ['0.0.0.0/0'], - :gateway_name => 'default') + :gateway_name => 'default-test') rescue Exception => e puts "websocket-gateway failed: #{e}" end diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb index 683107495..0f09d5f74 100644 --- a/websocket-gateway/lib/jam_websockets/router.rb +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -10,7 +10,7 @@ include Jampb module EventMachine module WebSocket class Connection < EventMachine::Connection - attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted # client_id is uuid we give to each client to track them as we like + attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions # client_id is uuid we give to each client to track them as we like end end end @@ -36,12 +36,14 @@ module JamWebsockets @clients = {} # clients that have logged in @user_context_lookup = {} # lookup a set of client_contexts by user_id @client_lookup = {} # lookup a client by client_id + @subscription_lookup = {} @amqp_connection_manager = nil @users_exchange = nil @message_factory = JamRuby::MessageFactory.new @semaphore = Mutex.new @user_topic = nil @client_topic = nil + @subscription_topic = nil @thread_pool = nil @heartbeat_interval_client = nil @connect_time_expire_client = nil @@ -187,6 +189,7 @@ module JamWebsockets @semaphore.synchronize do if client_id == MessageFactory::ALL_NATIVE_CLIENTS + msg = Jampb::ClientMessage.parse(msg) @log.debug "client-directed message received from #{msg.from} to all clients" @client_lookup.each do |client_id, client_context| @@ -204,7 +207,7 @@ module JamWebsockets else client_context = @client_lookup[client_id] - if !client_context.nil? + if client_context client = client_context.client @@ -234,6 +237,93 @@ module JamWebsockets end MQRouter.client_exchange = @clients_exchange + + ############## DYNAMIC SUBSCRIPTION MESSAGING ################### + + @subscriptions_exchange = channel.topic('subscriptions') + @subscription_topic = channel.queue("subscriptions-#{@gateway_name}", :auto_delete => true) + @subscription_topic.purge + + # subscribe for any p2p messages to a client + @subscription_topic.subscribe(:ack => false) do |headers, msg| + begin + routing_key = headers.routing_key + type_and_id = routing_key["subscription.".length..-1] + #type, id = type_and_id.split('.') + + @semaphore.synchronize do + + clients = @subscription_lookup[type_and_id] + + msg = Jampb::ClientMessage.parse(msg) + + if clients + EM.schedule do + clients.each do |client| + @log.debug "subscription msg to client #{client.client_id}" + send_to_client(client, msg) + end + end + end + end + rescue => e + @log.error "unhandled error in messaging to client for mount" + @log.error e + end + end + + MQRouter.subscription_exchange = @subscriptions_exchange + end + + # listens on a subscription topic on the behalf of a client + def register_subscription(client, type, id) + # track subscriptions that this client has made, for disconnect scenarios + client.subscriptions.add({type:type, id: id}) + + key = "#{type}.#{id}" + + # for a given type:id in @subscription_lookup, track clients listening + clients = @subscription_lookup[key] + if clients.nil? + clients = Set.new + @subscription_lookup[key] = clients + end + + needs_subscription = clients.length == 0 + + clients.add(client) + + @log.debug("register subscription handled #{type}.#{id}") + if needs_subscription + routing_key = "subscription.#{type}.#{id}" + @log.debug("register topic bound #{routing_key}") + # if this is the 1st client to listen for this mount, then subscribe to the topic for this mount_id + @subscription_topic.bind(@subscriptions_exchange, :routing_key => routing_key) + end + end + + # de-listens on a subscription topic on the behalf of a client + # called automatically when a clean disconnects, to keep state clean. + def unregister_subscription(client, type, id) + # remove subscription from this client's list of subscriptions + client.subscriptions.delete({type:type, id:id}) + + key = "#{type}.#{id}" + # for a given mount_id in @subscription_lookup, remove from list of clients listening + clients = @subscription_lookup[key] + if clients + deleted = clients.delete(client) + if !deleted + @log.error "unregister_subscription: unable to locate any client #{client.client_id} for id #{id}" + end + else + @log.error "unregister_subscription: unable to locate any clients for id #{id}" + end + + if clients.length == 0 + # if there are no more clients listening, then unsubscribe to the topic for this mount_id + @subscription_topic.unbind(@subscriptions_exchange, :routing_key => "subscription.#{type}.#{id}") + end end # this method allows you to translate exceptions into websocket channel messages and behavior safely. @@ -428,6 +518,12 @@ module JamWebsockets elsif client_msg.type == ClientMessage::Type::HEARTBEAT sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) } + elsif client_msg.type == ClientMessage::Type::SUBSCRIBE_BULK + sane_logging { handle_bulk_subscribe(client_msg.subscribe_bulk, client) } + elsif client_msg.type == ClientMessage::Type::SUBSCRIBE + sane_logging { handle_subscribe(client_msg.subscribe, client) } + elsif client_msg.type == ClientMessage::Type::UNSUBSCRIBE + sane_logging { handle_unsubscribe(client_msg.unsubscribe, client) } else raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" end @@ -522,6 +618,8 @@ module JamWebsockets os = options["os"] udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true' + client.subscriptions = Set.new# list of subscriptions that this client is watching in real-time + @log.info("handle_login: client_type=#{client_type} token=#{token} client_id=#{client_id} channel_id=#{client.channel_id} udp_reachable=#{udp_reachable}") if client_type == Connection::TYPE_LATENCY_TESTER @@ -669,6 +767,33 @@ module JamWebsockets end end + def handle_bulk_subscribe(subscriptions, client) + + subscriptions.types.each_with_index do |subscription, i| + handle_subscribe(OpenStruct.new({type: subscriptions.types[i], id: subscriptions.ids[i]}), client) + end + end + + def handle_subscribe(subscribe, client) + id = subscribe.id + type = subscribe.type + if id && id.length > 0 && type && type.length > 0 + register_subscription(client, type, id) + else + @log.error("handle_subscribe: empty data #{subscribe}") + end + end + + def handle_unsubscribe(unsubscribe, client) + id = unsubscribe.id + type = unsubscribe.type + if id && id.length > 0 && type && type.length > 0 + unregister_subscription(client, type, id) + else + @log.error("handle_subscribe: empty data #{unsubscribe}") + end + end + def handle_heartbeat(heartbeat, heartbeat_message_id, client) unless context = @clients[client] @log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session" @@ -1026,6 +1151,11 @@ module JamWebsockets def cleanup_client(client) client.close + # unregister any subscriptions + client.subscriptions.each do |subscription| + unregister_subscription(client, subscription[:type], subscription[:id]) + end + @semaphore.synchronize do pending = client.context.nil? # presence of context implies this connection has been logged into