Concurrency isn’t easy. Getting a program to do more than one thing at a time has traditionally meant hassling with mutexes, race conditions, lock contention, and the rest of the unpleasant baggage that comes along with multithreading. Event-based concurrency models alleviate some of these concerns, but can turn large programs into a rat’s nest of callback functions. No wonder, then, that concurrent programming is a task most programmers dread, or avoid altogether by retreating to multiple independent processes that share data externally (for example, through a database or message queue).
A large part of the difficulty of concurrent programming comes down to state: how do you know what your multithreaded program is doing, and when? What value does a particular variable hold when you have 2 threads running, or 5, or 50? How can you guarantee that your program’s many tendrils aren’t clobbering one another in a race to take action? A thread-based concurrency paradigm poses more questions than it answers.
Thankfully, Scala offers a reasonable, flexible approach to concurrency that we’ll explore in this chapter.
Though you may have heard of Scala and Actors in the same breath, Actors aren’t a concept unique to Scala. Actors, originally intended for use in Artificial Intelligence research, were first put forth in 1973 (see [Hewitt1973] and [Agha1987]). Since then, variations on the idea of Actors have appeared in a number of programming languages, most notably in Erlang and Io. As an abstraction, Actors are general enough that they can be implemented as a library (as in Scala), or as the fundamental unit of a computational system.
Fundamentally, an Actor is an object that receives messages and takes action on those messages. The order in which messages arrive is unimportant to an Actor, though some Actor implementations (such as Scala’s) queue messages in order. An Actor might handle a message internally, or it might send a message to another Actor, or it might create another Actor to take action based on the message. Actors are a very high-level abstraction.
Unlike traditional object systems (which, you might be thinking to yourself, have many of the same properties we’ve described), Actors don’t enforce a sequence or ordering to their actions. This inherent eschewing of sequentiality, coupled with independence from shared global state, allow Actors to do their work in parallel. As we’ll see later on, the judicious use of immutable data fits the Actor model ideally, and further aids in safe, comprehensible concurrent programming.
At their most basic, Actors
in Scala are objects that inherit from
scala.actors.Actor
:
// code-examples/Concurrency/simple-actor-script.scala
import
scala.actors.Actorclass
Redford
extends
Actor
{def
act
() { println("A lot of what acting is, is paying attention."
) } }val
robert =new
Redford
robert.start
As we can see, an Actor
defined in this way must be both instantiated and started, similar to how
threads are handled in Java. It must also implement the abstract method
act
, which returns Unit
. Once we’ve
started this simple Actor, the following sage advice for thespians is
printed to the console:
A lot of what acting is, is paying attention.
The
scala.actors
package contains a factory method for
creating Actors that avoids much of the setup in the above example. We can
import this method and other convenience methods from
scala.actors.Actors._
. Here is a factory-made
Actor:
// code-examples/Concurrency/factory-actor-script.scala
import
scala.actors.Actorimport
scala.actors.Actor._val
paulNewman = actor { println("To be an actor, you have to be a child."
) }
While a subclass that
extends the Actor
class must define
act
in order to be concrete, a factory-produced Actor
has no such limitation. In this shorter example, the body of the method
passed to actor
is effectively promoted to the
act
method from our first example. Predictably, this
Actor also prints a message when run. Illuminating, but we still haven’t
shown the essential piece of the Actors puzzle: sending messages.
Actors can receive any sort of object as a message, from strings of text to numeric types to whatever classes you’ve cooked up in your programs. For this reason, Actors and pattern matching go hand in hand. An Actor should only act on messages of familiar types; a pattern match on the class and/or contents of a message is good defensive programming and increases the readability of Actor code:
// code-examples/Concurrency/pattern-match-actor-script.scala
import
scala.actors.Actorimport
scala.actors.Actor._val
fussyActor = actor { loop { receive {case
s:String => println
("I got a String: "
+ s)case
i:Int => println
("I got an Int: "
+ i.toString)case
_
=>
println("I have no idea what I just got."
) } } } fussyActor !"hi there"
fussyActor !23
fussyActor !3.33
This example prints the following when run:
I got a String: hi there I got an Int: 23 I have no idea what I just got.
The body of
fussyActor
is a receive
method
wrapped in a loop
. loop
is
essentially a nice shortcut for while(true)
; it does
whatever is inside its block repeatedly. receive
blocks until it gets a message of a type that will satisfy one of its
internal pattern matching cases.
The final lines of this
example demonstrate use of the !
(exclamation point,
or bang) method to send messages to our Actor. If
you’ve ever seen Actors in Erlang, you’ll find this syntax familiar. The
Actor is always on the lefthand side of the bang, and the message being
sent to said Actor is always on the right. If you need a mnemonic for
this granule of syntactic sugar, imagine that you’re an irate director
shouting commands at your Actors.
Every Actor has a mailbox in which messages sent to that Actor are queued. Let’s see an example where we inspect the size of an Actor’s mailbox:
// code-examples/Concurrency/actor-mailbox-script.scala
import
scala.actors.Actorimport
scala.actors.Actor._val
countActor = actor { loop { react {case
"how many?"
=>
{ println("I've got "
+ mailboxSize.toString +" messages in my mailbox."
) } } } } countActor !1
countActor !2
countActor !3
countActor !"how many?"
countActor !"how many?"
countActor !4
countActor !"how many?"
This example produces the following output:
I've got 3 messages in my mailbox. I've got 3 messages in my mailbox. I've got 4 messages in my mailbox.
Note that the first and
second lines of output are identical. Because our Actor was set up
solely to process messages of the string "how many?"
,
those messages didn’t remain in its mailbox. Only the messages of types
we didn’t know about—in this case, Int
—remained
unprocessed.
Now that we’ve got a basic sense of what Actors are and how they’re used in Scala, let’s put them to work. Specifically, let’s put them to work cutting hair. The sleeping barber problem (see [SleepingBarberProblem]) is one of a popular set of computer science hypotheticals designed to demonstrate issues of concurrency and synchronization.
The problem is this: a hypothetical barber shop has just one barber with one barber chair, and three chairs in which customers may wait for a haircut. Without customers around, the barber sleeps. When a customer arrives, the barber wakes up to cut his hair. If the barber is busy cutting hair when a customer arrives, the customer sits down in an available chair. If a chair isn’t available, the customer leaves.
The sleeping barber problem is usually solved with semaphores and mutexes, but we’ve got better tools at our disposal. Straight away, we see several things to model as Actors: the barber is clearly one, as are the customers. The barbershop itself could be modeled as an Actor, too; there need not be a real-world parallel to verbal communication in an Actor system, even though we’re sending messages.
Let’s start with the sleeping barber’s customers, as they have the simplest responsibilities:
// code-examples/Concurrency/sleepingbarber/customer.scala
package
sleepingbarberimport
scala.actors.Actorimport
scala.actors.Actor._case
object
Haircut
class
Customer
(val
id:Int
)extends
Actor
{var
shorn =false
def
act
() = { loop { react {case
Haircut
=>
{ shorn =true
println("[c] customer "
+ id +" got a haircut"
) } } } } }
For the most part, this
should look pretty familiar: we declare the package in which this code
lives, we import code from the scala.actors
package,
and we define a class that extends Actor
. There are a
few details worth noting, however.
First of all, there’s our
declaration of case object Haircut
. A common pattern
when working with Actors in Scala is to use a case
object
to represent a message without internal data. If we
wanted to include, say, the time at which the haircut was completed,
we’d use a case class
instead. We declare
Haircut
here because it’s a message type that will be
sent solely to customers.
Note as well that we’re
storing one bit of mutable state in each Customer
:
whether or not they’ve gotten a haircut. In their internal loop, each
Customer
waits for a Haircut
message and, upon receipt of one, we set the shorn
boolean to true
. Customer
uses the
asynchronous react
method to respond to incoming
messages. If we needed to return the result of processing the message,
we would use receive
, but we don’t, and in the
process we save some memory and thread use under the hood.
Let’s move on to the
barber himself. Because there’s only one barber, we could have used the
actor
factory method technique mentioned earlier to
create him. For testing purposes, we’ve instead defined our own
Barber
class:
// code-examples/Concurrency/sleepingbarber/barber.scala
package
sleepingbarberimport
scala.actors.Actorimport
scala.actors.Actor._import
scala.util.Randomclass
Barber
extends
Actor
{private
val
random =new
Random
()def
helpCustomer
(customer:Customer
) {if
(self.mailboxSize >=3
) { println("[b] not enough seats, turning customer "
+ customer.id +" away"
) }else
{ println("[b] cutting hair of customer "
+ customer.id) Thread.sleep(100
+ random.nextInt(400
)) customer !Haircut
} }def
act
() { loop { react {case
customer:Customer => helpCustomer
(customer) } } } }
The core of the
Barber
class looks very much like the
Customer
. We loop around react
,
waiting for a particular type of object. To keep that loop tight and
readable, we call a method, helpCustomer
, when a new
Customer
is sent to the barber. Within that method we
employ a check on the mailbox size to serve as our “chairs” that
customers may occupy; we could have the Barber
or
Shop
classes maintain an internal
Queue
, but why bother when each Actor’s mailbox
already is one?
If three or more
customers are in the queue, we simply ignore that message; it’s then
discarded from the barber’s mailbox. Otherwise, we simulate a
semi-random delay (always at least 100 milliseconds) for the time it
takes to cut a customer’s hair, then send off a
Haircut
message to that customer. (Were we not trying
to simulate a real-world scenario,
we would of course remove the call to Thread.sleep()
and allow our barber to run full tilt.)
Next up, we have a simple class to represent the barbershop itself:
// code-examples/Concurrency/sleepingbarber/shop.scala
package
sleepingbarberimport
scala.actors.Actorimport
scala.actors.Actor._class
Shop
extends
Actor
{val
barber =new
Barber
() barber.startdef
act
() { println("[s] the shop is open"
) loop { react {case
customer:Customer => barber
! customer } } } }
By now, this should all
look very familiar. Each Shop
creates and starts a
new Barber
, prints a message telling the world that
the shop is open, and sits in a loop waiting for customers. When a
Customer
comes in, he’s sent to the barber. We now
see an unexpected benefit of Actors: they allow us to describe
concurrent business logic in easily understood terms. “Send the customer
to the barber” makes perfect sense, much more so than “Notify the
barber, unlock the mutex around the customer seats, increment the number
of free seats,” and so forth. Actors get us closer to our domain.
Finally, we have a driver for our simulation:
// code-examples/Concurrency/sleepingbarber/barbershop-simulator.scala
package
sleepingbarberimport
scala.actors.Actor._import
scala.collection.{immutable, mutable}import
scala.util.Randomobject
BarbershopSimulator
{private
val
random =new
Random
()private
val
customers =new
mutable.ArrayBuffer
[Customer]
()private
val
shop =new
Shop
()def
generateCustomers
{for
(i<-
1
to20
) {val
customer =new
Customer
(i) customer.start() customers += customer } println("[!] generated "
+ customers.size +" customers"
) }// customers arrive at random intervals
def
trickleCustomers
{for
(customer<-
customers) { shop ! customer Thread.sleep(random.nextInt(450
)) } }def
tallyCuts
{// wait for any remaining concurrent actions to complete
Thread.sleep(2000
)val
shornCount = customers.filter(c=>
c.shorn).size println("[!] "
+ shornCount +" customers got haircuts today"
) }def
main
(args:Array[String]
) { println("[!] starting barbershop simulation"
) shop.start() generateCustomers trickleCustomers tallyCuts System.exit(0
) } }
After “opening the shop,” we
generate a number of Customer
objects, assigning a
numeric ID to each and storing the lot in an
ArrayBuffer
. Next, we “trickle” the customers in by
sending them as messages to the shop and sleeping for a semi-random
amount of time between loops. At the end of our simulated day, we tally
up the number of customers who got haircuts by filtering out the
customers whose internal shorn
boolean was set to
true
and asking for the size of the resulting
sequence.
Compile and run the code
within the sleepingbarber
directory as
follows:
fsc *.scala scala -classpath . sleepingbarber.BarbershopSimulator
Throughout our code, we’ve prefixed console messages with abbreviations for the classes from which the messages were printed. When we look at an example run of our simulator, it’s easy to see where each message came from:
[!] starting barbershop simulation [s] the shop is open [!] generated 20 customers [b] cutting hair of customer 1 [b] cutting hair of customer 2 [c] customer 1 got a haircut [c] customer 2 got a haircut [b] cutting hair of customer 3 [c] customer 3 got a haircut [b] cutting hair of customer 4 [b] cutting hair of customer 5 [c] customer 4 got a haircut [b] cutting hair of customer 6 [c] customer 5 got a haircut [b] cutting hair of customer 7 [c] customer 6 got a haircut [b] not enough seats, turning customer 8 away [b] cutting hair of customer 9 [c] customer 7 got a haircut [b] not enough seats, turning customer 10 away [c] customer 9 got a haircut [b] cutting hair of customer 11 [b] cutting hair of customer 12 [c] customer 11 got a haircut [b] cutting hair of customer 13 [c] customer 12 got a haircut [b] cutting hair of customer 14 [c] customer 13 got a haircut [b] not enough seats, turning customer 15 away [b] not enough seats, turning customer 16 away [b] not enough seats, turning customer 17 away [b] cutting hair of customer 18 [c] customer 14 got a haircut [b] cutting hair of customer 19 [c] customer 18 got a haircut [b] cutting hair of customer 20 [c] customer 19 got a haircut [c] customer 20 got a haircut [!] 15 customers got haircuts today
You’ll find that each run’s output is, predictably, slightly different. Every time the barber takes a bit longer to cut hair than it does for several customers to enter, the “chairs” (the barber’s mailbox queue) fill up, and new customers simply leave.
Of course, we have to include the standard caveats that come with
simple examples. For one, it’s possible that our example may not be
suitably random, particularly if random values are retrieved within a
millisecond of one another. This is a byproduct of the way the JVM
generates random numbers, and a good reminder to be careful about
randomness in concurrent programs. You’d also want to replace the
sleep
inside tallyCuts
with a
clearer signal that the various actors in the system are done doing
their work, perhaps by making the
BarbershopSimulation
an Actor and sending it messages
that indicate completion.
Try modifying the code to introduce more customers, additional message types, different delays, or to remove the randomness altogether. If you’re an experienced multithreaded programmer, you might try writing your own sleeping barber implementation just to compare and contrast. We’re willing to bet that an implementation in Scala with Actors will be terser and easier to maintain.
To get the most out of Actors, there are few things to remember. First, note that there are several methods you can use to get different types of behavior out of your Actors. Table 9-1 should help clarify when to use each method.
Method | Returns | Description |
|
| Abstract, top-level method for an Actor. Typically contains one of the following methods inside it. |
| Result of processing message | Blocks until a message of matched type is received. |
| Result of processing message | Like |
|
| Requires less overhead (threads) than
|
|
| Like |
Typically, you’ll want to
use react
wherever possible. If you need the results
of processing a message (that is, you need a synchronous response from
sending a message to an Actor), use the receiveWithin
variant to reduce your chances of blocking indefinitely on an Actor
that’s gotten wedged.
Another strategy to keep your Actor-based code asynchronous is the
use of futures. A future is a placeholder object
for a value that hasn’t yet been returned from an asynchronous process.
You can send a message to an Actor with the !!
method; a variant of this method allows you to pass along a partial
function that is applied to the future value. As you can see from the
following example, retrieving a value from a Future
is as straightforward as invoking its apply
method.
Note that retrieving a value from a Future
is a
blocking operation:
// code-examples/Concurrency/future-script.scala
import
scala.actors.Futures._val
eventually = future(5
*42
) println(eventually())
Each Actor in your system
should have clear responsibilities. Don’t use Actors for general-purpose, highly stateful tasks.
Instead, think like a director: what are the distinct roles in the “script” of your
application, and what’s the least amount of information each Actor needs to do its
job? Give each Actor just a couple of responsibilities, and use messages
(usually in the form of a case class
or case
object
) to delegate those responsibilities to other
Actors.
Don’t be hesitant to copy data when writing Actor-centric code. The more immutable your design, the less likely you are to end up with unexpected state. The more you communicate via messages, the less you have to worry about synchronization. All those messages and immutable variables might appear to be overly costly. But, with today’s plentiful hardware, trading memory overhead for clarity and predictability seems more than fair for most applications.
Lastly, know when Actors aren’t appropriate. Just because Actors are a great way to handle concurrency in Scala doesn’t mean that they’re the only way, as we’ll see soon. Traditional threading and locking may better suit write-heavy critical paths for which a messaging approach would incur too much overhead. In our experience, you can use a purely Actor-based design to prototype a concurrent solution, then use profiling tools to suss out parts of your application that might benefit from a different approach.
While Actors are a great way to handle concurrent operations, they’re not the only way to do so in Scala. As Scala is interoperable with Java, the concurrency concepts that you may be familiar with on the JVM still apply.
For starters, Scala provides a handy way to run a block of code in a new thread:
// code-examples/Concurrency/threads/by-block-script.scala
new
Thread
{ println("this will run in a new thread"
) }
A similar construct is
available in the scala.concurrent
package, as a
method on the ops
object to run a block
asynchronously with spawn
:
// code-examples/Concurrency/threads/spawn.scala
import
scala.concurrent.ops._object
SpawnExample
{def
main
(args:Array[String]
) { println("this will run synchronously"
) spawn { println("this will run asychronously"
) } } }
If you’re familiar with
the venerable java.util.concurrent
package, you’ll
find it just as easy to use from Scala (or hard to use, depending on
your point of view). Let’s use Executors
to create a
pool of threads. We’ll use the thread pool to run a simple class,
implementing Java’s Runnable
interface for
thread-friendly classes, that identifies which thread it’s running
on:
// code-examples/Concurrency/threads/util-concurrent-script.scala
import
java.util.concurrent._class
ThreadIdentifier
extends
Runnable
{def
run
{ println("hello from Thread "
+ currentThread.getId) } }val
pool = Executors.newFixedThreadPool(5
)for
(i<-
1
to10
) { pool.execute(new
ThreadIdentifier
) }
As is standard in Java
concurrency, the run
method is where a threaded class
starts. Every time our pool
executes a new
ThreadIdentifier
, its run
method
is invoked. A look at the output tells us that we’re running on the five
threads in the pool, with IDs ranging from 9 to 13:
hello from Thread 9 hello from Thread 10 hello from Thread 11 hello from Thread 12 hello from Thread 13 hello from Thread 9 hello from Thread 11 hello from Thread 10 hello from Thread 10 hello from Thread 13
This is, of course, just
scratching the surface of what is available in
java.util.concurrent
. You’ll find that your
existing knowledge of Java’s approach to multithreading still applies in
Scala. What’s more, you’ll be able to accomplish the same tasks using
less code, which should contribute to maintainability and
productivity.
Threading and Actors aren’t the only way to do concurrency. Event-based concurrency, a particular approach to asynchronous or non-blocking I/O (NIO), has become a favored way to write servers that need to scale to thousands of simultaneous clients. Eschewing the traditional one-to-one relationship of threads to clients, this model of concurrency exposes events that occur when particular conditions are met (for example, when data is received from a client over a network socket). Typically, the programmer will associate a callback method with each event that’s relevant to her program.
While the
java.nio
package provides a variety of useful
primitives for non-blocking I/O
(buffers, channels, etc.), it’s still a fair bit of work to cobble
together an event-based concurrent program from those simple parts.
Enter Apache MINA, built atop Java NIO and described on its home page as
“a network application framework which helps users develop high
performance and high scalability network applications easily” (see [MINA]).
While MINA may be easier to use than Java’s built-in NIO libraries, we’ve gotten used to some conveniences of Scala that just aren’t available in MINA. The open source Naggati library (see [Naggati]) adds a Scala-friendly layer atop MINA that, according to its author, “makes it easy to build protocol filters [using a] sequential style.” Essentially, Naggati is a DSL for parsing network protocols, with MINA’s powerful NIO abilities under the hood.
Let’s use Naggati to
write the foundations of an SMTP email server. To keep things simple,
we’re only dealing with two SMTP commands: HELO
and
QUIT
. The former command identifies a client, and the
latter ends the client’s session.
We’ll keep ourselves honest with a test suite, facilitated by the Specs Behavior-Driven Development library (see Specs):
// .../smtpd/src/test/scala/com/programmingscala/smtpd/SmtpDecoderSpec.scala
package
com.programmingscala.smtpdimport
java.nio.ByteOrderimport
net.lag.naggati._import
org.apache.mina.core.buffer.IoBufferimport
org.apache.mina.core.filterchain.IoFilterimport
org.apache.mina.core.session.{DummySession, IoSession}import
org.apache.mina.filter.codec._import
org.specs._import
scala.collection.{immutable, mutable}object
SmtpDecoderSpec
extends
Specification
{private
var
fakeSession:IoSession
=null
private
var
fakeDecoderOutput:ProtocolDecoderOutput
=null
private
var
written =new
mutable.ListBuffer
[Request]
def
quickDecode
(s:String
):Unit
= { Codec.decoder.decode(fakeSession, IoBuffer.wrap(s.getBytes), fakeDecoderOutput) }"SmtpRequestDecoder"
should { doBefore { written.clear() fakeSession =new
DummySession
fakeDecoderOutput =new
ProtocolDecoderOutput
{override
def
flush
(nextFilter:IoFilter.NextFilter
, s:IoSession
) = {}override
def
write
(obj:AnyRef
) = written += obj.asInstanceOf[Request
] } }"parse HELO"
in { quickDecode("HELO client.example.org
"
) written.size mustEqual1
written(0
).command mustEqual"HELO"
written(0
).data mustEqual"client.example.org"
}"parse QUIT"
in { quickDecode("QUIT
"
) written.size mustEqual1
written(0
).command mustEqual"QUIT"
written(0
).data mustEqualnull
} } }
After setting up an
environment for each test run, our suite exercises the two SMTP commands
we’re interested in. The doBefore
block runs before
each test, guaranteeing that mock session and output buffers are in a
clean state. In each test we’re passing a string of hypothetical client
input to our as-yet-unimplemented Codec
, then
verifying that the resulting Request
(a case
class
) contains the correct command
and
data
fields. As the QUIT
command
doesn’t require any additional information from the client, we simply
check that data
is null
.
With our tests in place, let’s implement a basic codec (an encoder and decoder) for SMTP:
// .../smtpd/src/main/scala/com/programmingscala/smtpd/Codec.scala
package
com.programmingscala.smtpdimport
org.apache.mina.core.buffer.IoBufferimport
org.apache.mina.core.session.{IdleStatus, IoSession}import
org.apache.mina.filter.codec._import
net.lag.naggati._import
net.lag.naggati.Steps._case
class
Request
(command:String
, data:String
)case
class
Response
(data:IoBuffer
)object
Codec
{val
encoder =new
ProtocolEncoder
{def
encode
(session:IoSession
, message:AnyRef
, out:ProtocolEncoderOutput
) = {val
buffer = message.asInstanceOf[Response
].data out.write(buffer) }def
dispose
(session:IoSession
):Unit
= {// no-op, required by ProtocolEncoder trait
} }val
decoder =new
Decoder
(readLine(true
,"ISO-8859-1"
) { line=>
line.split(' '
).firstmatch
{case
"HELO"
=>
state.out.write(Request
("HELO"
, line.split(' '
)(1
)));End
case
"QUIT"
=>
state.out.write(Request
("QUIT"
,null
));End
case
_
=>
throw
new
ProtocolError
("Malformed request line: "
+ line) } }) }
We first define a
Request
case class
in which to
store request data as it arrives. Then we specify the
encoder
portion of our codec, which exists simply to
write data out. A dispose
method is defined (but not
fleshed out) to fulfill the contract of the ProtocolEncoder
trait.
The decoder is what we’re
really interested in. readRequest
reads a line, picks
out the first word in that line, and pattern matches on it to find SMTP
commands. In the case of a HELO
command, we also grab
the subsequent string on that line. The results are placed in a
Request
object and written out to
state
. As you might imagine, state
stores our progress throughout the parsing process.
Though trivial, the above example demonstrates just how easy it is to parse protocols with Naggati. Now that we’ve got a working codec, let’s combine Naggati and MINA with Actors to wire up a server.
First, a few lines of setup grunt work to get things going for our SMTP server:
// .../smtpd/src/main/scala/com/programmingscala/smtpd/Main.scala
package
com.programmingscala.smtpdimport
net.lag.naggati.IoHandlerActorAdapterimport
org.apache.mina.filter.codec.ProtocolCodecFilterimport
org.apache.mina.transport.socket.SocketAcceptorimport
org.apache.mina.transport.socket.nio.{NioProcessor, NioSocketAcceptor}import
java.net.InetSocketAddressimport
java.util.concurrent.{Executors, ExecutorService}import
scala.actors.Actor._object
Main
{val
listenAddress ="0.0.0.0"
val
listenPort =2525
def
setMaxThreads
= {val
maxThreads = (Runtime.getRuntime.availableProcessors *2
) System.setProperty("actors.maxPoolSize"
, maxThreads.toString) }def
initializeAcceptor
= {var
acceptorExecutor = Executors.newCachedThreadPool()var
acceptor =new
NioSocketAcceptor
(acceptorExecutor,new
NioProcessor
(acceptorExecutor)) acceptor.setBacklog(1000
) acceptor.setReuseAddress(true
) acceptor.getSessionConfig.setTcpNoDelay(true
) acceptor.getFilterChain.addLast("codec"
,new
ProtocolCodecFilter
(smtpd.Codec.encoder, smtpd.Codec.decoder)) acceptor.setHandler(new
IoHandlerActorAdapter
(session=>
new
SmtpHandler
(session))) acceptor.bind(new
InetSocketAddress
(listenAddress, listenPort)) }def
main
(args:Array[String]
) { setMaxThreads initializeAcceptor println("smtpd: up and listening on "
+ listenAddress +":"
+ listenPort) } }
To ensure that we’re
getting the most out of the Actor instances in our server, we set the
actors.maxPoolSize
system property to twice the
number of available processors on our machine. We then initialize an
NioSocketAcceptor
, a key piece of MINA machinery that
accepts new connections from clients. The final three lines of this
configuration are critical, as they put our codec to work, tell the
acceptor to handle requests with a special object, and start the server
listening for new connections on port 2525 (real SMTP servers run on the
privileged port 25).
The aforementioned
special object is an Actor wrapped in an
IoHandlerActorAdapter
, a bridging layer between Scala
Actors and MINA that’s provided by Naggati. This is the piece of our
server that talks back to the client. Now that we know what the client
is saying, thanks to the decoder, we actually know what to say
back!
// .../smtpd/src/main/scala/com/programmingscala/smtpd/SmtpHandler.scala
package
com.programmingscala.smtpdimport
net.lag.naggati.{IoHandlerActorAdapter, MinaMessage, ProtocolError}import
org.apache.mina.core.buffer.IoBufferimport
org.apache.mina.core.session.{IdleStatus, IoSession}import
java.io.IOExceptionimport
scala.actors.Actorimport
scala.actors.Actor._import
scala.collection.{immutable, mutable}class
SmtpHandler
(val
session:IoSession
)extends
Actor
{ startdef
act
= { loop { react {case
MinaMessage.MessageReceived
(msg)=>
handle(msg.asInstanceOf[smtpd.Request
])case
MinaMessage.SessionClosed
=>
exit()case
MinaMessage.SessionIdle
(status)=>
session.closecase
MinaMessage.SessionOpened
=>
reply("220 localhost Tapir SMTPd 0.1
"
)case
MinaMessage.ExceptionCaught
(cause)=>
{ cause.getCausematch
{case
e:ProtocolError => reply
("502 Error: "
+ e.getMessage +"
"
)case
i:IOException => reply
("502 Error: "
+ i.getMessage +"
"
)case
_
=>
reply("502 Error unknown
"
) } session.close } } } }private
def
handle
(request:smtpd.Request
) = { request.commandmatch
{case
"HELO"
=>
reply("250 Hi there "
+ request.data +"
"
)case
"QUIT"
=>
reply("221 Peace out girl scout
"
); session.close } }private
def
reply
(s:String
) = { session.write(new
smtpd.Response
(IoBuffer.wrap(s.getBytes))) } }
Straight away, we see the
same pattern that we saw in the Actors examples earlier in this chapter:
looping around a react
block that pattern matches on
a limited set of cases. In SmtpHandler
, all of those
cases are events provided by MINA. For example,
MINA will send us MinaMessage.SessionOpened
when a
client connects and MinaMessage.SessionClosed
when a client
disconnects.
The case we’re most
interested in is MinaMessage.MessageReceived
. We’re
handed a familiar Request
object with each newly
received valid message, and we can pattern match on the
command
field to take appropriate action. When the
client says HELO
, we can reply with an
acknowledgement. When the client says QUIT
, we say
goodbye and disconnect him.
Now that we’ve got all the pieces in place, let’s have a conversation with our server:
[al3x@jaya ~]$ telnet localhost 2525 Trying ::1... Connected to localhost. Escape character is '^]'. 220 localhost Tapir SMTPd 0.1 HELO jaya.local 250 Hi there jaya.local QUIT 221 Peace out girl scout Connection closed by foreign host.
A brief conversation, to be sure, but our server works! Now, what happens if we throw something unexpected at it?
[al3x@jaya ~]$ telnet localhost 2525 Trying ::1... Connected to localhost. Escape character is '^]'. 220 localhost Tapir SMTPd 0.1 HELO jaya.local 250 Hi there jaya.local BAD COMMAND 502 Error: Malformed request line: BAD COMMAND Connection closed by foreign host.
Nicely handled. Good thing
we took the time to dig out those exceptions when we received a
MinaMessage.ExceptionCaught
in our
SmtpHandler
Actor.
Of course, what we’ve built just handles the beginning and end of a complete SMTP conversation. As an exercise, try filling out the rest of the commands. Or, to skip ahead to something very much akin to what we’ve built here, check out the open source Mailslot project on GitHub (see [Mailslot]).
We learned how to build scalable, robust concurrent applications using Scala’s Actor library that avoid the problems of traditional approaches based on synchronized access to shared, mutable state. We also demonstrated that Java’s powerful built-in threading model is easily accessible from Scala. Finally, we learned how to combine Actors with the powerful MINA NIO framework and Naggati to develop event-driven, asynchronous network servers from the ground up in just a few lines of code.
The next chapter examines Scala’s built-in support for working with XML.