1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE RecordWildCards #-}
-- Implementation of the API. This module is the main point of the program.
module Server (application) where
import API (API, CompleteAPI, Metrics (..))
import Conduit (ResourceT)
import Config (LoggingConfig, ServerConfig (..))
import Control.Concurrent.STM (newTVarIO)
import Control.Monad.Extra (forM, when)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Logger (MonadLogger, logWarnN)
import Control.Monad.Reader (ReaderT)
import qualified Data.Aeson as A
import Data.ByteString.Lazy (toStrict)
import Data.Functor ((<&>))
import qualified Data.Map as M
import Data.Pool (Pool)
import Data.Proxy (Proxy (Proxy))
import Data.Text.Encoding (decodeUtf8)
import Data.Time (getCurrentTime)
import Data.UUID (UUID)
import Database.Persist (Entity (..),
PersistQueryRead (selectFirst),
SelectOpt (Desc), selectList,
(<-.), (==.), (>=.), (||.))
import Database.Persist.Postgresql (SqlBackend,
migrateEnableExtension,
runMigration)
import Fmt ((+|), (|+))
import qualified GTFS
import Persist
import Prometheus (Info (Info), exportMetricsAsText,
gauge, register)
import Prometheus.Metric.GHC (ghcMetrics)
import Servant (Application, err401, serve,
throwError)
import Servant.API ((:<|>) (..))
import Servant.Server (hoistServer)
import Servant.Swagger (toSwagger)
import Server.Base (ServerState)
import Server.Frontend (Frontend (..))
import Server.GTFS_RT (gtfsRealtimeServer)
import Server.Ingest (handleTrackerRegister,
handleTrainPing, handleWS)
import Server.Subscribe (handleSubscribe)
import Server.Util (Service, runLogging, runService,
serveDirectoryFileServer)
import System.IO.Unsafe (unsafePerformIO)
import Yesod (toWaiAppPlain)
application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application
application gtfs dbpool settings = do
when (serverConfigDebugMode settings) $
runLogging (serverConfigLogging settings) $
logWarnN "warning: tracktrain running in debug mode"
doMigration dbpool
metrics <- Metrics
<$> register (gauge (Info "ws_connections" "Number of WS Connections"))
register ghcMetrics
subscribers <- newTVarIO mempty
pure $ serve (Proxy @CompleteAPI)
$ hoistServer (Proxy @CompleteAPI) (runService (serverConfigLogging settings))
$ server gtfs metrics subscribers dbpool settings
doMigration pool = runSqlWithoutLog pool $ runMigration $ do
migrateEnableExtension "uuid-ossp"
migrateAll
server
:: GTFS.GTFS
-> Metrics
-> ServerState
-> Pool SqlBackend
-> ServerConfig
-> Service CompleteAPI
server gtfs metrics@Metrics{..} subscribers dbpool settings = handleDebugAPI
:<|> (handleTrackerRegister dbpool
:<|> handleTrainPing dbpool subscribers settings (throwError err401)
:<|> handleWS dbpool subscribers settings metrics
:<|> handleCurrentTicker
:<|> handleSubscribe dbpool subscribers
:<|> handleDebugState :<|> handleDebugTrain
:<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
:<|> handleMetrics
:<|> serveDirectoryFileServer (serverConfigAssets settings)
:<|> pure (unsafePerformIO (toWaiAppPlain (Frontend gtfs dbpool settings)))
where
handleDebugState = do
now <- liftIO getCurrentTime
runSql dbpool $ do
tracker <- selectList [TrackerBlocked ==. False, TrackerExpires >=. now] []
pairs <- forM tracker $ \(Entity token@(TrackerKey uuid) _) -> do
entities <- selectList [TrainPingToken ==. token] []
pure (uuid, fmap entityVal entities)
pure (M.fromList pairs)
handleCurrentTicker = runSql dbpool $ do
selectFirst [ TickerAnnouncementArchived ==. False ] [] <&> \case
Nothing -> A.object [ "error" A..= A.String "no message" ]
Just (Entity _ TickerAnnouncement{..}) -> A.object
[ "error" A..= A.Null
, "message" A..= tickerAnnouncementMessage
, "header" A..= tickerAnnouncementHeader
]
handleDebugTrain ticketId = runSql dbpool $ do
trackers <- getTicketTrackers ticketId
pings <- forM trackers $ \(Entity token _) -> do
selectList [TrainPingToken ==. token] [] <&> fmap entityVal
pure (concat pings)
handleDebugAPI = pure $ toSwagger (Proxy @API)
handleMetrics = exportMetricsAsText <&> (decodeUtf8 . toStrict)
getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
=> UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = do
joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
<&> fmap (trackerTicketTracker . entityVal)
selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []
|