module JamRuby class IcecastMount < ActiveRecord::Base @@log = Logging.logger[IcecastMount] attr_accessible :authentication_id, :name, :source_username, :source_pass, :max_listeners, :max_listener_duration, :dump_file, :intro, :fallback_mount, :fallback_override, :fallback_when_full, :charset, :is_public, :stream_name, :stream_description, :stream_url, :genre, :bitrate, :mime_type, :subtype, :burst_size, :mp3_metadata_interval, :hidden, :on_connect, :on_disconnect, :music_session_id, :icecast_server_id, :icecast_mount_template_id, :listeners, :sourced, :sourced_needs_changing_at, as: :admin attr_accessor :no_config_changed belongs_to :authentication, class_name: "JamRuby::IcecastUserAuthentication", inverse_of: :mount, :foreign_key => 'authentication_id' belongs_to :music_session, class_name: "JamRuby::ActiveMusicSession", inverse_of: :mount, foreign_key: 'music_session_id' belongs_to :server, class_name: "JamRuby::IcecastServer", inverse_of: :mounts, foreign_key: 'icecast_server_id' belongs_to :mount_template, class_name: "JamRuby::IcecastMountTemplate", inverse_of: :mounts, foreign_key: 'icecast_mount_template_id' has_many :source_changes, -> { order('created_at DESC') }, class_name: "JamRuby::IcecastSourceChange", inverse_of: :mount, foreign_key: 'icecast_mount_id' validates :name, presence: true, uniqueness: true validates :source_username, length: {minimum: 5}, if: lambda {|m| m.source_username.present?} validates :source_pass, length: {minimum: 5}, if: lambda {|m| m.source_pass.present?} validates :max_listeners, length: {in: 1..15000}, if: lambda {|m| m.max_listeners.present?} validates :max_listener_duration, length: {in: 1..3600 * 48}, if: lambda {|m| m.max_listener_duration.present?} validates :fallback_override, :inclusion => {:in => [0, 1]} , if: lambda {|m| m.fallback_mount.present?} validates :fallback_when_full, :inclusion => {:in => [0, 1]} , if: lambda {|m| m.fallback_mount.present?} validates :is_public, presence: true, :inclusion => {:in => [-1, 0, 1]} validates :bitrate, numericality: {only_integer: true}, if: lambda {|m| m.bitrate.present?} validates :burst_size, numericality: {only_integer: true}, if: lambda {|m| m.burst_size.present?} validates :mp3_metadata_interval, numericality: {only_integer: true}, if: lambda {|m| m.mp3_metadata_interval.present?} validates :hidden, :inclusion => {:in => [0, 1]} validates :server, presence: true validate :name_has_correct_format before_save :sanitize_active_admin after_save :after_save #after_save :poke_config before_destroy :poke_config def name_has_correct_format errors.add(:name, "must start with /") unless name && name.start_with?('/') end def poke_config server.update_attribute(:config_changed, 1) if server end def after_save server.update_attribute(:config_changed, 1) unless no_config_changed if !sourced_was && sourced # went from NOT SOURCED to SOURCED notify_source_up elsif sourced_was && !sourced # went from SOURCED to NOT SOURCED notify_source_down end if source_direction_was != source_direction # temporarily removed; it seems better to leave all the data in for now. It should never be that much # if the requested source direction has changed, then delete diagnostic info #IcecastSourceChange.delete_all(["icecast_mount_id = ?", self.id]) if source_direction # and tell anyone listening that the direction has changed # SubscriptionMessage.mount_source_direction(self) end # Note: # Notification.send_source_down_requested does not occur here. # we set up a cron that checks for streams that have not been successfully source up/down (after timeout ) in IcecastSourceCheck end def sanitize_active_admin self.authentication_id = nil if self.authentication_id == '' self.music_session_id = nil if self.music_session_id == '' self.icecast_server_id = nil if self.icecast_server_id == '' end # creates a templated def self.build_session_mount(music_session, active_music_session, icecast_server) # only public sessions get mounts currently return nil unless music_session.fan_access mount = nil if icecast_server && icecast_server.mount_template_id # we have a server with an associated mount_template; we can create a mount automatically mount = icecast_server.mount_template.build_session_mount(music_session, active_music_session) mount.server = icecast_server end mount end def fail_state(reason, detail = nil) {success:false, reason: reason, detail: detail} end def success_state(reason, detail = nil) {success:true, reason: reason, detail: detail} end # success messages: # * source_up - we are broadcasting - detail is the source user or empty. if empty you should refresh state in a few seconds # * source_down - we are broadcasting - detail is the last source user, or empty. if empty you should refresh state in a few seconds # * transition_up/down - a source change occurred recently, and we don't yet have any info from the client. you should refresh state in a few seconds # # failure messages: # * multiple_clients # * unknown - represents a code error # * transition_timeout_up - a source change for up has occurred, but no clients have reported any info # * transition_timeout_down - a source change for up has occurred, but no clients have reported any info # * source_wrong_up - the source should be up, but it could not succeed according to the most recent effort. # * source_wrong_down - the source should be down, but it could not succeed according to the most recent effort. # # for both source_wrong_up and source_wrong_down, valid detail values: # * 'no_client' if no client has said anything to a request after some amount of time, or otherwise it's client defined # client-defined reason values # * 'initialize_singleton' - code error in the client # * 'initialize_thread' - code error in the client # * 'initialize_ogg' - could not initialize ogg encoder... likely code error # * 'initialize_mp3' - could not initialize mp3 encoder... likely code error # * 'initialize_socket' - could not initialize socket... likely code error # * 'icecast_response' - icecast was not accessible or returned an error # * TODO client defined def state begin result = fail_state('unknown') if sourced == should_source? first = source_changes.first # don't check source_changes if actual source state mirrors desired source state... just say we are good, and pass down relevant sourcing user ID if present result = success_state('source_' + (sourced ? 'up' : 'down'), first.nil? ? nil : first.user_id) elsif source_changes.count > 0 # if the desired source direction is up, but we haven't sourced yet... let's try and find out why # let's first see if we have N clients contributing, which is a code error condition clients = Hash[source_changes.map { |source_change| [source_change.client_id, source_change] }] #if clients.length > 1 # this means more than one client has contributed... this is an code error condition # result = fail_state('multiple_clients', clients) #else first = source_changes.first if first.source_direction == should_source? if first.success? # the last message from any client indicated we had the right source # if less than a second has passed, this is not strange. But more than that, it's strange if sourced_needs_changing_at.nil? || (Time.now - first.created_at < APP_CONFIG.source_changes_missing_secs) result = success_state('transition_' + (source_direction ? 'up' : 'down')) else result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down')) end else # so the last state indicated by the client agrees that our source info is wrong; we can use it's data to augment the frontend result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), first.reason) end else # if the last message from the client is for the wrong source direction (meaning no attempt to change source state by client to the correct state) # then report this info if sourced_needs_changing_at.nil? || (Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs) result = fail_state('transition_' + (source_direction ? 'up' : 'down')) else result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), 'no_client') # no client implies no client has attempted to source end end #end else # we have the wrong source direction, but no source change info. # if less than a second has passed, this is not strange. But more than that, it's strange if sourced_needs_changing_at.nil? #result = fail_state('db_error', 'sourced_needs_changing_at is nil') result = success_state('transition_' + (source_direction ? 'up' : 'down')) elsif Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs result = success_state('transition_' + (source_direction ? 'up' : 'down'), 'initial') else result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'), 'initial') end end rescue Exception => e @@log.error("exception in IcecastMount.state #{e}") end result end def source_up with_lock do self.sourced = true self.sourced_needs_changing_at = Time.now self.no_config_changed = true save(validate: false) end end def source_down with_lock do self.sourced = false self.sourced_needs_changing_at = Time.now self.no_config_changed = true save(validate: false) end end def should_source? self.listeners > 0 end def listener_add with_lock do if listeners == 0 && !sourced && (self.sourced_needs_changing_at.nil? || Time.now - self.sourced_needs_changing_at > APP_CONFIG.source_changes_missing_secs) # enough time has elapsed since the last time the source direction changed to instaneously request a source up # listener count went above 0 and there is no source. ask the musician clients to source notify_source_up_requested end # this is completely unsafe without that 'with_lock' statement above self.listeners = self.listeners + 1 self.no_config_changed = true save(validate: false) end end def listener_remove if listeners == 0 @@log.warn("listeners is at 0, but we are being asked to remove a listener. maybe we missed a listener_add request earlier") return end with_lock do self.sourced_needs_changing_at = Time.now if listeners == 1 # this is completely unsafe without that 'with_lock' statement above self.listeners = self.listeners - 1 self.no_config_changed = true save(validations: false) end end def notify_source_up_requested if music_session_id self.sourced_needs_changing_at = Time.now self.source_direction = true self.no_config_changed = true save! source_change = IcecastSourceChange.new source_change.source_direction = true source_change.success = true source_change.mount = self source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_UP_REQUEST source_change.save! Notification.send_source_up_requested(music_session, server.hostname, server.pick_listen_socket(:port), name, resolve_string(:source_username), resolve_string(:source_pass), resolve_int(:bitrate)) SubscriptionMessage.mount_source_up_requested(self) end end def notify_source_down_requested if music_session_id self.sourced_needs_changing_at = Time.now self.source_direction = false self.no_config_changed = true save! source_change = IcecastSourceChange.new source_change.source_direction = false source_change.success = true source_change.mount = self source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_DOWN_REQUEST source_change.save! Notification.send_source_down_requested(music_session, name) SubscriptionMessage.mount_source_down_requested(self) end end def notify_source_up if music_session_id Notification.send_source_up(music_session) end end def notify_source_down if music_session_id Notification.send_source_down(music_session) end end # Check if the icecast_mount specifies the value; if not, use the mount_template's value take effect def dumpXml(builder) builder.tag! 'mount' do |mount| mount.tag! 'mount-name', name mount.tag! 'username', resolve_string(:source_username) if string_present?(:source_username) mount.tag! 'password', resolve_string(:source_pass) if string_present?(:source_pass) mount.tag! 'max-listeners', resolve_int(:max_listeners) if int_present?(:max_listeners) mount.tag! 'max-listener-duration', resolve_string(:max_listener_duration) if int_present?(:max_listener_duration) mount.tag! 'dump-file', resolve_string(:dump_file) if string_present?(:dump_file) mount.tag! 'intro', resolve_string(:intro) if string_present?(:intro) mount.tag! 'fallback-mount', resolve_string(:fallback_mount) if string_present?(:fallback_mount) mount.tag! 'fallback-override', resolve_int(:fallback_override) if int_present?(:fallback_override) mount.tag! 'fallback-when-full', resolve_int(:fallback_when_full) if int_present?(:fallback_when_full) mount.tag! 'charset', resolve_string(:charset) if string_present?(:charset) mount.tag! 'public', resolve_int(:is_public) if int_present?(:is_public) mount.tag! 'stream-name', resolve_string(:stream_name) if string_present?(:stream_name) mount.tag! 'stream-description', resolve_string(:stream_description) if string_present?(:stream_description) mount.tag! 'stream-url', resolve_string(:stream_url) if string_present?(:stream_url) mount.tag! 'genre', resolve_string(:genre) if string_present?(:genre) mount.tag! 'bitrate', resolve_int(:bitrate) if int_present?(:bitrate) mount.tag! 'type', resolve_string(:mime_type) if string_present?(:mime_type) mount.tag! 'subtype', resolve_string(:subtype) if string_present?(:subtype) mount.tag! 'burst-size', resolve_int(:burst_size) if int_present?(:burst_size) mount.tag! 'mp3-metadata-interval', resolve_int(:mp3_metadata_interval) if int_present?(:mp3_metadata_interval) mount.tag! 'hidden', resolve_int(:hidden) if int_present?(:hidden) mount.tag! 'on-connect', resolve_string(:on_connect) if string_present?(:on_connect) mount.tag! 'on-disconnect', resolve_string(:on_disconnect) if string_present?(:on_disconnect) authentication.dumpXml(builder) if authentication end end def url raise "Unassociated server to mount" if self.server.nil? "http://#{server.hostname}:#{server.pick_listen_socket(:port)}#{self.name}" end def resolve_string(field) self[field].present? ? self[field] : mount_template && mount_template[field] end def string_present?(field) val = resolve_string(field) val ? val.present? : false end def resolve_int(field) !self[field].nil? ? self[field]: mount_template && mount_template[field] end def int_present?(field) resolve_int(field) end end end