diff options
Diffstat (limited to 'src/ext_depends/arsd')
-rw-r--r-- | src/ext_depends/arsd/cgi.d | 451 | ||||
-rw-r--r-- | src/ext_depends/arsd/core.d | 849 |
2 files changed, 1068 insertions, 232 deletions
diff --git a/src/ext_depends/arsd/cgi.d b/src/ext_depends/arsd/cgi.d index 844a411..0497eb2 100644 --- a/src/ext_depends/arsd/cgi.d +++ b/src/ext_depends/arsd/cgi.d @@ -131,7 +131,7 @@ void main() { to change versions. The possible options for `VALUE_HERE` are: $(LIST - * `embedded_httpd` for the embedded httpd version (built-in web server). This is the default for dub builds. You can run the program then connect directly to it from your browser. + * `embedded_httpd` for the embedded httpd version (built-in web server). This is the default for dub builds. You can run the program then connect directly to it from your browser. Note: prior to version 11, this would be embedded_httpd_processes on Linux and embedded_httpd_threads everywhere else. It now means embedded_httpd_hybrid everywhere supported and embedded_httpd_threads everywhere else. * `cgi` for traditional cgi binaries. These are run by an outside web server as-needed to handle requests. * `fastcgi` for FastCGI builds. FastCGI is managed from an outside helper, there's one built into Microsoft IIS, Apache httpd, and Lighttpd, and a generic program you can use with nginx called `spawn-fcgi`. If you don't already know how to use it, I suggest you use one of the other modes. * `scgi` for SCGI builds. SCGI is a simplified form of FastCGI, where you run the server as an application service which is proxied by your outside webserver. @@ -617,29 +617,12 @@ void cloexec(Socket s) { } } -version(embedded_httpd_hybrid) { - version=embedded_httpd_threads; - version(cgi_no_fork) {} else version(Posix) - version=cgi_use_fork; - version=cgi_use_fiber; -} - -version(cgi_use_fork) - enum cgi_use_fork_default = true; -else - enum cgi_use_fork_default = false; - // the servers must know about the connections to talk to them; the interfaces are vital version(with_addon_servers) version=with_addon_servers_connections; version(embedded_httpd) { - version(linux) - version=embedded_httpd_processes; - else { - version=embedded_httpd_threads; - } - + version=embedded_httpd_hybrid; /* version(with_openssl) { pragma(lib, "crypto"); @@ -648,6 +631,18 @@ version(embedded_httpd) { */ } +version(embedded_httpd_hybrid) { + version=embedded_httpd_threads; + version(cgi_no_fork) {} else version(Posix) + version=cgi_use_fork; + version=cgi_use_fiber; +} + +version(cgi_use_fork) + enum cgi_use_fork_default = true; +else + enum cgi_use_fork_default = false; + version(embedded_httpd_processes) version=embedded_httpd_processes_accept_after_fork; // I am getting much better average performance on this, so just keeping it. But the other way MIGHT help keep the variation down so i wanna keep the code to play with later @@ -1435,7 +1430,7 @@ class Cgi { string contentFilename; /// the file where we dumped the content, if contentInMemory == false. Note that if you want to keep it, you MUST move the file, since otherwise it is considered garbage when cgi is disposed. /// - ulong fileSize() { + ulong fileSize() const { if(contentInMemory) return content.length; import std.file; @@ -1985,6 +1980,12 @@ class Cgi { if(headerNumber == 1) { // request line auto parts = al.splitter(header, " "); + if(parts.front == "PRI") { + // this is an HTTP/2.0 line - "PRI * HTTP/2.0" - which indicates their payload will follow + // we're going to immediately refuse this, im not interested in implementing http2 (it is unlikely + // to bring me benefit) + throw new HttpVersionNotSupportedException(); + } requestMethod = to!RequestMethod(parts.front); parts.popFront(); requestUri = parts.front; @@ -3639,8 +3640,8 @@ string plainHttpError(bool isCgi, string type, Throwable t) { auto message = messageFromException(t); message = simpleHtmlEncode(message); - return format("%s %s\r\nContent-Length: %s\r\n\r\n%s", - isCgi ? "Status:" : "HTTP/1.0", + return format("%s %s\r\nContent-Length: %s\r\nConnection: close\r\n\r\n%s", + isCgi ? "Status:" : "HTTP/1.1", type, message.length, message); } @@ -3753,17 +3754,101 @@ bool trySimulatedRequest(alias fun, CustomCgi = Cgi)(string[] args) if(is(Custom } /++ - A server control and configuration struct, as a potential alternative to calling [GenericMain] or [cgiMainImpl]. See the source of [cgiMainImpl] to an example of how you can use it. + A server control and configuration struct, as a potential alternative to calling [GenericMain] or [cgiMainImpl]. See the source of [cgiMainImpl] for a complete, up-to-date, example of how it is used internally. + + As of version 11 (released August 2023), you can also make things like this: + + --- + // listens on both a unix domain socket called `foo` and on the loopback interfaces port 8080 + RequestServer server = RequestServer(["http://unix:foo", "http://localhost:8080"]); + + // can also: + // RequestServer server = RequestServer(0); // listen on an OS-provided port on all interfaces + + // NOT IMPLEMENTED YET + // server.initialize(); // explicit initialization will populate any "any port" things and throw if a bind failed + + foreach(listenSpec; server.listenSpecs) { + // you can check what it actually bound to here and see your assigned ports + } + + // NOT IMPLEMENTED YET + // server.start!handler(); // starts and runs in the arsd.core event loop + + server.serve!handler(); // blocks the thread until the server exits + --- History: Added Sept 26, 2020 (release version 8.5). + + The `listenSpec` member was added July 31, 2023. +/ struct RequestServer { - /// + /++ + Sets the host and port the server will listen on. This is semi-deprecated; the new (as of July 31, 2023) [listenSpec] parameter obsoletes these. You cannot use both together; the listeningHost and listeningPort are ONLY used if listenSpec is null. + +/ string listeningHost = defaultListeningHost(); - /// + /// ditto ushort listeningPort = defaultListeningPort(); + static struct ListenSpec { + enum Protocol { + http, + https, + scgi + } + Protocol protocol; + + enum AddressType { + ip, + unix, + abstract_ + } + AddressType addressType; + + string address; + ushort port; + } + + /++ + The array of addresses you want to listen on. The format looks like a url but has a few differences. + + This ONLY works on embedded_httpd_threads, embedded_httpd_hybrid, and scgi builds at this time. + + `http://localhost:8080` + + `http://unix:filename/here` + + `scgi://abstract:/name/here` + + `http://[::1]:4444` + + Note that IPv6 addresses must be enclosed in brackets. If you want to listen on an interface called `unix` or `abstract`, contact me, that is not supported but I could add some kind of escape mechanism. + + If you leave off the protocol, it assumes the default based on compile flags. If you only give a number, it is assumed to be a port on any tcp interface. + + `localhost:8080` serves the default protocol. + + `8080` or `:8080` assumes default protocol on localhost. + + The protocols can be `http:`, `https:`, and `scgi:`. Original `cgi` is not supported with this, since it is transactional with a single process. + + Valid hosts are an IPv4 address (with a mandatory port), an IPv6 address (with a mandatory port), just a port alone, `unix:/path/to/unix/socket` (which may be a relative path without a leading slash), or `abstract:/path/to/linux/abstract/namespace`. + + `http://unix:foo` will serve http over the unix domain socket named `foo` in the current working directory. + + $(PITFALL + If you set this to anything non-null (including a non-null, zero-length array) any `listenSpec` entries, [listeningHost] and [listeningPort] are ignored. + ) + + Bugs: + The implementation currently ignores the protocol spec in favor of the default compiled in option. + + History: + Added July 31, 2023 (dub v11.0) + +/ + string[] listenSpec; + /++ Uses a fork() call, if available, to provide additional crash resiliency and possibly improved performance. On the other hand, if you fork, you must not assume any memory is shared between requests (you shouldn't be anyway though! But @@ -3786,13 +3871,23 @@ struct RequestServer { +/ int numberOfThreads = 0; - /// + /++ + Creates a server configured to listen to multiple URLs. + + History: + Added July 31, 2023 (dub v11.0) + +/ + this(string[] listenTo) { + this.listenSpec = listenTo; + } + + /// Creates a server object configured to listen on a single host and port. this(string defaultHost, ushort defaultPort) { this.listeningHost = defaultHost; this.listeningPort = defaultPort; } - /// + /// ditto this(ushort defaultPort) { listeningPort = defaultPort; } @@ -3800,29 +3895,45 @@ struct RequestServer { /++ Reads the command line arguments into the values here. - Possible arguments are `--listening-host`, `--listening-port` (or `--port`), `--uid`, and `--gid`. + Possible arguments are `--listen` (can appear multiple times), `--listening-host`, `--listening-port` (or `--port`), `--uid`, and `--gid`. + + Please note you cannot combine `--listen` with `--listening-host` or `--listening-port` / `--port`. Use one or the other style. +/ void configureFromCommandLine(string[] args) { + bool portOrHostFound = false; + bool foundPort = false; bool foundHost = false; bool foundUid = false; bool foundGid = false; + bool foundListen = false; foreach(arg; args) { if(foundPort) { listeningPort = to!ushort(arg); + portOrHostFound = true; foundPort = false; + continue; } if(foundHost) { listeningHost = arg; + portOrHostFound = true; foundHost = false; + continue; } if(foundUid) { privilegesDropToUid = to!uid_t(arg); foundUid = false; + continue; } if(foundGid) { privilegesDropToGid = to!gid_t(arg); foundGid = false; + continue; + } + if(foundListen) { + this.listenSpec ~= arg; + foundListen = false; + continue; } if(arg == "--listening-host" || arg == "-h" || arg == "/listening-host") foundHost = true; @@ -3832,6 +3943,12 @@ struct RequestServer { foundUid = true; else if(arg == "--gid") foundGid = true; + else if(arg == "--listen") + foundListen = true; + } + + if(portOrHostFound && listenSpec.length) { + throw new Exception("You passed both a --listening-host or --listening-port and a --listen argument. You should fix your script to ONLY use --listen arguments."); } } @@ -3950,7 +4067,9 @@ struct RequestServer { __traits(child, _this, fun)(cgi); else static assert(0, "Not implemented in your compiler version!"); } - auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads); + auto manager = this.listenSpec is null ? + new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads) : + new ListeningConnectionManager(this.listenSpec, &doThreadHttpConnection!(CustomCgi, funToUse), null, useFork, numberOfThreads); manager.listen(); } @@ -3959,7 +4078,9 @@ struct RequestServer { +/ void serveScgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { globalStopFlag = false; - auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads); + auto manager = this.listenSpec is null ? + new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads) : + new ListeningConnectionManager(this.listenSpec, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength), null, useFork, numberOfThreads); manager.listen(); } @@ -4167,6 +4288,10 @@ void serveEmbeddedHttpdProcesses(alias fun, CustomCgi = Cgi)(RequestServer param if(processPoolSize <= 1) closeConnection = true; //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); + } catch(HttpVersionNotSupportedException he) { + sendAll(ir.source, plainHttpError(false, "505 HTTP Version Not Supported", he)); + closeConnection = true; + break; } catch(Throwable t) { // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P // anyway let's kill the connection @@ -4914,6 +5039,10 @@ void doThreadHttpConnectionGuts(CustomCgi, alias fun, bool alwaysCloseConnection // broken pipe or something, just abort the connection closeConnection = true; break; + } catch(HttpVersionNotSupportedException ve) { + sendAll(connection, plainHttpError(false, "505 HTTP Version Not Supported", ve)); + closeConnection = true; + break; } catch(Throwable t) { // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P // anyway let's kill the connection @@ -5530,10 +5659,17 @@ class ListeningConnectionManager { import core.sys.posix.sys.select; fd_set read_fds; FD_ZERO(&read_fds); - FD_SET(listener.handle, &read_fds); - if(cancelfd != -1) + int max = 0; + foreach(listener; listeners) { + FD_SET(listener.handle, &read_fds); + if(listener.handle > max) + max = listener.handle; + } + if(cancelfd != -1) { FD_SET(cancelfd, &read_fds); - auto max = listener.handle > cancelfd ? listener.handle : cancelfd; + if(cancelfd > max) + max = cancelfd; + } auto ret = select(max + 1, &read_fds, null, null, null); if(ret == -1) { import core.stdc.errno; @@ -5547,24 +5683,27 @@ class ListeningConnectionManager { return null; } - if(FD_ISSET(listener.handle, &read_fds)) - return listener.accept(); + foreach(listener; listeners) { + if(FD_ISSET(listener.handle, &read_fds)) + return listener.accept(); + } return null; } else { - Socket socket = listener; - auto check = new SocketSet(); keep_looping: check.reset(); - check.add(socket); + foreach(listener; listeners) + check.add(listener); // just to check the stop flag on a kinda busy loop. i hate this FIXME auto got = Socket.select(check, null, null, 3.seconds); if(got > 0) - return listener.accept(); + foreach(listener; listeners) + if(check.isSet(listener)) + return listener.accept(); if(globalStopFlag) return null; else @@ -5575,7 +5714,7 @@ class ListeningConnectionManager { int defaultNumberOfThreads() { import std.parallelism; version(cgi_use_fiber) { - return totalCPUs * 1 + 1; + return totalCPUs * 2 + 1; // still chance some will be pointlessly blocked anyway } else { // I times 4 here because there's a good chance some will be blocked on i/o. return totalCPUs * 4; @@ -5625,7 +5764,9 @@ class ListeningConnectionManager { version(cgi_use_fiber) { version(Windows) { - listener.accept(); + // please note these are overlapped sockets! so the accept just kicks things off + foreach(listener; listeners) + listener.accept(); } WorkerThread[] threads = new WorkerThread[](numberOfThreads); @@ -5692,12 +5833,12 @@ class ListeningConnectionManager { void existing_connection_new_data() { // wait until a slot opens up - //int waited = 0; + // int waited = 0; while(queueLength >= queue.length) { Thread.sleep(1.msecs); - //waited ++; + // waited ++; } - //if(waited) {import std.stdio; writeln(waited);} + // if(waited) {import std.stdio; writeln(waited);} synchronized(this) { queue[nextIndexBack] = sn; nextIndexBack++; @@ -5739,27 +5880,105 @@ class ListeningConnectionManager { private void dg_handler(Socket s) { fhandler(s); } + + + this(string[] listenSpec, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { + fhandler = handler; + this(listenSpec, &dg_handler, dropPrivs, useFork, numberOfThreads); + } + this(string[] listenSpec, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { + string[] host; + ushort[] port; + + foreach(spec; listenSpec) { + /+ + The format: + + protocol:// + address_spec + + Protocol is optional. Must be http, https, scgi, or fastcgi. + + address_spec is either: + ipv4 address : port + [ipv6 address] : port + unix:filename + abstract:name + port <which is tcp but on any interface> + +/ + + string protocol; + string address_spec; + + auto protocolIdx = spec.indexOf("://"); + if(protocolIdx != -1) { + protocol = spec[0 .. protocolIdx]; + address_spec = spec[protocolIdx + "://".length .. $]; + } else { + address_spec = spec; + } + + if(address_spec.startsWith("unix:") || address_spec.startsWith("abstract:")) { + host ~= address_spec; + port ~= 0; + } else { + auto idx = address_spec.lastIndexOf(":"); + if(idx == -1) { + host ~= null; + } else { + auto as = address_spec[0 .. idx]; + if(as.length >= 3 && as[0] == '[' && as[$-1] == ']') + as = as[1 .. $-1]; + host ~= as; + } + port ~= address_spec[idx + 1 .. $].to!ushort; + } + + } + + this(host, port, handler, dropPrivs, useFork, numberOfThreads); + } + this(string host, ushort port, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { + this([host], [port], handler, dropPrivs, useFork, numberOfThreads); + } + this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { + this([host], [port], handler, dropPrivs, useFork, numberOfThreads); + } + + this(string[] host, ushort[] port, void function(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { fhandler = handler; this(host, port, &dg_handler, dropPrivs, useFork, numberOfThreads); } - this(string host, ushort port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { + this(string[] host, ushort[] port, void delegate(Socket) handler, void delegate() dropPrivs = null, bool useFork = cgi_use_fork_default, int numberOfThreads = 0) { + assert(host.length == port.length); + this.handler = handler; this.useFork = useFork; this.numberOfThreads = numberOfThreads ? numberOfThreads : defaultNumberOfThreads(); - listener = startListening(host, port, tcp, cleanup, 128, dropPrivs); + listeners.reserve(host.length); + + foreach(i; 0 .. host.length) + if(host[i] == "localhost") { + listeners ~= startListening("127.0.0.1", port[i], tcp, cleanup, 128, dropPrivs); + listeners ~= startListening("::1", port[i], tcp, cleanup, 128, dropPrivs); + } else { + listeners ~= startListening(host[i], port[i], tcp, cleanup, 128, dropPrivs); + } version(cgi_use_fiber) - if(useFork) - listener.blocking = false; + if(useFork) { + foreach(listener; listeners) + listener.blocking = false; + } // this is the UI control thread and thus gets more priority Thread.getThis.priority = Thread.PRIORITY_MAX; } - Socket listener; + Socket[] listeners; void delegate(Socket) handler; immutable bool useFork; @@ -5795,17 +6014,20 @@ Socket startListening(string host, ushort port, ref bool tcp, ref void delegate( throw new Exception("abstract unix sockets not supported on this system"); } } else { + auto address = host.length ? parseAddress(host, port) : new InternetAddress(port); version(cgi_use_fiber) { version(Windows) listener = new PseudoblockingOverlappedSocket(AddressFamily.INET, SocketType.STREAM); else - listener = new TcpSocket(); + listener = new Socket(address.addressFamily, SocketType.STREAM); } else { - listener = new TcpSocket(); + listener = new Socket(address.addressFamily, SocketType.STREAM); } cloexec(listener); listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); - listener.bind(host.length ? parseAddress(host, port) : new InternetAddress(port)); + if(address.addressFamily == AddressFamily.INET6) + listener.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, true); + listener.bind(address); cleanup = delegate() { listener.close(); }; @@ -5851,6 +6073,12 @@ class ConnectionException : Exception { } } +class HttpVersionNotSupportedException : Exception { + this(string file = __FILE__, size_t line = __LINE__) { + super("HTTP Version Not Supported", file, line); + } +} + alias void delegate(Socket) CMT; import core.thread; @@ -6027,11 +6255,13 @@ class WorkerThread : Thread { epoll_ctl(epfd, EPOLL_CTL_ADD, cancelfd, &ev); } - epoll_event ev; - ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough. - ev.data.fd = lcm.listener.handle; - if(epoll_ctl(epfd, EPOLL_CTL_ADD, lcm.listener.handle, &ev) == -1) - throw new Exception("epoll_ctl " ~ to!string(errno)); + foreach(listener; lcm.listeners) { + epoll_event ev; + ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough. + ev.data.fd = listener.handle; + if(epoll_ctl(epfd, EPOLL_CTL_ADD, listener.handle, &ev) == -1) + throw new Exception("epoll_ctl " ~ to!string(errno)); + } @@ -6046,37 +6276,44 @@ class WorkerThread : Thread { throw new Exception("epoll_wait " ~ to!string(errno)); } - foreach(idx; 0 .. nfds) { + outer: foreach(idx; 0 .. nfds) { auto flags = events[idx].events; if(cast(size_t) events[idx].data.ptr == cast(size_t) cancelfd) { globalStopFlag = true; //import std.stdio; writeln("exit heard"); break; - } else if(cast(size_t) events[idx].data.ptr == cast(size_t) lcm.listener.handle) { - //import std.stdio; writeln(myThreadNumber, " woken up ", flags); - // this try/catch is because it is set to non-blocking mode - // and Phobos' stupid api throws an exception instead of returning - // if it would block. Why would it block? because a forked process - // might have beat us to it, but the wakeup event thundered our herds. - try - sn = lcm.listener.accept(); // don't need to do the acceptCancelable here since the epoll checks it better - catch(SocketAcceptException e) { continue; } - - cloexec(sn); - if(lcm.tcp) { - // disable Nagle's algorithm to avoid a 40ms delay when we send/recv - // on the socket because we do some buffering internally. I think this helps, - // certainly does for small requests, and I think it does for larger ones too - sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); - - sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10)); + } else { + foreach(listener; lcm.listeners) { + if(cast(size_t) events[idx].data.ptr == cast(size_t) listener.handle) { + //import std.stdio; writeln(myThreadNumber, " woken up ", flags); + // this try/catch is because it is set to non-blocking mode + // and Phobos' stupid api throws an exception instead of returning + // if it would block. Why would it block? because a forked process + // might have beat us to it, but the wakeup event thundered our herds. + try + sn = listener.accept(); // don't need to do the acceptCancelable here since the epoll checks it better + catch(SocketAcceptException e) { continue outer; } + + cloexec(sn); + if(lcm.tcp) { + // disable Nagle's algorithm to avoid a 40ms delay when we send/recv + // on the socket because we do some buffering internally. I think this helps, + // certainly does for small requests, and I think it does for larger ones too + sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); + + sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10)); + } + + dg(sn); + continue outer; + } else { + // writeln(events[idx].data.ptr); + } } - dg(sn); - } else { if(cast(size_t) events[idx].data.ptr < 1024) { - throw new Exception("this doesn't look like a fiber pointer..."); + throw arsd.core.ArsdException!"this doesn't look like a fiber pointer... "(cast(size_t) events[idx].data.ptr); } auto fiber = cast(CgiFiber) events[idx].data.ptr; fiber.proceed(); @@ -6307,7 +6544,7 @@ ByChunkRange byChunk(BufferedInputRange ir, size_t atMost) { version(cgi_with_websocket) { // http://tools.ietf.org/html/rfc6455 - /** + /++ WEBSOCKET SUPPORT: Full example: @@ -6335,12 +6572,14 @@ version(cgi_with_websocket) { } websocket.close(); - } else assert(0, "i want a web socket!"); + } else { + cgi.write("You are loading the websocket endpoint in a browser instead of a websocket client. Use a websocket client on this url instead.\n", true); + } } mixin GenericMain!websocketEcho; --- - */ + +/ class WebSocket { Cgi cgi; @@ -6703,6 +6942,11 @@ version(cgi_with_websocket) { } + /++ + Returns true if the request headers are asking for a websocket upgrade. + + If this returns true, and you want to accept it, call [acceptWebsocket]. + +/ bool websocketRequested(Cgi cgi) { return "sec-websocket-key" in cgi.requestHeaders @@ -6715,6 +6959,9 @@ version(cgi_with_websocket) { ; } + /++ + If [websocketRequested], you can call this to accept it and upgrade the connection. It returns the new [WebSocket] object you use for future communication on this connection; the `cgi` object should no longer be used. + +/ WebSocket acceptWebsocket(Cgi cgi) { assert(!cgi.closed); assert(!cgi.outputtedResponseData); @@ -8592,11 +8839,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS void newConnection() { // on edge triggering, it is important that we get it all while(true) { - version(Android) { - auto size = cast(int) addr.sizeof; - } else { - auto size = cast(uint) addr.sizeof; - } + auto size = cast(socklen_t) addr.sizeof; auto ns = accept(sock, cast(sockaddr*) &addr, &size); if(ns == -1) { if(errno == EAGAIN || errno == EWOULDBLOCK) { @@ -9203,6 +9446,8 @@ auto callFromCgi(alias method, T)(T dg, Cgi cgi) { case idents[idx]: static if(is(param == Cgi.UploadedFile)) { params[idx] = cgi.files[name]; + } else static if(is(param : const Cgi.UploadedFile[])) { + (cast() params[idx]) = cgi.filesArray[name]; } else { setVariable(name, paramName, ¶ms[idx], value); } @@ -9544,6 +9789,7 @@ css"; <script>document.documentElement.classList.remove("no-script");</script> <style>.no-script requires-script { display: none; }</style> <title>D Application</title> + <meta name="viewport" content="initial-scale=1, width=device-width" /> <link rel="stylesheet" href="style.css" /> </head> <body> @@ -9561,7 +9807,7 @@ html", true, true); return document.requireSelector("main"); } - /// Renders a response as an HTTP error + /// Renders a response as an HTTP error with associated html body void renderBasicError(Cgi cgi, int httpErrorCode) { cgi.setResponseStatus(getHttpCodeText(httpErrorCode)); auto c = htmlContainer(); @@ -9670,10 +9916,24 @@ html", true, true); case "html": presentExceptionAsHtml(cgi, t, meta); break; + case "json": + presentExceptionAsJsonImpl(cgi, t); + break; default: } } + private void presentExceptionAsJsonImpl()(Cgi cgi, Throwable t) { + cgi.setResponseStatus("500 Internal Server Error"); + cgi.setResponseContentType("application/json"); + import arsd.jsvar; + var v = var.emptyObject; + v.type = typeid(t).toString; + v.msg = t.msg; + v.fullString = t.toString(); + cgi.write(v.toJson(), true); + } + /++ If you override this, you will need to cast the exception type `t` dynamically, @@ -9796,7 +10056,20 @@ html", true, true); auto div = Element.make("div"); div.addClass("form-field"); - static if(is(T == Cgi.UploadedFile)) { + static if(is(T : const Cgi.UploadedFile)) { + Element lbl; + if(displayName !is null) { + lbl = div.addChild("label"); + lbl.addChild("span", displayName, "label-text"); + lbl.appendText(" "); + } else { + lbl = div; + } + auto i = lbl.addChild("input", name); + i.attrs.name = name; + i.attrs.type = "file"; + i.attrs.multiple = "multiple"; + } else static if(is(T == Cgi.UploadedFile)) { Element lbl; if(displayName !is null) { lbl = div.addChild("label"); diff --git a/src/ext_depends/arsd/core.d b/src/ext_depends/arsd/core.d index 23a699d..49af24d 100644 --- a/src/ext_depends/arsd/core.d +++ b/src/ext_depends/arsd/core.d @@ -1,11 +1,25 @@ /++ - Please note: the api and behavior of this module is not externally stable at this time. See the documentation on specific functions. + $(PITFALL + Please note: the api and behavior of this module is not externally stable at this time. See the documentation on specific functions for details. + ) Shared core functionality including exception helpers, library loader, event loop, and possibly more. Maybe command line processor and uda helper and some basic shared annotation types. - I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too. + I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too. I might move the process helpers out to their own module - even things in here are not considered stable to library users at this time! + + If you use this directly outside the arsd library despite its current instability caveats, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules! + + ## Contributor notes + + arsd.core should be focused on things that enable interoperability primarily and secondarily increased code quality between other, otherwise independent arsd modules. As a foundational library, it is not permitted to import anything outside the druntime `core` namespace, except in templates and examples not normally compiled in. This keeps it independent and avoids transitive dependency spillover to end users while also keeping compile speeds fast. To help keep builds snappy, also avoid significant use of ctfe inside this module. + + On my linux computer, `dmd -unittest -main core.d` takes about a quarter second to run. We do not want this to grow. - If you use this directly outside the arsd library, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules! + `@safe` compatibility is ok when it isn't too big of a hassle. `@nogc` is a non-goal. I might accept it on some of the trivial functions but if it means changing the logic in any way to support, you will need a compelling argument to justify it. The arsd libs are supposed to be reliable and easy to use. That said, of course, don't be unnecessarily wasteful - if you can easily provide a reliable and easy to use way to let advanced users do their thing without hurting the other cases, let's discuss it. + + If functionality is not needed by multiple existing arsd modules, consider adding a new module instead of adding it to the core. + + Unittests should generally be hidden behind a special version guard so they don't interfere with end user tests. History: Added March 2023 (dub v11.0). Several functions were migrated in here at that time, noted individually. Members without a note were added with the module. @@ -47,7 +61,9 @@ version(Windows) { } else version(linux) { version=Arsd_core_epoll; - version=Arsd_core_has_cloexec; + static if(__VERSION__ >= 2098) { + version=Arsd_core_has_cloexec; + } } else version(FreeBSD) { version=Arsd_core_kqueue; @@ -884,11 +900,17 @@ unittest { +/ nothrow @safe @nogc pure inout(char)[] stripInternal(return inout(char)[] s) { + bool isAllWhitespace = true; foreach(i, char c; s) if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { s = s[i .. $]; + isAllWhitespace = false; break; } + + if(isAllWhitespace) + return s[$..$]; + for(int a = cast(int)(s.length - 1); a > 0; a--) { char c = s[a]; if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { @@ -900,15 +922,19 @@ inout(char)[] stripInternal(return inout(char)[] s) { return s; } +/// ditto nothrow @safe @nogc pure inout(char)[] stripRightInternal(return inout(char)[] s) { - for(int a = cast(int)(s.length - 1); a > 0; a--) { - char c = s[a]; + bool isAllWhitespace = true; + foreach_reverse(a, c; s) { if(c != ' ' && c != '\t' && c != '\n' && c != '\r') { s = s[0 .. a + 1]; + isAllWhitespace = false; break; } } + if(isAllWhitespace) + s = s[0..0]; return s; @@ -1372,6 +1398,10 @@ class InvalidArgumentsException : ArsdExceptionBase { ], functionName, file, line, next); } + this(string argumentName, string argumentDescription, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { + this(argumentName, argumentDescription, LimitedVariant.init, functionName, file, line, next); + } + override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const { // FIXME: print the details better foreach(arg; invalidArguments) @@ -1841,7 +1871,7 @@ enum ThreadToRunIn { Ad-Hoc thread - something running an event loop that isn't another thing Controller thread - running an explicit event loop instance set as not a task runner or blocking worker - UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code + UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code) Windows HANDLES will always be listened on the thread itself that is requesting, UNLESS it is a worker/helper thread, in which case it goes to a coordinator thread. since it prolly can't rely on the parent per se this will have to be one created by arsd core init, UNLESS the parent is inside an explicit EventLoop structure. @@ -2543,6 +2573,8 @@ class AsyncFile : AbstractFile { Reads or writes a file in one call. It might internally yield, but is generally blocking if it returns values. The callback ones depend on the implementation. Tip: prefer the callback ones. If settings where async is possible, it will do async, and if not, it will sync. + + NOT IMPLEMENTED +/ void writeFile(string filename, const(void)[] contents) { @@ -2733,98 +2765,328 @@ class NamedPipeServer { // can be on a specific thread or on any thread } +private version(Windows) extern(Windows) { + const(char)* inet_ntop(int, const void*, char*, socklen_t); +} + /++ - Looking these up might be done asynchronously. The objects both represent an async request and its result, which is the actual address the operating system uses. + Some functions that return arrays allow you to provide your own buffer. These are indicated in the type system as `UserProvidedBuffer!Type`, and you get to decide what you want to happen if the buffer is too small via the [OnOutOfSpace] parameter. + + These are usually optional, since an empty user provided buffer with the default policy of reallocate will also work fine for whatever needs to be returned, thanks to the garbage collector taking care of it for you. - When you create an address, it holds a request. You can call `start` and `waitForCompletion` like with other async requests. The request may be run in a helper thread. + The API inside `UserProvidedBuffer` is all private to the arsd library implementation; your job is just to provide the buffer to it with [provideBuffer] or a constructor call and decide on your on-out-of-space policy. - Unlike most the async objects though, its methods will implicitly call `waitForCompletion`. + $(TIP + To properly size a buffer, I suggest looking at what covers about 80% of cases. Trying to cover everything often leads to wasted buffer space, and if you use a reallocate policy it can cover the rest. You might be surprised how far just two elements can go! + ) - Note that The current implementation just blocks. + History: + Added August 4, 2023 (dub v11.0) +/ -class SocketAddress /* : AsyncOperationRequest, AsyncOperationResponse */ { - // maybe accept url? - // unix:///home/me/thing - // ip://0.0.0.0:4555 - // ipv6://[00:00:00:00:00:00] - - // address info - abstract int domain(); - // FIXME: find all cases of this and make sure it is completed first - abstract sockaddr* rawAddr(); - abstract socklen_t rawAddrLength(); +struct UserProvidedBuffer(T) { + private T[] buffer; + private int actualLength; + private OnOutOfSpace policy; - /+ - // request interface - abstract void start(); - abstract SocketAddress waitForCompletion(); - abstract bool isComplete(); + /++ - // response interface - abstract bool wasSuccessful(); +/ + public this(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) { + this.buffer = buffer; + this.policy = policy; + } + + package(arsd) bool append(T item) { + if(actualLength < buffer.length) { + buffer[actualLength++] = item; + return true; + } else final switch(policy) { + case OnOutOfSpace.discard: + return false; + case OnOutOfSpace.exception: + throw ArsdException!"Buffer out of space"(buffer.length, actualLength); + case OnOutOfSpace.reallocate: + buffer ~= item; + actualLength++; + return true; + } + } + + package(arsd) T[] slice() return { + return buffer[0 .. actualLength]; + } } -/+ -class BluetoothAddress : SocketAddress { - // FIXME it is AF_BLUETOOTH - // see: https://people.csail.mit.edu/albert/bluez-intro/x79.html - // see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets +/// ditto +UserProvidedBuffer!T provideBuffer(T)(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) { + return UserProvidedBuffer!T(buffer, policy); +} + +/++ + Possible policies for [UserProvidedBuffer]s that run out of space. ++/ +enum OnOutOfSpace { + reallocate, /// reallocate the buffer with the GC to make room + discard, /// discard all contents that do not fit in your provided buffer + exception, /// throw an exception if there is data that would not fit in your provided buffer } + + +/++ + For functions that give you an unknown address, you can use this to hold it. + + Can get: + ip4 + ip6 + unix + abstract_ + + name lookup for connect (stream or dgram) + request canonical name? + + interface lookup for bind (stream or dgram) +/ +struct SocketAddress { + import core.sys.posix.netdb; -version(Posix) // FIXME: find the sockaddr_un definition for Windows too and add it in -final class UnixAddress : SocketAddress { - sockaddr_un address; + /++ + Provides the set of addresses to listen on all supported protocols on the machine for the given interfaces. `localhost` only listens on the loopback interface, whereas `allInterfaces` will listen on loopback as well as the others on the system (meaning it may be publicly exposed to the internet). - override int domain() { - return AF_UNIX; + If you provide a buffer, I recommend using one of length two, so `SocketAddress[2]`, since this usually provides one address for ipv4 and one for ipv6. + +/ + static SocketAddress[] localhost(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) { + buffer.append(ip6("::1", port)); + buffer.append(ip4("127.0.0.1", port)); + return buffer.slice; } - override sockaddr* rawAddr() { - return cast(sockaddr*) &address; + /// ditto + static SocketAddress[] allInterfaces(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) { + char[16] str; + return allInterfaces(intToString(port, str[]), buffer); } - override socklen_t rawAddrLength() { - return address.sizeof; + + /// ditto + static SocketAddress[] allInterfaces(scope const char[] serviceOrPort, return UserProvidedBuffer!SocketAddress buffer = null) { + addrinfo hints; + hints.ai_flags = AI_PASSIVE; + hints.ai_socktype = SOCK_STREAM; // just to filter it down a little tbh + return get(null, serviceOrPort, &hints, buffer); } -} -final class IpAddress : SocketAddress { - sockaddr_in address; + /++ + Returns a single address object for the given protocol and parameters. - override int domain() { - return AF_INET; + You probably should generally prefer [get], [localhost], or [allInterfaces] to have more flexible code. + +/ + static SocketAddress ip4(scope const char[] address, ushort port, bool forListening = false) { + return getSingleAddress(AF_INET, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port); } - override sockaddr* rawAddr() { - return cast(sockaddr*) &address; + /// ditto + static SocketAddress ip4(ushort port) { + return ip4(null, port, true); } - override socklen_t rawAddrLength() { - return address.sizeof; + + /// ditto + static SocketAddress ip6(scope const char[] address, ushort port, bool forListening = false) { + return getSingleAddress(AF_INET6, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port); + } + + /// ditto + static SocketAddress ip6(ushort port) { + return ip6(null, port, true); + } + + /// ditto + static SocketAddress unix(scope const char[] path) { + // FIXME + SocketAddress addr; + return addr; + } + + /// ditto + static SocketAddress abstract_(scope const char[] path) { + char[190] buffer = void; + buffer[0] = 0; + buffer[1 .. path.length] = path[]; + return unix(buffer[0 .. 1 + path.length]); + } + + private static SocketAddress getSingleAddress(int family, int flags, scope const char[] address, ushort port) { + addrinfo hints; + hints.ai_family = family; + hints.ai_flags = flags; + + char[16] portBuffer; + char[] portString = intToString(port, portBuffer[]); + + SocketAddress[1] addr; + auto res = get(address, portString, &hints, provideBuffer(addr[])); + if(res.length == 0) + throw ArsdException!"bad address"(address.idup, port); + return res[0]; } -} -final class Ipv6Address : SocketAddress { - sockaddr_in6 address; + /++ + Calls `getaddrinfo` and returns the array of results. It will populate the data into the buffer you provide, if you provide one, otherwise it will allocate its own. + +/ + static SocketAddress[] get(scope const char[] nodeName, scope const char[] serviceOrPort, addrinfo* hints = null, return UserProvidedBuffer!SocketAddress buffer = null, scope bool delegate(scope addrinfo* ai) filter = null) @trusted { + addrinfo* res; + CharzBuffer node = nodeName; + CharzBuffer service = serviceOrPort; + auto ret = getaddrinfo(nodeName is null ? null : node.ptr, serviceOrPort is null ? null : service.ptr, hints, &res); + if(ret == 0) { + auto current = res; + while(current) { + if(filter is null || filter(current)) { + SocketAddress addr; + addr.addrlen = cast(socklen_t) current.ai_addrlen; + switch(current.ai_family) { + case AF_INET: + addr.in4 = * cast(sockaddr_in*) current.ai_addr; + break; + case AF_INET6: + addr.in6 = * cast(sockaddr_in6*) current.ai_addr; + break; + case AF_UNIX: + addr.unix_address = * cast(sockaddr_un*) current.ai_addr; + break; + default: + // skip + } + + if(!buffer.append(addr)) + break; + } + + current = current.ai_next; + } + + freeaddrinfo(res); + } else { + version(Windows) { + throw new WindowsApiException("getaddrinfo", ret); + } else { + const char* error = gai_strerror(ret); + } + } + + return buffer.slice; + } + + /++ + Returns a string representation of the address that identifies it in a custom format. + + $(LIST + * Unix domain socket addresses are their path prefixed with "unix:", unless they are in the abstract namespace, in which case it is prefixed with "abstract:" and the zero is trimmed out. For example, "unix:/tmp/pipe". - override int domain() { - return AF_INET6; + * IPv4 addresses are written in dotted decimal followed by a colon and the port number. For example, "127.0.0.1:8080". + + * IPv6 addresses are written in colon separated hex format, but enclosed in brackets, then followed by the colon and port number. For example, "[::1]:8080". + ) + +/ + string toString() const @trusted { + char[200] buffer; + switch(address.sa_family) { + case AF_INET: + auto writable = stringz(inet_ntop(address.sa_family, &in4.sin_addr, buffer.ptr, buffer.length)); + auto it = writable.borrow; + buffer[it.length] = ':'; + auto numbers = intToString(port, buffer[it.length + 1 .. $]); + return buffer[0 .. it.length + 1 + numbers.length].idup; + case AF_INET6: + buffer[0] = '['; + auto writable = stringz(inet_ntop(address.sa_family, &in6.sin6_addr, buffer.ptr + 1, buffer.length - 1)); + auto it = writable.borrow; + buffer[it.length + 1] = ']'; + buffer[it.length + 2] = ':'; + auto numbers = intToString(port, buffer[it.length + 3 .. $]); + return buffer[0 .. it.length + 3 + numbers.length].idup; + case AF_UNIX: + // FIXME: it might be abstract in which case stringz is wrong!!!!! + auto writable = stringz(cast(char*) unix_address.sun_path.ptr).borrow; + if(writable.length == 0) + return "unix:"; + string prefix = writable[0] == 0 ? "abstract:" : "unix:"; + buffer[0 .. prefix.length] = prefix[]; + buffer[prefix.length .. prefix.length + writable.length] = writable[writable[0] == 0 ? 1 : 0 .. $]; + return buffer.idup; + case AF_UNSPEC: + return "<unspecified address>"; + default: + return "<unsupported address>"; // FIXME + } + } + + ushort port() const @trusted { + switch(address.sa_family) { + case AF_INET: + return ntohs(in4.sin_port); + case AF_INET6: + return ntohs(in6.sin6_port); + default: + return 0; + } + } + + /+ + @safe unittest { + SocketAddress[4] buffer; + foreach(addr; SocketAddress.get("arsdnet.net", "http", null, provideBuffer(buffer[]))) + writeln(addr.toString()); } + +/ - override sockaddr* rawAddr() { - return cast(sockaddr*) &address; + /+ + unittest { + // writeln(SocketAddress.ip4(null, 4444, true)); + // writeln(SocketAddress.ip4("400.3.2.1", 4444)); + // writeln(SocketAddress.ip4("bar", 4444)); + foreach(addr; localhost(4444)) + writeln(addr.toString()); } - override socklen_t rawAddrLength() { - return address.sizeof; + +/ + + socklen_t addrlen = typeof(this).sizeof - socklen_t.sizeof; // the size of the union below + + union { + sockaddr address; + + sockaddr_storage storage; + + sockaddr_in in4; + sockaddr_in6 in6; + + sockaddr_un unix_address; } + + /+ + this(string node, string serviceOrPort, int family = 0) { + // need to populate the approrpiate address and the length and make sure you set sa_family + } + +/ + + int domain() { + return address.sa_family; + } + sockaddr* rawAddr() return { + return &address; + } + socklen_t rawAddrLength() { + return addrlen; + } + + // FIXME it is AF_BLUETOOTH + // see: https://people.csail.mit.edu/albert/bluez-intro/x79.html + // see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets } -/++ - For functions that give you an unknown address, you can use this to hold it. -+/ -struct SocketAddressBuffer { - sockaddr address; - socklen_t addrlen; +private version(Windows) { + struct sockaddr_un { + ushort sun_family; + char[108] sun_path; + } } class AsyncSocket : AsyncFile { @@ -2875,6 +3137,11 @@ class AsyncSocket : AsyncFile { setCloExec(handle); } + if(address.domain == AF_INET6) { + int opt = 1; + setsockopt(handle, IPPROTO_IPV6 /*SOL_IPV6*/, IPV6_V6ONLY, &opt, opt.sizeof); + } + // FIXME: chekc for broadcast // FIXME: REUSEADDR ? @@ -2945,8 +3212,8 @@ class AsyncSocket : AsyncFile { /++ You can also construct your own request externally to control the memory more. +/ - AsyncConnectRequest connect(SocketAddress address) { - return new AsyncConnectRequest(this, address); + AsyncConnectRequest connect(SocketAddress address, ubyte[] bufferToSend = null) { + return new AsyncConnectRequest(this, address, bufferToSend); } /++ @@ -2975,25 +3242,29 @@ class AsyncSocket : AsyncFile { /++ You can also construct your own request externally to control the memory more. +/ - AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress address, int flags = 0) { + AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress* address, int flags = 0) { return new AsyncSendRequest(this, buffer, address, flags); } /++ You can also construct your own request externally to control the memory more. +/ - AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddressBuffer* address, int flags = 0) { + AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddress* address, int flags = 0) { return new AsyncReceiveRequest(this, buffer, address, flags); } /++ +/ SocketAddress localAddress() { - return null; // FIXME + SocketAddress addr; + getsockname(handle, &addr.address, &addr.addrlen); + return addr; } /++ +/ SocketAddress peerAddress() { - return null; // FIXME + SocketAddress addr; + getpeername(handle, &addr.address, &addr.addrlen); + return addr; } // for unix sockets on unix only: send/receive fd, get peer creds @@ -3009,9 +3280,17 @@ class AsyncSocket : AsyncFile { } /++ + Initiates a connection request and optionally sends initial data as soon as possible. + + Calls `ConnectEx` on Windows and emulates it on other systems. + + The entire buffer is sent before the operation is considered complete. + + NOT IMPLEMENTED / NOT STABLE +/ class AsyncConnectRequest : AsyncOperationRequest { - this(AsyncSocket socket, SocketAddress address) { + // FIXME: i should take a list of addresses and take the first one that succeeds, so a getaddrinfo can be sent straight in. + this(AsyncSocket socket, SocketAddress address, ubyte[] dataToWrite) { } @@ -3035,17 +3314,67 @@ class AsyncConnectResponse : AsyncOperationResponse { } +// FIXME: TransmitFile/sendfile support + /++ + Calls `AcceptEx` on Windows and emulates it on other systems. + + NOT IMPLEMENTED / NOT STABLE +/ class AsyncAcceptRequest : AsyncOperationRequest { - this(AsyncSocket socket) { - - } + AsyncSocket socket; override void start() {} override void cancel() {} override bool isComplete() { return true; } override AsyncConnectResponse waitForCompletion() { assert(0); } + + + struct LowLevelOperation { + AsyncSocket file; + ubyte[] buffer; + SocketAddress* address; + + this(typeof(this.tupleof) args) { + this.tupleof = args; + } + + version(Windows) { + auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) { + WSABUF buf; + buf.len = cast(int) buffer.length; + buf.buf = cast(typeof(buf.buf)) buffer.ptr; + + uint flags; + + if(address is null) + return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr); + else { + return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr); + } + } + } else { + auto opCall() { + int flags; + if(address is null) + return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags); + else + return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen)); + } + } + + string errorString() { + return "Receive"; + } + } + mixin OverlappedIoRequest!(AsyncAcceptResponse, LowLevelOperation); + + this(AsyncSocket socket, ubyte[] buffer = null, SocketAddress* address = null) { + llo = LowLevelOperation(socket, buffer, address); + this.response = typeof(this.response).defaultConstructed; + } + + // can also look up the local address } /++ +/ @@ -3053,6 +3382,10 @@ class AsyncAcceptResponse : AsyncOperationResponse { AsyncSocket newSocket; const SystemErrorCode errorCode; + this(SystemErrorCode errorCode, ubyte[] buffer) { + this.errorCode = errorCode; + } + this(AsyncSocket newSocket, SystemErrorCode errorCode) { this.newSocket = newSocket; this.errorCode = errorCode; @@ -3070,7 +3403,7 @@ class AsyncReceiveRequest : AsyncOperationRequest { AsyncSocket file; ubyte[] buffer; int flags; - SocketAddressBuffer* address; + SocketAddress* address; this(typeof(this.tupleof) args) { this.tupleof = args; @@ -3105,7 +3438,7 @@ class AsyncReceiveRequest : AsyncOperationRequest { } mixin OverlappedIoRequest!(AsyncReceiveResponse, LowLevelOperation); - this(AsyncSocket socket, ubyte[] buffer, SocketAddressBuffer* address, int flags) { + this(AsyncSocket socket, ubyte[] buffer, SocketAddress* address, int flags) { llo = LowLevelOperation(socket, buffer, flags, address); this.response = typeof(this.response).defaultConstructed; } @@ -3134,7 +3467,7 @@ class AsyncSendRequest : AsyncOperationRequest { AsyncSocket file; const(ubyte)[] buffer; int flags; - SocketAddress address; + SocketAddress* address; this(typeof(this.tupleof) args) { this.tupleof = args; @@ -3167,7 +3500,7 @@ class AsyncSendRequest : AsyncOperationRequest { } mixin OverlappedIoRequest!(AsyncSendResponse, LowLevelOperation); - this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress address, int flags) { + this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress* address, int flags) { llo = LowLevelOperation(socket, buffer, flags, address); this.response = typeof(this.response).defaultConstructed; } @@ -3191,22 +3524,58 @@ class AsyncSendResponse : AsyncOperationResponse { } /++ - A socket bound and ready to accept connections. + A set of sockets bound and ready to accept connections on worker threads. + + Depending on the specified address, it can be tcp, tcpv6, unix domain, or all of the above. - Depending on the specified address, it can be tcp, tcpv6, or unix domain. + NOT IMPLEMENTED / NOT STABLE +/ class StreamServer { - this(SocketAddress listenTo) { + AsyncSocket[] sockets; + this(SocketAddress[] listenTo, int backlog = 8) { + foreach(listen; listenTo) { + auto socket = new AsyncSocket(listen, SOCK_STREAM); + + // FIXME: allInterfaces for ipv6 also covers ipv4 so the bind can fail... + // so we have to permit it to fail w/ address in use if we know we already + // are listening to ipv6 + + // or there is a setsockopt ipv6 only thing i could set. + + socket.bind(listen); + socket.listen(backlog); + sockets ~= socket; + + // writeln(socket.localAddress.port); + } + + // i have to start accepting on each thread for each socket... } // when a new connection arrives, it calls your callback // can be on a specific thread or on any thread + + + void start() { + foreach(socket; sockets) { + auto request = socket.accept(); + request.start(); + } + } +} + +/+ +unittest { + auto ss = new StreamServer(SocketAddress.localhost(0)); } ++/ /++ A socket bound and ready to use receiveFrom Depending on the address, it can be udp or unix domain. + + NOT IMPLEMENTED / NOT STABLE +/ class DatagramListener { // whenever a udp message arrives, it calls your callback @@ -3407,6 +3776,7 @@ unittest { // dispatches change event to either your thread or maybe the any task` queue. /++ + PARTIALLY IMPLEMENTED / NOT STABLE +/ class DirectoryWatcher { @@ -3965,11 +4335,158 @@ class AsyncReadResponse : AsyncOperationResponse { runHelperFunction() - whomever it reports to is the parent +/ -/+ -class Task : Fiber { +class ScheduableTask : Fiber { + private void delegate() dg; + + // linked list stuff + private static ScheduableTask taskRoot; + private ScheduableTask previous; + private ScheduableTask next; + + // need the controlling thread to know how to wake it up if it receives a message + private Thread controllingThread; + + // the api + + this(void delegate() dg) { + assert(dg !is null); + + this.dg = dg; + super(&taskRunner); + + if(taskRoot !is null) { + this.next = taskRoot; + taskRoot.previous = this; + } + taskRoot = this; + } + + /+ + enum BehaviorOnCtrlC { + ignore, + cancel, + deliverMessage + } + +/ + private bool cancelled; + + public void cancel() { + this.cancelled = true; + // if this is running, we can throw immediately + // otherwise if we're calling from an appropriate thread, we can call it immediately + // otherwise we need to queue a wakeup to its own thread. + // tbh we should prolly just queue it every time + } + + private void taskRunner() { + try { + dg(); + } catch(TaskCancelledException tce) { + // this space intentionally left blank; + // the purpose of this exception is to just + // let the fiber's destructors run before we + // let it die. + } catch(Throwable t) { + if(taskUncaughtException is null) { + throw t; + } else { + taskUncaughtException(t); + } + } finally { + if(this is taskRoot) { + taskRoot = taskRoot.next; + if(taskRoot !is null) + taskRoot.previous = null; + } else { + assert(this.previous !is null); + assert(this.previous.next is this); + this.previous.next = this.next; + if(this.next !is null) + this.next.previous = this.previous; + } + } + } } + +/++ + +/ +void delegate(Throwable t) taskUncaughtException; + +/++ + Gets an object that lets you control a schedulable task (which is a specialization of a fiber) and can be used in an `if` statement. + + --- + if(auto controller = inSchedulableTask()) { + controller.yieldUntilReadable(...); + } + --- + + History: + Added August 11, 2023 (dub v11.1) ++/ +SchedulableTaskController inSchedulableTask() { + import core.thread.fiber; + + if(auto fiber = Fiber.getThis) { + return SchedulableTaskController(cast(ScheduableTask) fiber); + } + + return SchedulableTaskController(null); +} + +/// ditto +struct SchedulableTaskController { + private this(ScheduableTask fiber) { + this.fiber = fiber; + } + + private ScheduableTask fiber; + + /++ + + +/ + bool opCast(T : bool)() { + return fiber !is null; + } + + /++ + + +/ + version(Posix) + void yieldUntilReadable(NativeFileHandle handle) { + assert(fiber !is null); + + auto cb = new CallbackHelper(() { fiber.call(); }); + + // FIXME: if the fd is already registered in this thread it can throw... + version(Windows) + auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb); + else + auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb); + + // FIXME: this is only valid if the fiber is only ever going to run in this thread! + fiber.yield(); + + rearmToken.unregister(); + + // what if there are other messages, like a ctrl+c? + if(fiber.cancelled) + throw new TaskCancelledException(); + } + + version(Windows) + void yieldUntilSignaled(NativeFileHandle handle) { + // add it to the WaitForMultipleObjects thing w/ a cb + } +} + +class TaskCancelledException : object.Exception { + this() { + super("Task cancelled"); + } +} private class CoreWorkerThread : Thread { this(EventLoopType type) { @@ -3987,7 +4504,13 @@ private class CoreWorkerThread : Thread { atomicOp!"-="(runningCount, 1); } - eventLoop.run(() => true); + eventLoop.run(() => cancelled); + } + + private bool cancelled; + + void cancel() { + cancelled = true; } EventLoopType type; @@ -4030,6 +4553,14 @@ private class CoreWorkerThread : Thread { started = true; } } + + void cancelAll() { + foreach(runner; taskRunners) + runner.cancel(); + foreach(runner; helperRunners) + runner.cancel(); + + } } } @@ -4055,6 +4586,7 @@ private int numberOfCpus() { Its destructor runs the event loop then waits to for the workers to finish to clean them up. +/ +// FIXME: single instance? struct ArsdCoreApplication { private ICoreEventLoop impl; @@ -4085,21 +4617,25 @@ struct ArsdCoreApplication { @disable new(); ~this() { - run(); + if(!alreadyRun) + run(); exitApplication(); waitForWorkersToExit(3000); } void exitApplication() { - + CoreWorkerThread.cancelAll(); } void waitForWorkersToExit(int timeoutMilliseconds) { } + private bool alreadyRun; + void run() { - impl.run(() => true); + impl.run(() => false); + alreadyRun = true; } } @@ -4941,7 +5477,7 @@ class WritableStream { /++ +/ - final void put(T)(T value, ByteOrder byteOrder = ByteOrder.irrelevant) { + final void put(T)(T value, ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { static if(T.sizeof == 8) ulong b; else static if(T.sizeof == 4) @@ -4953,7 +5489,7 @@ class WritableStream { else static assert(0, "unimplemented type, try using just the basic types"); if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) - throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte"); + throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "WritableStream.put", file, line); final switch(byteOrder) { case ByteOrder.irrelevant: @@ -4976,9 +5512,9 @@ class WritableStream { } /// ditto - final void put(T : E[], E)(T value, ByteOrder elementByteOrder = ByteOrder.irrelevant) { + final void put(T : E[], E)(T value, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { foreach(item; value) - put(item, elementByteOrder); + put(item, elementByteOrder, file, line); } /++ @@ -5051,9 +5587,9 @@ class ReadableStream { ubyte[] data = stream.get!(ubyte[])(i); --- +/ - final T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant) { + final T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1) - throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte"); + throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); // FIXME: what if it is a struct? @@ -5093,9 +5629,9 @@ class ReadableStream { } /// ditto - final T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant) { + final T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { if(elementByteOrder == ByteOrder.irrelevant && E.sizeof > 1) - throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte"); + throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); // if the stream is closed before getting the length or the terminator, should we send partial stuff // or just throw? @@ -5120,9 +5656,9 @@ class ReadableStream { } /// ditto - final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant) { + final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) { if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1) - throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte"); + throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line); assert(0, "Not implemented"); } @@ -5234,6 +5770,8 @@ unittest { } /++ + UNSTABLE, NOT FULLY IMPLEMENTED. DO NOT USE YET. + You might use this like: --- @@ -5256,11 +5794,12 @@ unittest { proc.start(); --- - Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. + Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. Of course, I might change my mind on this. - Of course, I might change my mind on this. + Bugs: + Not implemented at all on Windows yet. +/ -class ExternalProcess { +class ExternalProcess /*: AsyncOperationRequest*/ { private static version(Posix) { __gshared ExternalProcess[pid_t] activeChildren; @@ -5292,12 +5831,14 @@ class ExternalProcess { version(Posix) { assert(0, "not implemented command line to posix args yet"); } + else throw new NotYetImplementedException(); } this(string commandLine) { version(Posix) { assert(0, "not implemented command line to posix args yet"); } + else throw new NotYetImplementedException(); } this(string[] args) { @@ -5305,7 +5846,7 @@ class ExternalProcess { this.program = FilePath(args[0]); this.args = args; } - + else throw new NotYetImplementedException(); } /++ @@ -5316,6 +5857,7 @@ class ExternalProcess { this.program = program; this.args = args; } + else throw new NotYetImplementedException(); } // you can modify these before calling start @@ -5488,6 +6030,7 @@ class ExternalProcess { // also need to listen to SIGCHLD to queue up the terminated callback. FIXME stdoutUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stdoutFd, new CallbackHelper(&stdoutReadable)); + stderrUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stderrFd, new CallbackHelper(&stderrReadable)); } } } @@ -5501,6 +6044,7 @@ class ExternalProcess { int stderrFd = -1; ICoreEventLoop.UnregisterToken stdoutUnregisterToken; + ICoreEventLoop.UnregisterToken stderrUnregisterToken; pid_t pid = -1; @@ -5510,12 +6054,13 @@ class ExternalProcess { string[] args; void stdoutReadable() { - ubyte[1024] buffer; - auto ret = read(stdoutFd, buffer.ptr, buffer.length); + if(stdoutReadBuffer is null) + stdoutReadBuffer = new ubyte[](stdoutBufferSize); + auto ret = read(stdoutFd, stdoutReadBuffer.ptr, stdoutReadBuffer.length); if(ret == -1) throw new ErrnoApiException("read", errno); if(onStdoutAvailable) { - onStdoutAvailable(buffer[0 .. ret]); + onStdoutAvailable(stdoutReadBuffer[0 .. ret]); } if(ret == 0) { @@ -5525,8 +6070,29 @@ class ExternalProcess { stdoutFd = -1; } } + + void stderrReadable() { + if(stderrReadBuffer is null) + stderrReadBuffer = new ubyte[](stderrBufferSize); + auto ret = read(stderrFd, stderrReadBuffer.ptr, stderrReadBuffer.length); + if(ret == -1) + throw new ErrnoApiException("read", errno); + if(onStderrAvailable) { + onStderrAvailable(stderrReadBuffer[0 .. ret]); + } + + if(ret == 0) { + stderrUnregisterToken.unregister(); + + close(stderrFd); + stderrFd = -1; + } + } } + private ubyte[] stdoutReadBuffer; + private ubyte[] stderrReadBuffer; + void waitForCompletion() { getThisThreadEventLoop().run(&this.isComplete); } @@ -5594,22 +6160,6 @@ unittest { static int received; - static void tester() { - received++; - //writeln(cast(void*) Thread.getThis, " ", received); - } - - foreach(ref thread; pool) { - thread = new Thread(() { - getThisThreadEventLoop().run(() { - return shouldExit; - }); - }); - thread.start(); - } - - - proc.writeToStdin("hello!"); proc.writeToStdin(null); // closes the pipe @@ -5618,6 +6168,8 @@ unittest { assert(proc.status == 0); assert(c == 2); + + // writeln("here"); } +/ @@ -5651,42 +6203,53 @@ unittest { ================= +/ +private void appendToBuffer(ref char[] buffer, ref int pos, scope const(char)[] what) { + auto required = pos + what.length; + if(buffer.length < required) + buffer.length = required; + buffer[pos .. pos + what.length] = what[]; + pos += what.length; +} + +private void appendToBuffer(ref char[] buffer, ref int pos, long what) { + if(buffer.length < pos + 16) + buffer.length = pos + 16; + auto sliced = intToString(what, buffer[pos .. $]); + pos += sliced.length; +} + /++ - A `writeln` that actually works. + A `writeln` that actually works, at least for some basic types. It works correctly on Windows, using the correct functions to write unicode to the console. even allocating a console if needed. If the output has been redirected to a file or pipe, it writes UTF-8. - This always does text. See also WritableStream and WritableTextStream + This always does text. See also WritableStream and WritableTextStream when they are implemented. +/ void writeln(T...)(T t) { char[256] bufferBacking; char[] buffer = bufferBacking[]; int pos; + foreach(arg; t) { static if(is(typeof(arg) : const char[])) { - buffer[pos .. pos + arg.length] = arg[]; - pos += arg.length; + appendToBuffer(buffer, pos, arg); } else static if(is(typeof(arg) : stringz)) { - auto b = arg.borrow; - buffer[pos .. pos + b.length] = b[]; - pos += b.length; + appendToBuffer(buffer, pos, arg.borrow); } else static if(is(typeof(arg) : long)) { - auto sliced = intToString(arg, buffer[pos .. $]); - pos += sliced.length; + appendToBuffer(buffer, pos, arg); } else static if(is(typeof(arg.toString()) : const char[])) { - auto s = arg.toString(); - buffer[pos .. pos + s.length] = s[]; - pos += s.length; + appendToBuffer(buffer, pos, arg.toString()); } else { - auto s = "<unsupported type>"; - buffer[pos .. pos + s.length] = s[]; - pos += s.length; - // static assert(0, "Unsupported type: " ~ T.stringof); + appendToBuffer(buffer, pos, "<" ~ typeof(arg).stringof ~ ">"); } } - buffer[pos++] = '\n'; + appendToBuffer(buffer, pos, "\n"); + + actuallyWriteToStdout(buffer[0 .. pos]); +} +private void actuallyWriteToStdout(scope char[] buffer) @trusted { version(Windows) { import core.sys.windows.wincon; @@ -5698,17 +6261,17 @@ void writeln(T...)(T t) { if(GetFileType(hStdOut) == FILE_TYPE_CHAR) { wchar[256] wbuffer; - auto toWrite = makeWindowsString(buffer[0 .. pos], wbuffer, WindowsStringConversionFlags.convertNewLines); + auto toWrite = makeWindowsString(buffer, wbuffer, WindowsStringConversionFlags.convertNewLines); DWORD written; WriteConsoleW(hStdOut, toWrite.ptr, cast(DWORD) toWrite.length, &written, null); } else { DWORD written; - WriteFile(hStdOut, buffer.ptr, pos, &written, null); + WriteFile(hStdOut, buffer.ptr, cast(DWORD) buffer.length, &written, null); } } else { import unix = core.sys.posix.unistd; - unix.write(1, buffer.ptr, pos); + unix.write(1, buffer.ptr, buffer.length); } } |