bitcoin-client

bitcoin client library for price ticker and wallet

git clone https://9o.is/git/bitcoin-client.git

WsService.scala

(5897B)


      1 package inc.pyc.bitcoin
      2 package service
      3 
      4 import java.net.{ URI, ConnectException }
      5 import concurrent._
      6 import duration._
      7 import collection._
      8 import collection.JavaConversions._
      9 import akka.actor._
     10 import akka.pattern._
     11 import akka.util._
     12 import net.liftweb.json._
     13 import net.liftweb.json.JsonAST.JValue
     14 import org.java_websocket._
     15 import org.java_websocket.client._
     16 import org.java_websocket.handshake._
     17 import org.java_websocket.drafts._
     18 import java.io.File
     19 import java.io.FileInputStream
     20 import java.security.KeyStore
     21 import javax.net.ssl.SSLContext
     22 import javax.net.ssl.SSLSocketFactory
     23 import javax.net.ssl.TrustManagerFactory
     24 
     25 /**
     26  * Bitcoin service over WebSocket communications.
     27  */
     28 private[bitcoin] trait WsService {
     29   this: Actor with ActorLogging =>
     30 
     31   import context.dispatcher
     32 
     33   protected implicit val timeout: Timeout
     34 
     35   /**
     36    * Service's API
     37    */
     38   protected val api: URI
     39 
     40   /**
     41    * Handle json messages sent from server.
     42    */
     43   protected def onMessage(msg: JValue): () => Unit
     44 
     45   /**
     46    * Request headers
     47    */
     48   protected val headers: Map[String, String] = Map()
     49 
     50   /**
     51    * Maps request IDs to the corresponding response promises
     52    * and a function that converts the JSON RPC response to the final actor response.
     53    */
     54   protected val requests = mutable.HashMap.empty[String, (Promise[Any], JValue => _, List[String])]
     55 
     56   /**
     57    * Executes on websocket initial connection.
     58    */
     59   protected def onConnect: () => Unit = () => {}
     60 
     61   /**
     62    * Request and forget.
     63    */
     64   protected def requestForget(req: JValue): Unit = {
     65     request(req)
     66   }
     67 
     68   /**
     69    * Request and extract promised json response.
     70    * Note: Must be an actor making the request to receive response.
     71    */
     72   protected def requestExtract[T](id: String, req: JValue, extractor: JValue => T, info: List[String] = Nil): Unit = {
     73 
     74     val p = Promise[Any]()
     75     val f = p.future
     76     requests += id -> (p, extractor, info)
     77     request(req)
     78 
     79     context.system.scheduler.scheduleOnce(timeout.duration) {
     80       p tryFailure {
     81         new TimeoutException(s"Timeout: ${getClass.getSimpleName} bitcoin wallet did not respond in time")
     82       }
     83       self ! RemoveWsRequest(id)
     84     }
     85 
     86     pipe(f) to sender
     87   }
     88 
     89   override def preStart(): Unit = {
     90     context.become(connecting)
     91     tryToConnect()
     92   }
     93 
     94   /**
     95    * Receive when disconnected.
     96    */
     97   def connecting: Receive = {
     98     case Connected =>
     99       context.become(receive)
    100       onConnect()
    101 
    102     case _ =>
    103       val message = s"Cannot process request: no connection to ${getClass.getSimpleName} websocket."
    104       sender ! Status.Failure(new IllegalStateException(message))
    105       log.error(message)
    106   }
    107 
    108   /**
    109    * Receive when connected.
    110    */
    111   val websocket: Receive = {
    112     case RemoveWsRequest(id) =>
    113       requests -= id
    114 
    115     case Disconnected(reason) =>
    116       log.warning("Connection to {} bitcoin wallet closed: {}", getClass.getSimpleName, reason)
    117       context.become(connecting)
    118       tryToConnect()
    119   }
    120 
    121   /**
    122    * Tries to connect to the websocket.
    123    * @param f function to manipulate the websocket object before trying to connect.
    124    */
    125   protected def tryToConnect(f: WebSocketBtcWalletClient => WebSocketBtcWalletClient = walletClient => walletClient) {
    126     requests.clear()
    127     walletClient = new WebSocketBtcWalletClient(api, new Draft_17, headers, 0)
    128     walletClient = f(walletClient)
    129     val connected = walletClient.connectBlocking()
    130 
    131     if (connected) {
    132       log.info("Connection to {} bitcoin wallet established", getClass.getSimpleName)
    133       self ! Connected
    134     } else {
    135       new ConnectException(s"${getClass.getSimpleName} bitcoin wallet not available: $api")
    136     }
    137   }
    138 
    139   /* Send request down websocket. */
    140   private def request(req: JValue) { walletClient.send(compact(render(req))) }
    141 
    142   /* Personal websocket class of the java-websocket library. */
    143   class WebSocketBtcWalletClient(serverUri: URI, protocolDraft: Draft, httpHeaders: Map[String, String], connectTimeout: Int)
    144     extends WebSocketClient(serverUri, protocolDraft, headers, connectTimeout) {
    145     override def onMessage(jsonMessage: String): Unit = { ws.onMessage(parse(jsonMessage)) }
    146     override def onOpen(handshakeData: ServerHandshake) {}
    147     override def onClose(code: Int, reason: String, remote: Boolean) { self ! Disconnected(reason) }
    148     override def onError(ex: Exception) { self ! Disconnected(ex.getMessage()) }
    149   }
    150 
    151   /* The actual websocket. */
    152   protected var walletClient: WebSocketBtcWalletClient = null
    153 
    154   /* Used by the websocket class to access actor methods with same name. */
    155   private def ws = this
    156 
    157   /* Websocket Messages */
    158   case object Connected
    159   case class Disconnected(reason: String)
    160   case class RemoveWsRequest(id: String)
    161 }
    162 
    163 /**
    164  * Bitcoin service over WebSocket communications with SSL.
    165  */
    166 private[bitcoin] trait WssService extends WsService {
    167   this: Actor with ActorLogging =>
    168 
    169   import java.io._
    170   import java.security._
    171   import javax.net.ssl._
    172 
    173   /**
    174    * KeyStore file containing trusted certificates.
    175    */
    176   val keyStoreFile: File
    177 
    178   /**
    179    * Password of the KeyStore `keyStoreFile`
    180    */
    181   val keyStorePass: String
    182 
    183   private val socketFactory = createSslSocketFactory
    184 
    185   override def tryToConnect(f: WebSocketBtcWalletClient => WebSocketBtcWalletClient = walletClient => walletClient) =
    186     super.tryToConnect(walletClient => {
    187       walletClient.setSocket(socketFactory.createSocket())
    188       walletClient
    189     })
    190 
    191   /**
    192    * Creates a ssl socket factory for the websocket using the
    193    * given keystore file and password.
    194    */
    195   private def createSslSocketFactory: SSLSocketFactory = {
    196     val ks = KeyStore.getInstance("JKS")
    197     ks.load(new FileInputStream(keyStoreFile), keyStorePass.toCharArray)
    198     val tmf = TrustManagerFactory.getInstance("SunX509")
    199     tmf.init(ks)
    200     val sslContext = SSLContext.getInstance("TLS")
    201     sslContext.init(null, tmf.getTrustManagers, null)
    202     sslContext.getSocketFactory
    203   }
    204 
    205 }