Advertisement
Romeech

Site transitions processing

Aug 11th, 2018
2,710
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. ; priority-map-by from [org.clojure/data.priority-map "0.0.10"]
  2. ; see https://github.com/clojure/data.priority-map
  3.  
  4. (defonce *state (atom {:prioritized-sessions (priority-map-by <)
  5.                        :urls {}}))
  6.  
  7. (defn timestamp-to-epoch [ts]
  8.     ; ts string -> unix time
  9.     )
  10.  
  11. (defn get-now
  12.     ; current moment in unix time
  13.     )
  14.  
  15. (defn send-to-external-sys [sid-2-urls]
  16.     (defn send-urls [sid urls]
  17.         ; create HTTP-request to the external system with (sid, urls) in body
  18.         )
  19.     ; return vector of results of sendings
  20.     (map send-urls sid-2-urls))
  21.  
  22. (defn accept-site-transition! [site-trans]
  23.     (let [[sid url ts] site-trans
  24.           old-sessions (:prioritized-sessions @*state)
  25.           old-urls (:urls @*state)
  26.           new-state {:prioritized-sessions (assoc old-sessions sid (timestamp-to-epoch ts))
  27.                      :urls (merge-with into old-urls {sid [url]})}]
  28.         (reset! *state new-state)))
  29.  
  30. ; if it turns out that access to *state slows down seriously speed of the processing of an incoming request
  31. ; then we will have to group incoming site transitions into bulks by m elements (could be got empirically).
  32. (defn map-sid-to-latest-ts [trans-by-sid]
  33.     ; {sid1 [[sid1 url1 ts1]
  34.     ;        [sid1 url2 ts2]],
  35.     ;  sid2 [[sid2 url3 ts3]]} -> {sid1 ts2, sid2 ts3}
  36.     (reduce merge {}
  37.         (map (fn [pair]
  38.                   (let [[sid transitions] pair
  39.                         timestamps (map #(timestamp-to-epoch (last %))
  40.                                         transitions)
  41.                         latest-ts (reduce max timestamps)]
  42.                     {sid latest-ts}))
  43.              trans-by-sid)))
  44.  
  45. (defn map-sid-to-urls [trans-by-sid]
  46.     ; {sid1 [[sid1 url1 ts1]
  47.     ;        [sid1 url2 ts2]],
  48.     ;  sid2 [[sid2 url3 ts3]]} -> {sid1 [url1 url2], sid2 [url3]}
  49.     (reduce merge {}
  50.             (map (fn [pair]
  51.                     (let [[sid transitions] pair
  52.                           urls (vec (map (fn [_ url _] url) transitions))]
  53.                         {sid urls}))
  54.                  trans-by-sid)))
  55.  
  56. (defn accept-site-transition-chunk! [chunk]
  57.     (let [old-sessions (:prioritized-sessions @*state)
  58.           old-urls (:urls @*state)
  59.           trans-by-sid (group-by first chunk)
  60.           sids-to-latest-ts (map-sid-to-latest-ts trans-by-sid)
  61.           sids-to-urls (map-sid-to-urls trans-by-sid)
  62.           new-state {:prioritized-sessions (merge old-sessions sids-to-latest-ts)
  63.                      :urls (merge-with into old-urls sids-to-urls)}]
  64.         (reset! *state new-state)))  
  65.  
  66.  
  67. (defn find-urls-to-send
  68.     (let [sessions (:prioritized-sessions @*state)
  69.           urls (:urls @*state)
  70.           now (get-now)
  71.           sids (->> sessions
  72.                     (take-while (fn [[sid ts]] (>= (- now ts))))
  73.                     (map first))]
  74.         {:used-sids sids
  75.          :send-data (select-keys urls sids)}))
  76.  
  77. (defn wipe-used-sids! [sids]
  78.     (let [old-sessions (:prioritized-sessions @*state)
  79.           old-urls (:urls @*state)
  80.           new-state {:prioritized-sessions (dissoc old-sessions sids)
  81.                      :urls (dissoc old-urls sids)}]
  82.         (reset! *state new-state)))
  83.  
  84. (defn periodic-task
  85.     (let [search-res (find-urls-to-send)]
  86.         (send-to-external-sys (:send-data search-res))
  87.         (wipe-used-sids! (:used-data search-res))))
  88.  
  89. ; this could be optimized by executing send-to-external-sys in a separate asynchronous task, as well as wipe-used-sids!
  90. ; asynchronous calls could be organized with a help of https://github.com/clojure/core.async
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement