1 module osc.server;
2 
3 import std.socket;
4 import std.container;
5 import core.thread;
6 import core.sync.mutex;
7 import osc.message;
8 import osc.packet;
9 import osc.bundle;
10 
11 
12 /++
13 +/
14 class PullServer {
15     public{
16         this(ushort port){
17             this(new InternetAddress ("localhost", port));
18         }
19 
20         ///
21         this(InternetAddress internetAddress){
22             import std.socket;
23             _socket = new UdpSocket();
24             _socket.bind (internetAddress);
25         }
26 
27         const(Message)[] receive(){
28             // while(true){
29             const(Message)[] messages;
30             size_t l;
31             do{
32                 ubyte[512] recvRaw;
33                 l = _socket.receive(recvRaw);
34                 if(l>0){
35                     messages ~= Packet(recvRaw[0..l]).messages;
36                 }
37             }while(l>0);
38             return messages;
39         }
40     }//public
41 
42     private{
43         UdpSocket _socket;
44     }//private
45 }//class PullServer
46 
47 /++
48 +/
49 class Server{
50     public{
51         ///
52         this(ushort port){
53             this(new InternetAddress ("localhost", port));
54         }
55         
56         ///
57         this(InternetAddress internetAddress){
58             import std.socket;
59             _messages = new Messages;
60             auto socket = new UdpSocket();
61             socket.bind (internetAddress);
62             auto _thread = new Thread(() => receive(socket)).start;
63         }
64         
65         ///
66         ~this(){
67         }
68 
69         const(Message)[] popMessages(){
70             // const(Message) m = _messages[0];
71             // _messages = _messages[1..$];
72             return _messages.popMessages;
73         }
74 
75         void close(){
76             _thread.join;
77         }
78         
79         // bool hasMessage()const{
80         //     auto numMessages = _messages.length;
81         //
82         //     return _messages.length != 0;
83         // }
84     }//public
85 
86     private{
87         Messages _messages;
88         Thread _thread;
89         
90         void receive(Socket socket){
91             ubyte[512] recvRaw;
92             while(true){
93                 size_t l = socket.receive(recvRaw);
94                 _messages.pushMessages(Packet(recvRaw[0..l]).messages);
95             }
96         }
97     }//private
98 }//class Server
99 
100 /++
101 +/
102 private class Messages {
103     public{
104         Mutex mtx;
105         this(){
106             mtx = new Mutex();
107         }
108 
109         const(Message)[] popMessages(){
110             mtx.lock; scope(exit)mtx.unlock;
111             const(Message)[] result = cast(const(Message)[])(_contents);
112             _contents = [];
113             return result;
114         }
115 
116         void pushMessages(const(Message)[] messages){
117             mtx.lock;
118             _contents ~= cast(const(Message)[])messages;
119             mtx.unlock;
120         }
121     }//public
122 
123     private{
124         const(Message)[] _contents;
125     }//private
126 }//class Messages
127 
128 private{
129     const(Message)[] messages(in Packet packet){
130         const(Message)[] list;
131         if(packet.hasMessage){
132             list ~= packet.message;
133         }
134         if(packet.hasBundle){
135             list = messagesRecur(packet.bundle);
136         }
137         return list;
138         
139     }
140     
141     const(Message)[] messagesRecur(in Bundle bundle){
142         const(Message)[] list;
143         foreach (ref element; bundle.elements) {
144             if(element.hasMessage){
145                 list ~= element.message;
146             }
147             if(element.hasBundle){
148                 list ~= element.bundle.messagesRecur;
149             }
150         }
151         return list;
152     }
153 }