diff --git a/admin/app/admin/icecast_bootstrap.rb b/admin/app/admin/icecast_bootstrap.rb
index e07af5f20..73bf7aa46 100644
--- a/admin/app/admin/icecast_bootstrap.rb
+++ b/admin/app/admin/icecast_bootstrap.rb
@@ -43,10 +43,10 @@ ActiveAdmin.register_page "Bootstrap" do
admin_auth.save!
path = IcecastPath.new
- path.base_dir = '/usr/local/Cellar/icecast/2.3.3/share/icecast'
- path.log_dir = '/usr/local/Cellar/icecast/2.3.3/var/log/icecast'
- path.web_root = '/usr/local/Cellar/icecast/2.3.3/share/icecast/web'
- path.admin_root = '/usr/local/Cellar/icecast/2.3.3/share/icecast/admin'
+ path.base_dir = '/usr/local/Cellar/icecast/2.4.1/share/icecast'
+ path.log_dir = '/usr/local/Cellar/icecast/2.4.1/var/log/icecast'
+ path.web_root = '/usr/local/Cellar/icecast/2.4.1/share/icecast/web'
+ path.admin_root = '/usr/local/Cellar/icecast/2.4.1/share/icecast/admin'
path.pid_file = nil
path.save!
diff --git a/db/manifest b/db/manifest
index 396ce5b48..6b25b087c 100755
--- a/db/manifest
+++ b/db/manifest
@@ -229,6 +229,7 @@ deletable_recordings.sql
jam_tracks.sql
shopping_carts.sql
recurly.sql
-add_track_resource_id.sql
+add_track_resource_id.sql
user_genres.sql
user_online.sql
+icecast_source_changes.sql
diff --git a/db/up/icecast_source_changes.sql b/db/up/icecast_source_changes.sql
new file mode 100644
index 000000000..7c8cd7f14
--- /dev/null
+++ b/db/up/icecast_source_changes.sql
@@ -0,0 +1,16 @@
+-- track extra detail about if the source went up or down
+CREATE UNLOGGED TABLE icecast_source_changes (
+ id VARCHAR(64) PRIMARY KEY DEFAULT uuid_generate_v4(),
+ source_direction BOOLEAN NOT NULL,
+ change_type VARCHAR(64) NOT NULL,
+ user_id VARCHAR(64),
+ client_id VARCHAR(64),
+ success BOOLEAN NOT NULL,
+ reason VARCHAR,
+ detail VARCHAR,
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ icecast_mount_id VARCHAR(64) NOT NULL REFERENCES icecast_mounts(id) ON DELETE CASCADE
+);
+
+-- track when the source_direction
+ALTER TABLE icecast_mounts ADD COLUMN source_direction BOOLEAN NOT NULL DEFAULT FALSE;
\ No newline at end of file
diff --git a/pb/src/client_container.proto b/pb/src/client_container.proto
index 901797515..b339465c7 100644
--- a/pb/src/client_container.proto
+++ b/pb/src/client_container.proto
@@ -17,6 +17,10 @@ message ClientMessage {
LEAVE_MUSIC_SESSION_ACK = 125;
HEARTBEAT = 130;
HEARTBEAT_ACK = 135;
+ SUBSCRIBE = 136;
+ UNSUBSCRIBE = 137;
+ SUBSCRIPTION_MESSAGE = 138;
+ SUBSCRIBE_BULK = 139;
// friend notifications
FRIEND_UPDATE = 140;
@@ -73,7 +77,7 @@ message ClientMessage {
SOURCE_DOWN_REQUESTED = 251;
SOURCE_UP = 252;
SOURCE_DOWN = 253;
-
+
TEST_SESSION_MESSAGE = 295;
PING_REQUEST = 300;
@@ -115,6 +119,10 @@ message ClientMessage {
optional LeaveMusicSessionAck leave_music_session_ack = 125;
optional Heartbeat heartbeat = 130;
optional HeartbeatAck heartbeat_ack = 135;
+ optional Subscribe subscribe = 136;
+ optional Unsubscribe unsubscribe = 137;
+ optional SubscriptionMessage subscription_message = 138;
+ optional SubscribeBulk subscribe_bulk = 139;
// friend notifications
optional FriendUpdate friend_update = 140; // from server to all friends of user
@@ -586,6 +594,33 @@ message SourceDown {
optional string music_session = 1; // music session id
}
+message SubscriptionMessage {
+ optional string type = 1; // the type of the subscription
+ optional string id = 2; // data about what to subscribe to, specifically
+ optional string body = 3; // this is a JSON string; let the browser decode it
+}
+
+message Subscribe {
+ optional string type = 1; // the type of the subscription
+ optional string id = 2; // data about what to subscribe to, specifically
+}
+
+message Unsubscribe {
+ optional string type = 1; // the type of the subscription
+ optional string id = 2; // data about what to subscribe to, specifically
+}
+
+message Subscription {
+ optional string type = 1; // the type of the subscription
+ optional string id = 2; // data about what to subscribe to, specifically
+}
+
+message SubscribeBulk {
+ repeated string types = 1;
+ repeated string ids = 2;
+ //repeated Subscription subscriptions = 1; # the ruby protocol buffer library chokes on this. so we have to do the above
+}
+
// route_to: session
// a test message used by ruby-client currently. just gives way to send out to rest of session
message TestSessionMessage {
diff --git a/ruby/lib/jam_ruby.rb b/ruby/lib/jam_ruby.rb
index ac0ad6c52..cfe15366d 100755
--- a/ruby/lib/jam_ruby.rb
+++ b/ruby/lib/jam_ruby.rb
@@ -77,6 +77,7 @@ require "jam_ruby/app/uploaders/jam_track_track_uploader"
require "jam_ruby/app/uploaders/max_mind_release_uploader"
require "jam_ruby/lib/desk_multipass"
require "jam_ruby/lib/ip"
+require "jam_ruby/lib/subscription_message"
require "jam_ruby/amqp/amqp_connection_manager"
require "jam_ruby/database"
require "jam_ruby/message_factory"
@@ -144,6 +145,7 @@ require "jam_ruby/models/icecast_listen_socket"
require "jam_ruby/models/icecast_logging"
require "jam_ruby/models/icecast_master_server_relay"
require "jam_ruby/models/icecast_mount"
+require "jam_ruby/models/icecast_source_change"
require "jam_ruby/models/icecast_path"
require "jam_ruby/models/icecast_relay"
require "jam_ruby/models/icecast_security"
diff --git a/ruby/lib/jam_ruby/connection_manager.rb b/ruby/lib/jam_ruby/connection_manager.rb
index c2f22df30..8cb2709d8 100644
--- a/ruby/lib/jam_ruby/connection_manager.rb
+++ b/ruby/lib/jam_ruby/connection_manager.rb
@@ -389,7 +389,9 @@ SQL
end
def lock_connections(conn)
- conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
+ if APP_CONFIG.lock_connections
+ conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
+ end
end
# def associate_tracks(connection, tracks)
diff --git a/ruby/lib/jam_ruby/lib/em_helper.rb b/ruby/lib/jam_ruby/lib/em_helper.rb
index 592f97e7b..ba6e582e8 100644
--- a/ruby/lib/jam_ruby/lib/em_helper.rb
+++ b/ruby/lib/jam_ruby/lib/em_helper.rb
@@ -53,6 +53,11 @@ module JamWebEventMachine
@@log.debug("#{exchange.name} is ready to go")
MQRouter.user_exchange = exchange
end
+
+ AMQP::Exchange.new(channel, :topic, "subscriptions") do |exchange|
+ @@log.debug("#{exchange.name} is ready to go")
+ MQRouter.subscription_exchange = exchange
+ end
end
ran[:ran] = true
diff --git a/ruby/lib/jam_ruby/lib/subscription_message.rb b/ruby/lib/jam_ruby/lib/subscription_message.rb
new file mode 100644
index 000000000..62409b6aa
--- /dev/null
+++ b/ruby/lib/jam_ruby/lib/subscription_message.rb
@@ -0,0 +1,25 @@
+module JamRuby
+
+ class SubscriptionMessage
+
+ # sent whenever we change the desired direction of the source
+ def self.mount_source_direction(mount)
+ Notification.send_subscription_message('mount', mount.id, {type: 'source_direction_change', source_direction: true}.to_json)
+ end
+
+ # sent whenever we receive an update about a sourcing attempt by a client
+ # source_change_json is created by a rabl, so currently only possible to use via web
+ def self.mount_source_change(mount, source_change_json)
+ Notification.send_subscription_message('mount', mount.id, source_change_json)
+ end
+
+ def self.mount_source_up_requested(mount)
+ Notification.send_subscription_message('mount', mount.id, {type: 'source_up_requested'}.to_json )
+ end
+
+ def self.mount_source_down_requested(mount)
+ Notification.send_subscription_message('mount', mount.id, {type: 'source_down_requested'}.to_json )
+ end
+ end
+end
+
diff --git a/ruby/lib/jam_ruby/message_factory.rb b/ruby/lib/jam_ruby/message_factory.rb
index 6d1d34fb7..19be72de7 100644
--- a/ruby/lib/jam_ruby/message_factory.rb
+++ b/ruby/lib/jam_ruby/message_factory.rb
@@ -993,6 +993,20 @@ module JamRuby
)
end
+ def subscription_message(type, id, body)
+ subscription_message = Jampb::SubscriptionMessage.new(
+ id: id,
+ type: type,
+ body: body
+ )
+
+ Jampb::ClientMessage.new(
+ :type => ClientMessage::Type::SUBSCRIPTION_MESSAGE,
+ :route_to => CLIENT_TARGET,
+ :subscription_message => subscription_message,
+ )
+ end
+
####################################################
# is this message directed to the server?
diff --git a/ruby/lib/jam_ruby/models/connection.rb b/ruby/lib/jam_ruby/models/connection.rb
index 018901e32..105741a97 100644
--- a/ruby/lib/jam_ruby/models/connection.rb
+++ b/ruby/lib/jam_ruby/models/connection.rb
@@ -153,9 +153,9 @@ module JamRuby
end
def report_add_participant
- if self.music_session_id_changed? &&
- self.music_session.present? &&
- self.connected? &&
+ if self.music_session_id_changed? &&
+ self.music_session.present? &&
+ self.connected? &&
self.as_musician? &&
0 < (count = self.music_session.connected_participant_count)
GoogleAnalyticsEvent.report_session_participant(count)
diff --git a/ruby/lib/jam_ruby/models/icecast_mount.rb b/ruby/lib/jam_ruby/models/icecast_mount.rb
index 4e07dc786..29f3a6888 100644
--- a/ruby/lib/jam_ruby/models/icecast_mount.rb
+++ b/ruby/lib/jam_ruby/models/icecast_mount.rb
@@ -10,12 +10,16 @@ module JamRuby
:music_session_id, :icecast_server_id, :icecast_mount_template_id, :listeners, :sourced,
:sourced_needs_changing_at, as: :admin
+ attr_accessor :no_config_changed
+
belongs_to :authentication, class_name: "JamRuby::IcecastUserAuthentication", inverse_of: :mount, :foreign_key => 'authentication_id'
belongs_to :music_session, class_name: "JamRuby::ActiveMusicSession", inverse_of: :mount, foreign_key: 'music_session_id'
belongs_to :server, class_name: "JamRuby::IcecastServer", inverse_of: :mounts, foreign_key: 'icecast_server_id'
belongs_to :mount_template, class_name: "JamRuby::IcecastMountTemplate", inverse_of: :mounts, foreign_key: 'icecast_mount_template_id'
+ has_many :source_changes, class_name: "JamRuby::IcecastSourceChange", inverse_of: :mount, foreign_key: 'icecast_mount_id', order: 'created_at DESC'
+
validates :name, presence: true, uniqueness: true
validates :source_username, length: {minimum: 5}, if: lambda {|m| m.source_username.present?}
validates :source_pass, length: {minimum: 5}, if: lambda {|m| m.source_pass.present?}
@@ -33,7 +37,7 @@ module JamRuby
before_save :sanitize_active_admin
after_save :after_save
- after_save :poke_config
+ #after_save :poke_config
before_destroy :poke_config
def name_has_correct_format
@@ -45,7 +49,7 @@ module JamRuby
end
def after_save
- server.update_attribute(:config_changed, 1)
+ server.update_attribute(:config_changed, 1) unless no_config_changed
if !sourced_was && sourced
@@ -59,9 +63,13 @@ module JamRuby
end
- if listeners_was == 0 && listeners > 0 && !sourced
- # listener count went above 0 and there is no source. ask the musician clients to source
- notify_source_up_requested
+ if source_direction_was != source_direction
+ # temporarily removed; it seems better to leave all the data in for now. It should never be that much
+ # if the requested source direction has changed, then delete diagnostic info
+ #IcecastSourceChange.delete_all(["icecast_mount_id = ?", self.id]) if source_direction
+
+ # and tell anyone listening that the direction has changed
+ SubscriptionMessage.mount_source_direction(self)
end
# Note:
@@ -90,10 +98,110 @@ module JamRuby
mount
end
+ def fail_state(reason, detail = nil)
+ {success:false, reason: reason, detail: detail}
+ end
+
+ def success_state(reason, detail = nil)
+ {success:true, reason: reason, detail: detail}
+ end
+
+ # success messages:
+ # * source_up - we are broadcasting - detail is the source user or empty. if empty you should refresh state in a few seconds
+ # * source_down - we are broadcasting - detail is the last source user, or empty. if empty you should refresh state in a few seconds
+ # * transition_up/down - a source change occurred recently, and we don't yet have any info from the client. you should refresh state in a few seconds
+ #
+ # failure messages:
+ # * multiple_clients
+ # * unknown - represents a code error
+ # * transition_timeout_up - a source change for up has occurred, but no clients have reported any info
+ # * transition_timeout_down - a source change for up has occurred, but no clients have reported any info
+ # * source_wrong_up - the source should be up, but it could not succeed according to the most recent effort.
+ # * source_wrong_down - the source should be down, but it could not succeed according to the most recent effort.
+ #
+ # for both source_wrong_up and source_wrong_down, valid detail values:
+ # * 'no_client' if no client has said anything to a request after some amount of time, or otherwise it's client defined
+ # client-defined reason values
+ # * 'initialize_singleton' - code error in the client
+ # * 'initialize_thread' - code error in the client
+ # * 'initialize_ogg' - could not initialize ogg encoder... likely code error
+ # * 'initialize_mp3' - could not initialize mp3 encoder... likely code error
+ # * 'initialize_socket' - could not initialize socket... likely code error
+ # * 'icecast_response' - icecast was not accessible or returned an error
+
+ # * TODO client defined
+ def state
+ begin
+ result = fail_state('unknown')
+
+ if sourced == should_source?
+
+ first = source_changes.first
+
+ # don't check source_changes if actual source state mirrors desired source state... just say we are good, and pass down relevant sourcing user ID if present
+ result = success_state('source_' + (source_direction ? 'up' : 'down'), first.nil? ? nil : first.user_id)
+
+ elsif source_changes.count > 0
+ # if the desired source direction is up, but we haven't sourced yet... let's try and find out why
+
+ # let's first see if we have N clients contributing, which is a code error condition
+ clients = Hash[source_changes.map { |source_change| [source_change.client_id, source_change] }]
+
+ #if clients.length > 1
+ # this means more than one client has contributed... this is an code error condition
+ # result = fail_state('multiple_clients', clients)
+ #else
+ first = source_changes.first
+
+ if first.source_direction == should_source?
+ if first.success?
+ # the last message from any client indicated we had the right source
+ # if less than a second has passed, this is not strange. But more than that, it's strange
+ if sourced_needs_changing_at.nil? || (Time.now - first.created_at < APP_CONFIG.source_changes_missing_secs)
+ result = success_state('transition_' + (source_direction ? 'up' : 'down'))
+ else
+ result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'))
+ end
+ else
+ # so the last state indicated by the client agrees that our source info is wrong; we can use it's data to augment the frontend
+ result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), first.reason)
+ end
+ else
+ # if the last message from the client is for the wrong source direction (meaning no attempt to change source state by client to the correct state)
+ # then report this info
+
+ if sourced_needs_changing_at.nil? || (Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs)
+ result = fail_state('transition_' + (source_direction ? 'up' : 'down'))
+ else
+ result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), 'no_client') # no client implies no client has attempted to source
+ end
+ end
+ #end
+ else
+ # we have the wrong source direction, but no source change info.
+
+ # if less than a second has passed, this is not strange. But more than that, it's strange
+ if sourced_needs_changing_at.nil?
+ #result = fail_state('db_error', 'sourced_needs_changing_at is nil')
+ result = success_state('transition_' + (source_direction ? 'up' : 'down'))
+ elsif Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs
+ result = success_state('transition_' + (source_direction ? 'up' : 'down'), 'initial')
+ else
+ result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'), 'initial')
+ end
+ end
+
+ rescue Exception => e
+ @@log.error("exception in IcecastMount.state #{e}")
+ end
+ result
+ end
+
def source_up
with_lock do
self.sourced = true
self.sourced_needs_changing_at = nil
+ self.no_config_changed = true
save(validate: false)
end
end
@@ -102,17 +210,28 @@ module JamRuby
with_lock do
self.sourced = false
self.sourced_needs_changing_at = nil
+ self.no_config_changed = true
save(validate: false)
end
end
+ def should_source?
+ self.listeners > 0
+ end
+
def listener_add
with_lock do
- self.sourced_needs_changing_at = Time.now if listeners == 0
+ if listeners == 0 && !sourced && (self.sourced_needs_changing_at.nil? || Time.now - self.sourced_needs_changing_at > APP_CONFIG.source_changes_missing_secs)
+ # enough time has elapsed since the last time the source direction changed to instaneously request a source up
+
+ # listener count went above 0 and there is no source. ask the musician clients to source
+ notify_source_up_requested
+ end
# this is completely unsafe without that 'with_lock' statement above
self.listeners = self.listeners + 1
+ self.no_config_changed = true
save(validate: false)
end
end
@@ -129,31 +248,67 @@ module JamRuby
# this is completely unsafe without that 'with_lock' statement above
self.listeners = self.listeners - 1
+ self.no_config_changed = true
save(validations: false)
end
end
def notify_source_up_requested
- Notification.send_source_up_requested(music_session,
+ if music_session_id
+ self.sourced_needs_changing_at = Time.now
+ self.source_direction = true
+ self.no_config_changed = true
+ save!
+
+ source_change = IcecastSourceChange.new
+ source_change.source_direction = true
+ source_change.success = true
+ source_change.mount = self
+ source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_UP_REQUEST
+ source_change.save!
+
+ Notification.send_source_up_requested(music_session,
server.hostname,
server.pick_listen_socket(:port),
name,
resolve_string(:source_username),
resolve_string(:source_pass),
- resolve_int(:bitrate)) if music_session_id
+ resolve_int(:bitrate))
+ SubscriptionMessage.mount_source_up_requested(self)
+ end
end
def notify_source_down_requested
- Notification.send_source_down_requested(music_session, name)
+ if music_session_id
+ self.sourced_needs_changing_at = Time.now
+ self.source_direction = false
+ self.no_config_changed = true
+ save!
+
+ source_change = IcecastSourceChange.new
+ source_change.source_direction = false
+ source_change.success = true
+ source_change.mount = self
+ source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_DOWN_REQUEST
+ source_change.save!
+
+ Notification.send_source_down_requested(music_session, name)
+
+ SubscriptionMessage.mount_source_down_requested(self)
+ end
end
def notify_source_up
- Notification.send_source_up(music_session) if music_session_id
+ if music_session_id
+ Notification.send_source_up(music_session)
+ end
end
def notify_source_down
- Notification.send_source_down(music_session) if music_session_id
+ if music_session_id
+ Notification.send_source_down(music_session)
+ end
end
# Check if the icecast_mount specifies the value; if not, use the mount_template's value take effect
diff --git a/ruby/lib/jam_ruby/models/icecast_source_change.rb b/ruby/lib/jam_ruby/models/icecast_source_change.rb
new file mode 100644
index 000000000..335e8594a
--- /dev/null
+++ b/ruby/lib/jam_ruby/models/icecast_source_change.rb
@@ -0,0 +1,27 @@
+module JamRuby
+ class IcecastSourceChange < ActiveRecord::Base
+
+ @@log = Logging.logger[IcecastSourceChange]
+
+ CHANGE_TYPE_CLIENT = 'client'
+ CHANGE_TYPE_MOUNT_UP_REQUEST = 'source_up_request'
+ CHANGE_TYPE_MOUNT_DOWN_REQUEST = 'source_down_request'
+
+
+ belongs_to :mount, class_name: "JamRuby::IcecastMount", inverse_of: :source_changes, foreign_key: 'icecast_mount_id'
+ belongs_to :user, class_name: "JamRuby::User"
+
+ validates :source_direction, inclusion: {:in => [true, false]}
+ validates :success, inclusion: {:in => [true, false]}
+ validates :reason, length: {minimum: 0, maximum:255}
+ validates :detail, length: {minimum: 0, maximum:10000}
+ validates :user, presence:true, :if => :is_client_change?
+ validates :client_id, presence: true, :if => :is_client_change?
+ validates :mount, presence:true
+ validates :change_type, inclusion: {:in => [CHANGE_TYPE_CLIENT, CHANGE_TYPE_MOUNT_DOWN_REQUEST, CHANGE_TYPE_MOUNT_UP_REQUEST]}
+
+ def is_client_change?
+ change_type == CHANGE_TYPE_CLIENT
+ end
+ end
+end
diff --git a/ruby/lib/jam_ruby/models/notification.rb b/ruby/lib/jam_ruby/models/notification.rb
index da0ff9881..79187c3a2 100644
--- a/ruby/lib/jam_ruby/models/notification.rb
+++ b/ruby/lib/jam_ruby/models/notification.rb
@@ -1376,6 +1376,12 @@ module JamRuby
@@mq_router.server_publish_to_everyone_in_session(music_session, msg)
end
+
+ def send_subscription_message(type, id, body)
+ msg = @@message_factory.subscription_message(type, id, body)
+
+ @@mq_router.publish_to_subscription(type, id, msg)
+ end
end
private
diff --git a/ruby/lib/jam_ruby/mq_router.rb b/ruby/lib/jam_ruby/mq_router.rb
index 8f4efc828..826eb1ace 100644
--- a/ruby/lib/jam_ruby/mq_router.rb
+++ b/ruby/lib/jam_ruby/mq_router.rb
@@ -7,7 +7,7 @@ class MQRouter
# but ultimately there are internal static state variables to represent global MQ exchange connections
class << self
- attr_accessor :client_exchange, :user_exchange
+ attr_accessor :client_exchange, :user_exchange, :subscription_exchange
@@log = Logging.logger[MQRouter]
end
@@ -112,4 +112,15 @@ class MQRouter
end
end
end
+
+ def publish_to_subscription(type, id, msg)
+ @@log.error "EM not running in publish_to_subscription" unless EM.reactor_running?
+
+ EM.schedule do
+ routing_key = "subscription.#{type}.#{id}"
+ @@log.debug "publishing to #{routing_key} from server"
+ # put it on the topic exchange for users
+ self.class.subscription_exchange.publish(msg, :routing_key => routing_key)
+ end
+ end
end
\ No newline at end of file
diff --git a/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb b/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb
index 66f01c179..b81c5b992 100644
--- a/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb
+++ b/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb
@@ -31,8 +31,13 @@ module JamRuby
end
- def run # if we have seen that sourced_needs_changing_at is older than 15 seconds ago, re-poke clients in the session
- IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount|
+ def run
+ # we need to find any mount in the following conditions and then poke it with a websocket msg. here are the conditions:
+ # * (if sourced_needs_changing_at has gone too far in the past OR sourced_needs_changing_at IS NULL) AND
+ # ** listeners > 0 and sourced is DOWN (false)
+ # ** listeners == 0 and sourced is UP (true)
+
+ IcecastMount.find_each(lock: true, :conditions => "( (listeners > 0 AND sourced = FALSE) OR (listeners = 0 AND sourced = TRUE) ) AND ( sourced_needs_changing_at IS NULL OR sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second') ) ", :batch_size => 100) do |mount|
if mount.music_session_id
mount.with_lock do
handle_notifications(mount)
@@ -46,16 +51,13 @@ module JamRuby
# if no listeners, but we are sourced, then ask it to stop sourcing
@@log.debug("SOURCE_DOWN_REQUEST called on mount #{mount.name}")
- mount.update_attribute(:sourced_needs_changing_at, Time.now) # we send out a source request, so we need to update the time
mount.notify_source_down_requested
elsif mount.listeners > 0 && !mount.sourced
# if we have some listeners, and still are not sourced, then ask to start sourcing again
@@log.debug("SOURCE_UP_REQUEST called on mount #{mount.name}")
- mount.update_attribute(:sourced_needs_changing_at, Time.now) # we send out a source request, so we need to update the time
mount.notify_source_up_requested
-
end
end
end
diff --git a/ruby/spec/factories.rb b/ruby/spec/factories.rb
index 0e1730e09..ecd89c450 100644
--- a/ruby/spec/factories.rb
+++ b/ruby/spec/factories.rb
@@ -420,7 +420,7 @@ FactoryGirl.define do
end
factory :icecast_mount, :class => JamRuby::IcecastMount do
- name "/" + Faker::Lorem.characters(10)
+ sequence(:name) { |n| "/mount_#{n}" }
source_username Faker::Lorem.characters(10)
source_pass Faker::Lorem.characters(10)
max_listeners 100
@@ -447,7 +447,15 @@ FactoryGirl.define do
end
end
end
+ end
+ factory :icecast_source_change, :class => JamRuby::IcecastSourceChange do
+ source_direction true
+ success true
+ sequence(:client_id) { |n| "client_id#{n}" }
+ change_type JamRuby::IcecastSourceChange::CHANGE_TYPE_CLIENT
+ association :user, :factory => :user
+ association :mount, :factory => :iceast_mount_with_music_session
end
diff --git a/ruby/spec/jam_ruby/models/icecast_mount_spec.rb b/ruby/spec/jam_ruby/models/icecast_mount_spec.rb
index 69028ad03..847e9697c 100644
--- a/ruby/spec/jam_ruby/models/icecast_mount_spec.rb
+++ b/ruby/spec/jam_ruby/models/icecast_mount_spec.rb
@@ -245,4 +245,124 @@ describe IcecastMount do
end
end
+ describe "source_changes are deleted if source_direction transitions" do
+
+
+ it "no change if same source_direction" do
+ mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: true)
+ change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
+
+ mount.source_changes.size.should == 1
+ mount.source_direction = true
+ mount.save!
+ mount = IcecastMount.find(mount.id)
+ mount.source_changes.size.should == 1
+ end
+
+ it "NOT deleted on transition to down" do
+ mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: true)
+ change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
+
+ mount.source_changes.size.should == 1
+ mount.source_direction = false
+ mount.save!
+ mount = IcecastMount.find(mount.id)
+ mount.source_changes.size.should == 1
+ end
+
+ it "not deleted on transition to up" do
+ mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: false)
+ change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
+
+
+ mount.source_changes.size.should == 1
+ mount.source_direction = true
+ mount.save!
+ mount.reload
+ mount.source_changes.size.should == 1
+ end
+ end
+ describe "state" do
+
+ def success_state(reason, detail = nil)
+ {success: true, reason: reason, detail:detail}
+ end
+
+ def fail_state(reason, detail = nil)
+ {success: false, reason: reason, detail:detail}
+ end
+
+ let(:sourced_mount) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, sourced_needs_changing_at: nil, listeners: 1)}
+ let(:mount_needing_source) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, sourced_needs_changing_at: Time.now, listeners: 1)}
+
+ it "happy sourced mount" do
+ sourced_mount.state.should == success_state('source_down')
+ end
+
+ it "just transitioned" do
+ mount_needing_source.state.should == success_state('transition_down', 'initial')
+ end
+
+ it "transition timeout" do
+ #change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
+ mount_needing_source.sourced_needs_changing_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago
+ mount_needing_source.save!
+
+ mount_needing_source.state.should == fail_state('transition_timeout_down', 'initial')
+ end
+
+ describe "client attempted transition and" do
+ let!(:change1) { FactoryGirl.create(:icecast_source_change, mount: mount_needing_source, source_direction: true, success:true) }
+
+ it "happy sourced mount" do
+ mount_needing_source.sourced = true
+ mount_needing_source.save!
+ mount_needing_source.state.should == success_state('source_down', change1.user.id)
+ end
+
+ it "succeeded recently in correct transition" do
+ mount_needing_source.state.should == success_state('transition_down')
+ end
+
+ it "succeeded a while ago in correct transition" do
+ change1.update_attribute(:created_at, (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago)
+ mount_needing_source.state.should == fail_state('transition_timeout_down')
+ end
+
+ it "failed recently in correct transition" do
+ change1.success = false
+ change1.reason = 'bleh'
+ change1.source_direction = true
+ change1.save!
+ mount_needing_source.state.should == fail_state('source_wrong_down', change1.reason)
+ end
+
+ it "failed a while ago in correct transition" do
+ change1.success = false
+ change1.reason = 'bleh'
+ change1.source_direction = true
+ change1.created_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago
+ change1.save!
+ mount_needing_source.state.should == fail_state('source_wrong_down', change1.reason)
+ end
+
+ it "failed recently in wrong transition" do
+ change1.success = false
+ change1.reason = 'bleh'
+ change1.source_direction = false
+ change1.save!
+ mount_needing_source.state.should == fail_state('transition_down')
+ end
+
+ it "failed a while ago in wrong transition" do
+ change1.success = false
+ change1.reason = 'bleh'
+ change1.source_direction = false
+ change1.save!
+ mount_needing_source.sourced_needs_changing_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago
+ mount_needing_source.save!
+ mount_needing_source.state.should == fail_state('source_wrong_down', 'no_client')
+ end
+ end
+ end
end
\ No newline at end of file
diff --git a/ruby/spec/jam_ruby/models/icecast_source_change_spec.rb b/ruby/spec/jam_ruby/models/icecast_source_change_spec.rb
new file mode 100644
index 000000000..a2f6fc0e5
--- /dev/null
+++ b/ruby/spec/jam_ruby/models/icecast_source_change_spec.rb
@@ -0,0 +1,20 @@
+require 'spec_helper'
+
+describe IcecastSourceChange do
+
+ let(:change) { FactoryGirl.create(:icecast_source_change) }
+
+ it "validates" do
+ bad_change = IcecastSourceChange.new
+ bad_change.save.should be_false
+ bad_change.errors[:change_type].should == ["is not included in the list"]
+ bad_change.errors[:source_direction].should == ["is not included in the list"]
+ bad_change.errors[:success].should == ["is not included in the list"]
+ bad_change.errors[:mount].should == ["can't be blank"]
+ end
+
+ it "success" do
+ change.touch
+ end
+
+end
diff --git a/ruby/spec/jam_ruby/mq_router_spec.rb b/ruby/spec/jam_ruby/mq_router_spec.rb
index 3122af502..662f16d2e 100644
--- a/ruby/spec/jam_ruby/mq_router_spec.rb
+++ b/ruby/spec/jam_ruby/mq_router_spec.rb
@@ -25,11 +25,13 @@ describe MQRouter do
before(:all) do
@original_client_exchange = MQRouter.client_exchange
@original_user_exchange = MQRouter.user_exchange
+ @original_subscription_exchange = MQRouter.subscription_exchange
end
after(:all) do
MQRouter.client_exchange = @original_client_exchange
MQRouter.user_exchange = @original_user_exchange
+ MQRouter.subscription_exchange = @original_subscription_exchange
end
it "user_publish_to_session works (checking exchange callbacks)" do
@@ -59,6 +61,7 @@ describe MQRouter do
# mock up exchange
MQRouter.client_exchange = double("client_exchange")
MQRouter.user_exchange = double("user_exchange")
+ MQRouter.subscription_exchange = double("subscription_exchange")
MQRouter.client_exchange.should_receive(:publish).with("a message", :routing_key => "client.#{music_session_member2.client_id}")
MQRouter.user_exchange.should_not_receive(:publish)
diff --git a/ruby/spec/jam_ruby/resque/audiomixer_spec.rb b/ruby/spec/jam_ruby/resque/audiomixer_spec.rb
index ab4ed7875..9e26581de 100644
--- a/ruby/spec/jam_ruby/resque/audiomixer_spec.rb
+++ b/ruby/spec/jam_ruby/resque/audiomixer_spec.rb
@@ -44,8 +44,10 @@ describe AudioMixer do
MQRouter.client_exchange = double("client_exchange")
MQRouter.user_exchange = double("user_exchange")
+ MQRouter.subscription_exchange = double("subscription_exchange")
MQRouter.client_exchange.should_receive(:publish).any_number_of_times
MQRouter.user_exchange.should_receive(:publish).any_number_of_times
+ MQRouter.subscription_exchange.should_receive(:publish).any_number_of_times
end
diff --git a/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb b/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb
index 2c528bf6d..83be6d612 100644
--- a/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb
+++ b/ruby/spec/jam_ruby/resque/icecast_source_check_spec.rb
@@ -15,9 +15,15 @@ describe IcecastSourceCheck do
end
- it "find no mounts if source_hanged timestamp is nil and listeners = 1/sourced = false" do
- mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, listeners: 1)
- check.should_not_receive(:handle_notifications)
+ it "find a mounts if sourced_needs_changing_at is nil and listeners = 1/sourced = false" do
+ mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, listeners: 1, sourced_needs_changing_at: nil)
+ check.should_receive(:handle_notifications).once
+ check.run
+ end
+
+ it "find a mount if source_changed timestamp is nil and listeners = 0/sourced = true" do
+ mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, listeners: 0, sourced_needs_changing_at: nil)
+ check.should_receive(:handle_notifications).once
check.run
end
@@ -27,6 +33,7 @@ describe IcecastSourceCheck do
check.run
end
+
it "find no mounts if source_changed timestamp is very recent and listeners = 1/sourced = false" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced_needs_changing_at: Time.now, sourced: false, listeners: 1)
check.should_not_receive(:handle_notifications)
@@ -92,7 +99,7 @@ describe IcecastSourceCheck do
check.run
end
- it "resets source_changed_at when a notification is sent out" do
+ it "does not resets source_changed_at when a notification is sent out" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced_needs_changing_at: 2.days.ago, sourced:false, listeners: 1)
check.stub(:handle_notifications) do |mount|
mount.should_not_receive(:notify_source_down_requested)
@@ -104,7 +111,7 @@ describe IcecastSourceCheck do
end
check.run
mount.reload
- (mount.sourced_needs_changing_at.to_i - Time.now.to_i).abs.should < 10 # less than 5 seconds -- just a little slop for a very slow build server
+ (2.days.ago - mount.sourced_needs_changing_at).to_i.abs.should < 10 # less than 5 seconds -- just a little slop for a very slow build server
end
end
end
diff --git a/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb b/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb
index 9a622ce1a..f7d877ef3 100644
--- a/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb
+++ b/ruby/spec/jam_ruby/resque/quick_mixer_spec.rb
@@ -17,8 +17,10 @@ describe AudioMixer do
MQRouter.client_exchange = double("client_exchange")
MQRouter.user_exchange = double("user_exchange")
+ MQRouter.subscription_exchange = double("subscription_exchange")
MQRouter.client_exchange.should_receive(:publish).any_number_of_times
MQRouter.user_exchange.should_receive(:publish).any_number_of_times
+ MQRouter.subscription_exchange.should_receive(:publish).any_number_of_times
end
def assert_found_job_count(expected)
diff --git a/ruby/spec/support/utilities.rb b/ruby/spec/support/utilities.rb
index 48a525fe0..8dd934198 100644
--- a/ruby/spec/support/utilities.rb
+++ b/ruby/spec/support/utilities.rb
@@ -142,6 +142,14 @@ def app_config
7
end
+ def source_changes_missing_secs
+ 1
+ end
+
+ def lock_connections
+ false
+ end
+
private
def audiomixer_workspace_path
diff --git a/web/app/assets/javascripts/AAB_message_factory.js b/web/app/assets/javascripts/AAB_message_factory.js
index a2c1b3a37..2a502f43e 100644
--- a/web/app/assets/javascripts/AAB_message_factory.js
+++ b/web/app/assets/javascripts/AAB_message_factory.js
@@ -16,6 +16,10 @@
LEAVE_MUSIC_SESSION_ACK : "LEAVE_MUSIC_SESSION_ACK",
HEARTBEAT : "HEARTBEAT",
HEARTBEAT_ACK : "HEARTBEAT_ACK",
+ SUBSCRIBE : "SUBSCRIBE",
+ UNSUBSCRIBE : "UNSUBSCRIBE",
+ SUBSCRIPTION_MESSAGE : "SUBSCRIPTION_MESSAGE",
+ SUBSCRIBE_BULK : "SUBSCRIBE_BULK",
// friend notifications
FRIEND_UPDATE : "FRIEND_UPDATE",
@@ -168,6 +172,21 @@
return result;
};
+ factory.subscribe = function(type, id) {
+ var subscribe_msg = {type: type, id: id}
+ return client_container(msg.SUBSCRIBE, route_to.SERVER, subscribe_msg);
+ }
+
+ factory.subscribeBulk = function(types, ids) {
+ var subscribe_bulk_msg = {types: types, ids: ids}
+ return client_container(msg.SUBSCRIBE_BULK, route_to.SERVER, subscribe_bulk_msg);
+ }
+
+ factory.unsubscribe = function(type, id) {
+ var unsubscribe_msg = {type: type, id: id}
+ return client_container(msg.UNSUBSCRIBE, route_to.SERVER, unsubscribe_msg);
+ }
+
context.JK.MessageFactory = factory;
})(window, jQuery);
\ No newline at end of file
diff --git a/web/app/assets/javascripts/JamServer.js b/web/app/assets/javascripts/JamServer.js
index 040490d5f..f60ea07d0 100644
--- a/web/app/assets/javascripts/JamServer.js
+++ b/web/app/assets/javascripts/JamServer.js
@@ -10,6 +10,7 @@
var logger = context.JK.logger;
var msg_factory = context.JK.MessageFactory;
var rest = context.JK.Rest();
+ var EVENTS = context.JK.EVENTS;
// Let socket.io know where WebSocketMain.swf is
context.WEB_SOCKET_SWF_LOCATION = "assets/flash/WebSocketMain.swf";
@@ -89,6 +90,11 @@
// handles logic if the websocket connection closes, and if it was in error then also prompt for reconnect
function closedCleanup(in_error) {
+
+ if(server.connected) {
+ $self.triggerHandler(EVENTS.CONNECTION_DOWN);
+ }
+
server.connected = false;
server.connecting = false;
@@ -229,6 +235,7 @@
heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000);
lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat
connectDeferred.resolve();
+ $self.triggerHandler(EVENTS.CONNECTION_UP)
activeElementEvent('afterConnect', payload);
@@ -739,6 +746,10 @@
}
});
+ server.get$Server = function() {
+ return $self;
+ }
+
context.JK.JamServer = server;
// Callbacks from jamClient
diff --git a/web/app/assets/javascripts/application.js b/web/app/assets/javascripts/application.js
index b971a1d5e..037c2c1b7 100644
--- a/web/app/assets/javascripts/application.js
+++ b/web/app/assets/javascripts/application.js
@@ -43,6 +43,7 @@
//= require AAB_message_factory
//= require jam_rest
//= require utils
+//= require subscription_utils
//= require custom_controls
//= require_directory .
//= require_directory ./dialog
diff --git a/web/app/assets/javascripts/backend_alerts.js b/web/app/assets/javascripts/backend_alerts.js
index 74c7047cc..d1c9628e1 100644
--- a/web/app/assets/javascripts/backend_alerts.js
+++ b/web/app/assets/javascripts/backend_alerts.js
@@ -106,6 +106,18 @@
if(context.JK.CurrentSessionModel)
context.JK.CurrentSessionModel.onWindowBackgrounded(type, text);
}
+ else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_FAIL) {
+ if(context.JK.CurrentSessionModel)
+ context.JK.CurrentSessionModel.onBroadcastFailure(type, text);
+ }
+ else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_ACTIVE) {
+ if(context.JK.CurrentSessionModel)
+ context.JK.CurrentSessionModel.onBroadcastSuccess(type, text);
+ }
+ else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_STOPPED) {
+ if(context.JK.CurrentSessionModel)
+ context.JK.CurrentSessionModel.onBroadcastStopped(type, text);
+ }
else if((!context.JK.CurrentSessionModel || !context.JK.CurrentSessionModel.inSession()) &&
(ALERT_NAMES.INPUT_IO_RATE == type || ALERT_NAMES.INPUT_IO_JTR == type || ALERT_NAMES.OUTPUT_IO_RATE == type || ALERT_NAMES.OUTPUT_IO_JTR== type)) {
// squelch these events if not in session
diff --git a/web/app/assets/javascripts/globals.js b/web/app/assets/javascripts/globals.js
index 05b1f62f4..4f68a3125 100644
--- a/web/app/assets/javascripts/globals.js
+++ b/web/app/assets/javascripts/globals.js
@@ -41,7 +41,10 @@
FILE_MANAGER_CMD_PROGRESS : 'file_manager_cmd_progress',
FILE_MANAGER_CMD_ASAP_UPDATE : 'file_manager_cmd_asap_update',
MIXER_MODE_CHANGED : 'mixer_mode_changed',
- MUTE_SELECTED: 'mute_selected'
+ MUTE_SELECTED: 'mute_selected',
+ SUBSCRIBE_NOTIFICATION: 'subscribe_notification',
+ CONNECTION_UP: 'connection_up',
+ CONNECTION_DOWN: 'connection_down'
};
context.JK.ALERT_NAMES = {
diff --git a/web/app/assets/javascripts/jam_rest.js b/web/app/assets/javascripts/jam_rest.js
index 3e7eac08d..1be8d9bab 100644
--- a/web/app/assets/javascripts/jam_rest.js
+++ b/web/app/assets/javascripts/jam_rest.js
@@ -1396,6 +1396,27 @@
});
}
+ function getMount(options) {
+ var id = getId(options);
+ return $.ajax({
+ type: "GET",
+ url: '/api/icecast/mount/' + id,
+ dataType: "json",
+ contentType: 'application/json'
+ });
+ }
+
+ function createSourceChange(options) {
+ var mountId = options['mount_id'];
+
+ return $.ajax({
+ type: "POST",
+ url: '/api/icecast/mount/' + mountId + '/source_change',
+ dataType: "json",
+ contentType: 'application/json',
+ data: JSON.stringify(options),
+ });
+ }
function initialize() {
return self;
@@ -1519,7 +1540,12 @@
this.updateBillingInfo = updateBillingInfo;
this.placeOrder = placeOrder;
this.searchMusicians = searchMusicians;
+<<<<<<< HEAD
this.resendBandInvitation = resendBandInvitation;
+=======
+ this.getMount = getMount;
+ this.createSourceChange = createSourceChange;
+>>>>>>> feature/broadcast_slow
return this;
};
diff --git a/web/app/assets/javascripts/jquery.listenbroadcast.js b/web/app/assets/javascripts/jquery.listenbroadcast.js
index 5468a3236..d8dba3e4a 100644
--- a/web/app/assets/javascripts/jquery.listenbroadcast.js
+++ b/web/app/assets/javascripts/jquery.listenbroadcast.js
@@ -17,6 +17,11 @@
*/
context.JK = context.JK || {};
+ context.JK.ListenBroadcastCurrentlyPlaying = null;
+ context.JK.$templateListenBroadcastState = $('#template-listen-broadcast-state');
+ context.JK.$templateListenBroadcastDetail = $('#template-listen-broadcast-detail')
+
+
// the purpose of this code is to simplify the interaction between user and server
// it provides methods to call on user events (primarily play/pause), and will fire
// events (such as 'stream_gone'). This element does nothing with UI; only fires events for you to handle
@@ -41,6 +46,8 @@
var fanAccess = null;
var retryAttempts = 0;
var self = this;
+ var mountInfo = null;
+ var $mountState = null;
var destroyed = false;
@@ -155,7 +162,9 @@
playState = newState;
- clearBufferTimeout();
+ if(newState != PlayStateStalled) {
+ clearBufferTimeout();
+ }
if( playState == PlayStateNone ||
playState == PlayStateEnded ||
@@ -173,6 +182,8 @@
function noBuffer() {
+ waitForBufferingTimeout = null;
+
if(retryAttempts >= RETRY_ATTEMPTS) {
logger.log("never received indication of buffering or playing");
transition(PlayStateFailedStart);
@@ -345,7 +356,17 @@
displayText = 'SESSION IN PROGRESS';
}
else if(playState == 'stalled') {
- displayText = 'RECONNECTING';
+
+ // in Chrome, we've observed that a stalled can occur very early, before you've received any audio
+ // it's better to leave the display text as what it would be if juts retrying
+ if(!waitForBufferingTimeout) {
+ displayText = 'RECONNECTING';
+ }
+ else {
+ if(retryAttempts == 2) {
+ displayText = 'STILL TRYING, HANG ON';
+ }
+ }
}
else if(playState == 'ended') {
displayText = 'STREAM DISCONNECTED';
@@ -444,8 +465,179 @@
$audio.bind('progress', onProgress);
}
+ function broadcastDetailPreShow(box) {
+ var $box = $(box)
+ $mountState = $box.find('.listen-broadcast-state')
+
+ updateMountInfo(mountInfo)
+ updateMountDetails(mountInfo);
+ }
+
+ function updateMountInfo(mount) {
+ var $summary = $mountState.find('.summary')
+ var $status = $summary.find('.status.' + mount.state.reason)
+
+ if ($status.length == 0) {
+ // can't find this state
+ $status = $summary.find('.status.unknown')
+ $summary.attr('data-status', 'unknown')
+ $status.find('.msg').text ("unknown state: " + mount.state.reason)
+ return;
+ }
+
+ //var $msg = $status.find('.msg');
+
+ $summary.attr('data-status', mount.state.reason)
+ }
+
+
+ function updateMountDetails(mountInfo) {
+
+ var $detailHolder = $mountState.find('.detail-items')
+
+ context._.each(mountInfo.source_changes, function(sourceChange) {
+
+ var $detail = addSourceChange($detailHolder, sourceChange);
+ if($detail) {
+ $detailHolder.append($detail);
+ }
+ })
+ }
+
+ function onDetailEvent(e, data) {
+ if($mountState == null) return;
+
+ logger.debug("subscription notification received: type:" + data.type)
+
+ updateMountInfo(data.body.mount);
+
+ var id = data.id
+ var type = data.type
+
+ if(type == 'mount') {
+
+ var $detailHolder = $mountState.find('.detail-items')
+ var msgType = data.body.type;
+
+ var sourceChange = data.body.source_change;
+ if(sourceChange) {
+ var $detail = addSourceChange($detailHolder, sourceChange)
+ if ($detail) {
+ $detailHolder.prepend($detail)
+ $detail.hide().slideDown()
+ }
+ }
+ }
+ }
+
+
+ function addSourceChange($detailHolder, sourceChange) {
+ var $detail= $(context._.template($('#template-listen-broadcast-detail').html(), {}, {variable: 'data'}));
+
+ // if this is already in the dom, don't add it
+ if($detailHolder.find('.listen-broadcast-detail[data-id="' + sourceChange.id + '"]').length > 0) {
+ return null;
+ }
+
+ $detail.attr('data-id', sourceChange.id)
+
+ var $detailText = $detail.find('.detail');
+ var $who = $detail.find('.who')
+
+ if(sourceChange.change_type == 'source_up_request') {
+ $who.html('Server')
+ $detailText.text('Start Broadcast Requested')
+ }
+ else if(sourceChange.change_type == 'source_down_request') {
+ $who.text('Server')
+ $detailText.text('Start Broadcast Requested')
+ }
+ else {
+
+ $who.attr('hoveraction', 'musician').attr('user-id', sourceChange.user.id).addClass('avatar-tiny')
+ $who.html('
');
+ context.JK.bindHoverEvents($who);
+
+ if (sourceChange.source_direction) {
+ // up
+ if (sourceChange.success) {
+ $detailText.text(sourceChange.user.name + ' Started Broadcasting')
+ }
+ else {
+ if (sourceChange.reason == 'no_client') {
+ $detailText.text('No Client Responded')
+ }
+ else if (sourceChange.reason == 'initialize_singleton') {
+ $detailText.text('Init Failure (S)')
+ }
+ else if (sourceChange.reason == 'initialize_thread') {
+ $detailText.text('Init Failure (T)')
+ }
+ else if (sourceChange.reason == 'initialize_ogg') {
+ $detailText.text('Init Failure (O)')
+ }
+ else if (sourceChange.reason == 'initialize_mp3') {
+ $detailText.text('Init Failure (M)')
+ }
+ else if (sourceChange.reason == 'initialize_socket') {
+ $detailText.text('Init Failure (Client)')
+ }
+ else if (sourceChange.reason == 'icecast_response') {
+ $detailText.text('Init Failure (Server)')
+ }
+ else {
+ $detailText.text('Unknown Failure')
+ }
+ }
+ }
+ else {
+ $detailText.text(sourceChange.user.name + ' Stopped Broadcasting');
+ }
+ }
+ return $detail;
+ }
+
+ function bindHoverDetail() {
+
+ var stateHolderHtml = context._.template($('#template-listen-broadcast-state').html(), {}, {variable: 'data'});
+ context.JK.hoverBubble($parent, stateHolderHtml, {trigger: 'none', preShow: broadcastDetailPreShow, positions:['bottom'], width:'300px', height:'250px'})
+
+ $parent.hoverIntent({
+ over: function() {
+ checkServer().done(function(response) {
+ var mountId = response.mount ? response.mount.id : null
+
+ if(mountId) {
+ rest.getMount({id: mountId})
+ .done(function (mount) {
+ mountInfo = mount
+ $parent.data('mount-id', mountId)
+ context.JK.SubscriptionUtils.subscribe('mount', mountId).on(context.JK.EVENTS.SUBSCRIBE_NOTIFICATION, onDetailEvent)
+ $parent.btOn()
+ })
+ .fail(context.JK.app.ajaxError)
+ }
+ else {
+ mountInfo = null;
+ context.JK.app.layout.notify('This session can not currently broadcast')
+ }
+ })
+ .fail(function() {
+ logger.debug("session is over")
+ })
+ },
+ out: function() {
+ var mountId = $parent.data('mount-id')
+ context.JK.SubscriptionUtils.unsubscribe('mount', mountId)
+ $parent.btOff();
+ $mountState = null;
+ mountInfo = null;
+ }
+ })
+ }
function initialize() {
+
musicSessionId = $parent.attr('data-music-session');
if(!musicSessionId) throw "data-music-session must be specified on $parentElement";
@@ -453,6 +645,8 @@
if(fanAccess === null) throw 'fan-access must be specified in $parentElement';
fanAccess = $parent.attr('fan-access') === 'true' // coerce to boolean
+ bindHoverDetail();
+
$audio = $('audio', $parent);
if($audio.length == 0) {
@@ -479,8 +673,6 @@
return this;
}
- context.JK.ListenBroadcastCurrentlyPlaying = null;
-
$.fn.listenBroadcast = function(options) {
new context.JK.ListenBroadcast(this, options);
}
diff --git a/web/app/assets/javascripts/sessionModel.js b/web/app/assets/javascripts/sessionModel.js
index f55b6d8d1..0dd38f007 100644
--- a/web/app/assets/javascripts/sessionModel.js
+++ b/web/app/assets/javascripts/sessionModel.js
@@ -576,6 +576,57 @@
}
}
+ function onBroadcastSuccess(type, text) {
+ logger.debug("SESSION_LIVEBROADCAST_ACTIVE alert. reason:" + text);
+
+ if(currentSession && currentSession.mount) {
+ rest.createSourceChange({
+ mount_id: currentSession.mount.id,
+ source_direction: true,
+ success: true,
+ reason: text,
+ client_id: app.clientId
+ })
+ }
+ else {
+ logger.debug("unable to report source change because no mount seen on session")
+ }
+ }
+
+ function onBroadcastFailure(type, text) {
+ logger.debug("SESSION_LIVEBROADCAST_FAIL alert. reason:" + text);
+
+ if(currentSession && currentSession.mount) {
+ rest.createSourceChange({
+ mount_id: currentSession.mount.id,
+ source_direction: true,
+ success: false,
+ reason: text,
+ client_id: app.clientId
+ })
+ }
+ else {
+ logger.debug("unable to report source change because no mount seen on session")
+ }
+ }
+
+ function onBroadcastStopped(type, text) {
+ logger.debug("SESSION_LIVEBROADCAST_STOPPED alert. reason:" + text);
+
+ if(currentSession && currentSession.mount) {
+ rest.createSourceChange({
+ mount_id: currentSession.mount.id,
+ source_direction: false,
+ success: true,
+ reason: text,
+ client_id: app.clientId
+ })
+ }
+ else {
+ logger.debug("unable to report source change because no mount seen on session")
+ }
+ }
+
function onBackendMixerChanged(type, text) {
logger.debug("BACKEND_MIXER_CHANGE alert. reason:" + text);
@@ -680,6 +731,9 @@
this.onDeadUserRemove = onDeadUserRemove;
this.onWindowBackgrounded = onWindowBackgrounded;
this.waitForSessionPageEnterDone = waitForSessionPageEnterDone;
+ this.onBroadcastStopped = onBroadcastStopped;
+ this.onBroadcastSuccess = onBroadcastSuccess;
+ this.onBroadcastFailure = onBroadcastFailure;
this.getCurrentSession = function() {
return currentSession;
diff --git a/web/app/assets/javascripts/subscription_utils.js.coffee b/web/app/assets/javascripts/subscription_utils.js.coffee
new file mode 100644
index 000000000..afd7852ab
--- /dev/null
+++ b/web/app/assets/javascripts/subscription_utils.js.coffee
@@ -0,0 +1,128 @@
+#
+# Common utility functions to help deal with subscriptions to the server
+#
+
+$ = jQuery
+context = window
+context.JK ||= {};
+
+class SubscriptionUtils
+ constructor: () ->
+ @logger = context.JK.logger
+ @msgFactory = context.JK.MessageFactory;
+ @subscriptions = {}
+ @events = context.JK.EVENTS;
+ @reconnectRegistrationTimeout = null
+
+ this.init()
+
+ init: () =>
+ # once JamKazam is fully loaded, watch for SUBSCRIPTION_MESSAGE events
+ $(document).on('JAMKAZAM_READY', this.registerWithGateway)
+
+
+ genKey:(type, id) =>
+ type + ":" + id
+
+ test: () =>
+ this.subscribe('test', '1')
+ this.subscribe('test', '2')
+
+ reregister: () =>
+ # re-register all existing watches
+ @reconnectRegistrationTimeout = null
+
+ if context.JK.dlen(@subscriptions) > 0
+ bulkSubscribe = []
+ types = []
+ ids = []
+ for key, watch of @subscriptions
+ bits = key.split(':')
+ type = bits[0]
+ id = bits[1]
+ types.push(type)
+ ids.push(id)
+# bulkSubscribe.push({type: type, id:id})
+
+ # send a special message which contains all subscriptions in one message
+ msg = @msgFactory.subscribeBulk(types, ids)
+ context.JK.JamServer.send(msg)
+
+ # whenever we reconnect, set a timer to automatically re-subscribe to any outstanding subscriptions
+ onConnectionUp: () =>
+ #this.test()
+
+ if @reconnectRegistrationTimeout?
+ clearTimeout(@reconnectRegistrationTimeout)
+
+ if context.JK.dlen(@subscriptions) > 0
+ @reconnectRegistrationTimeout = setTimeout(this.reregister, 1000) # re-register after 1 second, to prevent excessive messaging to server
+
+ onConnectionDown: () =>
+ if @reconnectRegistrationTimeout?
+ clearTimeout(@reconnectRegistrationTimeout)
+
+ registerWithGateway: () =>
+
+ context.JK.JamServer.registerMessageCallback(context.JK.MessageType.SUBSCRIPTION_MESSAGE, this.onSubscriptionMessage);
+
+ $server = context.JK.JamServer.get$Server()
+
+ $server.on(@events.CONNECTION_UP, this.onConnectionUp)
+ $server.on(@events.CONNECTION_DOWN, this.onConnectionDown)
+
+ onSubscriptionMessage: (header, payload) =>
+ key = this.genKey(payload.type, payload.id)
+
+ watch = @subscriptions[key]
+
+ if watch
+ watch.triggerHandler(@events.SUBSCRIBE_NOTIFICATION, {type:payload.type, id: payload.id, body: JSON.parse(payload.body)})
+ else
+ @logger.warn("unable to find subscription for #{key}")
+
+ # call subscribe, and use the returned object to listen for events of name context.JK.EVENTS.SUBSCRIBE_NOTIFICATION
+ subscribe: (type, id) =>
+
+ key = this.genKey(type, id)
+
+ @logger.debug("subscribing for any notifications for #{key}")
+
+ watch = @subscriptions[key]
+
+ unless watch?
+ watch = $({type: type, id: id})
+ @subscriptions[key] = watch
+
+ # tell server we want messages for this entity
+ msg = @msgFactory.subscribe(type, id)
+ context.JK.JamServer.send(msg)
+
+ # watch can be used to watch for events using jquery
+ watch
+
+ # TODO: this should not send a unsubscribe message to the server it's the last listener for the specific type/id combo
+ unsubscribe: (type, id) =>
+
+ key = this.genKey(type, id)
+
+ @logger.debug("unsubscribing for any notifications for #{key}")
+
+ watch = @subscriptions[key]
+
+ delete @subscriptions[key]
+
+ # tell server we don't want any more messages for this entity
+ msg = @msgFactory.unsubscribe(type, id)
+ context.JK.JamServer.send(msg)
+
+ if watch
+ # unattach any events
+ watch.off();
+
+
+ watch
+
+
+# global instance
+context.JK.SubscriptionUtils = new SubscriptionUtils()
\ No newline at end of file
diff --git a/web/app/assets/javascripts/web/sessions.js b/web/app/assets/javascripts/web/sessions.js
index 95e64ca71..397795585 100644
--- a/web/app/assets/javascripts/web/sessions.js
+++ b/web/app/assets/javascripts/web/sessions.js
@@ -98,8 +98,8 @@
}
function initialize(musicSessionId) {
- $controls = $('.recording-controls');
- $status = $('.session-status')
+ $controls = $('.sessions-page .recording-controls');
+ $status = $('.sessions-page .session-status')
$('.timeago').timeago();
$controls.listenBroadcast();
diff --git a/web/app/assets/javascripts/web/web.js b/web/app/assets/javascripts/web/web.js
index f3e91abd4..466533efd 100644
--- a/web/app/assets/javascripts/web/web.js
+++ b/web/app/assets/javascripts/web/web.js
@@ -43,6 +43,7 @@
//= require user_dropdown
//= require jamkazam
//= require utils
+//= require subscription_utils
//= require ui_helper
//= require custom_controls
//= require ga
diff --git a/web/app/assets/stylesheets/client/client.css b/web/app/assets/stylesheets/client/client.css
index e18189462..8403cf75a 100644
--- a/web/app/assets/stylesheets/client/client.css
+++ b/web/app/assets/stylesheets/client/client.css
@@ -59,6 +59,7 @@
*= require ./musician
*= require ./help
*= require ./jquery-ui-overrides
+ *= require ./listenBroadcast
*= require web/audioWidgets
*= require web/recordings
*= require web/sessions
diff --git a/web/app/assets/stylesheets/client/listenBroadcast.css.scss b/web/app/assets/stylesheets/client/listenBroadcast.css.scss
new file mode 100644
index 000000000..e4d9495fb
--- /dev/null
+++ b/web/app/assets/stylesheets/client/listenBroadcast.css.scss
@@ -0,0 +1,111 @@
+@import "client/common";
+
+.listen-broadcast-state {
+
+ .instructions {
+ font-size:12px;
+ margin:5px 0;
+ padding:3px;
+ width:100%;
+ @include border_box_sizing;
+ }
+
+ .title {
+ margin:5px 0 5px;
+ text-align:center;
+ font-size:14px;
+ }
+
+ .summary {
+
+ font-size:14px;
+
+ .user {display:none}
+
+ .status {
+ @include border_box_sizing;
+ padding:5px;
+ background-color:$poor;
+ display:none;
+ width:100%;
+ margin-bottom:10px;
+ font-size:16px;
+
+ &.source_up { background-color:$good;}
+ &.source_down { background-color:$good;}
+ &.transition_up { background-color:$good;}
+ &.transition_down { background-color:$good;}
+ }
+
+ .msg {
+ text-align:center;
+ }
+
+ &[data-status=source_up] {.source_up { display:inline-block; }}
+ &[data-status=source_down] {.source_down { display:inline-block; }}
+ &[data-status=source_wrong_up] {.source_up { display:inline-block; }}
+ &[data-status=source_wrong_down] {.source_up { display:inline-block; }}
+ &[data-status=transition_up] {.source_up { display:inline-block; }}
+ &[data-status=transition_down] {.source_up { display:inline-block; }}
+ &[data-status=transition_timeout_up] {.source_up { display:inline-block; }}
+ &[data-status=transition_timeout_down] {.source_up { display:inline-block; }}
+ &[data-status=unknown] {.unknown { display:inline-block; }}
+ &[data-status=loading] {.loading { display:inline-block; }}
+ &[data-status=db_error] {.db_error{ display:inline-block; }}
+ }
+
+ .who-subtitle {
+ position:absolute;
+ text-align:center;
+ width:26px;
+ overflow:visible;
+ font-size:12px;
+ }
+
+ .detail-subtitle {
+ text-align:center;
+ font-size:12px;
+
+ span {
+ font-style:italic;
+ font-size:10px;
+ padding:3px;
+ }
+ }
+
+ .listen-broadcast-detail {
+ position:relative;
+ background-color:black;
+ margin:5px 0;
+ padding:2px;
+ }
+
+ .detail-items {
+
+ height:85px;
+ overflow:auto;
+ }
+
+ .details {
+
+ height:135px;
+
+ .title {
+ margin-bottom:2px;
+ }
+
+ .who {
+ margin:0 !important;
+ position:absolute;
+ text-align:center;
+ overflow:visible;
+ vertical-align:middle;
+ line-height:26px;
+ }
+ .detail {
+ text-align:center;
+ line-height:26px;
+ vertical-align: middle;
+ }
+ }
+}
diff --git a/web/app/assets/stylesheets/web/web.css b/web/app/assets/stylesheets/web/web.css
index dc01e969c..d91e79de2 100644
--- a/web/app/assets/stylesheets/web/web.css
+++ b/web/app/assets/stylesheets/web/web.css
@@ -12,6 +12,7 @@
*= require client/user_dropdown
*= require client/hoverBubble
*= require client/help
+*= require client/listenBroadcast
*= require web/main
*= require web/footer
*= require web/recordings
diff --git a/web/app/controllers/api_icecast_controller.rb b/web/app/controllers/api_icecast_controller.rb
index 8f03fb85c..7edd4fee5 100644
--- a/web/app/controllers/api_icecast_controller.rb
+++ b/web/app/controllers/api_icecast_controller.rb
@@ -1,11 +1,39 @@
+=begin
+ Summary of icecast implementation:
+
+ mount lock locations:
+ * IcecastMount.listener_add
+ * IcecastMount.listener_remove
+ * IcecastMount.source_up
+ * IcecastMount.source_down
+ * IcecastSourceCheck on each stale IcecastMount
+
+ sourced_needs_changing_at behavior:
+ * set to Time.now if first listener comes in (current listeners==0 as they join)
+ * set to Time.now if last listener leaves (current listeners == 1 as they leave)
+ * set to nil if source begins
+ * set to nil if source ends
+
+
+ * a session's clients gets requested to start a source if one of:
+ ** an IcecastMount is saved and listeners go above 0 and not currently sourced
+ ** an IcecastSourceCheck finds a IcecastMount with a stale sourced_needs_changing_at and mount.listeners > 0 && !mount.sourced
+
+ * a session's clients get requested to stop a source if
+ ** an IcecastSourceCheck finds a IcecastMount with a stale sourced_needs_changing_at and mount.listeners == 0 && mount.sourced
+=end
+
class ApiIcecastController < ApiController
before_filter :local_only, :only => [:test]
- before_filter :parse_mount, :except => [:test]
+ before_filter :parse_mount, :only => [:mount_add, :mount_remove, :listener_add, :listener_remove]
+ before_filter :api_signed_in_user, :only => [ :create_source_change ]
# each request will have this in it, if it's icecast.
#user-agent = Icecast 2.3.3
+ respond_to :json
+
def test
puts "========= GOT IT======="
render text: 'GOT IT', :status => :ok
@@ -51,6 +79,38 @@ class ApiIcecastController < ApiController
render text: '', :status => :ok
end
+ # info reported by the client to the server letting us track what's going on at a deeper level
+ def create_source_change
+ mount = IcecastMount.find(params[:id])
+
+ @source_change = IcecastSourceChange.new
+ @source_change.source_direction = params[:source_direction]
+ @source_change.user = current_user
+ @source_change.client_id = params[:client_id]
+ @source_change.success = params[:success]
+ @source_change.reason = params[:reason]
+ @source_change.detail = params[:detail]
+ @source_change.mount = mount
+ @source_change.change_type = IcecastSourceChange::CHANGE_TYPE_CLIENT
+ @source_change.save
+
+ if @source_change.errors.any?
+ response.status = :unprocessable_entity
+ respond_with @source_change
+ else
+ source_change_json = Rabl::Renderer.json(@source_change, 'api_icecast/source_change_notification')
+ SubscriptionMessage.mount_source_change(mount, source_change_json)
+ render :json => {}, :status => 200
+ end
+ end
+
+ def show
+ @mount = IcecastMount.find(params[:id])
+
+ puts "@MOUNT #{@mount}"
+ respond_with @mount, responder: ApiResponder
+ end
+
protected
def local_only
request.local?
@@ -74,5 +134,4 @@ class ApiIcecastController < ApiController
@mount_id = uri.path
@mount_params = Rack::Utils.parse_query(uri.query)
end
-
end
diff --git a/web/app/controllers/spikes_controller.rb b/web/app/controllers/spikes_controller.rb
index ff31a9511..b07987b4d 100644
--- a/web/app/controllers/spikes_controller.rb
+++ b/web/app/controllers/spikes_controller.rb
@@ -30,4 +30,11 @@ class SpikesController < ApplicationController
def websocket
render :layout => false
end
+
+ def subscription
+
+ Notification.send_subscription_message('test', '1', '{"msg": "oh hai 1"}')
+ Notification.send_subscription_message('test', '2', '{"msg": "oh hai 2"}')
+ render text: 'oh hai'
+ end
end
diff --git a/web/app/views/api_icecast/show.rabl b/web/app/views/api_icecast/show.rabl
new file mode 100644
index 000000000..b708847f9
--- /dev/null
+++ b/web/app/views/api_icecast/show.rabl
@@ -0,0 +1,19 @@
+object @mount
+
+attributes :id, :listeners, :source_direction, :sourced
+
+node :state do |mount|
+ mount.state
+end
+
+
+child(:source_changes => :source_changes) {
+
+ attributes :id
+
+ puts "source_change #{@object}"
+
+ node do |source_change|
+ partial("api_icecast/show_source_change", :object => source_change)
+ end
+}
diff --git a/web/app/views/api_icecast/show_source_change.rabl b/web/app/views/api_icecast/show_source_change.rabl
new file mode 100644
index 000000000..0bdea96ea
--- /dev/null
+++ b/web/app/views/api_icecast/show_source_change.rabl
@@ -0,0 +1,7 @@
+object @source_change
+
+attributes :id, :source_direction, :client_id, :success, :reason, :detail, :created_at, :change_type
+
+node :user do |source_change|
+ partial("api_users/show_minimal", :object => source_change.user)
+end
diff --git a/web/app/views/api_icecast/source_change_notification.rabl b/web/app/views/api_icecast/source_change_notification.rabl
new file mode 100644
index 000000000..012ae9ebf
--- /dev/null
+++ b/web/app/views/api_icecast/source_change_notification.rabl
@@ -0,0 +1,13 @@
+object @source_change
+
+node do |source_change|
+ partial("api_icecast/show_source_change", :object => source_change)
+end
+
+child(:mount) { |mount|
+ attributes :id, :listeners, :source_direction, :sourced
+
+ node :state do |mount|
+ mount.state
+ end
+}
\ No newline at end of file
diff --git a/web/app/views/clients/_listenBroadcast.html.slim b/web/app/views/clients/_listenBroadcast.html.slim
new file mode 100644
index 000000000..844b481b5
--- /dev/null
+++ b/web/app/views/clients/_listenBroadcast.html.slim
@@ -0,0 +1,57 @@
+script type="text/template" id="template-listen-broadcast-state"
+ .listen-broadcast-state
+ .instructions
+ | Here you can see a greater amount of detail about the state of the broadcast. If you leave the hover window open, new activity related to broadcasts will show up in real-time.
+ .summary
+ .title
+ | Current Status
+ .source_up.status
+ .user
+ .msg
+ | Broadcasting
+ .source_down.status
+ .user
+ .msg
+ | Not Broadcasting
+ .source_wrong_up.status
+ .msg
+ | Broadcast Attempt Failed
+ .source_wrong_down.status
+ .msg
+ | Stop Broadcast Failed
+ .transition_up.status
+ .msg
+ | Initializing Broadcast
+ .transition_down.status
+ .msg
+ | Stopping Broadcast
+ .transition_timeout_up.status
+ .msg
+ | No One Broadcasting Yet
+ .transition_timeout_down.status
+ .msg
+ | Not Stopped Broadcasting
+ .unknown.status
+ .msg
+ | Unknown Status
+ .loading.status
+ .msg
+ | loading ...
+ .db_error.status
+ .msg
+ | db error
+ .details
+ .title
+ .who-subtitle
+ | Who
+ .detail-subtitle
+ | Details
+ span
+ | (most recent at top)
+ .detail-items
+
+script type="text/template" id="template-listen-broadcast-detail"
+ .listen-broadcast-detail
+ .who
+ .detail
+
diff --git a/web/app/views/clients/index.html.erb b/web/app/views/clients/index.html.erb
index 9852b44d9..0fb57a248 100644
--- a/web/app/views/clients/index.html.erb
+++ b/web/app/views/clients/index.html.erb
@@ -62,6 +62,7 @@
<%= render "notify" %>
<%= render "client_update" %>
<%= render "overlay_small" %>
+<%= render "listenBroadcast" %>
<%= render "sync_viewer_templates" %>
<%= render "help" %>
<%= render 'dialogs/dialogs' %>
diff --git a/web/app/views/layouts/web.html.erb b/web/app/views/layouts/web.html.erb
index 62b8d4f7c..62c2f1ec6 100644
--- a/web/app/views/layouts/web.html.erb
+++ b/web/app/views/layouts/web.html.erb
@@ -78,6 +78,7 @@
<%= render "clients/hoverSession" %>
<%= render "clients/hoverRecording" %>
<%= render "clients/help" %>
+ <%= render "clients/listenBroadcast" %>
<%= render 'dialogs/dialogs' %>
diff --git a/web/config/application.rb b/web/config/application.rb
index 4c502e07f..418d38545 100644
--- a/web/config/application.rb
+++ b/web/config/application.rb
@@ -116,6 +116,7 @@ if defined?(Bundler)
config.websocket_gateway_uri = "ws://localhost:#{config.websocket_gateway_port}/websocket"
config.websocket_gateway_trusted_uri = "ws://localhost:#{config.websocket_gateway_port + 1}/websocket"
config.websocket_gateway_max_connections_per_user = 20
+ config.lock_connections = false
config.external_hostname = ENV['EXTERNAL_HOSTNAME'] || 'localhost'
config.external_port = ENV['EXTERNAL_PORT'] || 3000
@@ -197,9 +198,10 @@ if defined?(Bundler)
# this will be the qualifier on the IcecastConfigWorker queue name
config.icecast_server_id = ENV['ICECAST_SERVER_ID'] || 'localhost'
config.icecast_max_missing_check = 2 * 60 # 2 minutes
- config.icecast_max_sourced_changed = 15 # 15 seconds
+ config.icecast_max_sourced_changed = 10 # 10 seconds
config.icecast_hardcoded_source_password = nil # generate a new password everytim. production should always use this value
- config.icecast_wait_after_reload = 5 # 5 seconds. a hack needed until VRFS-1043
+ config.icecast_wait_after_reload = 0 # 0 seconds. a hack needed until VRFS-1043... maybe
+ config.source_changes_missing_secs = 2 # amount of time before we think it's odd that there are no source_change notifications
config.email_alerts_alias = 'nobody@jamkazam.com' # should be used for 'oh no' server down/service down sorts of emails
config.email_generic_from = 'nobody@jamkazam.com'
diff --git a/web/config/initializers/rabl_init.rb b/web/config/initializers/rabl_init.rb
index cfa6b995c..f03d6261c 100644
--- a/web/config/initializers/rabl_init.rb
+++ b/web/config/initializers/rabl_init.rb
@@ -1,3 +1,9 @@
+class PrettyJson
+ def self.dump(object)
+ JSON.pretty_generate(object, {:indent => " "})
+ end
+end
+
Rabl.configure do |config|
# Commented as these are defaults
# config.cache_all_output = false
@@ -5,6 +11,7 @@ Rabl.configure do |config|
# config.cache_engine = Rabl::CacheEngine.new # Defaults to Rails cache
# config.escape_all_output = false
# config.json_engine = nil # Any multi_json engines or a Class with #encode method
+ config.json_engine = PrettyJson if Rails.env.development?
# config.msgpack_engine = nil # Defaults to ::MessagePack
# config.bson_engine = nil # Defaults to ::BSON
# config.plist_engine = nil # Defaults to ::Plist::Emit
@@ -16,5 +23,5 @@ Rabl.configure do |config|
config.include_child_root = false
# config.enable_json_callbacks = false
# config.xml_options = { :dasherize => true, :skip_types => false }
- # config.view_paths = []
+ config.view_paths << Rails.root.join('app/views')
end
\ No newline at end of file
diff --git a/web/config/routes.rb b/web/config/routes.rb
index d4e4b5556..2a6194284 100644
--- a/web/config/routes.rb
+++ b/web/config/routes.rb
@@ -91,6 +91,7 @@ SampleApp::Application.routes.draw do
match '/facebook_invite', to: 'spikes#facebook_invite'
match '/launch_app', to: 'spikes#launch_app'
match '/websocket', to: 'spikes#websocket'
+ match '/test_subscription', to: 'spikes#subscription'
# junk pages
match '/help', to: 'static_pages#help'
@@ -476,6 +477,8 @@ SampleApp::Application.routes.draw do
match '/icecast/mount_remove' => 'api_icecast#mount_remove', :via => :post
match '/icecast/listener_add' => 'api_icecast#listener_add', :via => :post
match '/icecast/listener_remove' => 'api_icecast#listener_remove', :via => :post
+ match '/icecast/mount/:id' => 'api_icecast#show', :via => :get
+ match '/icecast/mount/:id/source_change' => 'api_icecast#create_source_change', :via => :post
# tweet on behalf of client
match '/twitter/tweet' => 'api_twitters#tweet', :via => :post
diff --git a/web/config/scheduler.yml b/web/config/scheduler.yml
index 9108a382d..cf5c1e80a 100644
--- a/web/config/scheduler.yml
+++ b/web/config/scheduler.yml
@@ -10,7 +10,8 @@ IcecastConfigRetry:
description: "Finds icecast servers that have had their config_changed, but no IcecastConfigWriter check recently"
IcecastSourceCheck:
- cron: "10 * * * * *"
+ every:
+ - "7s"
class: "JamRuby::IcecastSourceCheck"
description: "Finds icecast mounts that need their 'sourced' state to change, but haven't in some time"
@@ -20,7 +21,7 @@ CleanupFacebookSignup:
description: "Deletes facebook_signups that are old"
UnusedMusicNotationCleaner:
- cron: "10 * * * * *"
+ cron: "0 * * * *"
class: "JamRuby::UnusedMusicNotationCleaner"
description: "Remove unused music notations"
diff --git a/web/lib/tasks/start.rake b/web/lib/tasks/start.rake
index 87cbce1a0..b53b25f6c 100644
--- a/web/lib/tasks/start.rake
+++ b/web/lib/tasks/start.rake
@@ -1,6 +1,6 @@
# this rake file is meant to hold shortcuts/helpers for starting onerous command line executions
-# bunde exec rake all_jobs
+# bundle exec rake all_jobs
task :all_jobs do
Rake::Task['environment'].invoke
diff --git a/web/spec/controllers/api_icecast_controller_spec.rb b/web/spec/controllers/api_icecast_controller_spec.rb
new file mode 100644
index 000000000..2cc8716ee
--- /dev/null
+++ b/web/spec/controllers/api_icecast_controller_spec.rb
@@ -0,0 +1,23 @@
+require 'spec_helper'
+
+describe ApiIcecastController do
+ render_views
+
+ let(:user) {FactoryGirl.create(:user) }
+
+ before(:each) do
+ controller.current_user = user
+ end
+
+ describe "create_source_change" do
+ let!(:mount) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, sourced_needs_changing_at: nil, listeners: 1)}
+
+ it "success" do
+
+ post :create_source_change, {format:'json', id: mount.id, success: true, source_direction: true, reason: 'blah', client_id: 'abc'}
+
+ response.status.should == 200
+ end
+ end
+
+end
diff --git a/web/spec/factories.rb b/web/spec/factories.rb
index 659733bcf..a1d3a1ef8 100644
--- a/web/spec/factories.rb
+++ b/web/spec/factories.rb
@@ -434,7 +434,7 @@ FactoryGirl.define do
association :mount_template, :factory => :icecast_mount_template
factory :iceast_mount_with_music_session do
- association :music_session, :factory => :music_session
+ association :music_session, :factory => :active_music_session
end
end
end
diff --git a/web/spec/javascripts/sessionLatency.spec.js b/web/spec/javascripts/sessionLatency.spec.js
index beb07a122..074efc76a 100644
--- a/web/spec/javascripts/sessionLatency.spec.js
+++ b/web/spec/javascripts/sessionLatency.spec.js
@@ -96,7 +96,7 @@
sessionLatency.subscribe('test', cb);
});
it("should invoke callback on latency result", function() {
- var cb = jasmine.createSpy("Latency Subscription Callback");
+ var cb = jasmine.createSpy("Latency subscription.rb Callback");
sessionLatency.subscribe('test', cb);
$.each(sessions, function(index, session) {
sessionLatency.sessionPings(session);
diff --git a/web/spec/spec_helper.rb b/web/spec/spec_helper.rb
index 483a30851..936471e11 100644
--- a/web/spec/spec_helper.rb
+++ b/web/spec/spec_helper.rb
@@ -87,7 +87,7 @@ Thread.new do
:rabbitmq_port => 5672,
:calling_thread => current,
:cidr => ['0.0.0.0/0'],
- :gateway_name => 'default')
+ :gateway_name => 'default-test')
rescue Exception => e
puts "websocket-gateway failed: #{e}"
end
diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb
index 683107495..0f09d5f74 100644
--- a/websocket-gateway/lib/jam_websockets/router.rb
+++ b/websocket-gateway/lib/jam_websockets/router.rb
@@ -10,7 +10,7 @@ include Jampb
module EventMachine
module WebSocket
class Connection < EventMachine::Connection
- attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted # client_id is uuid we give to each client to track them as we like
+ attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions # client_id is uuid we give to each client to track them as we like
end
end
end
@@ -36,12 +36,14 @@ module JamWebsockets
@clients = {} # clients that have logged in
@user_context_lookup = {} # lookup a set of client_contexts by user_id
@client_lookup = {} # lookup a client by client_id
+ @subscription_lookup = {}
@amqp_connection_manager = nil
@users_exchange = nil
@message_factory = JamRuby::MessageFactory.new
@semaphore = Mutex.new
@user_topic = nil
@client_topic = nil
+ @subscription_topic = nil
@thread_pool = nil
@heartbeat_interval_client = nil
@connect_time_expire_client = nil
@@ -187,6 +189,7 @@ module JamWebsockets
@semaphore.synchronize do
if client_id == MessageFactory::ALL_NATIVE_CLIENTS
+
msg = Jampb::ClientMessage.parse(msg)
@log.debug "client-directed message received from #{msg.from} to all clients"
@client_lookup.each do |client_id, client_context|
@@ -204,7 +207,7 @@ module JamWebsockets
else
client_context = @client_lookup[client_id]
- if !client_context.nil?
+ if client_context
client = client_context.client
@@ -234,6 +237,93 @@ module JamWebsockets
end
MQRouter.client_exchange = @clients_exchange
+
+ ############## DYNAMIC SUBSCRIPTION MESSAGING ###################
+
+ @subscriptions_exchange = channel.topic('subscriptions')
+ @subscription_topic = channel.queue("subscriptions-#{@gateway_name}", :auto_delete => true)
+ @subscription_topic.purge
+
+ # subscribe for any p2p messages to a client
+ @subscription_topic.subscribe(:ack => false) do |headers, msg|
+ begin
+ routing_key = headers.routing_key
+ type_and_id = routing_key["subscription.".length..-1]
+ #type, id = type_and_id.split('.')
+
+ @semaphore.synchronize do
+
+ clients = @subscription_lookup[type_and_id]
+
+ msg = Jampb::ClientMessage.parse(msg)
+
+ if clients
+ EM.schedule do
+ clients.each do |client|
+ @log.debug "subscription msg to client #{client.client_id}"
+ send_to_client(client, msg)
+ end
+ end
+ end
+ end
+ rescue => e
+ @log.error "unhandled error in messaging to client for mount"
+ @log.error e
+ end
+ end
+
+ MQRouter.subscription_exchange = @subscriptions_exchange
+ end
+
+ # listens on a subscription topic on the behalf of a client
+ def register_subscription(client, type, id)
+ # track subscriptions that this client has made, for disconnect scenarios
+ client.subscriptions.add({type:type, id: id})
+
+ key = "#{type}.#{id}"
+
+ # for a given type:id in @subscription_lookup, track clients listening
+ clients = @subscription_lookup[key]
+ if clients.nil?
+ clients = Set.new
+ @subscription_lookup[key] = clients
+ end
+
+ needs_subscription = clients.length == 0
+
+ clients.add(client)
+
+ @log.debug("register subscription handled #{type}.#{id}")
+ if needs_subscription
+ routing_key = "subscription.#{type}.#{id}"
+ @log.debug("register topic bound #{routing_key}")
+ # if this is the 1st client to listen for this mount, then subscribe to the topic for this mount_id
+ @subscription_topic.bind(@subscriptions_exchange, :routing_key => routing_key)
+ end
+ end
+
+ # de-listens on a subscription topic on the behalf of a client
+ # called automatically when a clean disconnects, to keep state clean.
+ def unregister_subscription(client, type, id)
+ # remove subscription from this client's list of subscriptions
+ client.subscriptions.delete({type:type, id:id})
+
+ key = "#{type}.#{id}"
+ # for a given mount_id in @subscription_lookup, remove from list of clients listening
+ clients = @subscription_lookup[key]
+ if clients
+ deleted = clients.delete(client)
+ if !deleted
+ @log.error "unregister_subscription: unable to locate any client #{client.client_id} for id #{id}"
+ end
+ else
+ @log.error "unregister_subscription: unable to locate any clients for id #{id}"
+ end
+
+ if clients.length == 0
+ # if there are no more clients listening, then unsubscribe to the topic for this mount_id
+ @subscription_topic.unbind(@subscriptions_exchange, :routing_key => "subscription.#{type}.#{id}")
+ end
end
# this method allows you to translate exceptions into websocket channel messages and behavior safely.
@@ -428,6 +518,12 @@ module JamWebsockets
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) }
+ elsif client_msg.type == ClientMessage::Type::SUBSCRIBE_BULK
+ sane_logging { handle_bulk_subscribe(client_msg.subscribe_bulk, client) }
+ elsif client_msg.type == ClientMessage::Type::SUBSCRIBE
+ sane_logging { handle_subscribe(client_msg.subscribe, client) }
+ elsif client_msg.type == ClientMessage::Type::UNSUBSCRIBE
+ sane_logging { handle_unsubscribe(client_msg.unsubscribe, client) }
else
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
end
@@ -522,6 +618,8 @@ module JamWebsockets
os = options["os"]
udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true'
+ client.subscriptions = Set.new# list of subscriptions that this client is watching in real-time
+
@log.info("handle_login: client_type=#{client_type} token=#{token} client_id=#{client_id} channel_id=#{client.channel_id} udp_reachable=#{udp_reachable}")
if client_type == Connection::TYPE_LATENCY_TESTER
@@ -669,6 +767,33 @@ module JamWebsockets
end
end
+ def handle_bulk_subscribe(subscriptions, client)
+
+ subscriptions.types.each_with_index do |subscription, i|
+ handle_subscribe(OpenStruct.new({type: subscriptions.types[i], id: subscriptions.ids[i]}), client)
+ end
+ end
+
+ def handle_subscribe(subscribe, client)
+ id = subscribe.id
+ type = subscribe.type
+ if id && id.length > 0 && type && type.length > 0
+ register_subscription(client, type, id)
+ else
+ @log.error("handle_subscribe: empty data #{subscribe}")
+ end
+ end
+
+ def handle_unsubscribe(unsubscribe, client)
+ id = unsubscribe.id
+ type = unsubscribe.type
+ if id && id.length > 0 && type && type.length > 0
+ unregister_subscription(client, type, id)
+ else
+ @log.error("handle_subscribe: empty data #{unsubscribe}")
+ end
+ end
+
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
unless context = @clients[client]
@log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session"
@@ -1026,6 +1151,11 @@ module JamWebsockets
def cleanup_client(client)
client.close
+ # unregister any subscriptions
+ client.subscriptions.each do |subscription|
+ unregister_subscription(client, subscription[:type], subscription[:id])
+ end
+
@semaphore.synchronize do
pending = client.context.nil? # presence of context implies this connection has been logged into