Conflicts:

db/manifest
	web/app/assets/javascripts/jam_rest.js
This commit is contained in:
Seth Call 2014-12-18 15:13:55 -06:00
parent 566035348b
commit d83a632d0b
55 changed files with 1414 additions and 53 deletions

View File

@ -43,10 +43,10 @@ ActiveAdmin.register_page "Bootstrap" do
admin_auth.save!
path = IcecastPath.new
path.base_dir = '/usr/local/Cellar/icecast/2.3.3/share/icecast'
path.log_dir = '/usr/local/Cellar/icecast/2.3.3/var/log/icecast'
path.web_root = '/usr/local/Cellar/icecast/2.3.3/share/icecast/web'
path.admin_root = '/usr/local/Cellar/icecast/2.3.3/share/icecast/admin'
path.base_dir = '/usr/local/Cellar/icecast/2.4.1/share/icecast'
path.log_dir = '/usr/local/Cellar/icecast/2.4.1/var/log/icecast'
path.web_root = '/usr/local/Cellar/icecast/2.4.1/share/icecast/web'
path.admin_root = '/usr/local/Cellar/icecast/2.4.1/share/icecast/admin'
path.pid_file = nil
path.save!

View File

@ -229,6 +229,7 @@ deletable_recordings.sql
jam_tracks.sql
shopping_carts.sql
recurly.sql
add_track_resource_id.sql
add_track_resource_id.sql
user_genres.sql
user_online.sql
icecast_source_changes.sql

View File

@ -0,0 +1,16 @@
-- track extra detail about if the source went up or down
CREATE UNLOGGED TABLE icecast_source_changes (
id VARCHAR(64) PRIMARY KEY DEFAULT uuid_generate_v4(),
source_direction BOOLEAN NOT NULL,
change_type VARCHAR(64) NOT NULL,
user_id VARCHAR(64),
client_id VARCHAR(64),
success BOOLEAN NOT NULL,
reason VARCHAR,
detail VARCHAR,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
icecast_mount_id VARCHAR(64) NOT NULL REFERENCES icecast_mounts(id) ON DELETE CASCADE
);
-- track when the source_direction
ALTER TABLE icecast_mounts ADD COLUMN source_direction BOOLEAN NOT NULL DEFAULT FALSE;

View File

