bitcoin-client
bitcoin client library for price ticker and wallet
git clone https://9o.is/git/bitcoin-client.git
WsService.scala
(5897B)
1 package inc.pyc.bitcoin
2 package service
3
4 import java.net.{ URI, ConnectException }
5 import concurrent._
6 import duration._
7 import collection._
8 import collection.JavaConversions._
9 import akka.actor._
10 import akka.pattern._
11 import akka.util._
12 import net.liftweb.json._
13 import net.liftweb.json.JsonAST.JValue
14 import org.java_websocket._
15 import org.java_websocket.client._
16 import org.java_websocket.handshake._
17 import org.java_websocket.drafts._
18 import java.io.File
19 import java.io.FileInputStream
20 import java.security.KeyStore
21 import javax.net.ssl.SSLContext
22 import javax.net.ssl.SSLSocketFactory
23 import javax.net.ssl.TrustManagerFactory
24
25 /**
26 * Bitcoin service over WebSocket communications.
27 */
28 private[bitcoin] trait WsService {
29 this: Actor with ActorLogging =>
30
31 import context.dispatcher
32
33 protected implicit val timeout: Timeout
34
35 /**
36 * Service's API
37 */
38 protected val api: URI
39
40 /**
41 * Handle json messages sent from server.
42 */
43 protected def onMessage(msg: JValue): () => Unit
44
45 /**
46 * Request headers
47 */
48 protected val headers: Map[String, String] = Map()
49
50 /**
51 * Maps request IDs to the corresponding response promises
52 * and a function that converts the JSON RPC response to the final actor response.
53 */
54 protected val requests = mutable.HashMap.empty[String, (Promise[Any], JValue => _, List[String])]
55
56 /**
57 * Executes on websocket initial connection.
58 */
59 protected def onConnect: () => Unit = () => {}
60
61 /**
62 * Request and forget.
63 */
64 protected def requestForget(req: JValue): Unit = {
65 request(req)
66 }
67
68 /**
69 * Request and extract promised json response.
70 * Note: Must be an actor making the request to receive response.
71 */
72 protected def requestExtract[T](id: String, req: JValue, extractor: JValue => T, info: List[String] = Nil): Unit = {
73
74 val p = Promise[Any]()
75 val f = p.future
76 requests += id -> (p, extractor, info)
77 request(req)
78
79 context.system.scheduler.scheduleOnce(timeout.duration) {
80 p tryFailure {
81 new TimeoutException(s"Timeout: ${getClass.getSimpleName} bitcoin wallet did not respond in time")
82 }
83 self ! RemoveWsRequest(id)
84 }
85
86 pipe(f) to sender
87 }
88
89 override def preStart(): Unit = {
90 context.become(connecting)
91 tryToConnect()
92 }
93
94 /**
95 * Receive when disconnected.
96 */
97 def connecting: Receive = {
98 case Connected =>
99 context.become(receive)
100 onConnect()
101
102 case _ =>
103 val message = s"Cannot process request: no connection to ${getClass.getSimpleName} websocket."
104 sender ! Status.Failure(new IllegalStateException(message))
105 log.error(message)
106 }
107
108 /**
109 * Receive when connected.
110 */
111 val websocket: Receive = {
112 case RemoveWsRequest(id) =>
113 requests -= id
114
115 case Disconnected(reason) =>
116 log.warning("Connection to {} bitcoin wallet closed: {}", getClass.getSimpleName, reason)
117 context.become(connecting)
118 tryToConnect()
119 }
120
121 /**
122 * Tries to connect to the websocket.
123 * @param f function to manipulate the websocket object before trying to connect.
124 */
125 protected def tryToConnect(f: WebSocketBtcWalletClient => WebSocketBtcWalletClient = walletClient => walletClient) {
126 requests.clear()
127 walletClient = new WebSocketBtcWalletClient(api, new Draft_17, headers, 0)
128 walletClient = f(walletClient)
129 val connected = walletClient.connectBlocking()
130
131 if (connected) {
132 log.info("Connection to {} bitcoin wallet established", getClass.getSimpleName)
133 self ! Connected
134 } else {
135 new ConnectException(s"${getClass.getSimpleName} bitcoin wallet not available: $api")
136 }
137 }
138
139 /* Send request down websocket. */
140 private def request(req: JValue) { walletClient.send(compact(render(req))) }
141
142 /* Personal websocket class of the java-websocket library. */
143 class WebSocketBtcWalletClient(serverUri: URI, protocolDraft: Draft, httpHeaders: Map[String, String], connectTimeout: Int)
144 extends WebSocketClient(serverUri, protocolDraft, headers, connectTimeout) {
145 override def onMessage(jsonMessage: String): Unit = { ws.onMessage(parse(jsonMessage)) }
146 override def onOpen(handshakeData: ServerHandshake) {}
147 override def onClose(code: Int, reason: String, remote: Boolean) { self ! Disconnected(reason) }
148 override def onError(ex: Exception) { self ! Disconnected(ex.getMessage()) }
149 }
150
151 /* The actual websocket. */
152 protected var walletClient: WebSocketBtcWalletClient = null
153
154 /* Used by the websocket class to access actor methods with same name. */
155 private def ws = this
156
157 /* Websocket Messages */
158 case object Connected
159 case class Disconnected(reason: String)
160 case class RemoveWsRequest(id: String)
161 }
162
163 /**
164 * Bitcoin service over WebSocket communications with SSL.
165 */
166 private[bitcoin] trait WssService extends WsService {
167 this: Actor with ActorLogging =>
168
169 import java.io._
170 import java.security._
171 import javax.net.ssl._
172
173 /**
174 * KeyStore file containing trusted certificates.
175 */
176 val keyStoreFile: File
177
178 /**
179 * Password of the KeyStore `keyStoreFile`
180 */
181 val keyStorePass: String
182
183 private val socketFactory = createSslSocketFactory
184
185 override def tryToConnect(f: WebSocketBtcWalletClient => WebSocketBtcWalletClient = walletClient => walletClient) =
186 super.tryToConnect(walletClient => {
187 walletClient.setSocket(socketFactory.createSocket())
188 walletClient
189 })
190
191 /**
192 * Creates a ssl socket factory for the websocket using the
193 * given keystore file and password.
194 */
195 private def createSslSocketFactory: SSLSocketFactory = {
196 val ks = KeyStore.getInstance("JKS")
197 ks.load(new FileInputStream(keyStoreFile), keyStorePass.toCharArray)
198 val tmf = TrustManagerFactory.getInstance("SunX509")
199 tmf.init(ks)
200 val sslContext = SSLContext.getInstance("TLS")
201 sslContext.init(null, tmf.getTrustManagers, null)
202 sslContext.getSocketFactory
203 }
204
205 }