aboutsummaryrefslogtreecommitdiff
path: root/lib/Server.hs
blob: 055e4406da8d71af65fdf0934ddeceafad08394c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE RecordWildCards       #-}


-- Implementation of the API. This module is the main point of the program.
module Server (application) where
import           API                         (API, CompleteAPI, Metrics (..))
import           Conduit                     (ResourceT)
import           Config                      (LoggingConfig, ServerConfig (..))
import           Control.Concurrent.STM      (newTVarIO)
import           Control.Monad.Extra         (forM)
import           Control.Monad.IO.Class      (MonadIO (liftIO))
import           Control.Monad.Logger        (MonadLogger)
import           Control.Monad.Reader        (ReaderT)
import           Data.ByteString.Lazy        (toStrict)
import           Data.Functor                ((<&>))
import qualified Data.Map                    as M
import           Data.Pool                   (Pool)
import           Data.Proxy                  (Proxy (Proxy))
import           Data.Text.Encoding          (decodeUtf8)
import           Data.Time                   (getCurrentTime)
import           Data.UUID                   (UUID)
import           Database.Persist            (Entity (..),
                                              PersistQueryRead (selectFirst),
                                              SelectOpt (Desc), selectList,
                                              (<-.), (==.), (>=.), (||.))
import           Database.Persist.Postgresql (SqlBackend,
                                              migrateEnableExtension,
                                              runMigration)
import           Fmt                         ((+|), (|+))
import qualified GTFS
import           Persist
import           Prometheus                  (Info (Info), exportMetricsAsText,
                                              gauge, register)
import           Prometheus.Metric.GHC       (ghcMetrics)
import           Servant                     (Application, err401, serve,
                                              serveDirectoryFileServer,
                                              throwError)
import           Servant.API                 ((:<|>) (..))
import           Servant.Server              (hoistServer)
import           Servant.Swagger             (toSwagger)
import           Server.Base                 (ServerState)
import           Server.Frontend             (Frontend (..))
import           Server.GTFS_RT              (gtfsRealtimeServer)
import           Server.Ingest               (handleTrackerRegister,
                                              handleTrainPing, handleWS)
import           Server.Subscribe            (handleSubscribe)
import           Server.Util                 (Service, runService)
import           System.IO.Unsafe            (unsafePerformIO)
import           Yesod                       (toWaiAppPlain)


application :: GTFS.GTFS -> Pool SqlBackend -> ServerConfig -> IO Application
application gtfs dbpool settings = do
  doMigration dbpool
  metrics <- Metrics
    <$> register (gauge (Info "ws_connections" "Number of WS Connections"))
  register ghcMetrics

  subscribers <- newTVarIO mempty
  pure $ serve (Proxy @CompleteAPI)
    $ hoistServer (Proxy @CompleteAPI) (runService (serverConfigLogging settings))
    $ server gtfs metrics subscribers dbpool settings

doMigration pool = runSqlWithoutLog pool $ runMigration $ do
  migrateEnableExtension "uuid-ossp"
  migrateAll

server
  :: GTFS.GTFS
  -> Metrics
  -> ServerState
  -> Pool SqlBackend
  -> ServerConfig
  -> Service CompleteAPI
server gtfs metrics@Metrics{..} subscribers dbpool settings = handleDebugAPI
  :<|> (handleTrackerRegister dbpool
        :<|> handleTrainPing dbpool subscribers settings (throwError err401)
        :<|> handleWS dbpool subscribers settings metrics
        :<|> handleSubscribe dbpool subscribers
        :<|> handleDebugState :<|> handleDebugTrain
        :<|> pure (GTFS.gtfsFile gtfs) :<|> gtfsRealtimeServer gtfs dbpool)
  :<|> handleMetrics
  :<|> serveDirectoryFileServer (serverConfigAssets settings)
  :<|> pure (unsafePerformIO (toWaiAppPlain (Frontend gtfs dbpool settings)))
  where
    handleDebugState = do
      now <- liftIO getCurrentTime
      runSql dbpool $ do
       tracker <- selectList [TrackerBlocked ==. False, TrackerExpires >=. now] []
       pairs <- forM tracker $ \(Entity token@(TrackerKey uuid) _) -> do
         entities <- selectList [TrainPingToken ==. token] []
         pure (uuid, fmap entityVal entities)
       pure (M.fromList pairs)
    handleDebugTrain ticketId = runSql dbpool $ do
      trackers <- getTicketTrackers ticketId
      pings <- forM trackers $ \(Entity token _) -> do
        selectList [TrainPingToken ==. token] [] <&> fmap entityVal
      pure (concat pings)
    handleDebugAPI = pure $ toSwagger (Proxy @API)
    handleMetrics = exportMetricsAsText <&> (decodeUtf8 . toStrict)

getTicketTrackers :: (MonadLogger (t (ResourceT IO)), MonadIO (t (ResourceT IO)))
  => UUID -> ReaderT SqlBackend (t (ResourceT IO)) [Entity Tracker]
getTicketTrackers ticketId = do
  joins <- selectList [TrackerTicketTicket ==. TicketKey ticketId] []
           <&> fmap (trackerTicketTracker . entityVal)
  selectList ([TrackerId <-. joins] ||. [TrackerCurrentTicket ==. Just (TicketKey ticketId)]) []