@ -17,6 +17,10 @@ message ClientMessage {
LEAVE_MUSIC_SESSION_ACK = 125;
HEARTBEAT = 130;
HEARTBEAT_ACK = 135;
SUBSCRIBE = 136;
UNSUBSCRIBE = 137;
SUBSCRIPTION_MESSAGE = 138;
SUBSCRIBE_BULK = 139;
// friend notifications
FRIEND_UPDATE = 140;
@ -73,7 +77,7 @@ message ClientMessage {
SOURCE_DOWN_REQUESTED = 251;
SOURCE_UP = 252;
SOURCE_DOWN = 253;
TEST_SESSION_MESSAGE = 295;
PING_REQUEST = 300;
@ -115,6 +119,10 @@ message ClientMessage {
optional LeaveMusicSessionAck leave_music_session_ack = 125;
optional Heartbeat heartbeat = 130;
optional HeartbeatAck heartbeat_ack = 135;
optional Subscribe subscribe = 136;
optional Unsubscribe unsubscribe = 137;
optional SubscriptionMessage subscription_message = 138;
optional SubscribeBulk subscribe_bulk = 139;
// friend notifications
optional FriendUpdate friend_update = 140; // from server to all friends of user
@ -586,6 +594,33 @@ message SourceDown {
optional string music_session = 1; // music session id
}
message SubscriptionMessage {
optional string type = 1; // the type of the subscription
optional string id = 2; // data about what to subscribe to, specifically
optional string body = 3; // this is a JSON string; let the browser decode it
}
message Subscribe {
optional string type = 1; // the type of the subscription
optional string id = 2; // data about what to subscribe to, specifically
}
message Unsubscribe {
optional string type = 1; // the type of the subscription
optional string id = 2; // data about what to subscribe to, specifically
}
message Subscription {
optional string type = 1; // the type of the subscription
optional string id = 2; // data about what to subscribe to, specifically
}
message SubscribeBulk {
repeated string types = 1;
repeated string ids = 2;
//repeated Subscription subscriptions = 1; # the ruby protocol buffer library chokes on this. so we have to do the above
}
// route_to: session
// a test message used by ruby-client currently. just gives way to send out to rest of session
message TestSessionMessage {

View File

@ -77,6 +77,7 @@ require "jam_ruby/app/uploaders/jam_track_track_uploader"
require "jam_ruby/app/uploaders/max_mind_release_uploader"
require "jam_ruby/lib/desk_multipass"
require "jam_ruby/lib/ip"
require "jam_ruby/lib/subscription_message"
require "jam_ruby/amqp/amqp_connection_manager"
require "jam_ruby/database"
require "jam_ruby/message_factory"
@ -144,6 +145,7 @@ require "jam_ruby/models/icecast_listen_socket"
require "jam_ruby/models/icecast_logging"
require "jam_ruby/models/icecast_master_server_relay"
require "jam_ruby/models/icecast_mount"
require "jam_ruby/models/icecast_source_change"
require "jam_ruby/models/icecast_path"
require "jam_ruby/models/icecast_relay"
require "jam_ruby/models/icecast_security"

View File

@ -389,7 +389,9 @@ SQL
end
def lock_connections(conn)
conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
if APP_CONFIG.lock_connections
conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
end
end
# def associate_tracks(connection, tracks)

View File

@ -53,6 +53,11 @@ module JamWebEventMachine
@@log.debug("#{exchange.name} is ready to go")
MQRouter.user_exchange = exchange
end
AMQP::Exchange.new(channel, :topic, "subscriptions") do |exchange|
@@log.debug("#{exchange.name} is ready to go")
MQRouter.subscription_exchange = exchange
end
end
ran[:ran] = true

View File

@ -0,0 +1,25 @@
module JamRuby
class SubscriptionMessage
# sent whenever we change the desired direction of the source
def self.mount_source_direction(mount)
Notification.send_subscription_message('mount', mount.id, {type: 'source_direction_change', source_direction: true}.to_json)
end
# sent whenever we receive an update about a sourcing attempt by a client
# source_change_json is created by a rabl, so currently only possible to use via web
def self.mount_source_change(mount, source_change_json)
Notification.send_subscription_message('mount', mount.id, source_change_json)
end
def self.mount_source_up_requested(mount)
Notification.send_subscription_message('mount', mount.id, {type: 'source_up_requested'}.to_json )
end
def self.mount_source_down_requested(mount)
Notification.send_subscription_message('mount', mount.id, {type: 'source_down_requested'}.to_json )
end
end
end

View File

@ -993,6 +993,20 @@ module JamRuby
)
end
def subscription_message(type, id, body)
subscription_message = Jampb::SubscriptionMessage.new(
id: id,
type: type,
body: body
)
Jampb::ClientMessage.new(
:type => ClientMessage::Type::SUBSCRIPTION_MESSAGE,
:route_to => CLIENT_TARGET,
:subscription_message => subscription_message,
)
end
####################################################
# is this message directed to the server?

View File

@ -153,9 +153,9 @@ module JamRuby
end
def report_add_participant
if self.music_session_id_changed? &&
self.music_session.present? &&
self.connected? &&
if self.music_session_id_changed? &&
self.music_session.present? &&
self.connected? &&
self.as_musician? &&
0 < (count = self.music_session.connected_participant_count)
GoogleAnalyticsEvent.report_session_participant(count)

View File

@ -10,12 +10,16 @@ module JamRuby
:music_session_id, :icecast_server_id, :icecast_mount_template_id, :listeners, :sourced,
:sourced_needs_changing_at, as: :admin
attr_accessor :no_config_changed
belongs_to :authentication, class_name: "JamRuby::IcecastUserAuthentication", inverse_of: :mount, :foreign_key => 'authentication_id'
belongs_to :music_session, class_name: "JamRuby::ActiveMusicSession", inverse_of: :mount, foreign_key: 'music_session_id'
belongs_to :server, class_name: "JamRuby::IcecastServer", inverse_of: :mounts, foreign_key: 'icecast_server_id'
belongs_to :mount_template, class_name: "JamRuby::IcecastMountTemplate", inverse_of: :mounts, foreign_key: 'icecast_mount_template_id'
has_many :source_changes, class_name: "JamRuby::IcecastSourceChange", inverse_of: :mount, foreign_key: 'icecast_mount_id', order: 'created_at DESC'
validates :name, presence: true, uniqueness: true
validates :source_username, length: {minimum: 5}, if: lambda {|m| m.source_username.present?}
validates :source_pass, length: {minimum: 5}, if: lambda {|m| m.source_pass.present?}
@ -33,7 +37,7 @@ module JamRuby
before_save :sanitize_active_admin
after_save :after_save
after_save :poke_config
#after_save :poke_config
before_destroy :poke_config
def name_has_correct_format
@ -45,7 +49,7 @@ module JamRuby
end
def after_save
server.update_attribute(:config_changed, 1)
server.update_attribute(:config_changed, 1) unless no_config_changed
if !sourced_was && sourced
@ -59,9 +63,13 @@ module JamRuby
end
if listeners_was == 0 && listeners > 0 && !sourced
# listener count went above 0 and there is no source. ask the musician clients to source
notify_source_up_requested
if source_direction_was != source_direction
# temporarily removed; it seems better to leave all the data in for now. It should never be that much
# if the requested source direction has changed, then delete diagnostic info
#IcecastSourceChange.delete_all(["icecast_mount_id = ?", self.id]) if source_direction
# and tell anyone listening that the direction has changed
SubscriptionMessage.mount_source_direction(self)
end
# Note:
@ -90,10 +98,110 @@ module JamRuby
mount
end
def fail_state(reason, detail = nil)
{success:false, reason: reason, detail: detail}
end
def success_state(reason, detail = nil)
{success:true, reason: reason, detail: detail}
end
# success messages:
# * source_up - we are broadcasting - detail is the source user or empty. if empty you should refresh state in a few seconds
# * source_down - we are broadcasting - detail is the last source user, or empty. if empty you should refresh state in a few seconds
# * transition_up/down - a source change occurred recently, and we don't yet have any info from the client. you should refresh state in a few seconds
#
# failure messages:
# * multiple_clients
# * unknown - represents a code error
# * transition_timeout_up - a source change for up has occurred, but no clients have reported any info
# * transition_timeout_down - a source change for up has occurred, but no clients have reported any info
# * source_wrong_up - the source should be up, but it could not succeed according to the most recent effort.
# * source_wrong_down - the source should be down, but it could not succeed according to the most recent effort.
#
# for both source_wrong_up and source_wrong_down, valid detail values:
# * 'no_client' if no client has said anything to a request after some amount of time, or otherwise it's client defined
# client-defined reason values
# * 'initialize_singleton' - code error in the client
# * 'initialize_thread' - code error in the client
# * 'initialize_ogg' - could not initialize ogg encoder... likely code error
# * 'initialize_mp3' - could not initialize mp3 encoder... likely code error
# * 'initialize_socket' - could not initialize socket... likely code error
# * 'icecast_response' - icecast was not accessible or returned an error
# * TODO client defined
def state
begin
result = fail_state('unknown')
if sourced == should_source?
first = source_changes.first
# don't check source_changes if actual source state mirrors desired source state... just say we are good, and pass down relevant sourcing user ID if present
result = success_state('source_' + (source_direction ? 'up' : 'down'), first.nil? ? nil : first.user_id)
elsif source_changes.count > 0
# if the desired source direction is up, but we haven't sourced yet... let's try and find out why
# let's first see if we have N clients contributing, which is a code error condition
clients = Hash[source_changes.map { |source_change| [source_change.client_id, source_change] }]
#if clients.length > 1
# this means more than one client has contributed... this is an code error condition
# result = fail_state('multiple_clients', clients)
#else
first = source_changes.first
if first.source_direction == should_source?
if first.success?
# the last message from any client indicated we had the right source
# if less than a second has passed, this is not strange. But more than that, it's strange
if sourced_needs_changing_at.nil? || (Time.now - first.created_at < APP_CONFIG.source_changes_missing_secs)
result = success_state('transition_' + (source_direction ? 'up' : 'down'))
else
result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'))
end
else
# so the last state indicated by the client agrees that our source info is wrong; we can use it's data to augment the frontend
result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), first.reason)
end
else
# if the last message from the client is for the wrong source direction (meaning no attempt to change source state by client to the correct state)
# then report this info
if sourced_needs_changing_at.nil? || (Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs)
result = fail_state('transition_' + (source_direction ? 'up' : 'down'))
else
result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), 'no_client') # no client implies no client has attempted to source
end
end
#end
else
# we have the wrong source direction, but no source change info.
# if less than a second has passed, this is not strange. But more than that, it's strange
if sourced_needs_changing_at.nil?
#result = fail_state('db_error', 'sourced_needs_changing_at is nil')
result = success_state('transition_' + (source_direction ? 'up' : 'down'))
elsif Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs
result = success_state('transition_' + (source_direction ? 'up' : 'down'), 'initial')
else
result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'), 'initial')
end
end
rescue Exception => e
@@log.error("exception in IcecastMount.state #{e}")
end
result
end
def source_up
with_lock do
self.sourced = true
self.sourced_needs_changing_at = nil
self.no_config_changed = true
save(validate: false)
end
end
@ -102,17 +210,28 @@ module JamRuby
with_lock do
self.sourced = false
self.sourced_needs_changing_at = nil
self.no_config_changed = true
save(validate: false)
end
end
def should_source?
self.listeners > 0
end
def listener_add
with_lock do
self.sourced_needs_changing_at = Time.now if listeners == 0
if listeners == 0 && !sourced && (self.sourced_needs_changing_at.nil? || Time.now - self.sourced_needs_changing_at > APP_CONFIG.source_changes_missing_secs)
# enough time has elapsed since the last time the source direction changed to instaneously request a source up
# listener count went above 0 and there is no source. ask the musician clients to source
notify_source_up_requested
end
# this is completely unsafe without that 'with_lock' statement above
self.listeners = self.listeners + 1
self.no_config_changed = true
save(validate: false)
end
end
@ -129,31 +248,67 @@ module JamRuby
# this is completely unsafe without that 'with_lock' statement above
self.listeners = self.listeners - 1
self.no_config_changed = true
save(validations: false)
end
end
def notify_source_up_requested
Notification.send_source_up_requested(music_session,
if music_session_id
self.sourced_needs_changing_at = Time.now
self.source_direction = true
self.no_config_changed = true
save!
source_change = IcecastSourceChange.new
source_change.source_direction = true
source_change.success = true
source_change.mount = self
source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_UP_REQUEST
source_change.save!
Notification.send_source_up_requested(music_session,
server.hostname,
server.pick_listen_socket(:port),
name,
resolve_string(:source_username),
resolve_string(:source_pass),
resolve_int(:bitrate)) if music_session_id
resolve_int(:bitrate))
SubscriptionMessage.mount_source_up_requested(self)
end
end
def notify_source_down_requested
Notification.send_source_down_requested(music_session, name)
if music_session_id
self.sourced_needs_changing_at = Time.now
self.source_direction = false
self.no_config_changed = true
save!
source_change = IcecastSourceChange.new
source_change.source_direction = false
source_change.success = true
source_change.mount = self
source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_DOWN_REQUEST
source_change.save!
Notification.send_source_down_requested(music_session, name)
SubscriptionMessage.mount_source_down_requested(self)
end
end
def notify_source_up
Notification.send_source_up(music_session) if music_session_id
if music_session_id
Notification.send_source_up(music_session)
end
end
def notify_source_down
Notification.send_source_down(music_session) if music_session_id
if music_session_id
Notification.send_source_down(music_session)
end
end
# Check if the icecast_mount specifies the value; if not, use the mount_template's value take effect

View File

@ -0,0 +1,27 @@
module JamRuby
class IcecastSourceChange < ActiveRecord::Base
@@log = Logging.logger[IcecastSourceChange]
CHANGE_TYPE_CLIENT = 'client'
CHANGE_TYPE_MOUNT_UP_REQUEST = 'source_up_request'
CHANGE_TYPE_MOUNT_DOWN_REQUEST = 'source_down_request'
belongs_to :mount, class_name: "JamRuby::IcecastMount", inverse_of: :source_changes, foreign_key: 'icecast_mount_id'
belongs_to :user, class_name: "JamRuby::User"
validates :source_direction, inclusion: {:in => [true, false]}
validates :success, inclusion: {:in => [true, false]}
validates :reason, length: {minimum: 0, maximum:255}
validates :detail, length: {minimum: 0, maximum:10000}
validates :user, presence:true, :if => :is_client_change?
validates :client_id, presence: true, :if => :is_client_change?
validates :mount, presence:true
validates :change_type, inclusion: {:in => [CHANGE_TYPE_CLIENT, CHANGE_TYPE_MOUNT_DOWN_REQUEST, CHANGE_TYPE_MOUNT_UP_REQUEST]}
def is_client_change?
change_type == CHANGE_TYPE_CLIENT
end
end
end

