Advertisement
punces

adapter_async.cc

Mar 21st, 2017
602
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.34 KB | None | 0 0
  1. #include "sample.h"
  2. #include "Debug.h"
  3. #include <libecap/common/registry.h>
  4. #include <libecap/common/errors.h>
  5. #include <libecap/adapter/service.h>
  6. #include <libecap/adapter/xaction.h>
  7. #include <libecap/host/host.h>
  8. #include <libecap/host/xaction.h>
  9. #include <assert.h>
  10. #include <iostream>
  11. #include <list>
  12. #if HAVE_PTHREAD
  13. #include <pthread.h>
  14. #endif
  15.  
  16. /*
  17. * Warning: This sample code is NOT thread-safe! A production implementation
  18. * MUST protect and synchronize shared resources. Most, but probably not all
  19. * relevant places are marked with an "XXX". Other than this important caveat,
  20. * this sample eCAP adapter illustrates how to orchestrate asynchronous
  21. * analysis while keeping the thread-unaware host appliaction happy.
  22. */
  23.  
  24.  
  25. namespace Adapter { // not required, but adds clarity
  26.  
  27. class Xaction;
  28. typedef libecap::shared_ptr<Xaction> XactionPointer;
  29.  
  30. class Service: public libecap::adapter::Service {
  31. public:
  32. // About
  33. virtual std::string uri() const;
  34. virtual std::string tag() const;
  35. virtual void describe(std::ostream &os) const;
  36. virtual bool makesAsyncXactions() const { return true; }
  37.  
  38. // Configuration
  39. virtual void configure(const libecap::Options &cfg);
  40. virtual void reconfigure(const libecap::Options &cfg);
  41.  
  42. // Lifecycle
  43. virtual void start();
  44. virtual void suspend(timeval &timeout);
  45. virtual void resume();
  46. virtual void stop();
  47. virtual void retire();
  48.  
  49. // Scope
  50. virtual bool wantsUrl(const char *url) const;
  51.  
  52. // Work
  53. virtual MadeXactionPointer makeXaction(libecap::host::Xaction *hostx);
  54.  
  55. // number of transactions still "analysing" their messages
  56. static int WorkingXactions_; // XXX: not thread-safe!
  57.  
  58. static void Resume(const XactionPointer &x);
  59.  
  60. private:
  61. // transactions that completed their "analysis"
  62. typedef std::list<XactionPointer> WaitingXactions;
  63. static WaitingXactions WaitingXactions_; // XXX: not thread-safe!
  64. };
  65.  
  66. Adapter::Service::WaitingXactions Adapter::Service::WaitingXactions_;
  67. int Adapter::Service::WorkingXactions_ = 0;
  68.  
  69. // an async adapter transaction
  70. class Xaction: public libecap::adapter::Xaction {
  71. public:
  72. Xaction(libecap::host::Xaction *x);
  73. virtual ~Xaction();
  74.  
  75. // meta-info for the host transaction
  76. virtual const libecap::Area option(const libecap::Name &name) const;
  77. virtual void visitEachOption(libecap::NamedValueVisitor &visitor) const;
  78.  
  79. // lifecycle
  80. virtual void start();
  81. virtual void resume();
  82. virtual void stop();
  83.  
  84. // adapted body transmission control
  85. virtual void abDiscard() { noBodySupport(); }
  86. virtual void abMake() { noBodySupport(); }
  87. virtual void abMakeMore() { noBodySupport(); }
  88. virtual void abStopMaking() { noBodySupport(); }
  89.  
  90. // adapted body content extraction and consumption
  91. virtual libecap::Area abContent(libecap::size_type, libecap::size_type) { noBodySupport(); return libecap::Area(); }
  92. virtual void abContentShift(libecap::size_type) { noBodySupport(); }
  93.  
  94. // virgin body state notification
  95. virtual void noteVbContentDone(bool) { noBodySupport(); }
  96. virtual void noteVbContentAvailable() { noBodySupport(); }
  97.  
  98. // perform (well, simulate) content adaptation
  99. void analyze();
  100.  
  101. // give host control after async analysis
  102. void tellHostToResume();
  103.  
  104. XactionPointer self;
  105.  
  106. protected:
  107. void noBodySupport() const;
  108.  
  109. private:
  110. libecap::host::Xaction *hostx; // Host transaction rep
  111. #if HAVE_PTHREAD
  112. pthread_t thread_; // pthread handler
  113. #endif
  114. };
  115.  
  116. } // namespace Adapter
  117.  
  118. std::string Adapter::Service::uri() const {
  119. return "ecap://e-cap.org/ecap/services/sample/async";
  120. }
  121.  
  122. std::string Adapter::Service::tag() const {
  123. return PACKAGE_VERSION;
  124. }
  125.  
  126. void Adapter::Service::describe(std::ostream &os) const {
  127. os << "An async adapter from " << PACKAGE_NAME << " v" << PACKAGE_VERSION;
  128. }
  129.  
  130. void Adapter::Service::configure(const libecap::Options &) {
  131. if (Debug::Prefix.empty()) {
  132. Debug::Prefix = "adapter_async: ";
  133. #if HAVE_PTHREAD
  134. Debug(flApplication|ilNormal) << "WARNING: This sample eCAP " <<
  135. "adapter is NOT thread-safe. Sooner or later, it will " <<
  136. "crash your host application.";
  137. #else
  138. Debug(flApplication|ilNormal) << "ERROR: This sample eCAP " <<
  139. "adapter was built without pthread support. " <<
  140. "The adapter will not work as intended.";
  141. #endif /* HAVE_PTHREAD */
  142. }
  143. // this service is not really configurable
  144. }
  145.  
  146. void Adapter::Service::reconfigure(const libecap::Options &) {
  147. // this service is not configurable
  148. }
  149.  
  150. void Adapter::Service::start() {
  151. libecap::adapter::Service::start();
  152. // custom code would go here, but this service does not have one
  153. }
  154.  
  155. void Adapter::Service::stop() {
  156. // custom code would go here, but this service does not have one
  157. libecap::adapter::Service::stop();
  158. }
  159.  
  160. void Adapter::Service::retire() {
  161. // custom code would go here, but this service does not have one
  162. libecap::adapter::Service::stop();
  163. }
  164.  
  165. bool Adapter::Service::wantsUrl(const char *url) const {
  166. return true; // async adapter is applied to all messages
  167. }
  168.  
  169. void Adapter::Service::suspend(timeval &timeout) {
  170. Debug(flXaction) << "Adapter::Service::suspend " <<
  171. WorkingXactions_ << '+' << WaitingXactions_.size();
  172.  
  173. // Do not delay waiting transactions more than necessary.
  174. if (!WaitingXactions_.empty()) {
  175. timeout.tv_sec = 0;
  176. timeout.tv_usec = 0;
  177. return;
  178. }
  179.  
  180. // Do not ignore working transactions for too long:
  181. // In most cases, the adapter does not know when the async analysis will
  182. // be over, so using a constant maximum delay such as 300ms is acceptible.
  183. if (WorkingXactions_) {
  184. const int maxUsec = 300*1000;
  185. if (timeout.tv_sec > 0 || timeout.tv_usec > maxUsec) {
  186. timeout.tv_sec = 0;
  187. timeout.tv_usec = maxUsec;
  188. }
  189. return;
  190. }
  191.  
  192. // otherwise, the host sleep as much as it (or other services) want,
  193. // preventing "hot idle" state
  194. }
  195.  
  196. void Adapter::Service::resume() {
  197. Debug(flXaction) << "Adapter::Service::resume " <<
  198. WorkingXactions_ << '+' << WaitingXactions_.size();
  199.  
  200. while (!WaitingXactions_.empty()) {
  201. XactionPointer x = WaitingXactions_.front();
  202. WaitingXactions_.pop_front();
  203. x->tellHostToResume();
  204. }
  205. }
  206.  
  207. Adapter::Service::MadeXactionPointer
  208. Adapter::Service::makeXaction(libecap::host::Xaction *hostx) {
  209. Adapter::Xaction *x = new Adapter::Xaction(hostx);
  210. x->self.reset(x);
  211. return x->self;
  212. }
  213.  
  214. void Adapter::Service::Resume(const XactionPointer &x) {
  215. assert(WorkingXactions_);
  216. // We are running inside the transaction thread so we cannot call the host
  217. // application now. We must wait for the host to call our Service::resume.
  218. // XXX: push_back creates a copy of x, which is not thread-safe
  219. WaitingXactions_.push_back(x);
  220. }
  221.  
  222.  
  223. /* Xaction */
  224.  
  225. Adapter::Xaction::Xaction(libecap::host::Xaction *x): hostx(x) {
  226. Debug(flXaction) << "Adapter::Xaction::Xaction hostx=" << hostx;
  227. }
  228.  
  229. Adapter::Xaction::~Xaction() {
  230. Debug(flXaction) << "Adapter::Xaction::~Xaction hostx=" << hostx;
  231. if (libecap::host::Xaction *x = hostx) {
  232. hostx = 0;
  233. x->adaptationAborted();
  234. }
  235. }
  236.  
  237. const libecap::Area Adapter::Xaction::option(const libecap::Name &) const {
  238. return libecap::Area(); // this transaction has no meta-information
  239. }
  240.  
  241. void Adapter::Xaction::visitEachOption(libecap::NamedValueVisitor &) const {
  242. // this transaction has no meta-information to pass to the visitor
  243. }
  244.  
  245. extern "C"
  246. void *Analyze(void *arg) {
  247. static_cast<Adapter::Xaction*>(arg)->analyze();
  248. return 0;
  249. }
  250.  
  251. // This method runs inside a non-host thread. Must not call host here.
  252. void Adapter::Xaction::analyze() {
  253. ++Service::WorkingXactions_;
  254. static int count = 0;
  255. const int delay = (++count % 4); // 0-3 seconds
  256. std::clog << "adapter_async[" << this << "] starts " << delay << "s analysis" << std::endl;
  257. // sleep(delay); // simulate slow analysis
  258. std::clog << "adapter_async[" << this << "] ends " << delay << "s analysis" << std::endl;
  259. Service::Resume(self);
  260. self.reset(); // XXX: may not happen if thread is canceled
  261. --Service::WorkingXactions_; // XXX: may not happen if thread is canceled
  262. }
  263.  
  264. void Adapter::Xaction::start() {
  265. Must(hostx);
  266. #if HAVE_PTHREAD
  267. Must(pthread_create(&thread_, 0, &Analyze, this) == 0);
  268. #else
  269. #warning No pthread support detected. The adapter_async sample will not work as intended.
  270. Analyze(this);
  271. #endif
  272. }
  273.  
  274. void Adapter::Xaction::resume() {
  275. assert(hostx);
  276. // make this adapter non-callable
  277. libecap::host::Xaction *x = hostx;
  278. hostx = 0;
  279. // tell the host to use the virgin message
  280. x->useVirgin();
  281. }
  282.  
  283. void Adapter::Xaction::stop() {
  284. Debug(flXaction) << "Adapter::Xaction::stop hostx=" << hostx;
  285. #if HAVE_PTHREAD
  286. if (hostx)
  287. pthread_cancel(thread_);
  288. #endif
  289.  
  290. hostx = 0;
  291. // the caller will delete
  292. // XXX: remove this transaction from the WaitingXactions container!
  293. }
  294.  
  295. void Adapter::Xaction::noBodySupport() const {
  296. Must(!"must not be called: async adapter offers no body support");
  297. // not reached
  298. }
  299.  
  300. void Adapter::Xaction::tellHostToResume() {
  301. Debug(flXaction) << "Adapter::Xaction::tellHostToResume hostx=" << hostx;
  302. // if we are stopped during async analysis, stop() tries to cancel the
  303. // thread, but it is possible that the cancellation comes after the
  304. // transaction has been added to WaitingXactions.
  305. if (hostx != 0)
  306. hostx->resume();
  307. }
  308.  
  309. // create the adapter and register with libecap to reach the host application
  310. static const bool Registered =
  311. libecap::RegisterVersionedService(new Adapter::Service);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement