jam-cloud/ruby/lib/jam_ruby/models/quick_mix.rb

287 lines
10 KiB
Ruby

module JamRuby
class QuickMix < ActiveRecord::Base
include S3ManagerMixin
MAX_MIX_TIME = 7200 # 2 hours
attr_accessor :marking_complete, :is_skip_mount_uploader
attr_writer :current_user
belongs_to :user, :class_name => "JamRuby::User", :inverse_of => :quick_mixes
belongs_to :recording, :class_name => "JamRuby::Recording", :inverse_of => :quick_mixes, :foreign_key => 'recording_id'
validates :ogg_md5, :presence => true, :if => :upload_starting?
validates :ogg_length, length: {minimum: 1, maximum: 1024 * 1024 * 256 }, if: :upload_starting? # 256 megs max. is this reasonable? surely...
validates :user, presence: true
validate :validate_fully_uploaded
validate :validate_part_complete
validate :validate_too_many_upload_failures
before_destroy :delete_s3_files
# skip_callback :save, :before, :store_picture!, if: :is_skip_mount_uploader
def too_many_upload_failures?
upload_failures >= APP_CONFIG.max_track_upload_failures
end
def upload_starting?
next_part_to_upload_was == 0 && next_part_to_upload == 1
end
def validate_too_many_upload_failures
if upload_failures >= APP_CONFIG.max_track_upload_failures
errors.add(:upload_failures, ValidationMessages::UPLOAD_FAILURES_EXCEEDED)
end
end
def validate_fully_uploaded
if marking_complete && fully_uploaded && fully_uploaded_was
errors.add(:fully_uploaded, ValidationMessages::ALREADY_UPLOADED)
end
end
def validate_part_complete
# if we see a transition from is_part_uploading from true to false, we validate
if is_part_uploading_was && !is_part_uploading
if next_part_to_upload_was + 1 != next_part_to_upload
errors.add(:next_part_to_upload, ValidationMessages::INVALID_PART_NUMBER_SPECIFIED)
end
if file_offset > ogg_length
errors.add(:file_offset, ValidationMessages::FILE_OFFSET_EXCEEDS_LENGTH)
end
elsif next_part_to_upload_was + 1 == next_part_to_upload
# this makes sure we are only catching 'upload_part_complete' transitions, and not upload_start
if next_part_to_upload_was != 0
# we see that the part number was ticked--but was is_part_upload set to true before this transition?
if !is_part_uploading_was && !is_part_uploading
errors.add(:next_part_to_upload, ValidationMessages::PART_NOT_STARTED)
end
end
end
end
def sanitize_active_admin
self.user_id = nil if self.user_id == ''
end
def upload_start(length, md5)
#self.upload_id set by the observer
self.next_part_to_upload = 1
self.ogg_length = length
self.ogg_md5 = md5
save
end
# if for some reason the server thinks the client can't carry on with the upload,
# this resets everything to the initial state
def reset_upload
self.upload_failures = self.upload_failures + 1
self.part_failures = 0
self.file_offset = 0
self.next_part_to_upload = 0
self.upload_id = nil
self.ogg_md5 = nil
self.ogg_length = 0
self.fully_uploaded = false
self.is_part_uploading = false
save :validate => false # skip validation because we need this to always work
end
def upload_next_part(length, md5)
self.marking_complete = true
if next_part_to_upload == 0
upload_start(length, md5)
end
self.is_part_uploading = true
save
end
def upload_sign(content_md5)
s3_manager.upload_sign(self[:ogg_url], content_md5, next_part_to_upload, upload_id)
end
def upload_part_complete(part, offset)
# validated by :validate_part_complete
self.marking_complete = true
self.is_part_uploading = false
self.next_part_to_upload = self.next_part_to_upload + 1
self.file_offset = offset.to_i
self.part_failures = 0
save
end
def upload_complete
# validate from happening twice by :validate_fully_uploaded
self.fully_uploaded = true
self.marking_complete = true
enqueue
save
end
def increment_part_failures(part_failure_before_error)
self.part_failures = part_failure_before_error + 1
QuickMix.update_all({"part_failures" => self.part_failures, "id" => self.id})
end
def self.create(recording, user)
raise if recording.nil?
mix = QuickMix.new
mix.is_skip_mount_uploader = true
mix.recording = recording
mix.user = user
mix.save
mix[:ogg_url] = construct_filename(mix.created_at, recording.id, mix.id, mix.default_type)
mix[:mp3_url] = construct_filename(mix.created_at, recording.id, mix.id, type='mp3')
mix.save
mix.is_skip_mount_uploader = false
mix
end
def enqueue
begin
Resque.enqueue(QuickMixer, self.id, self.sign_put(3600 * 24 * 30, 'mp3'))
rescue Exception => e
# implies redis is down. we don't update started_at by bailing out here
false
end
# avoid db validations
QuickMix.where(:id => self.id).update_all(:started_at => Time.now, :should_retry => false)
true
end
def mix_timeout?
Time.now - started_at > 60 * 30 # 30 minutes to mix is more than enough
end
def state
return 'mixed' if completed
return 'waiting-to-mix' if started_at.nil?
return 'error' if error_count > 0 || mix_timeout?
return 'mixing'
end
def error
return nil if state != 'error'
return {error_count: error_count, error_reason: error_reason, error_detail: error_detail} if error_count > 0
return {error_count: 1, error_reason: 'mix-timeout', error_detail: started_at} if mix_timeout?
return {error_count: 1, error_reason: 'unknown', error_detail: 'unknown'}
end
def errored(reason, detail)
self.started_at = nil
self.error_reason = reason
self.error_detail = detail
self.error_count = self.error_count + 1
if self.error_count <= 3
self.should_retry = true
end
save
end
def finish(mp3_length, mp3_md5)
self.completed_at = Time.now
self.mp3_length = mp3_length
self.mp3_md5 = mp3_md5
self.completed = true
save!
# did we have a stream mix already when this one finished? We'll check later for sending a notification only on the 1st finished stream mix
has_stream_mix = recording.has_stream_mix
Recording.where(:id => self.recording.id).update_all(:has_stream_mix => true)
# only update first_quick_mix_id pointer if this is the 1st quick mix to complete for this recording
Recording.where(:id => self.recording.id).update_all(:first_quick_mix_id => self.id) if recording.first_quick_mix_id.nil?
unless has_stream_mix
Notification.send_recording_stream_mix_complete(recording)
end
end
def s3_url(type=default_type)
if type == 'aac'
s3_manager.s3_url(self[:ogg_url])
elsif type == 'ogg'
s3_manager.s3_url(self[:ogg_url])
else
s3_manager.s3_url(self[:mp3_url])
end
end
def is_completed
completed
end
# if the url starts with http, just return it because it's in some other store. Otherwise it's a relative path in s3 and needs be signed
def resolve_url(url_field, mime_type, expiration_time)
self[url_field].start_with?('http') ? self[url_field] : s3_manager.sign_url(self[url_field], {:expires => expiration_time, :response_content_type => mime_type, :secure => true})
end
def sign_url(expiration_time = 120, type=default_type)
type ||= 'ogg'
# expire link in 1 minute--the expectation is that a client is immediately following this link
if type == 'aac'
resolve_url(:ogg_url, 'audio/aac', expiration_time)
elsif type == 'ogg'
resolve_url(:ogg_url, 'audio/ogg', expiration_time)
else
resolve_url(:mp3_url, 'audio/mpeg', expiration_time)
end
end
# this is not 'secure' because, in testing, the PUT failed often in Ruby. should investigate more.
def sign_put(expiration_time = 3600 * 24, type=default_type)
type ||= 'ogg'
if type == 'aac'
s3_manager.sign_url(self[:ogg_url], {:expires => expiration_time, :content_type => 'audio/aac', :secure => false}, :put)
elsif type == 'ogg'
s3_manager.sign_url(self[:ogg_url], {:expires => expiration_time, :content_type => 'audio/ogg', :secure => false}, :put)
else
s3_manager.sign_url(self[:mp3_url], {:expires => expiration_time, :content_type => 'audio/mpeg', :secure => false}, :put)
end
end
def self.cleanup_excessive_storage
QuickMix
.joins(:recording)
.includes(:recording)
.where("cleaned = FALSE AND completed = TRUE AND NOW() - completed_at > '7 days'::INTERVAL AND (has_final_mix = TRUE OR (has_stream_mix = TRUE AND quick_mixes.id IN (SELECT qm.id FROM quick_mixes qm WHERE qm.recording_id = recordings.id AND (recordings.first_quick_mix_id IS NULL OR recordings.first_quick_mix_id != qm.id))))").limit(1000).each do |quick_mix|
quick_mix.delete_s3_files
QuickMix.where(:id => quick_mix.id).update_all(:cleaned => true)
if quick_mix.recording.first_quick_mix_id == quick_mix.id
Recording.where(:id => quick_mix.recording.id).update_all(:has_stream_mix => false, :first_quick_mix_id => nil)
end
end
end
def default_type
recording.immediate ? 'aac' : 'ogg'
end
def filename(type=default_type)
# construct a path for s3
QuickMix.construct_filename(self.created_at, self.recording_id, self.id, type)
end
def delete_s3_files
s3_manager.delete(filename(type=default_type)) if self[:ogg_url] && s3_manager.exists?(filename(type=default_type))
s3_manager.delete(filename(type='mp3')) if self[:mp3_url] && s3_manager.exists?(filename(type='mp3'))
end
def self.construct_filename(created_at, recording_id, id, type='ogg')
raise "unknown ID" unless id
"recordings/#{created_at.strftime('%m-%d-%Y')}/#{recording_id}/stream-mix-#{id}.#{type}"
end
end
end