View File

@ -1376,6 +1376,12 @@ module JamRuby
@@mq_router.server_publish_to_everyone_in_session(music_session, msg)
end
def send_subscription_message(type, id, body)
msg = @@message_factory.subscription_message(type, id, body)
@@mq_router.publish_to_subscription(type, id, msg)
end
end
private

View File

@ -7,7 +7,7 @@ class MQRouter
# but ultimately there are internal static state variables to represent global MQ exchange connections
class << self
attr_accessor :client_exchange, :user_exchange
attr_accessor :client_exchange, :user_exchange, :subscription_exchange
@@log = Logging.logger[MQRouter]
end
@ -112,4 +112,15 @@ class MQRouter
end
end
end
def publish_to_subscription(type, id, msg)
@@log.error "EM not running in publish_to_subscription" unless EM.reactor_running?
EM.schedule do
routing_key = "subscription.#{type}.#{id}"
@@log.debug "publishing to #{routing_key} from server"
# put it on the topic exchange for users
self.class.subscription_exchange.publish(msg, :routing_key => routing_key)
end
end
end

View File

@ -31,8 +31,13 @@ module JamRuby
end
def run # if we have seen that sourced_needs_changing_at is older than 15 seconds ago, re-poke clients in the session
IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount|
def run
# we need to find any mount in the following conditions and then poke it with a websocket msg. here are the conditions:
# * (if sourced_needs_changing_at has gone too far in the past OR sourced_needs_changing_at IS NULL) AND
# ** listeners > 0 and sourced is DOWN (false)
# ** listeners == 0 and sourced is UP (true)
IcecastMount.find_each(lock: true, :conditions => "( (listeners > 0 AND sourced = FALSE) OR (listeners = 0 AND sourced = TRUE) ) AND ( sourced_needs_changing_at IS NULL OR sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second') ) ", :batch_size => 100) do |mount|
if mount.music_session_id
mount.with_lock do
handle_notifications(mount)
@ -46,16 +51,13 @@ module JamRuby
# if no listeners, but we are sourced, then ask it to stop sourcing
@@log.debug("SOURCE_DOWN_REQUEST called on mount #{mount.name}")
mount.update_attribute(:sourced_needs_changing_at, Time.now) # we send out a source request, so we need to update the time
mount.notify_source_down_requested
elsif mount.listeners > 0 && !mount.sourced
# if we have some listeners, and still are not sourced, then ask to start sourcing again
@@log.debug("SOURCE_UP_REQUEST called on mount #{mount.name}")
mount.update_attribute(:sourced_needs_changing_at, Time.now) # we send out a source request, so we need to update the time
mount.notify_source_up_requested
end
end
end

View File

@ -420,7 +420,7 @@ FactoryGirl.define do
end
factory :icecast_mount, :class => JamRuby::IcecastMount do
name "/" + Faker::Lorem.characters(10)
sequence(:name) { |n| "/mount_#{n}" }
source_username Faker::Lorem.characters(10)
source_pass Faker::Lorem.characters(10)
max_listeners 100
@ -447,7 +447,15 @@ FactoryGirl.define do
end
end
end
end
factory :icecast_source_change, :class => JamRuby::IcecastSourceChange do
source_direction true
success true
sequence(:client_id) { |n| "client_id#{n}" }
change_type JamRuby::IcecastSourceChange::CHANGE_TYPE_CLIENT
association :user, :factory => :user
association :mount, :factory => :iceast_mount_with_music_session
end

View File

@ -245,4 +245,124 @@ describe IcecastMount do
end
end
describe "source_changes are deleted if source_direction transitions" do
it "no change if same source_direction" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: true)
change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
mount.source_changes.size.should == 1
mount.source_direction = true
mount.save!
mount = IcecastMount.find(mount.id)
mount.source_changes.size.should == 1
end
it "NOT deleted on transition to down" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: true)
change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
mount.source_changes.size.should == 1
mount.source_direction = false
mount.save!
mount = IcecastMount.find(mount.id)
mount.source_changes.size.should == 1
end
it "not deleted on transition to up" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, source_direction: false)
change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
mount.source_changes.size.should == 1
mount.source_direction = true
mount.save!
mount.reload
mount.source_changes.size.should == 1
end
end
describe "state" do
def success_state(reason, detail = nil)
{success: true, reason: reason, detail:detail}
end
def fail_state(reason, detail = nil)
{success: false, reason: reason, detail:detail}
end
let(:sourced_mount) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, sourced_needs_changing_at: nil, listeners: 1)}
let(:mount_needing_source) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, sourced_needs_changing_at: Time.now, listeners: 1)}
it "happy sourced mount" do
sourced_mount.state.should == success_state('source_down')
end
it "just transitioned" do
mount_needing_source.state.should == success_state('transition_down', 'initial')
end
it "transition timeout" do
#change1 = FactoryGirl.create(:icecast_source_change, mount: mount, source_direction: true, success:true)
mount_needing_source.sourced_needs_changing_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago
mount_needing_source.save!
mount_needing_source.state.should == fail_state('transition_timeout_down', 'initial')
end
describe "client attempted transition and" do
let!(:change1) { FactoryGirl.create(:icecast_source_change, mount: mount_needing_source, source_direction: true, success:true) }
it "happy sourced mount" do
mount_needing_source.sourced = true
mount_needing_source.save!
mount_needing_source.state.should == success_state('source_down', change1.user.id)
end
it "succeeded recently in correct transition" do
mount_needing_source.state.should == success_state('transition_down')
end
it "succeeded a while ago in correct transition" do
change1.update_attribute(:created_at, (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago)
mount_needing_source.state.should == fail_state('transition_timeout_down')
end
it "failed recently in correct transition" do
change1.success = false
change1.reason = 'bleh'
change1.source_direction = true
change1.save!
mount_needing_source.state.should == fail_state('source_wrong_down', change1.reason)
end
it "failed a while ago in correct transition" do
change1.success = false
change1.reason = 'bleh'
change1.source_direction = true
change1.created_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago
change1.save!
mount_needing_source.state.should == fail_state('source_wrong_down', change1.reason)
end
it "failed recently in wrong transition" do
change1.success = false
change1.reason = 'bleh'
change1.source_direction = false
change1.save!
mount_needing_source.state.should == fail_state('transition_down')
end
it "failed a while ago in wrong transition" do
change1.success = false
change1.reason = 'bleh'
change1.source_direction = false
change1.save!
mount_needing_source.sourced_needs_changing_at = (APP_CONFIG.source_changes_missing_secs + 1).seconds.ago
mount_needing_source.save!
mount_needing_source.state.should == fail_state('source_wrong_down', 'no_client')
end
end
end
end

View File

@ -0,0 +1,20 @@
require 'spec_helper'
describe IcecastSourceChange do
let(:change) { FactoryGirl.create(:icecast_source_change) }
it "validates" do
bad_change = IcecastSourceChange.new
bad_change.save.should be_false
bad_change.errors[:change_type].should == ["is not included in the list"]
bad_change.errors[:source_direction].should == ["is not included in the list"]
bad_change.errors[:success].should == ["is not included in the list"]
bad_change.errors[:mount].should == ["can't be blank"]
end
it "success" do
change.touch
end
end

View File

@ -25,11 +25,13 @@ describe MQRouter do
before(:all) do
@original_client_exchange = MQRouter.client_exchange
@original_user_exchange = MQRouter.user_exchange
@original_subscription_exchange = MQRouter.subscription_exchange
end
after(:all) do
MQRouter.client_exchange = @original_client_exchange
MQRouter.user_exchange = @original_user_exchange
MQRouter.subscription_exchange = @original_subscription_exchange
end
it "user_publish_to_session works (checking exchange callbacks)" do
@ -59,6 +61,7 @@ describe MQRouter do
# mock up exchange
MQRouter.client_exchange = double("client_exchange")
MQRouter.user_exchange = double("user_exchange")
MQRouter.subscription_exchange = double("subscription_exchange")
MQRouter.client_exchange.should_receive(:publish).with("a message", :routing_key => "client.#{music_session_member2.client_id}")
MQRouter.user_exchange.should_not_receive(:publish)

View File

@ -44,8 +44,10 @@ describe AudioMixer do
MQRouter.client_exchange = double("client_exchange")
MQRouter.user_exchange = double("user_exchange")
MQRouter.subscription_exchange = double("subscription_exchange")
MQRouter.client_exchange.should_receive(:publish).any_number_of_times
MQRouter.user_exchange.should_receive(:publish).any_number_of_times
MQRouter.subscription_exchange.should_receive(:publish).any_number_of_times
end

View File

@ -15,9 +15,15 @@ describe IcecastSourceCheck do
end
it "find no mounts if source_hanged timestamp is nil and listeners = 1/sourced = false" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, listeners: 1)
check.should_not_receive(:handle_notifications)
it "find a mounts if sourced_needs_changing_at is nil and listeners = 1/sourced = false" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: false, listeners: 1, sourced_needs_changing_at: nil)
check.should_receive(:handle_notifications).once
check.run
end
it "find a mount if source_changed timestamp is nil and listeners = 0/sourced = true" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, listeners: 0, sourced_needs_changing_at: nil)
check.should_receive(:handle_notifications).once
check.run
end
@ -27,6 +33,7 @@ describe IcecastSourceCheck do
check.run
end
it "find no mounts if source_changed timestamp is very recent and listeners = 1/sourced = false" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced_needs_changing_at: Time.now, sourced: false, listeners: 1)
check.should_not_receive(:handle_notifications)
@ -92,7 +99,7 @@ describe IcecastSourceCheck do
check.run
end
it "resets source_changed_at when a notification is sent out" do
it "does not resets source_changed_at when a notification is sent out" do
mount = FactoryGirl.create(:iceast_mount_with_music_session, sourced_needs_changing_at: 2.days.ago, sourced:false, listeners: 1)
check.stub(:handle_notifications) do |mount|
mount.should_not_receive(:notify_source_down_requested)
@ -104,7 +111,7 @@ describe IcecastSourceCheck do
end
check.run
mount.reload
(mount.sourced_needs_changing_at.to_i - Time.now.to_i).abs.should < 10 # less than 5 seconds -- just a little slop for a very slow build server
(2.days.ago - mount.sourced_needs_changing_at).to_i.abs.should < 10 # less than 5 seconds -- just a little slop for a very slow build server
end
end
end

