NTP Analyzer  0.8.2
Analyze the operation of time servers
Scheduler.cs
Go to the documentation of this file.
1 //
2 // Copyright (c) 2013-2017 Carsten Sonne Larsen <cs@innolan.net>
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 using System;
23 using System.Collections;
24 using System.Collections.Generic;
25 using System.Globalization;
26 using System.Linq;
27 using System.Threading;
28 using Ntp.Common.Log;
29 
30 namespace Ntp.Common.Process
31 {
35  public sealed class Scheduler : IScheduler, IDisposable
36  {
41  public Scheduler(LogBase log)
42  {
43  StartTime = DateTime.Now;
44  firstRun = true;
45  Active = false;
46 
47  runningThreads = new List<Thread>();
48  WaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
49 
50  schedule = new List<ScheduledJob>();
51  jobs = new List<Job>();
53 
54  LogGroup logGroup = LogFactory.CreateGroupLog();
55  logGroup.Add(log);
56  logGroup.Add(ActivityLog);
57 
58  Log = logGroup;
59  }
60 
61  private readonly List<Job> jobs;
62  private readonly List<Thread> runningThreads;
63  private readonly List<ScheduledJob> schedule;
64  private bool firstRun;
65 
70  public EventWaitHandle WaitHandle { get; }
71 
76  public DateTime StartTime { get; }
77 
82  public LogBase Log { get; }
83 
88  public IEnumerator<Job> GetEnumerator()
89  {
90  return jobs.GetEnumerator();
91  }
92 
97  IEnumerator IEnumerable.GetEnumerator()
98  {
99  return jobs.GetEnumerator();
100  }
101 
106  public bool Active { get; private set; }
107 
112  public ActivityLog ActivityLog { get; }
113 
118  public IEnumerable<ScheduledJob> Schedule => schedule;
119 
124  public ScheduledJob NextJob { get; private set; }
125 
130  public void Add(JobDescription description)
131  {
132  if (description.Configuration == null || description.Configuration.Frequency == -1)
133  return;
134 
135  var scheduleDescription = new JobScheduleDescription(
136  description.Configuration.InitialRun,
137  description.Configuration.FixedRun,
138  description.Configuration.Frequency);
139 
140  var job = new Job(description, scheduleDescription, Log);
141  jobs.Add(job);
142 
143  Log.SchedulerJobAdded(description, job);
144 
145  QueueJob(job, StartTime);
146  }
147 
151  public void RunOneCycle()
152  {
153  Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
154 
155  if (firstRun)
156  {
157  firstRun = false;
158  Active = true;
159 
160  lock (schedule)
161  {
162  Log.SchedulerStart(schedule.Count);
163  }
164  }
165 
166  // Take the next job of the queue
167  ScheduledJob next;
168  lock (schedule)
169  {
170  next = schedule.OrderBy(j => j.Run).ThenBy(j => j.Job.Description.Priority).First();
171  schedule.Remove(next);
172  next.Job.Queued = false;
173  NextJob = next;
174  }
175 
176  // Wait until job should be run
177  int wait = Convert.ToInt32(next.Run.Subtract(DateTime.Now).TotalMilliseconds);
178  if (wait < 0)
179  {
180  if (wait < -5000)
181  Log.SchedulerBehind();
182 
183  wait = 0;
184  }
185 
186  bool signal = WaitHandle.WaitOne(wait);
187  if (signal)
188  {
189  // Abort
190  return;
191  }
192 
193  // Check if the job is already running and postpone if needed
194  lock (schedule)
195  {
196  if (next.Job.Description.ThreadType == ThreadType.SingleThreaded &&
197  schedule.Count(j => j.Job.Description.ThreadType == ThreadType.SingleThreaded && j.Job.Running) != 0)
198  {
199  PostponeJob(next.Job);
200  return;
201  }
202  }
203 
204  Log.SchedulerJobExecuting(next);
205  if (next.Job.Description.ThreadType != ThreadType.NoThread)
206  {
207  // Queue the job on the threadpool
208  ThreadPool.QueueUserWorkItem(JobThreadStart, next.Job);
209  }
210  else
211  {
212  // Execute directly in scheduler thread
213  ExecuteJob(next.Job);
214  }
215 
216  // Dont re-schedule "run-only-once" jobs.
217  if (next.Job.Schedule.Frequency == 0)
218  return;
219 
220  QueueJob(next.Job, DateTime.Now);
221  }
222 
223  public void Stop()
224  {
225  int count;
226 
227  // Remove finished threads
228  lock (runningThreads)
229  {
230  foreach (var finished in runningThreads.Where(t => !t.IsAlive).ToList())
231  runningThreads.Remove(finished);
232 
233  count = runningThreads.Count;
234  }
235 
236  if (count != 0)
237  Log.SchedulerWaiting(count);
238 
239  lock (runningThreads)
240  {
241  foreach (var thread in runningThreads.ToList())
242  {
243  // Wait maximum 15 seconds
244  thread.Join(15000/count);
245  if (!thread.IsAlive)
246  continue;
247 
248  Log.SchedulerAbort(thread);
249  thread.Abort();
250  }
251  }
252 
253  Log.SchedulerFinished();
254  }
255 
256  private void ExecuteJob(Job job)
257  {
258  try
259  {
260  job.Execute();
261  }
262  catch (Exception e)
263  {
264  Log.SchedulerError(job.Description.Name, e);
265  }
266  }
267 
268  private void JobThreadStart(object stateInfo)
269  {
270  try
271  {
272  lock (runningThreads)
273  runningThreads.Add(Thread.CurrentThread);
274 
275  var job = (Job) stateInfo;
276  Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
277  //Thread.CurrentThread.Name = job.Description.Name;
278  job.Execute();
279  }
280  catch (Exception e)
281  {
282  Log.SchedulerError(Thread.CurrentThread.Name, e);
283  }
284  finally
285  {
286  lock (runningThreads)
287  {
288  if (runningThreads.Contains(Thread.CurrentThread))
289  {
290  runningThreads.Remove(Thread.CurrentThread);
291  }
292  }
293  }
294  }
295 
300  private void PostponeJob(Job job)
301  {
302  lock (schedule)
303  {
304  ScheduledJob scheduledJob = job.Schedule.CreatePostponed(job);
305  schedule.Add(scheduledJob);
306  job.Queued = true;
307  job.Postponed = true;
308  Log.SchedulerJobStatus(scheduledJob);
309  }
310  }
311 
317  private void QueueJob(Job job, DateTime run)
318  {
319  DateTime next = job.Schedule.CalculateNextRun(run);
320  double offset = 0;
321 
322  // Adjust offset to avoid simultaneously job execution.
323  if (job.Schedule.CanMove)
324  {
325  lock (schedule)
326  {
327  while (schedule.Count(j => Math.Abs(next.Subtract(j.Run).TotalMilliseconds) < 5000) != 0)
328  {
329  double move = job.Schedule.Frequency/40.0;
330  next = next.AddMinutes(move);
331  offset += move;
332  }
333  }
334  }
335 
336  lock (schedule)
337  {
338  ScheduledJob scheduledJob = job.Schedule.CreateNew(job, run, offset);
339  schedule.Add(scheduledJob);
340  job.Queued = true;
341  job.Postponed = false;
342  Log.SchedulerJobStatus(scheduledJob);
343  }
344  }
345 
346  #region IDisposable Support
347 
348  private bool disposedValue;
349 
350  private void Dispose(bool disposing)
351  {
352  if (disposedValue)
353  return;
354 
355  if (disposing)
356  {
357  WaitHandle.Dispose();
358  }
359 
360  disposedValue = true;
361  }
362 
364  {
365  Dispose(false);
366  }
367 
368  public void Dispose()
369  {
370  Dispose(true);
371  GC.SuppressFinalize(this);
372  }
373 
374  #endregion
375  }
376 }
void Add(LogBase log)
Definition: LogGroup.cs:71
bool Queued
Gets or sets a value indicating whether this Job is queued for run.
Definition: Job.cs:77
readonly List< ScheduledJob > schedule
Definition: Scheduler.cs:63
IJobConfiguration Configuration
Gets the configuration for the job.
bool Postponed
Gets or sets a value indicating whether this Job has been postponed.
Definition: Job.cs:83
A scheduler performs scheduling of jobs according to job schedule descriptions.
Definition: Scheduler.cs:35
ScheduledJob CreatePostponed(Job job)
Creates a postponed job schedule.
void Add(JobDescription description)
Add the specified job to the scheduler queue.
Definition: Scheduler.cs:130
DateTime Run
Gets the time of planned execution.
Definition: ScheduledJob.cs:52
ScheduledJob CreateNew(Job job, DateTime start, double offset)
Creates a new scheduled job based on description and parameters.
static ActivityLog CreateActivityLog()
Definition: LogFactory.cs:45
void QueueJob(Job job, DateTime run)
Queue job for scheduled run.
Definition: Scheduler.cs:317
A description of job schedule rules.
Job Job
Gets the job to execute.
Definition: ScheduledJob.cs:46
static LogGroup CreateGroupLog()
Definition: LogFactory.cs:63
readonly List< Job > jobs
Definition: Scheduler.cs:61
abstract ThreadType ThreadType
Gets a value indicating whether this JobDescription should run as a single thread.
DateTime CalculateNextRun(DateTime start)
Calculates the time of next run according to description parameters.
var e
Definition: bootstrap.min.js:6
void JobThreadStart(object stateInfo)
Definition: Scheduler.cs:268
JobScheduleDescription Schedule
Gets the schedule.
Definition: Job.cs:65
void Execute()
Execute this Job.
Definition: Job.cs:135
Scheduler(LogBase log)
Initializes a new instance of the Scheduler class.
Definition: Scheduler.cs:41
A job which have been scheduled for execution.
Definition: ScheduledJob.cs:29
void RunOneCycle()
Run the scheduler "queue pump" method.
Definition: Scheduler.cs:151
bool CanMove
Gets a value indicating whether this description allow schedules to be moved.
int Frequency
Gets the schedule frequency in minutes.
Base class for jobs following the GoF Command Pattern.
void Dispose(bool disposing)
Definition: Scheduler.cs:350
IEnumerator< Job > GetEnumerator()
Gets the enumerator.
Definition: Scheduler.cs:88
void PostponeJob(Job job)
Postpones the job when putting it to the queue.
Definition: Scheduler.cs:300
readonly List< Thread > runningThreads
Definition: Scheduler.cs:62
JobDescription Description
Gets the description.
Definition: Job.cs:71