layout: true .bottom[ ---- .left[![Logo](akka-presentation-ums-logo.png)] ] --- class: center, middle # .title[Akka] ### asynkron utvikling med actors Ståle Undheim
--- # Oversikt 1. Kjapp inføring i Scala, case classes og pattern matching 2. Akka Hello World 3. Hva er en Actor 4. Hvordan skiller en Actor seg fra ett vanlig objekt 5. Hva egner et actor system seg til 6. Nyttige Features 7. UMS SendingPlan 8. Andre Eksempler 9. Spørsmål --- class: center, middle # Scala intro --- ## Variabel deklerasjon *Scala:* ```scala val name = "Ståle Undheim" var mode = "Presentation" val time: DateTime = DateTime.now() // joda date time ``` *Java:* ```java final String name = "Ståle Undheim"; String mode = "Presentation"; final DateTime time = DateTime.now(); ``` --- # Funksjoner *Scala:* ```scala def greet(name: String) { println(name) } def addOne(value: Int) = value + 1 ``` *Java:* ```java public void greet(String name) { System.out.println(name); } public int addOne(int value) { return value + 1; } ``` --- ## Case class *Scala:* ```scala case class Person(firstName: String, lastName: String) ``` *Java:* ```java public class Person { private final String firstName; private final String lastName; public Person(String firstName, String lastName) { this.firstName = firstName; this.lastNAme = lastName; } // Getters } ``` --- ## Pattern matching (og string interpolation) *Scala:* ```scala value match { case Person(firstName, lastName) => println(s"My name is $lastName, $firstName $lastName") } ``` *Java:* ```java if (value instanceof Person) { final String firstName = ((Person) value).getFirstName(); final String lastName = ((Person) value).getLastName(); System.out.println( "My name is " + lastName + ", " + firstName + " " + lastName); } ``` --- class: center, middle # Akka Hello world --- #### Actor og API ```scala case class Message(content: String) class Printer extends Actor with ActorLogging { override def receive = { case Message(content) => log.info("Got message: {}", content) } } ``` -- #### Main runner ```scala object Main extends App { val system = ActorSystem("HelloWorld") val printerRef = system.actorOf(Props[Printer], "printer") printerRef ! Msg("Hello world") Thread.sleep(10000) system.shutdown() } ``` --- ## Akka actors - subjektiv historie -- * Scala hadde actors basert på Erlangs actors -- * Erlang ble utviklet av Ericsson for å bruke i dere system -- * Erlang har actors for å sikre høy oppetid -- * Akka begynte som et bibliotek som alternativ til Scala Actors -- * Scala actors er deprecated til fordel for Akka -- ![History](History.png) --- ## Hva er en Actor ---- -- 1. Implementeres av en klasse som extender Actor -- 2. Har en meldingsboks -- 3. Har en state -- 4. Har atferd (behaviour) som reagerer på meldinger basert på state -- 5. Kan ha barn (children) -- 6. Har en parent (supervisor) -- 7. Kan ha en Supervisor Strategy -- 8. Kan addresseres via en path (root er /user) --- ## Hva gjør en actor forskjellig fra ett vanlig objekt ---- -- 1. Ingen direkte tilgang til objektet, alt skjer igjennom en actorRef. -- 2. Asynkron konstruksjon. -- 3. Kun en trå om gangen i en actor, Akka gjør at en ikke trenger locks. -- 4. Mulighet til å endre rekkefølgen en behandler meldinger. -- 5. Asyynkron model er normalen, snarere enn en synkron model. Lockfree programming. -- 6. Svar skjer via sender, snarere enn stack --- ## Hva egner et actor system seg til ---- -- 1. Alt mulig. -- 2. Message passing gjør at en har loose coupling -- 3. Asynkron model gjør at alt er multithreaded. -- 4. Actors er naturlig for å modelere domene objekter -- 5. Enkelt å lage mikro services -- 6. Worker pools --- ## Nyttige features ---- -- 1. Nettverks støtte. Om en snakker med en local eller remote actor, så gjøres det på samme måte -- 2. Routing. Dette er en collection av actors, og en kan forwarde meldinger til disse -- 3. Clustering støtte. Hver node i ett cluster kan ha en eller flere roller -- 4. Supervision gjør at actors restarter ved exceptions, self-healing hvis korrupt state er årsaken til feil. -- 5. Camel støtte gjør for enkel integrasjon --- ## Eksempel: UMS - SendingPlan -- ### Oppgave * Håndtere en enkelt utsending av meldinger over en eller flere kanaler. * Skedulering av utsending i fremtiden * Clustering - high availability * Håndtere status av sending (scheduled, sending, done) * Håndtere flere typer kanaler, og feil i kanaler, timeouts * Kanselering --- ## SendingPlan, overordnet ![Overview](SendingPlan-Overview.png) --- ## SendingPlanRepository ![Overview](SendingPlan-Repository.png) --- ## SendingPlanRepository * Alle meldinger til SendingPlan objekter skjer via denne actoren. -- * Cluster shariding: kun en instans av hver SendingPlan i ett cluster --- ## SendingPlan ![Overview](SendingPlan-SendingPlan.png) --- ## SendingPlan * Bruker Akka persistence, implementerer EventsourcedProcessor -- * Bruker context.setRecieveTimeout til å ta selvmord hvis ubrukt i 30 sekunder -- * Tar i mot meldinger om å konstruere kanaler: ```scala case class AddChannel(sendingId: String, channelProps: Props) ``` -- * Bruker scheduler for å starte alerts i fremtiden --- ## Scheduler ![Overview](SendingPlan-Scheduler.png) --- ## Scheduler * Cluster singleton, kun en i ett cluster -- * Bruker Akka persistence, all scheduling starter igjen ved konstruksjon -- * Scheduling: ```scala case class Schedule(when: DateTime, target: ActorRef, message: Any) ``` -- * Scheduler lagrer target med actor path og addresse --- ## Channel ![Overview](SendingPlan-Channel.png) --- ## Channel * Lagrer instillinger for sending (kanal spesifikt). -- * State lagres via persistence og event sourcing -- * Bruker en channel (ungår redelivery ved restart) for å lagre mottakere under Recipients -- * Bruker scheduler for å kontrollere start, stop, og timeouts på individuelle sendinger -- * Bruker sender for å sende meldinger til mottaker --- ## Eksempler - Camel consumer -- ```scala object CamelSample extends App { // Change log level for org.apache to info in slf4j LoggerFactory.getLogger("org.apache") .asInstanceOf[ch.qos.logback.classic.Logger].setLevel(Level.INFO) val system = ActorSystem("camelSample") system.actorOf(Props[RssReader], "rss") } ``` --- ## Eksempler - Camel consumer ```scala class RssReader extends Actor with Consumer with ActorLogging { override def endpointUri = "rss:http://www.vg.no/rss/feed/forsiden/?consumer.delay=1000&sortEntries=true" override def receive = { case CamelMessage(feed:SyndFeed, headers) => val entries = feed.getEntries.toList.asInstanceOf[List[SyndEntry]] for (entry <- entries) { log.info(s"${entry.getPublishedDate}: ${entry.getTitle} - ${entry.getUri}") } } } ``` --- ## Eksempler - Networking --- ## Eksempler - Networking, Config ```scala object ConfigHelper { def withPort(port: Int) = s""" |akka { | actor.provider = "akka.remote.RemoteActorRefProvider" | remote.netty.tcp.port = $port | remote.netty.tcp.hostname = localhost |} """.stripMargin } ``` --- ## Eksempler - Networking, Server ```scala object ServerMain extends App { val config = ConfigHelper.withPort(2550) val system = ActorSystem("server", ConfigFactory.parseString(config)) val serverRef = system.actorOf(Props[Server], "server") } ``` --- ## Eksempler - Networking, Server ```scala class Server extends Actor with ActorLogging { var messageCount = 0 override def receive = { case msg => messageCount += 1 log.info("{}: Got message [{}] from [{}]", messageCount, msg, sender) if (messageCount >= 10) { log.info("Shutting down after {} messages", messageCount) context.stop(self) val sys = context.system context.system.scheduler.scheduleOnce(10 seconds, new Runnable { override def run() { sys.shutdown() } }) } } } ``` --- ## Eksempler - Networking, Klient ```scala object ClientMain extends App { val config = ConfigHelper.withPort(2551) val system = ActorSystem("client", ConfigFactory.parseString(config)) val server = system.actorSelection("akka.tcp://server@localhost:2550/user/server") val clientRef = system.actorOf(Props(classOf[Client], server), "client") } ``` --- ## Eksempler - Networking, Klient ```scala class Client(partner: ActorSelection) extends Actor with ActorLogging { partner ! Identify(1) context.system.scheduler.schedule(1 second, 1 second, self, "transmit") override def receive = { case ActorIdentity(1, Some(ref)) => log.info("Watching partner {}", ref) context.watch(ref) case "transmit" => partner ! "Time is " + DateTime.now() case Terminated(ref) => log.info("Partner at {} terminated, shuting down", ref) context.system.shutdown() } } ``` --- ## Eksempler - Cluster singleton --- ## Spørsmål? Linker: * http://akka.io/ * http://spray.io/ * http://staale.github.io/akka.html