View File

@ -17,8 +17,10 @@ describe AudioMixer do
MQRouter.client_exchange = double("client_exchange")
MQRouter.user_exchange = double("user_exchange")
MQRouter.subscription_exchange = double("subscription_exchange")
MQRouter.client_exchange.should_receive(:publish).any_number_of_times
MQRouter.user_exchange.should_receive(:publish).any_number_of_times
MQRouter.subscription_exchange.should_receive(:publish).any_number_of_times
end
def assert_found_job_count(expected)

View File

@ -142,6 +142,14 @@ def app_config
7
end
def source_changes_missing_secs
1
end
def lock_connections
false
end
private
def audiomixer_workspace_path

View File

@ -16,6 +16,10 @@
LEAVE_MUSIC_SESSION_ACK : "LEAVE_MUSIC_SESSION_ACK",
HEARTBEAT : "HEARTBEAT",
HEARTBEAT_ACK : "HEARTBEAT_ACK",
SUBSCRIBE : "SUBSCRIBE",
UNSUBSCRIBE : "UNSUBSCRIBE",
SUBSCRIPTION_MESSAGE : "SUBSCRIPTION_MESSAGE",
SUBSCRIBE_BULK : "SUBSCRIBE_BULK",
// friend notifications
FRIEND_UPDATE : "FRIEND_UPDATE",
@ -168,6 +172,21 @@
return result;
};
factory.subscribe = function(type, id) {
var subscribe_msg = {type: type, id: id}
return client_container(msg.SUBSCRIBE, route_to.SERVER, subscribe_msg);
}
factory.subscribeBulk = function(types, ids) {
var subscribe_bulk_msg = {types: types, ids: ids}
return client_container(msg.SUBSCRIBE_BULK, route_to.SERVER, subscribe_bulk_msg);
}
factory.unsubscribe = function(type, id) {
var unsubscribe_msg = {type: type, id: id}
return client_container(msg.UNSUBSCRIBE, route_to.SERVER, unsubscribe_msg);
}
context.JK.MessageFactory = factory;
})(window, jQuery);

View File

@ -10,6 +10,7 @@
var logger = context.JK.logger;
var msg_factory = context.JK.MessageFactory;
var rest = context.JK.Rest();
var EVENTS = context.JK.EVENTS;
// Let socket.io know where WebSocketMain.swf is
context.WEB_SOCKET_SWF_LOCATION = "assets/flash/WebSocketMain.swf";
@ -89,6 +90,11 @@
// handles logic if the websocket connection closes, and if it was in error then also prompt for reconnect
function closedCleanup(in_error) {
if(server.connected) {
$self.triggerHandler(EVENTS.CONNECTION_DOWN);
}
server.connected = false;
server.connecting = false;
@ -229,6 +235,7 @@
heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000);
lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat
connectDeferred.resolve();
$self.triggerHandler(EVENTS.CONNECTION_UP)
activeElementEvent('afterConnect', payload);
@ -739,6 +746,10 @@
}
});
server.get$Server = function() {
return $self;
}
context.JK.JamServer = server;
// Callbacks from jamClient

View File

@ -43,6 +43,7 @@
//= require AAB_message_factory
//= require jam_rest
//= require utils
//= require subscription_utils
//= require custom_controls
//= require_directory .
//= require_directory ./dialog

View File

@ -106,6 +106,18 @@
if(context.JK.CurrentSessionModel)
context.JK.CurrentSessionModel.onWindowBackgrounded(type, text);
}
else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_FAIL) {
if(context.JK.CurrentSessionModel)
context.JK.CurrentSessionModel.onBroadcastFailure(type, text);
}
else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_ACTIVE) {
if(context.JK.CurrentSessionModel)
context.JK.CurrentSessionModel.onBroadcastSuccess(type, text);
}
else if(type === ALERT_NAMES.SESSION_LIVEBROADCAST_STOPPED) {
if(context.JK.CurrentSessionModel)
context.JK.CurrentSessionModel.onBroadcastStopped(type, text);
}
else if((!context.JK.CurrentSessionModel || !context.JK.CurrentSessionModel.inSession()) &&
(ALERT_NAMES.INPUT_IO_RATE == type || ALERT_NAMES.INPUT_IO_JTR == type || ALERT_NAMES.OUTPUT_IO_RATE == type || ALERT_NAMES.OUTPUT_IO_JTR== type)) {
// squelch these events if not in session

View File

@ -41,7 +41,10 @@
FILE_MANAGER_CMD_PROGRESS : 'file_manager_cmd_progress',
FILE_MANAGER_CMD_ASAP_UPDATE : 'file_manager_cmd_asap_update',
MIXER_MODE_CHANGED : 'mixer_mode_changed',
MUTE_SELECTED: 'mute_selected'
MUTE_SELECTED: 'mute_selected',
SUBSCRIBE_NOTIFICATION: 'subscribe_notification',
CONNECTION_UP: 'connection_up',
CONNECTION_DOWN: 'connection_down'
};
context.JK.ALERT_NAMES = {

View File

@ -1396,6 +1396,27 @@
});
}
function getMount(options) {
var id = getId(options);
return $.ajax({
type: "GET",
url: '/api/icecast/mount/' + id,
dataType: "json",
contentType: 'application/json'
});
}
function createSourceChange(options) {
var mountId = options['mount_id'];
return $.ajax({
type: "POST",
url: '/api/icecast/mount/' + mountId + '/source_change',
dataType: "json",
contentType: 'application/json',
data: JSON.stringify(options),
});
}
function initialize() {
return self;
@ -1519,7 +1540,12 @@
this.updateBillingInfo = updateBillingInfo;
this.placeOrder = placeOrder;
this.searchMusicians = searchMusicians;
<<<<<<< HEAD
this.resendBandInvitation = resendBandInvitation;
=======
this.getMount = getMount;
this.createSourceChange = createSourceChange;
>>>>>>> feature/broadcast_slow
return this;
};

View File

