merging websocket-gateway into websocket-gateway
This commit is contained in:
commit
f78b6f7868
|
|
@ -0,0 +1,24 @@
|
|||
*.gem
|
||||
*.rbc
|
||||
.bundle
|
||||
.config
|
||||
.yardoc
|
||||
Gemfile.lock
|
||||
InstalledFiles
|
||||
_yardoc
|
||||
coverage
|
||||
doc/
|
||||
lib/bundler/man
|
||||
pkg
|
||||
rdoc
|
||||
spec/reports
|
||||
test/tmp
|
||||
test/version_tmp
|
||||
tmp
|
||||
log/*
|
||||
.idea
|
||||
*~
|
||||
*.swp
|
||||
*.iml
|
||||
target
|
||||
vendor
|
||||
|
|
@ -0,0 +1 @@
|
|||
up.connopts=dbname:jam
|
||||
|
|
@ -0,0 +1 @@
|
|||
websockets
|
||||
|
|
@ -0,0 +1 @@
|
|||
ruby-2.0.0-p247
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
#ruby=1.9.3-p327
|
||||
source 'https://rubygems.org'
|
||||
unless ENV['LOCAL_DEV'] == '1'
|
||||
source 'https://jamjam:blueberryjam@int.jamkazam.com/gems/'
|
||||
end
|
||||
|
||||
# Look for $WORKSPACE, otherwise use "workspace" as dev path.
|
||||
workspace = ENV["WORKSPACE"] || "~/workspace"
|
||||
devenv = ENV["BUILD_NUMBER"].nil? # Jenkins sets a build number environment variable
|
||||
|
||||
if devenv
|
||||
gem 'jam_db', :path=> "#{workspace}/jam-db/target/ruby_package"
|
||||
gem 'jampb', :path => "#{workspace}/jam-pb/target/ruby/jampb"
|
||||
gem 'jam_ruby', :path => "#{workspace}/jam-ruby"
|
||||
else
|
||||
gem 'jam_db'
|
||||
gem 'jampb'
|
||||
gem 'jam_ruby'
|
||||
end
|
||||
|
||||
|
||||
gem 'uuidtools', '2.1.2'
|
||||
gem 'bcrypt-ruby', '3.0.1'
|
||||
gem 'ruby-protocol-buffers', '1.2.2'
|
||||
gem 'em-websocket', '>=0.4.0' #, :path=> "#{workspace}/em-websocket-jam"
|
||||
gem 'amqp'
|
||||
gem 'activerecord', '3.2.7'
|
||||
gem 'logging'
|
||||
gem 'will_paginate'
|
||||
gem 'actionmailer'
|
||||
gem 'sendgrid'
|
||||
gem 'rb-readline'
|
||||
gem 'aasm', '3.0.16'
|
||||
gem 'carrierwave'
|
||||
gem 'devise'
|
||||
gem 'postgres-copy'
|
||||
gem 'aws-sdk'
|
||||
|
||||
group :development do
|
||||
gem 'pry'
|
||||
end
|
||||
|
||||
group :test do
|
||||
gem 'cucumber'
|
||||
gem 'rspec'
|
||||
gem 'factory_girl'
|
||||
#gem 'spork', '0.9.0'
|
||||
gem 'database_cleaner', '0.7.0'
|
||||
gem 'guard', '>= 0.10.0'
|
||||
gem 'guard-rspec', '>= 0.7.3'
|
||||
gem 'pg_migrate','0.1.11' #:path => "#{workspace}/pg_migrate_ruby"
|
||||
gem 'evented-spec'
|
||||
end
|
||||
|
||||
group :package do
|
||||
gem 'fpm'
|
||||
end
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
interactor :simple
|
||||
guard 'jruby-rspec', :spec_paths => ["spec"]
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
TODO & DESIGN LIMITATIONS
|
||||
=========================
|
||||
|
||||
* !!!! lock up multi-threaded unsafe data structures
|
||||
|
||||
* The rabbitmq connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy)
|
||||
* The database connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy)
|
||||
* We make just one user topic registration and session registration for all users/sessions. If ever we had 10 of servers, it could be wasteful. It just depends on how fast the bogus messaging can be ignored
|
||||
* The database connection is pooled.
|
||||
* The user model is stored in memory, meaning periodically it should be reloaded from the database (in case a user was marked inactive and you want them knocked out of the system)
|
||||
* The user could easily join to multiple sessions. Currently, though, the ClientContext object only tracks one jam session topic subscription. This is minimial to change.
|
||||
* peek logic not implemented on server for protoc messages; this could be done to save cost of deserialization and serialization for session/user directed messages
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
#!/usr/bin/env rake
|
||||
require "bundler/gem_tasks"
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
# establish database connection before including JamRuby
|
||||
require 'active_record'
|
||||
bin_dir = File.expand_path(File.dirname(__FILE__))
|
||||
|
||||
app_config_file = File.join(bin_dir, '..', 'config', 'application.yml')
|
||||
db_config_file = File.join(bin_dir, '..', 'config', 'database.yml')
|
||||
jamenv = ENV['JAMENV']
|
||||
jamenv ||= 'development'
|
||||
config = YAML::load(File.open(app_config_file))[jamenv]
|
||||
db_config = YAML::load(File.open(db_config_file))[jamenv]
|
||||
|
||||
ActiveRecord::Base.establish_connection(db_config)
|
||||
|
||||
|
||||
# now bring in the Jam code
|
||||
require 'jam_websockets'
|
||||
|
||||
include JamWebsockets
|
||||
|
||||
# run some method
|
||||
|
||||
|
||||
if config["verbose"]
|
||||
Logging.logger.root.level = :debug
|
||||
else
|
||||
Logging.logger.root.level = :info
|
||||
end
|
||||
|
||||
if jamenv == "production"
|
||||
one_meg = 1024 * 1024
|
||||
Logging.logger.root.appenders = Logging.appenders.rolling_file("log/#{jamenv}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20)
|
||||
else
|
||||
Logging.logger.root.appenders = Logging.appenders.stdout
|
||||
end
|
||||
|
||||
Server.new.run(:port => config["port"],
|
||||
:emwebsocket_debug => config["emwebsocket_debug"],
|
||||
:connect_time_stale => config["connect_time_stale"],
|
||||
:connect_time_expire => config["connect_time_expire"])
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/sh
|
||||
# this script is used only in development
|
||||
|
||||
bundle exec ruby -Ilib bin/websocket_gateway $*
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
#!/bin/sh
|
||||
# this script is used only in development
|
||||
|
||||
# Look for $WORKSPACE, otherwise use "workspace" as dev path.
|
||||
if [ -z "$WORKSPACE" ]; then
|
||||
WORKSPACE="~/workspace"
|
||||
fi
|
||||
|
||||
|
||||
jruby -I"$WORKSPACE/jam-ruby/lib" -I"$WORKSPACE/jam-pb/target/ruby/jampb/lib" -I"$WORKSPACE/jam-db/target/ruby_package/lib" -Ilib bin/websocket_gateway $*
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
#!/bin/bash
|
||||
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
# 'target' is the output directory
|
||||
rm -rf $DIR/target
|
||||
mkdir $DIR/target
|
||||
mkdir $DIR/target/deb
|
||||
|
||||
# put all dependencies into vendor/bundle
|
||||
rm -rf vendor/bundle
|
||||
echo "updating dependencies"
|
||||
|
||||
bundle install --path vendor/bundle
|
||||
bundle update
|
||||
|
||||
|
||||
if [ -z $SKIP_TESTS ]; then
|
||||
echo "running rspec tests"
|
||||
bundle exec rspec
|
||||
|
||||
if [ "$?" = "0" ]; then
|
||||
echo "tests completed"
|
||||
else
|
||||
echo "tests failed."
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -n "$PACKAGE" ]; then
|
||||
if [ -z "$BUILD_NUMBER" ]; then
|
||||
echo "BUILD NUMBER is not defined"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
type -P dpkg-architecture > /dev/null
|
||||
|
||||
if [ "$?" = "0" ]; then
|
||||
ARCH=`dpkg-architecture -qDEB_HOST_ARCH`
|
||||
else
|
||||
echo "WARN: unable to determine architecture."
|
||||
ARCH=`all`
|
||||
fi
|
||||
|
||||
set -e
|
||||
# cache all gems local, and tell bundle to use local gems only
|
||||
bundle install --path vendor/bundle --local
|
||||
|
||||
# create debian using fpm
|
||||
bundle exec fpm -s dir -t deb -p target/deb/websocket-gateway_0.1.${BUILD_NUMBER}_${ARCH}.deb -n "websocket-gateway" -v "0.1.$BUILD_NUMBER" --prefix /var/lib/websocket-gateway --after-install $DIR/script/package/post-install.sh --before-install $DIR/script/package/pre-install.sh --before-remove $DIR/script/package/pre-uninstall.sh --after-remove $DIR/script/package/post-uninstall.sh Gemfile lib bin vendor .bundle config script
|
||||
|
||||
fi
|
||||
|
||||
|
||||
echo "build complete"
|
||||
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
Defaults: &defaults
|
||||
connect_time_stale: 30
|
||||
connect_time_expire: 60
|
||||
|
||||
development:
|
||||
port: 6767
|
||||
verbose: true
|
||||
emwebsocket_debug: false
|
||||
<<: *defaults
|
||||
|
||||
test:
|
||||
port: 6769
|
||||
verbose: true
|
||||
<<: *defaults
|
||||
|
||||
production:
|
||||
port: 6767
|
||||
verbose: false
|
||||
<<: *defaults
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
test:
|
||||
adapter: postgresql
|
||||
database: jam_websockets_test
|
||||
host: localhost
|
||||
port: 5432
|
||||
pool: 3
|
||||
username: postgres
|
||||
password: postgres
|
||||
timeout: 2000
|
||||
encoding: unicode
|
||||
|
||||
development:
|
||||
adapter: postgresql
|
||||
database: jam
|
||||
host: localhost
|
||||
port: 5432
|
||||
pool: 3
|
||||
username: postgres
|
||||
password: postgres
|
||||
timeout: 2000
|
||||
encoding: unicode
|
||||
|
||||
production:
|
||||
adapter: postgresql
|
||||
database: jam
|
||||
host: localhost
|
||||
port: 5432
|
||||
pool: 3
|
||||
username: postgres
|
||||
password: postgres
|
||||
timeout: 2000
|
||||
encoding: unicode
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
Feature: Login
|
||||
In order to protect the internal cloud from malicious actors, login is required to remain connected and send commands as a particular user.
|
||||
|
||||
Scenario: Login with bad credentials and be bounced
|
||||
Given I supply bad credentials in Login command
|
||||
When I send the command
|
||||
Then the result should be an error seen and a closed connection
|
||||
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
require File.expand_path('../lib/jam_websockets/version', __FILE__)
|
||||
|
||||
Gem::Specification.new do |gem|
|
||||
gem.authors = ["Seth Call"]
|
||||
gem.email = ["sethcall@gmail.com"]
|
||||
gem.description = %q{websocket server for clients}
|
||||
gem.summary = %q{websocket server for clients}
|
||||
gem.homepage = ""
|
||||
|
||||
gem.files = `git ls-files`.split($\)
|
||||
gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
|
||||
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
|
||||
gem.name = "jam_websockets"
|
||||
gem.require_paths = ["lib"]
|
||||
gem.version = JamWebsockets::VERSION
|
||||
end
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
#!/bin/bash
|
||||
|
||||
GEM_SERVER=http://localhost:9000/gems
|
||||
DEB_SERVER=http://localhost:9010/apt-`uname -p`
|
||||
|
||||
echo "starting build..."
|
||||
./build
|
||||
|
||||
if [ "$?" = "0" ]; then
|
||||
echo "build succeeded"
|
||||
|
||||
# generate gem version based on jenkins build number
|
||||
if [ -z $BUILD_NUMBER ]; then
|
||||
BUILD_NUMBER="1"
|
||||
fi
|
||||
VERSION="0.0.${BUILD_NUMBER}"
|
||||
echo "packaging gem jam_websockets-$VERSION"
|
||||
cat > lib/jam_websockets/version.rb << EOF
|
||||
module JamWebsockets
|
||||
VERSION = "$VERSION"
|
||||
end
|
||||
EOF
|
||||
|
||||
gem build jam_websockets.gemspec
|
||||
|
||||
GEMNAME="jam_websockets-${VERSION}.gem"
|
||||
|
||||
echo "publishing gem"
|
||||
curl -f -T $GEMNAME $GEM_SERVER/$GEMNAME
|
||||
|
||||
if [ "$?" != "0" ]; then
|
||||
echo "gem publish failed"
|
||||
exit 1
|
||||
fi
|
||||
echo "done publishing gem"
|
||||
|
||||
if [ ! -z "$PACKAGE" ]; then
|
||||
echo "publishing ubuntu package (.deb)"
|
||||
DEBPATH=`find target/deb -name *.deb`
|
||||
DEBNAME=`basename $DEBPATH`
|
||||
|
||||
curl -f -T $DEBPATH $DEB_SERVER/$DEBNAME
|
||||
|
||||
if [ "$?" != "0" ]; then
|
||||
echo "deb publish failed"
|
||||
exit 1
|
||||
fi
|
||||
echo "done publishing deb"
|
||||
|
||||
|
||||
fi
|
||||
else
|
||||
echo "build failed"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
require "logging"
|
||||
require "jam_ruby"
|
||||
require "jam_websockets/version"
|
||||
require "jam_websockets/session_error"
|
||||
require "jam_websockets/permission_error"
|
||||
require "jam_websockets/client_context"
|
||||
require "jam_websockets/message"
|
||||
require "jam_websockets/router"
|
||||
require "jam_websockets/server"
|
||||
|
||||
include JamRuby
|
||||
module JamWebsockets
|
||||
# Your code goes here...
|
||||
end
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
module JamWebsockets
|
||||
class ClientContext
|
||||
|
||||
attr_accessor :user, :client, :msg_count, :session, :sent_bad_state_previously
|
||||
|
||||
def initialize(user, client)
|
||||
@user = user
|
||||
@client = client
|
||||
@msg_count = 0
|
||||
@session = nil
|
||||
@sent_bad_state_previously = false
|
||||
end
|
||||
|
||||
def to_s
|
||||
return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]"
|
||||
end
|
||||
|
||||
def hash
|
||||
@client.hash
|
||||
end
|
||||
|
||||
def ==(o)
|
||||
o.class == self.class && o.client == @client
|
||||
end
|
||||
alias_method :eql?, :==
|
||||
|
||||
end
|
||||
end
|
||||
|
|
@ -0,0 +1,71 @@
|
|||
require 'json'
|
||||
require 'protocol_buffers'
|
||||
require 'protocol_buffers/compiler'
|
||||
|
||||
class ProtocolBuffers::Message
|
||||
|
||||
def to_json(*args)
|
||||
|
||||
json = to_json_ready_object()
|
||||
|
||||
json.to_json(*args)
|
||||
end
|
||||
|
||||
def to_json_ready_object()
|
||||
hash = {}
|
||||
|
||||
# simpler version, includes all fields in the output, using the default
|
||||
# values if unset. also includes empty repeated fields as empty arrays.
|
||||
# fields.each do |tag, field|
|
||||
# hash[field.name] = value_for_tag(field.tag)
|
||||
# end
|
||||
|
||||
# prettier output, only includes non-empty repeated fields and set fields
|
||||
fields.each do |tag, field|
|
||||
if field.repeated?
|
||||
value = value_for_tag(field.tag)
|
||||
hash[field.name] = value unless value.empty?
|
||||
else
|
||||
if value_for_tag?(field.tag)
|
||||
value = value_for_tag(field.tag)
|
||||
if field.instance_of? ProtocolBuffers::Field::EnumField # if value is enum, resolve string value as ruby const
|
||||
hash[field.name] = field.value_to_name[value]
|
||||
else
|
||||
proxy_class = field.instance_variable_get(:@proxy_class)
|
||||
if proxy_class.nil?
|
||||
hash[field.name] = value
|
||||
else
|
||||
hash[field.name] = value.to_json_ready_object
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
return hash
|
||||
end
|
||||
|
||||
def self.json_create(hash)
|
||||
# initialize takes a hash of { attribute_name => value } so you can just
|
||||
# pass the hash into the constructor. but we're supposed to be showing off
|
||||
# reflection, here. plus, that raises an exception if there is an unknown
|
||||
# key in the hash.
|
||||
# new(hash)
|
||||
|
||||
message = new
|
||||
fields.each do |tag, field|
|
||||
if value = hash[field.name.to_s]
|
||||
if value.instance_of? Hash # if the value is a Hash, descend down into PB hierachy
|
||||
inner_class = field.instance_variable_get(:@proxy_class)
|
||||
value = inner_class.json_create(value)
|
||||
message.set_value_for_tag(field.tag, value)
|
||||
elsif field.instance_of? ProtocolBuffers::Field::EnumField # if value is enum, resolve string value as ruby const
|
||||
message.set_value_for_tag(field.tag, field.instance_variable_get(:@proxy_enum).const_get(value))
|
||||
else
|
||||
message.set_value_for_tag(field.tag, value)
|
||||
end
|
||||
end
|
||||
end
|
||||
message
|
||||
end
|
||||
end
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
class PermissionError < Exception
|
||||
|
||||
end
|
||||
|
||||
|
|
@ -0,0 +1,705 @@
|
|||
require 'set'
|
||||
require 'amqp'
|
||||
require 'thread'
|
||||
require 'json'
|
||||
require 'eventmachine'
|
||||
|
||||
include Jampb
|
||||
|
||||
# add new field to client connection
|
||||
module EventMachine
|
||||
module WebSocket
|
||||
class Connection < EventMachine::Connection
|
||||
attr_accessor :encode_json, :client_id # client_id is uuid we give to each client to track them as we like
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
module JamWebsockets
|
||||
|
||||
class Router
|
||||
|
||||
attr_accessor :user_context_lookup
|
||||
|
||||
def initialize()
|
||||
@log = Logging.logger[self]
|
||||
@pending_clients = Set.new # clients that have connected to server, but not logged in.
|
||||
@clients = {} # clients that have logged in
|
||||
@user_context_lookup = {} # lookup a set of client_contexts by user_id
|
||||
@client_lookup = {} # lookup a client by client_id
|
||||
@amqp_connection_manager = nil
|
||||
@users_exchange = nil
|
||||
@message_factory = JamRuby::MessageFactory.new
|
||||
@semaphore = Mutex.new
|
||||
@user_topic = nil
|
||||
@client_topic = nil
|
||||
@thread_pool = nil
|
||||
@heartbeat_interval = nil
|
||||
|
||||
end
|
||||
|
||||
def start(connect_time_stale, options={:host => "localhost", :port => 5672}, &block)
|
||||
|
||||
@log.info "startup"
|
||||
|
||||
@heartbeat_interval = connect_time_stale / 2
|
||||
|
||||
begin
|
||||
@amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port])
|
||||
@amqp_connection_manager.connect do |channel|
|
||||
register_topics(channel)
|
||||
block.call
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.error "unable to initialize #{e.to_s}"
|
||||
cleanup
|
||||
raise e
|
||||
end
|
||||
|
||||
@log.info "started"
|
||||
end
|
||||
|
||||
def add_client(client_id, client_context)
|
||||
@client_lookup[client_id] = client_context
|
||||
end
|
||||
|
||||
def remove_client(client_id, client)
|
||||
deleted = @client_lookup.delete(client_id)
|
||||
|
||||
if deleted.nil?
|
||||
@log.warn "unable to delete #{client_id} from client_lookup"
|
||||
elsif deleted.client != client
|
||||
# put it back--this is only possible if add_client hit the 'old connection' path
|
||||
# so in other words if this happens:
|
||||
# add_client(1, clientX)
|
||||
# add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time
|
||||
# remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash
|
||||
@client_lookup[client_id] = deleted
|
||||
@log.debug "putting back client into @client_lookup for #{client_id} #{client.inspect}"
|
||||
else
|
||||
@log.debug "cleaned up @client_lookup for #{client_id}"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def add_user(context)
|
||||
user_contexts = @user_context_lookup[context.user.id]
|
||||
if user_contexts.nil?
|
||||
user_contexts = Hash.new
|
||||
@user_context_lookup[context.user.id] = user_contexts
|
||||
end
|
||||
|
||||
user_contexts[context.client] = context
|
||||
end
|
||||
|
||||
def remove_user(client_context)
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
|
||||
if user_contexts.nil?
|
||||
@log.warn "user can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of user contexts
|
||||
user_contexts.delete(client_context.client)
|
||||
|
||||
# if last user context, delete entire set (memory leak concern)
|
||||
if user_contexts.length == 0
|
||||
@user_context_lookup.delete(client_context.user.id)
|
||||
end
|
||||
|
||||
client_context.user = nil
|
||||
end
|
||||
end
|
||||
|
||||
# register topic for user messages and session messages
|
||||
def register_topics(channel)
|
||||
|
||||
######################## USER MESSAGING ###########################
|
||||
|
||||
# create user exchange
|
||||
@users_exchange = channel.topic('users')
|
||||
# create user messaging topic
|
||||
@user_topic = channel.queue("", :auto_delete => true)
|
||||
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
||||
@user_topic.purge
|
||||
|
||||
# subscribe for any messages to users
|
||||
@user_topic.subscribe(:ack => false) do |headers, msg|
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
user_id = routing_key["user.".length..-1]
|
||||
|
||||
@semaphore.synchronize do
|
||||
contexts = @user_context_lookup[user_id]
|
||||
|
||||
if !contexts.nil?
|
||||
|
||||
@log.debug "received user-directed message for user: #{user_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
contexts.each do |client_id, context|
|
||||
EM.schedule do
|
||||
@log.debug "sending user message to #{context}"
|
||||
send_to_client(context.client, msg)
|
||||
end
|
||||
end
|
||||
else
|
||||
@log.debug "Can't route message: no user connected with id #{user_id}"
|
||||
end
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client"
|
||||
@log.error e
|
||||
end
|
||||
end
|
||||
|
||||
MQRouter.user_exchange = @users_exchange
|
||||
|
||||
############## CLIENT MESSAGING ###################
|
||||
|
||||
@clients_exchange = channel.topic('clients')
|
||||
|
||||
@client_topic = channel.queue("", :auto_delete => true)
|
||||
@client_topic.bind(@clients_exchange, :routing_key => "client.#")
|
||||
@client_topic.purge
|
||||
|
||||
# subscribe for any p2p messages to a client
|
||||
@client_topic.subscribe(:ack => false) do |headers, msg|
|
||||
begin
|
||||
routing_key = headers.routing_key
|
||||
client_id = routing_key["client.".length..-1]
|
||||
@semaphore.synchronize do
|
||||
client_context = @client_lookup[client_id]
|
||||
client = client_context.client
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
@log.debug "client-directed message received from #{msg.from} to client #{client_id}"
|
||||
|
||||
unless client.nil?
|
||||
|
||||
EM.schedule do
|
||||
@log.debug "sending client-directed down websocket to #{client_id}"
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
else
|
||||
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
@log.error "unhandled error in messaging to client"
|
||||
@log.error e
|
||||
end
|
||||
end
|
||||
|
||||
MQRouter.client_exchange = @clients_exchange
|
||||
end
|
||||
|
||||
|
||||
def new_client(client)
|
||||
@semaphore.synchronize do
|
||||
@pending_clients.add(client)
|
||||
end
|
||||
|
||||
# default to using json instead of pb
|
||||
client.encode_json = true
|
||||
|
||||
client.onopen { |handshake|
|
||||
#binding.pry
|
||||
@log.debug "client connected #{client}"
|
||||
|
||||
# check for '?pb' or '?pb=true' in url query parameters
|
||||
query_pb = handshake.query["pb"]
|
||||
|
||||
if !query_pb.nil? && (query_pb == "" || query_pb == "true")
|
||||
client.encode_json = false
|
||||
end
|
||||
|
||||
}
|
||||
|
||||
client.onclose {
|
||||
@log.debug "Connection closed"
|
||||
stale_client(client)
|
||||
}
|
||||
|
||||
client.onerror { |error|
|
||||
if error.kind_of?(EM::WebSocket::WebSocketError)
|
||||
@log.error "websockets error: #{error}"
|
||||
else
|
||||
@log.error "generic error: #{error} #{error.backtrace}"
|
||||
end
|
||||
|
||||
cleanup_client(client)
|
||||
client.close_websocket
|
||||
}
|
||||
|
||||
client.onmessage { |msg|
|
||||
@log.debug("msg received")
|
||||
|
||||
# TODO: set a max message size before we put it through PB?
|
||||
# TODO: rate limit?
|
||||
|
||||
pb_msg = nil
|
||||
|
||||
begin
|
||||
if client.encode_json
|
||||
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
|
||||
parse = JSON.parse(msg)
|
||||
pb_msg = Jampb::ClientMessage.json_create(parse)
|
||||
self.route(pb_msg, client)
|
||||
else
|
||||
pb_msg = Jampb::ClientMessage.parse(msg.to_s)
|
||||
self.route(pb_msg, client)
|
||||
end
|
||||
rescue SessionError => e
|
||||
@log.info "ending client session deliberately due to malformed client behavior. reason=#{e}"
|
||||
begin
|
||||
# wrap the message up and send it down
|
||||
error_msg = @message_factory.server_rejection_error(e.to_s)
|
||||
send_to_client(client, error_msg)
|
||||
ensure
|
||||
client.close_websocket
|
||||
cleanup_client(client)
|
||||
end
|
||||
rescue PermissionError => e
|
||||
@log.info "permission error. reason=#{e.to_s}"
|
||||
@log.info e
|
||||
|
||||
# wrap the message up and send it down
|
||||
error_msg = @message_factory.server_permission_error(pb_msg.message_id, e.to_s)
|
||||
send_to_client(client, error_msg)
|
||||
rescue => e
|
||||
@log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}"
|
||||
@log.error e
|
||||
|
||||
begin
|
||||
# wrap the message up and send it down
|
||||
error_msg = @message_factory.server_generic_error(e.to_s)
|
||||
send_to_client(client, error_msg)
|
||||
ensure
|
||||
client.close_websocket
|
||||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
|
||||
def send_to_client(client, msg)
|
||||
@log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})"
|
||||
if client.encode_json
|
||||
client.send(msg.to_json.to_s)
|
||||
else
|
||||
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
||||
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup()
|
||||
# shutdown topic listeners and mq connection
|
||||
|
||||
unless @amqp_connection_manager.nil?
|
||||
@amqp_connection_manager.disconnect
|
||||
end
|
||||
|
||||
# tear down each individual client
|
||||
@clients.each do |client, context|
|
||||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
|
||||
def stop
|
||||
@log.info "shutdown"
|
||||
cleanup
|
||||
end
|
||||
|
||||
# caused a client connection to be marked stale
|
||||
def stale_client(client)
|
||||
if cid = client.client_id
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
music_session_id = connection_manager.flag_connection_stale_with_client_id(cid)
|
||||
# update the session members, letting them know this client went stale
|
||||
context = @client_lookup[client.client_id]
|
||||
music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil?
|
||||
Notification.send_musician_session_stale(music_session, client.client_id, context.user) unless music_session.nil?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup_clients_with_ids(client_ids)
|
||||
# @log.debug("*** cleanup_clients_with_ids: client_ids = #{client_ids.inspect}")
|
||||
client_ids.each do |cid|
|
||||
|
||||
client_context = @client_lookup[cid]
|
||||
self.cleanup_client(client_context.client) unless client_context.nil?
|
||||
|
||||
# remove this connection from the database
|
||||
ConnectionManager.active_record_transaction do |mgr|
|
||||
mgr.delete_connection(cid) { |conn, count, music_session_id, user_id|
|
||||
Notification.send_friend_update(user_id, false, conn) if count == 0
|
||||
music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil?
|
||||
user = User.find_by_id(user_id) unless user_id.nil?
|
||||
Notification.send_musician_session_depart(music_session, cid, user) unless music_session.nil? || user.nil?
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# removes all resources associated with a client
|
||||
def cleanup_client(client)
|
||||
@semaphore.synchronize do
|
||||
# @log.debug("*** cleanup_clients: client = #{client}")
|
||||
pending = @pending_clients.delete?(client)
|
||||
|
||||
if !pending.nil?
|
||||
@log.debug "cleaning up not-logged-in client #{client}"
|
||||
else
|
||||
|
||||
@log.debug "cleanup up logged-in client #{client}"
|
||||
|
||||
remove_client(client.client_id, client)
|
||||
|
||||
context = @clients.delete(client)
|
||||
|
||||
if !context.nil?
|
||||
remove_user(context)
|
||||
else
|
||||
@log.debug "skipping duplicate cleanup attempt of logged-in client"
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def route(client_msg, client)
|
||||
message_type = @message_factory.get_message_type(client_msg)
|
||||
|
||||
raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil?
|
||||
|
||||
@log.debug("msg received #{message_type}")
|
||||
|
||||
raise SessionError, 'client_msg.route_to is null' if client_msg.route_to.nil?
|
||||
|
||||
if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN
|
||||
# this client has not logged in and is trying to send a non-login message
|
||||
raise SessionError, "must 'Login' first"
|
||||
end
|
||||
|
||||
if @message_factory.server_directed? client_msg
|
||||
|
||||
handle_server_directed(client_msg, client)
|
||||
|
||||
elsif @message_factory.client_directed? client_msg
|
||||
|
||||
to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1]
|
||||
handle_client_directed(to_client_id, client_msg, client)
|
||||
|
||||
elsif @message_factory.session_directed? client_msg
|
||||
|
||||
session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
|
||||
handle_session_directed(session_id, client_msg, client)
|
||||
|
||||
elsif @message_factory.user_directed? client_msg
|
||||
|
||||
user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
|
||||
handle_user_directed(user_id, client_msg, client)
|
||||
|
||||
else
|
||||
raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def handle_server_directed(client_msg, client)
|
||||
# @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})")
|
||||
|
||||
if client_msg.type == ClientMessage::Type::LOGIN
|
||||
|
||||
handle_login(client_msg.login, client)
|
||||
|
||||
elsif client_msg.type == ClientMessage::Type::HEARTBEAT
|
||||
|
||||
handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client)
|
||||
|
||||
else
|
||||
raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message"
|
||||
end
|
||||
end
|
||||
|
||||
def handle_login(login, client)
|
||||
username = login.username if login.value_for_tag(1)
|
||||
password = login.password if login.value_for_tag(2)
|
||||
token = login.token if login.value_for_tag(3)
|
||||
client_id = login.client_id if login.value_for_tag(4)
|
||||
reconnect_music_session_id = login.client_id if login.value_for_tag(5)
|
||||
|
||||
@log.info("*** handle_login: token=#{token}; client_id=#{client_id}")
|
||||
connection = nil
|
||||
reconnected = false
|
||||
|
||||
# you don't have to supply client_id in login--if you don't, we'll generate one
|
||||
if client_id.nil? || client_id.empty?
|
||||
# give a unique ID to this client. This is used to prevent session messages
|
||||
# from echoing back to the sender, for instance.
|
||||
client_id = UUIDTools::UUID.random_create.to_s
|
||||
else
|
||||
# check if there's a connection for the client... if it's stale, reconnect it
|
||||
if connection = JamRuby::Connection.find_by_client_id(client_id)
|
||||
# FIXME: I think connection table needs to updated within connection_manager
|
||||
# otherwise this would be 1 line of code (connection.connect!)
|
||||
|
||||
music_session_upon_reentry = connection.music_session
|
||||
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id)
|
||||
context = @client_lookup[client_id]
|
||||
if music_session_id.nil?
|
||||
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
|
||||
# if so, then we need to tell the others in the session that this user is now departed
|
||||
Notification.send_musician_session_depart(music_session_upon_reentry, client.client_id, context.user) unless context.nil? || music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed?
|
||||
else
|
||||
music_session = MusicSession.find_by_id(music_session_id)
|
||||
Notification.send_musician_session_fresh(music_session, client.client_id, context.user) unless context.nil?
|
||||
end
|
||||
|
||||
end if connection.stale?
|
||||
end
|
||||
# if there's a client_id but no connection object, create new client_id
|
||||
client_id = UUIDTools::UUID.random_create.to_s if !connection
|
||||
end
|
||||
|
||||
client.client_id = client_id
|
||||
|
||||
user = valid_login(username, password, token, client_id)
|
||||
|
||||
if !user.nil?
|
||||
@log.debug "user #{user} logged in"
|
||||
|
||||
# respond with LOGIN_ACK to let client know it was successful
|
||||
remote_ip = extract_ip(client)
|
||||
|
||||
|
||||
@semaphore.synchronize do
|
||||
# remove from pending_queue
|
||||
@pending_clients.delete(client)
|
||||
|
||||
# add a tracker for this user
|
||||
context = ClientContext.new(user, client)
|
||||
@clients[client] = context
|
||||
add_user(context)
|
||||
add_client(client_id, context)
|
||||
|
||||
unless connection
|
||||
# log this connection in the database
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.create_connection(user.id, client.client_id, remote_ip) do |conn, count|
|
||||
if count == 1
|
||||
Notification.send_friend_update(user.id, true, conn)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
login_ack = @message_factory.login_ack(remote_ip,
|
||||
client_id,
|
||||
user.remember_token,
|
||||
@heartbeat_interval,
|
||||
connection.try(:music_session_id),
|
||||
reconnected)
|
||||
send_to_client(client, login_ack)
|
||||
end
|
||||
else
|
||||
raise SessionError, 'invalid login'
|
||||
end
|
||||
end
|
||||
|
||||
# TODO: deprecated; jam_ruby has routine inspired by this
|
||||
def send_friend_update(user, online, client)
|
||||
@log.debug "sending friend update for user #{user} online = #{online}"
|
||||
|
||||
if !user.nil? && user.friends.exists?
|
||||
@log.debug "user has friends - sending friend updates"
|
||||
|
||||
# create the friend_update message
|
||||
friend_update_msg = @message_factory.friend_update(user.id, online)
|
||||
|
||||
# send the friend_update to each friend that has active connections
|
||||
user.friends.each do |friend|
|
||||
@log.debug "sending friend update message to #{friend}"
|
||||
|
||||
handle_user_directed(friend.id, friend_update_msg, client)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def handle_heartbeat(heartbeat, heartbeat_message_id, client)
|
||||
unless context = @clients[client]
|
||||
@log.warn "*** WARNING: unable to find context due to heartbeat from client: #{client.client_id}; calling cleanup"
|
||||
cleanup_client(client)
|
||||
else
|
||||
connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id)
|
||||
if connection.nil?
|
||||
@log.warn "*** WARNING: unable to find connection due to heartbeat from client: #{context}; calling cleanup_client"
|
||||
cleanup_client(client)
|
||||
else
|
||||
connection.touch
|
||||
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.reconnect(connection, connection.music_session_id)
|
||||
end if connection.stale?
|
||||
end
|
||||
|
||||
heartbeat_ack = @message_factory.heartbeat_ack()
|
||||
|
||||
send_to_client(client, heartbeat_ack)
|
||||
|
||||
# send errors to clients in response to heartbeats if rabbitmq is down
|
||||
if !@amqp_connection_manager.connected?
|
||||
error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down")
|
||||
context.sent_bad_state_previously = true
|
||||
send_to_client(client, error_msg)
|
||||
return
|
||||
elsif context.sent_bad_state_previously
|
||||
context.sent_bad_state_previously = false
|
||||
recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id)
|
||||
send_to_client(client, recovery_msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def valid_login(username, password, token, client_id)
|
||||
|
||||
if !token.nil? && token != ''
|
||||
@log.debug "logging in via token"
|
||||
# attempt login with token
|
||||
user = JamRuby::User.find_by_remember_token(token)
|
||||
|
||||
if user.nil?
|
||||
@log.debug "no user found with token #{token}"
|
||||
return nil
|
||||
else
|
||||
@log.debug "#{user} login via token"
|
||||
return user
|
||||
end
|
||||
|
||||
elsif !username.nil? and !password.nil?
|
||||
|
||||
@log.debug "logging in via user/pass '#{username}' '#{password}'"
|
||||
# attempt login with username and password
|
||||
user = User.find_by_email(username)
|
||||
|
||||
if !user.nil? && user.valid_password?(password)
|
||||
@log.debug "#{user} login via password"
|
||||
return user
|
||||
else
|
||||
@log.debug "#{username} login failure"
|
||||
return nil
|
||||
end
|
||||
else
|
||||
raise SessionError, 'no login data was found in Login message'
|
||||
end
|
||||
end
|
||||
|
||||
def access_music_session(music_session_id, user)
|
||||
music_session = MusicSession.find_by_id(music_session_id)
|
||||
|
||||
if music_session.nil?
|
||||
raise SessionError, 'specified session not found'
|
||||
end
|
||||
|
||||
if !music_session.access? user
|
||||
raise SessionError, 'not allowed to join the specified session'
|
||||
end
|
||||
|
||||
return music_session
|
||||
end
|
||||
|
||||
# client_id = the id of the client being accessed
|
||||
# client = the current client
|
||||
def access_p2p(client_id, user, msg)
|
||||
|
||||
return nil
|
||||
# ping_request and ping_ack messages are special in that they are simply allowed
|
||||
if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK
|
||||
return nil
|
||||
end
|
||||
|
||||
client_connection = Connection.find_by_client_id(client_id)
|
||||
|
||||
if client_connection.nil?
|
||||
raise PermissionError, 'specified client not found'
|
||||
end
|
||||
|
||||
if !client_connection.access_p2p? user
|
||||
raise SessionError, 'not allowed to message this client'
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def handle_client_directed(to_client_id, client_msg, client)
|
||||
context = @clients[client]
|
||||
|
||||
# by not catching any exception here, a PermissionError will be thrown if this isn't valid
|
||||
# if for some reason the client is trying to send to a client that it doesn't
|
||||
# belong to
|
||||
access_p2p(to_client_id, context.user, client_msg)
|
||||
|
||||
# populate routing data
|
||||
client_msg.from = client.client_id
|
||||
|
||||
@log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}"
|
||||
|
||||
# put it on the topic exchange for clients
|
||||
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
|
||||
end
|
||||
|
||||
def handle_user_directed(user_id, client_msg, client)
|
||||
|
||||
@log.debug "publishing to user #{user_id} from client_id #{client.client_id}"
|
||||
|
||||
# put it on the topic exchange for users
|
||||
@users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}")
|
||||
end
|
||||
|
||||
def handle_session_directed(session_id, client_msg, client)
|
||||
context = @clients[client]
|
||||
|
||||
user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id)
|
||||
end
|
||||
|
||||
# sends a message to a session on behalf of a user
|
||||
# if this is originating in the context of a client, it should be specified as :client_id => "value"
|
||||
# client_msg should be a well-structure message (jam-pb message)
|
||||
def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""})
|
||||
music_session = access_music_session(music_session_id, user)
|
||||
|
||||
# gather up client_ids in the session
|
||||
client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] }
|
||||
|
||||
publish_to_session(music_session.id, client_ids, client_msg.to_s, sender)
|
||||
end
|
||||
|
||||
|
||||
# sends a message to a session with no checking of permissions
|
||||
# this method deliberately has no database interactivity/active_record objects
|
||||
def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""})
|
||||
|
||||
EM.schedule do
|
||||
sender_client_id = sender[:client_id]
|
||||
|
||||
# iterate over each person in the session, and send a p2p message
|
||||
client_ids.each do |client_id|
|
||||
|
||||
@@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}"
|
||||
# put it on the topic exchange3 for clients
|
||||
self.class.client_exchange.publish(client_msg, :routing_key => "client.#{music_session_id}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def extract_ip(client)
|
||||
return Socket.unpack_sockaddr_in(client.get_peername)[1]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
require 'em-websocket'
|
||||
|
||||
module JamWebsockets
|
||||
|
||||
class Server
|
||||
|
||||
def initialize(options={})
|
||||
@log = Logging.logger[self]
|
||||
@count=0
|
||||
@router = Router.new
|
||||
end
|
||||
|
||||
def run(options={})
|
||||
|
||||
host = "0.0.0.0"
|
||||
port = options[:port]
|
||||
connect_time_stale = options[:connect_time_stale].to_i
|
||||
connect_time_expire = options[:connect_time_expire].to_i
|
||||
|
||||
@log.info "starting server #{host}:#{port} with staleness_time=#{connect_time_stale}; reconnect time = #{connect_time_expire}"
|
||||
|
||||
EventMachine.run do
|
||||
@router.start(connect_time_stale) do
|
||||
# take stale off the expire limit because the call to stale will
|
||||
# touch the updated_at column, adding an extra stale limit to the expire time limit
|
||||
# expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire
|
||||
expire_time = connect_time_expire
|
||||
start_connection_expiration(expire_time)
|
||||
start_connection_flagger(connect_time_stale)
|
||||
|
||||
start_websocket_listener(host, port, options[:emwebsocket_debug])
|
||||
end
|
||||
|
||||
# if you don't do this, the app won't exit unless you kill -9
|
||||
at_exit do
|
||||
@log.info "cleaning up server"
|
||||
@router.cleanup
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
def start_websocket_listener(listen_ip, port, emwebsocket_debug)
|
||||
EventMachine::WebSocket.start(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
|
||||
@log.info "new client #{ws}"
|
||||
@router.new_client(ws)
|
||||
end
|
||||
end
|
||||
|
||||
def start_connection_expiration(stale_max_time)
|
||||
# one cleanup on startup
|
||||
expire_stale_connections(stale_max_time)
|
||||
|
||||
EventMachine::PeriodicTimer.new(stale_max_time) do
|
||||
expire_stale_connections(stale_max_time)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def expire_stale_connections(stale_max_time)
|
||||
client_ids = []
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
client_ids = connection_manager.stale_connection_client_ids(stale_max_time)
|
||||
end
|
||||
# @log.debug("*** expire_stale_connections(#{stale_max_time}): client_ids = #{client_ids.inspect}")
|
||||
@router.cleanup_clients_with_ids(client_ids)
|
||||
end
|
||||
|
||||
def start_connection_flagger(flag_max_time)
|
||||
# one cleanup on startup
|
||||
flag_stale_connections(flag_max_time)
|
||||
|
||||
EventMachine::PeriodicTimer.new(flag_max_time/2) do
|
||||
flag_stale_connections(flag_max_time)
|
||||
end
|
||||
end
|
||||
|
||||
def flag_stale_connections(flag_max_time)
|
||||
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
connection_manager.flag_stale_connections(flag_max_time)
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
class SessionError < Exception
|
||||
|
||||
end
|
||||
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
module JamWebsockets
|
||||
VERSION = "0.0.1"
|
||||
end
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
|
||||
bundle exec jam_db up --connopts=dbname:jam host:localhost user:postgres password:postgres --verbose
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
#!/bin/sh
|
||||
|
||||
export BUILD_NUMBER=1
|
||||
|
||||
exec "/usr/local/rbenv/versions/1.9.3-p327/bin/ruby" "$@"
|
||||
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
#!/bin/sh
|
||||
|
||||
set -eu
|
||||
|
||||
NAME="websocket-gateway"
|
||||
|
||||
USER="$NAME"
|
||||
GROUP="$NAME"
|
||||
|
||||
# copy upstart file
|
||||
cp /var/lib/$NAME/script/package/$NAME.conf /etc/init/$NAME.conf
|
||||
|
||||
mkdir -p /var/lib/$NAME/log
|
||||
|
||||
chown -R $USER:$GROUP /var/lib/$NAME
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
#!/bin/sh
|
||||
|
||||
|
||||
|
||||
NAME="websocket-gateway"
|
||||
|
||||
set -e
|
||||
if [ "$1" = "remove" ]
|
||||
then
|
||||
set +e
|
||||
# stop the process, if any is found. we don't want this failing to cause an error, though.
|
||||
stop websocket-gateway
|
||||
set -e
|
||||
|
||||
if [ -f /etc/init/websocket-gateway.conf ]; then
|
||||
rm /etc/init/websocket-gateway.conf
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ "$1" = "purge" ]
|
||||
then
|
||||
if [ -d /var/lib/$NAME ]; then
|
||||
rm -rf /var/lib/$NAME
|
||||
fi
|
||||
|
||||
userdel $NAME
|
||||
fi
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
#!/bin/sh
|
||||
|
||||
set -eu
|
||||
|
||||
NAME="websocket-gateway"
|
||||
|
||||
HOME="/var/lib/$NAME"
|
||||
USER="$NAME"
|
||||
GROUP="$NAME"
|
||||
|
||||
# if NIS is used, then errors can occur but be non-fatal
|
||||
if which ypwhich >/dev/null 2>&1 && ypwhich >/dev/null 2>&1
|
||||
then
|
||||
set +e
|
||||
fi
|
||||
|
||||
if ! getent group "$GROUP" >/dev/null
|
||||
then
|
||||
addgroup --system "$GROUP" >/dev/null
|
||||
fi
|
||||
|
||||
# creating user if it isn't already there
|
||||
if ! getent passwd "$USER" >/dev/null
|
||||
then
|
||||
adduser \
|
||||
--system \
|
||||
--home $HOME \
|
||||
--shell /bin/false \
|
||||
--disabled-login \
|
||||
--ingroup "$GROUP" \
|
||||
--gecos "$USER" \
|
||||
"$USER" >/dev/null
|
||||
fi
|
||||
|
||||
# NISno longer a possible problem; stop ignoring errors
|
||||
set -e
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
#!/bin/bash -l
|
||||
|
||||
# default config values
|
||||
BUILD_NUMBER=1
|
||||
|
||||
CONFIG_FILE="/etc/websocket-gateway/upstart.conf"
|
||||
if [ -e "$CONFIG_FILE" ]; then
|
||||
. "$CONFIG_FILE"
|
||||
fi
|
||||
|
||||
# I don't like doing this, but the next command (bundle exec) retouches/generates
|
||||
# the gemfile. This unfortunately means the next debian update doesn't update this file.
|
||||
# Ultimately this means an old Gemfile.lock is left behind for a new package,
|
||||
# and bundle won't run because it thinks it has the wrong versions of gems
|
||||
rm -f Gemfile.lock
|
||||
|
||||
BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
description "websocket-gateway"
|
||||
|
||||
start on startup
|
||||
start on runlevel [2345]
|
||||
stop on runlevel [016]
|
||||
|
||||
exec start-stop-daemon --start --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
FactoryGirl.define do
|
||||
factory :user, :class => JamRuby::User do
|
||||
sequence(:email) { |n| "person_#{n}@example.com"}
|
||||
sequence(:first_name) { |n| "Person" }
|
||||
sequence(:last_name) { |n| "#{n}" }
|
||||
password "foobar"
|
||||
password_confirmation "foobar"
|
||||
email_confirmed true
|
||||
musician true
|
||||
city "Apex"
|
||||
state "NC"
|
||||
country "USA"
|
||||
terms_of_service true
|
||||
|
||||
|
||||
factory :admin do
|
||||
admin true
|
||||
end
|
||||
|
||||
before(:create) do |user|
|
||||
user.musician_instruments << FactoryGirl.build(:musician_instrument, user: user)
|
||||
end
|
||||
end
|
||||
|
||||
factory :music_session, :class => JamRuby::MusicSession do
|
||||
sequence(:description) { |n| "Jam Session #{n}" }
|
||||
fan_chat true
|
||||
fan_access true
|
||||
approval_required false
|
||||
musician_access true
|
||||
legal_terms true
|
||||
end
|
||||
|
||||
factory :connection, :class => JamRuby::Connection do
|
||||
ip_address "1.1.1.1"
|
||||
as_musician true
|
||||
end
|
||||
|
||||
factory :instrument, :class => JamRuby::Instrument do
|
||||
description { |n| "Instrument #{n}" }
|
||||
end
|
||||
|
||||
factory :musician_instrument, :class=> JamRuby::MusicianInstrument do
|
||||
instrument { Instrument.find('electric guitar') }
|
||||
proficiency_level 1
|
||||
priority 0
|
||||
end
|
||||
end
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe ClientContext do
|
||||
|
||||
let(:context) {ClientContext.new({}, "client1")}
|
||||
|
||||
describe 'hashing' do
|
||||
it "hash correctly" do
|
||||
set = Set.new
|
||||
set.add?(context).should eql(set)
|
||||
set.add?(context).should be_nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -0,0 +1,283 @@
|
|||
require 'spec_helper'
|
||||
require 'thread'
|
||||
|
||||
LoginClient = Class.new do
|
||||
attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id
|
||||
|
||||
|
||||
def initialize()
|
||||
|
||||
end
|
||||
|
||||
def onopen(&block)
|
||||
@onopenblock = block
|
||||
end
|
||||
|
||||
def onmessage(&block)
|
||||
@onmsgblock = block
|
||||
end
|
||||
|
||||
def close(&block)
|
||||
@oncloseblock = block
|
||||
end
|
||||
|
||||
def close_websocket()
|
||||
|
||||
end
|
||||
|
||||
def send(msg)
|
||||
puts msg
|
||||
end
|
||||
|
||||
def get_peername
|
||||
return "\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" # 37643, "localhost"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
# does a login and returns client
|
||||
def login(router, user, password, client_id)
|
||||
|
||||
message_factory = MessageFactory.new
|
||||
client = LoginClient.new
|
||||
|
||||
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil, false)
|
||||
|
||||
router.should_receive(:send_to_client) do |*args|
|
||||
args.count.should == 2
|
||||
args[0].should == client
|
||||
args[1].is_a?(Jampb::ClientMessage).should be_true
|
||||
end
|
||||
router.should_receive(:extract_ip).at_least(:once).with(client).and_return("127.0.0.1")
|
||||
client.should_receive(:onclose)
|
||||
client.should_receive(:onerror)
|
||||
|
||||
#client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")
|
||||
|
||||
@router.new_client(client)
|
||||
handshake = double("handshake")
|
||||
handshake.should_receive(:query).and_return({ "pb" => "true" })
|
||||
client.onopenblock.call handshake
|
||||
|
||||
# create a login message, and pass it into the router via onmsgblock.call
|
||||
login = message_factory.login_with_user_pass(user.email, password, :client_id => client_id)
|
||||
|
||||
# first log in
|
||||
client.onmsgblock.call login.to_s
|
||||
|
||||
# then join music session
|
||||
return client
|
||||
end
|
||||
|
||||
# currently commented out; we have deprecated logging in for jam sessions via websocket-gateway;
|
||||
# use rest API instead (or direct db access with factory-girl)
|
||||
def login_music_session(router, client, music_session)
|
||||
#message_factory = MessageFactory.new
|
||||
#login_music_session = message_factory.login_music_session(music_session.id)
|
||||
#login_ack = message_factory.login_music_session_ack(false, nil);
|
||||
#router.should_receive(:send_to_client).with(client, login_ack)
|
||||
#client.onmsgblock.call login_music_session.to_s
|
||||
end
|
||||
|
||||
|
||||
describe Router do
|
||||
include EventedSpec::EMSpec
|
||||
|
||||
message_factory = MessageFactory.new
|
||||
|
||||
em_before do
|
||||
@router = Router.new()
|
||||
end
|
||||
|
||||
subject { @router }
|
||||
|
||||
em_after do
|
||||
|
||||
end
|
||||
|
||||
|
||||
describe "serviceability" do
|
||||
it "should start and stop", :mq => true do
|
||||
#em do
|
||||
done
|
||||
#end
|
||||
end
|
||||
|
||||
it "should register for client events", :mq => true do
|
||||
#em do
|
||||
client = double("client")
|
||||
client.should_receive(:onopen)
|
||||
client.should_receive(:onclose)
|
||||
client.should_receive(:onerror)
|
||||
client.should_receive(:onmessage)
|
||||
client.should_receive(:encode_json=)
|
||||
|
||||
@router.new_client(client)
|
||||
done
|
||||
#end
|
||||
end
|
||||
end
|
||||
|
||||
describe "topic routing helpers" do
|
||||
it "create and delete user lookup set" do
|
||||
#em do
|
||||
user = double(User)
|
||||
user.should_receive(:id).any_number_of_times.and_return("1")
|
||||
client = double("client")
|
||||
context = ClientContext.new(user, client)
|
||||
|
||||
@router.user_context_lookup.length.should == 0
|
||||
|
||||
@router.add_user(context)
|
||||
|
||||
@router.user_context_lookup.length.should == 1
|
||||
|
||||
@router.remove_user(context)
|
||||
|
||||
@router.user_context_lookup.length.should == 0
|
||||
done
|
||||
#end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
describe "login" do
|
||||
it "should not allow login of bogus user", :mq => true do
|
||||
#em do
|
||||
TestClient = Class.new do
|
||||
|
||||
attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id
|
||||
|
||||
def initialize()
|
||||
|
||||
end
|
||||
|
||||
def onopen(&block)
|
||||
@onopenblock = block
|
||||
end
|
||||
|
||||
def onmessage(&block)
|
||||
@onmsgblock = block
|
||||
end
|
||||
|
||||
def close_websocket()
|
||||
end
|
||||
|
||||
def close()
|
||||
end
|
||||
end
|
||||
|
||||
client = TestClient.new
|
||||
|
||||
error_msg = message_factory.server_rejection_error("invalid login")
|
||||
|
||||
@router.should_receive(:send_to_client).with(client, error_msg)
|
||||
client.should_receive(:close_websocket)
|
||||
client.should_receive(:onclose)
|
||||
client.should_receive(:onerror)
|
||||
|
||||
|
||||
@router.new_client(client)
|
||||
handshake = double("handshake")
|
||||
handshake.should_receive(:query).and_return({"pb" => "true"})
|
||||
client.onopenblock.call handshake
|
||||
|
||||
# create a login message, and pass it into the router via onmsgblock.call
|
||||
login = message_factory.login_with_user_pass("baduser@example.com", "foobar")
|
||||
|
||||
client.onmsgblock.call login.to_s
|
||||
done
|
||||
#end
|
||||
end
|
||||
|
||||
it "should allow login of valid user", :mq => true do
|
||||
#em do
|
||||
@user = FactoryGirl.create(:user,
|
||||
:password => "foobar", :password_confirmation => "foobar")
|
||||
client1 = login(@router, @user, "foobar", "1")
|
||||
done
|
||||
#end
|
||||
end
|
||||
|
||||
it "should allow music_session_join of valid user", :mq => true do
|
||||
#em do
|
||||
user1 = FactoryGirl.create(:user) # in the music session
|
||||
user2 = FactoryGirl.create(:user) # in the music session
|
||||
user3 = FactoryGirl.create(:user) # not in the music session
|
||||
|
||||
music_session = FactoryGirl.create(:music_session, :creator => user1)
|
||||
|
||||
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "4")
|
||||
music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "5")
|
||||
|
||||
# make a music_session and define two members
|
||||
|
||||
# create client 1, log him in, and log him in to music session
|
||||
client1 = login(@router, user1, "foobar", "1")
|
||||
login_music_session(@router, client1, music_session)
|
||||
done
|
||||
#end
|
||||
end
|
||||
|
||||
it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do
|
||||
#em do
|
||||
user1 = FactoryGirl.create(:user) # in the music session
|
||||
user2 = FactoryGirl.create(:user) # in the music session
|
||||
|
||||
music_session = FactoryGirl.create(:music_session, :creator => user1)
|
||||
|
||||
|
||||
# create client 1, log him in, and log him in to music session
|
||||
client1 = login(@router, user1, "foobar", "1")
|
||||
login_music_session(@router, client1, music_session)
|
||||
|
||||
client2 = login(@router, user2, "foobar", "2")
|
||||
login_music_session(@router, client2, music_session)
|
||||
|
||||
# make a music_session and define two members
|
||||
|
||||
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "6")
|
||||
music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "7")
|
||||
|
||||
done
|
||||
#end
|
||||
end
|
||||
|
||||
it "should allow two valid subscribers to communicate with p2p messages", :mq => true do
|
||||
#em do
|
||||
user1 = FactoryGirl.create(:user) # in the music session
|
||||
user2 = FactoryGirl.create(:user) # in the music session
|
||||
|
||||
music_session = FactoryGirl.create(:music_session, :creator => user1)
|
||||
|
||||
# create client 1, log him in, and log him in to music session
|
||||
client1 = login(@router, user1, "foobar", "1")
|
||||
#login_music_session(@router, client1, music_session)
|
||||
|
||||
client2 = login(@router, user2, "foobar", "2")
|
||||
#login_music_session(@router, client2, music_session)
|
||||
|
||||
# by creating
|
||||
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "8")
|
||||
|
||||
# now attempt to message p2p!
|
||||
|
||||
# first test: user 2 should be able to send a ping message to user 1, even though he isn't in the same session yet
|
||||
|
||||
# create a login message, and pass it into the router via onmsgblock.call
|
||||
ping = message_factory.ping_request("1", "2")
|
||||
|
||||
#@router.should_receive(:send_to_client) #.with(client1, ping)
|
||||
|
||||
## send ping to client 2
|
||||
#client2.onmsgblock.call ping.to_s
|
||||
|
||||
#music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "2")
|
||||
|
||||
done
|
||||
#end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
class SpecDb
|
||||
|
||||
TEST_DB_NAME="jam_websockets_test"
|
||||
|
||||
def self.recreate_database
|
||||
conn = PG::Connection.open("dbname=postgres user=postgres password=postgres host=localhost")
|
||||
conn.exec("DROP DATABASE IF EXISTS #{TEST_DB_NAME}")
|
||||
conn.exec("CREATE DATABASE #{TEST_DB_NAME}")
|
||||
JamDb::Migrator.new.migrate(:dbname => TEST_DB_NAME, :user => "postgres", :password => "postgres", :host => "localhost")
|
||||
end
|
||||
|
||||
end
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
require 'active_record'
|
||||
require 'jam_db'
|
||||
require 'spec_db'
|
||||
|
||||
# recreate test database and migrate it
|
||||
db_config = YAML::load(File.open('config/database.yml'))["test"]
|
||||
|
||||
SpecDb::recreate_database()
|
||||
# initialize ActiveRecord's db connection
|
||||
ActiveRecord::Base.establish_connection(db_config)
|
||||
|
||||
|
||||
require 'jam_websockets'
|
||||
require 'timeout'
|
||||
require 'evented-spec'
|
||||
|
||||
jamenv = ENV['JAMENV']
|
||||
jamenv ||= 'development'
|
||||
|
||||
fn = "#{File.dirname(__FILE__)}/../config/application.yml"
|
||||
puts "*** spec_helper.rb: fn=#{fn}; #{File.exists?(fn)}; #{jamenv}"
|
||||
|
||||
ff = File.open("#{File.dirname(__FILE__)}/../config/application.yml",'r')
|
||||
config = YAML::load(ff)[jamenv]
|
||||
puts "*** spec_helper.rb: jamenv=#{jamenv}; config = #{config}"
|
||||
|
||||
if config["verbose"]
|
||||
Logging.logger.root.level = :debug
|
||||
else
|
||||
Logging.logger.root.level = :info
|
||||
end
|
||||
|
||||
Logging.logger.root.appenders = Logging.appenders.stdout
|
||||
|
||||
|
||||
require 'jam_ruby'
|
||||
require 'jampb'
|
||||
require 'rubygems'
|
||||
#require 'spork'
|
||||
require 'database_cleaner'
|
||||
require 'factory_girl'
|
||||
require 'factories'
|
||||
|
||||
include JamRuby
|
||||
include JamWebsockets
|
||||
include Jampb
|
||||
|
||||
|
||||
#uncomment the following line to use spork with the debugger
|
||||
#require 'spork/ext/ruby-debug'
|
||||
|
||||
|
||||
#Spork.prefork do
|
||||
# Loading more in this block will cause your tests to run faster. However,
|
||||
# if you change any configuration or code from libraries loaded here, you'll
|
||||
# need to restart spork for it take effect.
|
||||
# This file is copied to spec/ when you run 'rails generate rspec:install'
|
||||
#ENV["RAILS_ENV"] ||= 'test'
|
||||
#require File.expand_path("../../config/environment", __FILE__)
|
||||
require 'rspec/autorun'
|
||||
#require 'rspec/rails'
|
||||
# This file was generated by the `rspec --init` command. Conventionally, all
|
||||
# specs live under a `spec` directory, which RSpec adds to the `$LOAD_PATH`.
|
||||
# Require this file using `require "spec_helper"` to ensure that it is only
|
||||
# loaded once.
|
||||
#
|
||||
# See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration
|
||||
RSpec.configure do |config|
|
||||
config.treat_symbols_as_metadata_keys_with_true_values = true
|
||||
config.run_all_when_everything_filtered = true
|
||||
config.filter_run :focus
|
||||
|
||||
config.before(:each) do
|
||||
DatabaseCleaner.start
|
||||
end
|
||||
|
||||
config.after(:each) do
|
||||
DatabaseCleaner.clean
|
||||
end
|
||||
|
||||
config.before(:suite) do
|
||||
DatabaseCleaner.strategy = :truncation, {:except => %w[instruments genres] }
|
||||
DatabaseCleaner.clean_with(:truncation, {:except => %w[instruments genres] })
|
||||
end
|
||||
|
||||
#config.after(:each) do
|
||||
# ActiveRecord::Base.connection.execute('select truncate_tables()')
|
||||
# # DatabaseCleaner.clean
|
||||
# end
|
||||
|
||||
# If you're not using ActiveRecord, or you'd prefer not to run each of your
|
||||
# examples within a transaction, remove the following line or assign false
|
||||
# instead of true.
|
||||
#config.use_transactional_fixtures = true
|
||||
|
||||
# Run specs in random order to surface order dependencies. If you find an
|
||||
# order dependency and want to debug it, you can fix the order by providing
|
||||
# the seed, which is printed after each run.
|
||||
# --seed 1234
|
||||
config.order = 'random'
|
||||
end
|
||||
#end
|
||||
|
||||
|
||||
#Spork.each_run do
|
||||
# This code will be run each time you run your specs.
|
||||
|
||||
#end
|
||||
Loading…
Reference in New Issue