1 """A library of helper functions for the CherryPy test suite."""
2
3 import datetime
4 import logging
5 log = logging.getLogger(__name__)
6 import os
7 thisdir = os.path.abspath(os.path.dirname(__file__))
8 serverpem = os.path.join(os.getcwd(), thisdir, 'test.pem')
9
10 import re
11 import sys
12 import time
13 import warnings
14
15 import cherrypy
16 from cherrypy._cpcompat import basestring, copyitems, HTTPSConnection, ntob
17 from cherrypy.lib import httputil
18 from cherrypy.lib import gctools
19 from cherrypy.lib.reprconf import unrepr
20 from cherrypy.test import webtest
21
22 import nose
23
24 _testconfig = None
25
27 global _testconfig
28 if _testconfig is None:
29 conf = {
30 'scheme': 'http',
31 'protocol': "HTTP/1.1",
32 'port': 54583,
33 'host': '127.0.0.1',
34 'validate': False,
35 'conquer': False,
36 'server': 'wsgi',
37 }
38 try:
39 import testconfig
40 _conf = testconfig.config.get('supervisor', None)
41 if _conf is not None:
42 for k, v in _conf.items():
43 if isinstance(v, basestring):
44 _conf[k] = unrepr(v)
45 conf.update(_conf)
46 except ImportError:
47 pass
48 _testconfig = conf
49 conf = _testconfig.copy()
50 conf.update(overconf)
51
52 return conf
53
55 """Base class for modeling and controlling servers during testing."""
56
58 for k, v in kwargs.items():
59 if k == 'port':
60 setattr(self, k, int(v))
61 setattr(self, k, v)
62
63
64 log_to_stderr = lambda msg, level: sys.stderr.write(msg + os.linesep)
65
67 """Base class for modeling/controlling servers which run in the same process.
68
69 When the server side runs in a different process, start/stop can dump all
70 state between each test module easily. When the server side runs in the
71 same process as the client, however, we have to do a bit more work to ensure
72 config and mounted apps are reset between tests.
73 """
74
75 using_apache = False
76 using_wsgi = False
77
92
93
94 - def start(self, modulename=None):
104
106 """Tell the server about any apps which the setup functions mounted."""
107 pass
108
119
120
122 """Server supervisor for the builtin HTTP server."""
123
124 httpserver_class = "cherrypy._cpnative_server.CPHTTPServer"
125 using_apache = False
126 using_wsgi = False
127
129 return "Builtin HTTP Server on %s:%s" % (self.host, self.port)
130
131
133 """Server supervisor for the builtin WSGI server."""
134
135 httpserver_class = "cherrypy._cpwsgi_server.CPWSGIServer"
136 using_apache = False
137 using_wsgi = True
138
140 return "Builtin WSGI Server on %s:%s" % (self.host, self.port)
141
143 """Hook a new WSGI app into the origin server."""
144 cherrypy.server.httpserver.wsgi_app = self.get_app()
145
147 """Obtain a new (decorated) WSGI app to hook into the origin server."""
148 if app is None:
149 app = cherrypy.tree
150
151 if self.conquer:
152 try:
153 import wsgiconq
154 except ImportError:
155 warnings.warn("Error importing wsgiconq. pyconquer will not run.")
156 else:
157 app = wsgiconq.WSGILogger(app, c_calls=True)
158
159 if self.validate:
160 try:
161 from wsgiref import validate
162 except ImportError:
163 warnings.warn("Error importing wsgiref. The validator will not run.")
164 else:
165
166 app = validate.validator(app)
167
168 return app
169
170
176
183
187
191
195
199
200
202
203 script_name = ""
204 scheme = "http"
205
206 available_servers = {'wsgi': LocalWSGISupervisor,
207 'wsgi_u': get_wsgi_u_supervisor,
208 'native': NativeServerSupervisor,
209 'cpmodpy': get_cpmodpy_supervisor,
210 'modpygw': get_modpygw_supervisor,
211 'modwsgi': get_modwsgi_supervisor,
212 'modfcgid': get_modfcgid_supervisor,
213 'modfastcgi': get_modfastcgi_supervisor,
214 }
215 default_server = "wsgi"
216
259 _setup_server = classmethod(_setup_server)
260
289 setup_class = classmethod(setup_class)
290
292 ''
293 if hasattr(cls, 'setup_server'):
294 cls.supervisor.stop()
295 teardown_class = classmethod(teardown_class)
296
297 do_gc_test = False
298
303
304
305 test_gc.compat_co_firstlineno = getattr(sys, 'maxint', None) or float('inf')
306
309
319
322
323 - def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
324 """Open the url. Return status, headers, body."""
325 if self.script_name:
326 url = httputil.urljoin(self.script_name, url)
327 return webtest.WebCase.getPage(self, url, headers, method, body, protocol)
328
329 - def skip(self, msg='skipped '):
330 raise nose.SkipTest(msg)
331
332 - def assertErrorPage(self, status, message=None, pattern=''):
333 """Compare the response body with a built in error page.
334
335 The function will optionally look for the regexp pattern,
336 within the exception embedded in the error page."""
337
338
339 page = cherrypy._cperror.get_error_page(status, message=message)
340
341
342
343 esc = re.escape
344 epage = esc(page)
345 epage = epage.replace(esc('<pre id="traceback"></pre>'),
346 esc('<pre id="traceback">') + '(.*)' + esc('</pre>'))
347 m = re.match(ntob(epage, self.encoding), self.body, re.DOTALL)
348 if not m:
349 self._handlewebError('Error page does not match; expected:\n' + page)
350 return
351
352
353 if pattern is None:
354
355 if m and m.group(1):
356 self._handlewebError('Error page contains traceback')
357 else:
358 if (m is None) or (
359 not re.search(ntob(re.escape(pattern), self.encoding),
360 m.group(1))):
361 msg = 'Error page does not contain %s in traceback'
362 self._handlewebError(msg % repr(pattern))
363
364 date_tolerance = 2
365
367 """Assert abs(dt1 - dt2) is within Y seconds."""
368 if seconds is None:
369 seconds = self.date_tolerance
370
371 if dt1 > dt2:
372 diff = dt1 - dt2
373 else:
374 diff = dt2 - dt1
375 if not diff < datetime.timedelta(seconds=seconds):
376 raise AssertionError('%r and %r are not within %r seconds.' %
377 (dt1, dt2, seconds))
378
379
386
387
388
389
391
392 pid_file = os.path.join(thisdir, 'test.pid')
393 config_file = os.path.join(thisdir, 'test.conf')
394 config_template = """[global]
395 server.socket_host: '%(host)s'
396 server.socket_port: %(port)s
397 checker.on: False
398 log.screen: False
399 log.error_file: r'%(error_log)s'
400 log.access_file: r'%(access_log)s'
401 %(ssl)s
402 %(extra)s
403 """
404 error_log = os.path.join(thisdir, 'test.error.log')
405 access_log = os.path.join(thisdir, 'test.access.log')
406
407 - def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None):
413
415 if self.ssl:
416 serverpem = os.path.join(thisdir, 'test.pem')
417 ssl = """
418 server.ssl_certificate: r'%s'
419 server.ssl_private_key: r'%s'
420 """ % (serverpem, serverpem)
421 else:
422 ssl = ""
423
424 conf = self.config_template % {
425 'host': self.host,
426 'port': self.port,
427 'error_log': self.error_log,
428 'access_log': self.access_log,
429 'ssl': ssl,
430 'extra': extra,
431 }
432 f = open(self.config_file, 'wb')
433 f.write(ntob(conf, 'utf-8'))
434 f.close()
435
436 - def start(self, imports=None):
437 """Start cherryd in a subprocess."""
438 cherrypy._cpserver.wait_for_free_port(self.host, self.port)
439
440 args = [sys.executable, '/usr/sbin/cherryd',
441 '-c', self.config_file, '-p', self.pid_file]
442
443 if not isinstance(imports, (list, tuple)):
444 imports = [imports]
445 for i in imports:
446 if i:
447 args.append('-i')
448 args.append(i)
449
450 if self.daemonize:
451 args.append('-d')
452
453 env = os.environ.copy()
454
455 grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..'))
456 if env.get('PYTHONPATH', ''):
457 env['PYTHONPATH'] = os.pathsep.join((grandparentdir, env['PYTHONPATH']))
458 else:
459 env['PYTHONPATH'] = grandparentdir
460 if self.wait:
461 self.exit_code = os.spawnve(os.P_WAIT, sys.executable, args, env)
462 else:
463 os.spawnve(os.P_NOWAIT, sys.executable, args, env)
464 cherrypy._cpserver.wait_for_occupied_port(self.host, self.port)
465
466
467 if self.daemonize:
468 time.sleep(2)
469 else:
470 time.sleep(1)
471
474
476 """Wait for the process to exit."""
477 try:
478 try:
479
480 os.wait()
481 except AttributeError:
482
483 try:
484 pid = self.get_pid()
485 except IOError:
486
487 pass
488 else:
489 os.waitpid(pid, 0)
490 except OSError:
491 x = sys.exc_info()[1]
492 if x.args != (10, 'No child processes'):
493 raise
494