@ -17,6 +17,11 @@
*/
context.JK = context.JK || {};
context.JK.ListenBroadcastCurrentlyPlaying = null;
context.JK.$templateListenBroadcastState = $('#template-listen-broadcast-state');
context.JK.$templateListenBroadcastDetail = $('#template-listen-broadcast-detail')
// the purpose of this code is to simplify the interaction between user and server
// it provides methods to call on user events (primarily play/pause), and will fire
// events (such as 'stream_gone'). This element does nothing with UI; only fires events for you to handle
@ -41,6 +46,8 @@
var fanAccess = null;
var retryAttempts = 0;
var self = this;
var mountInfo = null;
var $mountState = null;
var destroyed = false;
@ -155,7 +162,9 @@
playState = newState;
clearBufferTimeout();
if(newState != PlayStateStalled) {
clearBufferTimeout();
}
if( playState == PlayStateNone ||
playState == PlayStateEnded ||
@ -173,6 +182,8 @@
function noBuffer() {
waitForBufferingTimeout = null;
if(retryAttempts >= RETRY_ATTEMPTS) {
logger.log("never received indication of buffering or playing");
transition(PlayStateFailedStart);
@ -345,7 +356,17 @@
displayText = 'SESSION IN PROGRESS';
}
else if(playState == 'stalled') {
displayText = 'RECONNECTING';
// in Chrome, we've observed that a stalled can occur very early, before you've received any audio
// it's better to leave the display text as what it would be if juts retrying
if(!waitForBufferingTimeout) {
displayText = 'RECONNECTING';
}
else {
if(retryAttempts == 2) {
displayText = 'STILL TRYING, HANG ON';
}
}
}
else if(playState == 'ended') {
displayText = 'STREAM DISCONNECTED';
@ -444,8 +465,179 @@
$audio.bind('progress', onProgress);
}
function broadcastDetailPreShow(box) {
var $box = $(box)
$mountState = $box.find('.listen-broadcast-state')
updateMountInfo(mountInfo)
updateMountDetails(mountInfo);
}
function updateMountInfo(mount) {
var $summary = $mountState.find('.summary')
var $status = $summary.find('.status.' + mount.state.reason)
if ($status.length == 0) {
// can't find this state
$status = $summary.find('.status.unknown')
$summary.attr('data-status', 'unknown')
$status.find('.msg').text ("unknown state: " + mount.state.reason)
return;
}
//var $msg = $status.find('.msg');
$summary.attr('data-status', mount.state.reason)
}
function updateMountDetails(mountInfo) {
var $detailHolder = $mountState.find('.detail-items')
context._.each(mountInfo.source_changes, function(sourceChange) {
var $detail = addSourceChange($detailHolder, sourceChange);
if($detail) {
$detailHolder.append($detail);
}
})
}
function onDetailEvent(e, data) {
if($mountState == null) return;
logger.debug("subscription notification received: type:" + data.type)
updateMountInfo(data.body.mount);
var id = data.id
var type = data.type
if(type == 'mount') {
var $detailHolder = $mountState.find('.detail-items')
var msgType = data.body.type;
var sourceChange = data.body.source_change;
if(sourceChange) {
var $detail = addSourceChange($detailHolder, sourceChange)
if ($detail) {
$detailHolder.prepend($detail)
$detail.hide().slideDown()
}
}
}
}
function addSourceChange($detailHolder, sourceChange) {
var $detail= $(context._.template($('#template-listen-broadcast-detail').html(), {}, {variable: 'data'}));
// if this is already in the dom, don't add it
if($detailHolder.find('.listen-broadcast-detail[data-id="' + sourceChange.id + '"]').length > 0) {
return null;
}
$detail.attr('data-id', sourceChange.id)
var $detailText = $detail.find('.detail');
var $who = $detail.find('.who')
if(sourceChange.change_type == 'source_up_request') {
$who.html('<span>Server</span>')
$detailText.text('Start Broadcast Requested')
}
else if(sourceChange.change_type == 'source_down_request') {
$who.text('<span>Server</span>')
$detailText.text('Start Broadcast Requested')
}
else {
$who.attr('hoveraction', 'musician').attr('user-id', sourceChange.user.id).addClass('avatar-tiny')
$who.html('<img src="' + context.JK.resolveAvatarUrl(sourceChange.user.photo_url) + '" />');
context.JK.bindHoverEvents($who);
if (sourceChange.source_direction) {
// up
if (sourceChange.success) {
$detailText.text(sourceChange.user.name + ' Started Broadcasting')
}
else {
if (sourceChange.reason == 'no_client') {
$detailText.text('No Client Responded')
}
else if (sourceChange.reason == 'initialize_singleton') {
$detailText.text('Init Failure (S)')
}
else if (sourceChange.reason == 'initialize_thread') {
$detailText.text('Init Failure (T)')
}
else if (sourceChange.reason == 'initialize_ogg') {
$detailText.text('Init Failure (O)')
}
else if (sourceChange.reason == 'initialize_mp3') {
$detailText.text('Init Failure (M)')
}
else if (sourceChange.reason == 'initialize_socket') {
$detailText.text('Init Failure (Client)')
}
else if (sourceChange.reason == 'icecast_response') {
$detailText.text('Init Failure (Server)')
}
else {
$detailText.text('Unknown Failure')
}
}
}
else {
$detailText.text(sourceChange.user.name + ' Stopped Broadcasting');
}
}
return $detail;
}
function bindHoverDetail() {
var stateHolderHtml = context._.template($('#template-listen-broadcast-state').html(), {}, {variable: 'data'});
context.JK.hoverBubble($parent, stateHolderHtml, {trigger: 'none', preShow: broadcastDetailPreShow, positions:['bottom'], width:'300px', height:'250px'})
$parent.hoverIntent({
over: function() {
checkServer().done(function(response) {
var mountId = response.mount ? response.mount.id : null
if(mountId) {
rest.getMount({id: mountId})
.done(function (mount) {
mountInfo = mount
$parent.data('mount-id', mountId)
context.JK.SubscriptionUtils.subscribe('mount', mountId).on(context.JK.EVENTS.SUBSCRIBE_NOTIFICATION, onDetailEvent)
$parent.btOn()
})
.fail(context.JK.app.ajaxError)
}
else {
mountInfo = null;
context.JK.app.layout.notify('This session can not currently broadcast')
}
})
.fail(function() {
logger.debug("session is over")
})
},
out: function() {
var mountId = $parent.data('mount-id')
context.JK.SubscriptionUtils.unsubscribe('mount', mountId)
$parent.btOff();
$mountState = null;
mountInfo = null;
}
})
}
function initialize() {
musicSessionId = $parent.attr('data-music-session');
if(!musicSessionId) throw "data-music-session must be specified on $parentElement";
@ -453,6 +645,8 @@
if(fanAccess === null) throw 'fan-access must be specified in $parentElement';
fanAccess = $parent.attr('fan-access') === 'true' // coerce to boolean
bindHoverDetail();
$audio = $('audio', $parent);
if($audio.length == 0) {
@ -479,8 +673,6 @@
return this;
}
context.JK.ListenBroadcastCurrentlyPlaying = null;
$.fn.listenBroadcast = function(options) {
new context.JK.ListenBroadcast(this, options);
}

View File

@ -576,6 +576,57 @@
}
}
function onBroadcastSuccess(type, text) {
logger.debug("SESSION_LIVEBROADCAST_ACTIVE alert. reason:" + text);
if(currentSession && currentSession.mount) {
rest.createSourceChange({
mount_id: currentSession.mount.id,
source_direction: true,
success: true,
reason: text,
client_id: app.clientId
})
}
else {
logger.debug("unable to report source change because no mount seen on session")
}
}
function onBroadcastFailure(type, text) {
logger.debug("SESSION_LIVEBROADCAST_FAIL alert. reason:" + text);
if(currentSession && currentSession.mount) {
rest.createSourceChange({
mount_id: currentSession.mount.id,
source_direction: true,
success: false,
reason: text,
client_id: app.clientId
})
}
else {
logger.debug("unable to report source change because no mount seen on session")
}
}
function onBroadcastStopped(type, text) {
logger.debug("SESSION_LIVEBROADCAST_STOPPED alert. reason:" + text);
if(currentSession && currentSession.mount) {
rest.createSourceChange({
mount_id: currentSession.mount.id,
source_direction: false,
success: true,
reason: text,
client_id: app.clientId
})
}
else {
logger.debug("unable to report source change because no mount seen on session")
}
}
function onBackendMixerChanged(type, text) {
logger.debug("BACKEND_MIXER_CHANGE alert. reason:" + text);
@ -680,6 +731,9 @@
this.onDeadUserRemove = onDeadUserRemove;
this.onWindowBackgrounded = onWindowBackgrounded;
this.waitForSessionPageEnterDone = waitForSessionPageEnterDone;
this.onBroadcastStopped = onBroadcastStopped;
this.onBroadcastSuccess = onBroadcastSuccess;
this.onBroadcastFailure = onBroadcastFailure;
this.getCurrentSession = function() {
return currentSession;

View File

@ -0,0 +1,128 @@
#
# Common utility functions to help deal with subscriptions to the server
#
$ = jQuery
context = window
context.JK ||= {};
class SubscriptionUtils
constructor: () ->
@logger = context.JK.logger
@msgFactory = context.JK.MessageFactory;
@subscriptions = {}
@events = context.JK.EVENTS;
@reconnectRegistrationTimeout = null
this.init()
init: () =>
# once JamKazam is fully loaded, watch for SUBSCRIPTION_MESSAGE events
$(document).on('JAMKAZAM_READY', this.registerWithGateway)
genKey:(type, id) =>
type + ":" + id
test: () =>
this.subscribe('test', '1')
this.subscribe('test', '2')
reregister: () =>
# re-register all existing watches
@reconnectRegistrationTimeout = null
if context.JK.dlen(@subscriptions) > 0
bulkSubscribe = []
types = []
ids = []
for key, watch of @subscriptions
bits = key.split(':')
type = bits[0]
id = bits[1]
types.push(type)
ids.push(id)
# bulkSubscribe.push({type: type, id:id})
# send a special message which contains all subscriptions in one message
msg = @msgFactory.subscribeBulk(types, ids)
context.JK.JamServer.send(msg)
# whenever we reconnect, set a timer to automatically re-subscribe to any outstanding subscriptions
onConnectionUp: () =>
#this.test()
if @reconnectRegistrationTimeout?
clearTimeout(@reconnectRegistrationTimeout)
if context.JK.dlen(@subscriptions) > 0
@reconnectRegistrationTimeout = setTimeout(this.reregister, 1000) # re-register after 1 second, to prevent excessive messaging to server
onConnectionDown: () =>
if @reconnectRegistrationTimeout?
clearTimeout(@reconnectRegistrationTimeout)
registerWithGateway: () =>
context.JK.JamServer.registerMessageCallback(context.JK.MessageType.SUBSCRIPTION_MESSAGE, this.onSubscriptionMessage);
$server = context.JK.JamServer.get$Server()
$server.on(@events.CONNECTION_UP, this.onConnectionUp)
$server.on(@events.CONNECTION_DOWN, this.onConnectionDown)
onSubscriptionMessage: (header, payload) =>
key = this.genKey(payload.type, payload.id)
watch = @subscriptions[key]
if watch
watch.triggerHandler(@events.SUBSCRIBE_NOTIFICATION, {type:payload.type, id: payload.id, body: JSON.parse(payload.body)})
else
@logger.warn("unable to find subscription for #{key}")
# call subscribe, and use the returned object to listen for events of name context.JK.EVENTS.SUBSCRIBE_NOTIFICATION
subscribe: (type, id) =>
key = this.genKey(type, id)
@logger.debug("subscribing for any notifications for #{key}")
watch = @subscriptions[key]
unless watch?
watch = $({type: type, id: id})
@subscriptions[key] = watch
# tell server we want messages for this entity
msg = @msgFactory.subscribe(type, id)
context.JK.JamServer.send(msg)
# watch can be used to watch for events using jquery
watch
# TODO: this should not send a unsubscribe message to the server it's the last listener for the specific type/id combo
unsubscribe: (type, id) =>
key = this.genKey(type, id)
@logger.debug("unsubscribing for any notifications for #{key}")
watch = @subscriptions[key]
delete @subscriptions[key]
# tell server we don't want any more messages for this entity
msg = @msgFactory.unsubscribe(type, id)
context.JK.JamServer.send(msg)
if watch
# unattach any events
watch.off();
watch
# global instance
context.JK.SubscriptionUtils = new SubscriptionUtils()

View File

@ -98,8 +98,8 @@
}
function initialize(musicSessionId) {
$controls = $('.recording-controls');
$status = $('.session-status')
$controls = $('.sessions-page .recording-controls');
$status = $('.sessions-page .session-status')
$('.timeago').timeago();
$controls.listenBroadcast();

View File

@ -43,6 +43,7 @@
//= require user_dropdown
//= require jamkazam
//= require utils
//= require subscription_utils
//= require ui_helper
//= require custom_controls
//= require ga

View File

@ -59,6 +59,7 @@
*= require ./musician
*= require ./help
*= require ./jquery-ui-overrides
*= require ./listenBroadcast
*= require web/audioWidgets
*= require web/recordings
*= require web/sessions

View File

@ -0,0 +1,111 @@
@import "client/common";
.listen-broadcast-state {
.instructions {
font-size:12px;
margin:5px 0;
padding:3px;
width:100%;
@include border_box_sizing;
}
.title {
margin:5px 0 5px;
text-align:center;
font-size:14px;
}
.summary {
font-size:14px;
.user {display:none}
.status {
@include border_box_sizing;
padding:5px;
background-color:$poor;
display:none;
width:100%;
margin-bottom:10px;
font-size:16px;
&.source_up { background-color:$good;}
&.source_down { background-color:$good;}
&.transition_up { background-color:$good;}
&.transition_down { background-color:$good;}
}
.msg {
text-align:center;
}
&[data-status=source_up] {.source_up { display:inline-block; }}
&[data-status=source_down] {.source_down { display:inline-block; }}
&[data-status=source_wrong_up] {.source_up { display:inline-block; }}
&[data-status=source_wrong_down] {.source_up { display:inline-block; }}
&[data-status=transition_up] {.source_up { display:inline-block; }}
&[data-status=transition_down] {.source_up { display:inline-block; }}
&[data-status=transition_timeout_up] {.source_up { display:inline-block; }}
&[data-status=transition_timeout_down] {.source_up { display:inline-block; }}
&[data-status=unknown] {.unknown { display:inline-block; }}
&[data-status=loading] {.loading { display:inline-block; }}
&[data-status=db_error] {.db_error{ display:inline-block; }}
}
.who-subtitle {
position:absolute;
text-align:center;
width:26px;
overflow:visible;
font-size:12px;
}
.detail-subtitle {
text-align:center;
font-size:12px;
span {
font-style:italic;
font-size:10px;
padding:3px;
}
}
.listen-broadcast-detail {
position:relative;
background-color:black;
margin:5px 0;
padding:2px;
}
.detail-items {
height:85px;
overflow:auto;
}
.details {
height:135px;
.title {
margin-bottom:2px;
}
.who {
margin:0 !important;
position:absolute;
text-align:center;
overflow:visible;
vertical-align:middle;
line-height:26px;
}
.detail {
text-align:center;
line-height:26px;
vertical-align: middle;
}
}
}

View File

@ -12,6 +12,7 @@
*= require client/user_dropdown
*= require client/hoverBubble
*= require client/help
*= require client/listenBroadcast
*= require web/main
*= require web/footer
*= require web/recordings

View File

@ -1,11 +1,39 @@
=begin
Summary of icecast implementation:
mount lock locations:
* IcecastMount.listener_add
* IcecastMount.listener_remove
* IcecastMount.source_up
* IcecastMount.source_down
* IcecastSourceCheck on each stale IcecastMount
sourced_needs_changing_at behavior:
* set to Time.now if first listener comes in (current listeners==0 as they join)
* set to Time.now if last listener leaves (current listeners == 1 as they leave)
* set to nil if source begins
* set to nil if source ends
* a session's clients gets requested to start a source if one of:
** an IcecastMount is saved and listeners go above 0 and not currently sourced
** an IcecastSourceCheck finds a IcecastMount with a stale sourced_needs_changing_at and mount.listeners > 0 && !mount.sourced
* a session's clients get requested to stop a source if
** an IcecastSourceCheck finds a IcecastMount with a stale sourced_needs_changing_at and mount.listeners == 0 && mount.sourced
=end
class ApiIcecastController < ApiController
before_filter :local_only, :only => [:test]
before_filter :parse_mount, :except => [:test]
before_filter :parse_mount, :only => [:mount_add, :mount_remove, :listener_add, :listener_remove]
before_filter :api_signed_in_user, :only => [ :create_source_change ]
# each request will have this in it, if it's icecast.
#user-agent = Icecast 2.3.3
respond_to :json
def test
puts "========= GOT IT======="
render text: 'GOT IT', :status => :ok
@ -51,6 +79,38 @@ class ApiIcecastController < ApiController
render text: '', :status => :ok
end
# info reported by the client to the server letting us track what's going on at a deeper level
def create_source_change
mount = IcecastMount.find(params[:id])
@source_change = IcecastSourceChange.new
@source_change.source_direction = params[:source_direction]
@source_change.user = current_user
@source_change.client_id = params[:client_id]
@source_change.success = params[:success]
@source_change.reason = params[:reason]
@source_change.detail = params[:detail]
@source_change.mount = mount
@source_change.change_type = IcecastSourceChange::CHANGE_TYPE_CLIENT
@source_change.save
if @source_change.errors.any?
response.status = :unprocessable_entity
respond_with @source_change
else
source_change_json = Rabl::Renderer.json(@source_change, 'api_icecast/source_change_notification')
SubscriptionMessage.mount_source_change(mount, source_change_json)
render :json => {}, :status => 200
end
end
def show
@mount = IcecastMount.find(params[:id])
puts "@MOUNT #{@mount}"
respond_with @mount, responder: ApiResponder
end
protected
def local_only
request.local?
@ -74,5 +134,4 @@ class ApiIcecastController < ApiController
@mount_id = uri.path
@mount_params = Rack::Utils.parse_query(uri.query)
end
end

View File

@ -30,4 +30,11 @@ class SpikesController < ApplicationController
def websocket
render :layout => false
end
def subscription
Notification.send_subscription_message('test', '1', '{"msg": "oh hai 1"}')
Notification.send_subscription_message('test', '2', '{"msg": "oh hai 2"}')
render text: 'oh hai'
end
end

View File

@ -0,0 +1,19 @@
object @mount
attributes :id, :listeners, :source_direction, :sourced
node :state do |mount|
mount.state
end
child(:source_changes => :source_changes) {
attributes :id
puts "source_change #{@object}"
node do |source_change|
partial("api_icecast/show_source_change", :object => source_change)
end
}

View File

@ -0,0 +1,7 @@
object @source_change
attributes :id, :source_direction, :client_id, :success, :reason, :detail, :created_at, :change_type
node :user do |source_change|
partial("api_users/show_minimal", :object => source_change.user)
end

View File

@ -0,0 +1,13 @@
object @source_change
node do |source_change|
partial("api_icecast/show_source_change", :object => source_change)
end
child(:mount) { |mount|
attributes :id, :listeners, :source_direction, :sourced
node :state do |mount|
mount.state
end
}

View File

@ -0,0 +1,57 @@
script type="text/template" id="template-listen-broadcast-state"
.listen-broadcast-state
.instructions
| Here you can see a greater amount of detail about the state of the broadcast. If you leave the hover window open, new activity related to broadcasts will show up in real-time.
.summary
.title
| Current Status
.source_up.status
.user
.msg
| Broadcasting
.source_down.status
.user
.msg
| Not Broadcasting
.source_wrong_up.status
.msg
| Broadcast Attempt Failed
.source_wrong_down.status
.msg
| Stop Broadcast Failed
.transition_up.status
.msg
| Initializing Broadcast
.transition_down.status
.msg
| Stopping Broadcast
.transition_timeout_up.status
.msg
| No One Broadcasting Yet
.transition_timeout_down.status
.msg
| Not Stopped Broadcasting
.unknown.status
.msg
| Unknown Status
.loading.status
.msg
| loading ...
.db_error.status
.msg
| db error
.details
.title
.who-subtitle
| Who
.detail-subtitle
| Details
span
| (most recent at top)
.detail-items
script type="text/template" id="template-listen-broadcast-detail"
.listen-broadcast-detail
.who
.detail

View File

@ -62,6 +62,7 @@
<%= render "notify" %>
<%= render "client_update" %>
<%= render "overlay_small" %>
<%= render "listenBroadcast" %>
<%= render "sync_viewer_templates" %>
<%= render "help" %>
<%= render 'dialogs/dialogs' %>

View File

@ -78,6 +78,7 @@
<%= render "clients/hoverSession" %>
<%= render "clients/hoverRecording" %>
<%= render "clients/help" %>
<%= render "clients/listenBroadcast" %>
<%= render 'dialogs/dialogs' %>

View File

@ -116,6 +116,7 @@ if defined?(Bundler)
config.websocket_gateway_uri = "ws://localhost:#{config.websocket_gateway_port}/websocket"
config.websocket_gateway_trusted_uri = "ws://localhost:#{config.websocket_gateway_port + 1}/websocket"
config.websocket_gateway_max_connections_per_user = 20
config.lock_connections = false
config.external_hostname = ENV['EXTERNAL_HOSTNAME'] || 'localhost'
config.external_port = ENV['EXTERNAL_PORT'] || 3000
@ -197,9 +198,10 @@ if defined?(Bundler)
# this will be the qualifier on the IcecastConfigWorker queue name
config.icecast_server_id = ENV['ICECAST_SERVER_ID'] || 'localhost'
config.icecast_max_missing_check = 2 * 60 # 2 minutes
config.icecast_max_sourced_changed = 15 # 15 seconds
config.icecast_max_sourced_changed = 10 # 10 seconds
config.icecast_hardcoded_source_password = nil # generate a new password everytim. production should always use this value
config.icecast_wait_after_reload = 5 # 5 seconds. a hack needed until VRFS-1043
config.icecast_wait_after_reload = 0 # 0 seconds. a hack needed until VRFS-1043... maybe
config.source_changes_missing_secs = 2 # amount of time before we think it's odd that there are no source_change notifications
config.email_alerts_alias = 'nobody@jamkazam.com' # should be used for 'oh no' server down/service down sorts of emails
config.email_generic_from = 'nobody@jamkazam.com'

View File

@ -1,3 +1,9 @@
class PrettyJson
def self.dump(object)
JSON.pretty_generate(object, {:indent => " "})
end
end
Rabl.configure do |config|
# Commented as these are defaults
# config.cache_all_output = false
@ -5,6 +11,7 @@ Rabl.configure do |config|
# config.cache_engine = Rabl::CacheEngine.new # Defaults to Rails cache
# config.escape_all_output = false
# config.json_engine = nil # Any multi_json engines or a Class with #encode method
config.json_engine = PrettyJson if Rails.env.development?
# config.msgpack_engine = nil # Defaults to ::MessagePack
# config.bson_engine = nil # Defaults to ::BSON
# config.plist_engine = nil # Defaults to ::Plist::Emit
@ -16,5 +23,5 @@ Rabl.configure do |config|
config.include_child_root = false
# config.enable_json_callbacks = false
# config.xml_options = { :dasherize => true, :skip_types => false }
# config.view_paths = []
config.view_paths << Rails.root.join('app/views')
end

View File

@ -91,6 +91,7 @@ SampleApp::Application.routes.draw do
match '/facebook_invite', to: 'spikes#facebook_invite'
match '/launch_app', to: 'spikes#launch_app'
match '/websocket', to: 'spikes#websocket'
match '/test_subscription', to: 'spikes#subscription'
# junk pages
match '/help', to: 'static_pages#help'
@ -476,6 +477,8 @@ SampleApp::Application.routes.draw do
match '/icecast/mount_remove' => 'api_icecast#mount_remove', :via => :post
match '/icecast/listener_add' => 'api_icecast#listener_add', :via => :post
match '/icecast/listener_remove' => 'api_icecast#listener_remove', :via => :post
match '/icecast/mount/:id' => 'api_icecast#show', :via => :get
match '/icecast/mount/:id/source_change' => 'api_icecast#create_source_change', :via => :post
# tweet on behalf of client
match '/twitter/tweet' => 'api_twitters#tweet', :via => :post

View File

@ -10,7 +10,8 @@ IcecastConfigRetry:
description: "Finds icecast servers that have had their config_changed, but no IcecastConfigWriter check recently"
IcecastSourceCheck:
cron: "10 * * * * *"
every:
- "7s"
class: "JamRuby::IcecastSourceCheck"
description: "Finds icecast mounts that need their 'sourced' state to change, but haven't in some time"
@ -20,7 +21,7 @@ CleanupFacebookSignup:
description: "Deletes facebook_signups that are old"
UnusedMusicNotationCleaner:
cron: "10 * * * * *"
cron: "0 * * * *"
class: "JamRuby::UnusedMusicNotationCleaner"
description: "Remove unused music notations"

View File

@ -1,6 +1,6 @@
# this rake file is meant to hold shortcuts/helpers for starting onerous command line executions
# bunde exec rake all_jobs
# bundle exec rake all_jobs
task :all_jobs do
Rake::Task['environment'].invoke

View File

@ -0,0 +1,23 @@
require 'spec_helper'
describe ApiIcecastController do
render_views
let(:user) {FactoryGirl.create(:user) }
before(:each) do
controller.current_user = user
end
describe "create_source_change" do
let!(:mount) {FactoryGirl.create(:iceast_mount_with_music_session, sourced: true, sourced_needs_changing_at: nil, listeners: 1)}
it "success" do
post :create_source_change, {format:'json', id: mount.id, success: true, source_direction: true, reason: 'blah', client_id: 'abc'}
response.status.should == 200
end
end
end

View File

@ -434,7 +434,7 @@ FactoryGirl.define do
association :mount_template, :factory => :icecast_mount_template
factory :iceast_mount_with_music_session do
association :music_session, :factory => :music_session
association :music_session, :factory => :active_music_session
end
end
end

View File

@ -96,7 +96,7 @@
sessionLatency.subscribe('test', cb);
});
it("should invoke callback on latency result", function() {
var cb = jasmine.createSpy("Latency Subscription Callback");
var cb = jasmine.createSpy("Latency subscription.rb Callback");
sessionLatency.subscribe('test', cb);
$.each(sessions, function(index, session) {
sessionLatency.sessionPings(session);

View File

@ -87,7 +87,7 @@ Thread.new do
:rabbitmq_port => 5672,
:calling_thread => current,
:cidr => ['0.0.0.0/0'],
:gateway_name => 'default')
:gateway_name => 'default-test')
rescue Exception => e
puts "websocket-gateway failed: #{e}"
end

View File

@ -10,7 +10,7 @@ include Jampb
module EventMachine
module WebSocket
class Connection < EventMachine::Connection
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted # client_id is uuid we give to each client to track them as we like
attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context, :trusted, :subscriptions # client_id is uuid we give to each client to track them as we like
end
end
end
@ -36,12 +36,14 @@ module JamWebsockets
@clients = {} # clients that have logged in
@user_context_lookup = {} # lookup a set of client_contexts by user_id
@client_lookup = {} # lookup a client by client_id
@subscription_lookup = {}
@amqp_connection_manager = nil
@users_exchange = nil
@message_factory = JamRuby::MessageFactory.new
@semaphore = Mutex.new
@user_topic = nil
@client_topic = nil
@subscription_topic = nil
@thread_pool = nil
@heartbeat_interval_client = nil
@connect_time_expire_client = nil
@ -187,6 +189,7 @@ module JamWebsockets
@semaphore.synchronize do
if client_id == MessageFactory::ALL_NATIVE_CLIENTS
msg = Jampb::ClientMessage.parse(msg)
@log.debug "client-directed message received from #{msg.from} to all clients"
@client_lookup.each do |client_id, client_context|
@ -204,7 +207,7 @@ module JamWebsockets
else
client_context = @client_lookup[client_id]
if !client_context.nil?
if client_context
client = client_context.client
@ -234,6 +237,93 @@ module JamWebsockets
end
MQRouter.client_exchange = @clients_exchange
############## DYNAMIC SUBSCRIPTION MESSAGING ###################
@subscriptions_exchange = channel.topic('subscriptions')
@subscription_topic = channel.queue("subscriptions-#{@gateway_name}", :auto_delete => true)
@subscription_topic.purge
# subscribe for any p2p messages to a client
@subscription_topic.subscribe(:ack => false) do |headers, msg|
begin
routing_key = headers.routing_key
type_and_id = routing_key["subscription.".length..-1]
#type, id = type_and_id.split('.')
@semaphore.synchronize do
clients = @subscription_lookup[type_and_id]
msg = Jampb::ClientMessage.parse(msg)
if clients
EM.schedule do
clients.each do |client|
@log.debug "subscription msg to client #{client.client_id}"
send_to_client(client, msg)
end
end
end
end
rescue => e
@log.error "unhandled error in messaging to client for mount"
@log.error e
end
end
MQRouter.subscription_exchange = @subscriptions_exchange
end
# listens on a subscription topic on the behalf of a client
def register_subscription(client, type, id)
# track subscriptions that this client has made, for disconnect scenarios
client.subscriptions.add({type:type, id: id})
key = "#{type}.#{id}"
# for a given type:id in @subscription_lookup, track clients listening
clients = @subscription_lookup[key]
if clients.nil?
clients = Set.new
@subscription_lookup[key] = clients
end
needs_subscription = clients.length == 0
clients.add(client)
@log.debug("register subscription handled #{type}.#{id}")
if needs_subscription
routing_key = "subscription.#{type}.#{id}"
@log.debug("register topic bound #{routing_key}")
# if this is the 1st client to listen for this mount, then subscribe to the topic for this mount_id
@subscription_topic.bind(@subscriptions_exchange, :routing_key => routing_key)
end
end
# de-listens on a subscription topic on the behalf of a client
# called automatically when a clean disconnects, to keep state clean.
def unregister_subscription(client, type, id)
# remove subscription from this client's list of subscriptions
client.subscriptions.delete({type:type, id:id})
key = "#{type}.#{id}"
# for a given mount_id in @subscription_lookup, remove from list of clients listening
clients = @subscription_lookup[key]
if clients
deleted = clients.delete(client)
if !deleted
@log.error "unregister_subscription: unable to locate any client #{client.client_id} for id #{id}"
end
else
@log.error "unregister_subscription: unable to locate any clients for id #{id}"
end
if clients.length == 0
# if there are no more clients listening, then unsubscribe to the topic for this mount_id
@subscription_topic.unbind(@subscriptions_exchange, :routing_key => "subscription.#{type}.#{id}")
end
end
# this method allows you to translate exceptions into websocket channel messages and behavior safely.
@ -428,6 +518,12 @@ module JamWebsockets
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
sane_logging { handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) }
elsif client_msg.type == ClientMessage::Type::SUBSCRIBE_BULK
sane_logging { handle_bulk_subscribe(client_msg.subscribe_bulk, client) }
elsif client_msg.type == ClientMessage::Type::SUBSCRIBE
sane_logging { handle_subscribe(client_msg.subscribe, client) }
elsif client_msg.type == ClientMessage::Type::UNSUBSCRIBE
sane_logging { handle_unsubscribe(client_msg.unsubscribe, client) }
else
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
end
@ -522,6 +618,8 @@ module JamWebsockets
os = options["os"]
udp_reachable = options["udp_reachable"].nil? ? true : options["udp_reachable"] == 'true'
client.subscriptions = Set.new# list of subscriptions that this client is watching in real-time
@log.info("handle_login: client_type=#{client_type} token=#{token} client_id=#{client_id} channel_id=#{client.channel_id} udp_reachable=#{udp_reachable}")
if client_type == Connection::TYPE_LATENCY_TESTER
@ -669,6 +767,33 @@ module JamWebsockets
end
end
def handle_bulk_subscribe(subscriptions, client)
subscriptions.types.each_with_index do |subscription, i|
handle_subscribe(OpenStruct.new({type: subscriptions.types[i], id: subscriptions.ids[i]}), client)
end
end
def handle_subscribe(subscribe, client)
id = subscribe.id
type = subscribe.type
if id && id.length > 0 && type && type.length > 0
register_subscription(client, type, id)
else
@log.error("handle_subscribe: empty data #{subscribe}")
end
end
def handle_unsubscribe(unsubscribe, client)
id = unsubscribe.id
type = unsubscribe.type
if id && id.length > 0 && type && type.length > 0
unregister_subscription(client, type, id)
else
@log.error("handle_subscribe: empty data #{unsubscribe}")
end
end
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
unless context = @clients[client]
@log.warn "*** WARNING: unable to find context when handling heartbeat. client_id=#{client.client_id}; killing session"
@ -1026,6 +1151,11 @@ module JamWebsockets
def cleanup_client(client)
client.close
# unregister any subscriptions
client.subscriptions.each do |subscription|
unregister_subscription(client, subscription[:type], subscription[:id])
end
@semaphore.synchronize do
pending = client.context.nil? # presence of context implies this connection has been logged into